聊聊Elasticsearch的NodesSniffer

栏目: 后端 · 发布时间: 4年前

内容简介:本文主要研究一下Elasticsearch的NodesSnifferelasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/NodesSniffer.javaelasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/ElasticsearchNodesSniffer.java

本文主要研究一下Elasticsearch的NodesSniffer

NodesSniffer

elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/NodesSniffer.java

/**
 * Responsible for sniffing the http hosts
 */
public interface NodesSniffer {
    /**
     * Returns the sniffed Elasticsearch nodes.
     */
    List<Node> sniff() throws IOException;
}
  • NodesSniffer接口定义了sniff方法用于获取sniffed Elasticsearch nodes,它有一个实现类为ElasticsearchNodesSniffer

ElasticsearchNodesSniffer

elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/ElasticsearchNodesSniffer.java

public final class ElasticsearchNodesSniffer implements NodesSniffer {

    private static final Log logger = LogFactory.getLog(ElasticsearchNodesSniffer.class);

    public static final long DEFAULT_SNIFF_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(1);

    private final RestClient restClient;
    private final Request request;
    private final Scheme scheme;
    private final JsonFactory jsonFactory = new JsonFactory();

    public ElasticsearchNodesSniffer(RestClient restClient) {
        this(restClient, DEFAULT_SNIFF_REQUEST_TIMEOUT, ElasticsearchNodesSniffer.Scheme.HTTP);
    }

    public ElasticsearchNodesSniffer(RestClient restClient, long sniffRequestTimeoutMillis, Scheme scheme) {
        this.restClient = Objects.requireNonNull(restClient, "restClient cannot be null");
        if (sniffRequestTimeoutMillis < 0) {
            throw new IllegalArgumentException("sniffRequestTimeoutMillis must be greater than 0");
        }
        this.request = new Request("GET", "/_nodes/http");
        request.addParameter("timeout", sniffRequestTimeoutMillis + "ms");
        this.scheme = Objects.requireNonNull(scheme, "scheme cannot be null");
    }

    /**
     * Calls the elasticsearch nodes info api, parses the response and returns all the found http hosts
     */
    @Override
    public List<Node> sniff() throws IOException {
        Response response = restClient.performRequest(request);
        return readHosts(response.getEntity(), scheme, jsonFactory);
    }

    static List<Node> readHosts(HttpEntity entity, Scheme scheme, JsonFactory jsonFactory) throws IOException {
        try (InputStream inputStream = entity.getContent()) {
            JsonParser parser = jsonFactory.createParser(inputStream);
            if (parser.nextToken() != JsonToken.START_OBJECT) {
                throw new IOException("expected data to start with an object");
            }
            List<Node> nodes = new ArrayList<>();
            while (parser.nextToken() != JsonToken.END_OBJECT) {
                if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
                    if ("nodes".equals(parser.getCurrentName())) {
                        while (parser.nextToken() != JsonToken.END_OBJECT) {
                            JsonToken token = parser.nextToken();
                            assert token == JsonToken.START_OBJECT;
                            String nodeId = parser.getCurrentName();
                            Node node = readNode(nodeId, parser, scheme);
                            if (node != null) {
                                nodes.add(node);
                            }
                        }
                    } else {
                        parser.skipChildren();
                    }
                }
            }
            return nodes;
        }
    }

    //......

    public enum Scheme {
        HTTP("http"), HTTPS("https");

        private final String name;

        Scheme(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return name;
        }
    }

}
  • ElasticsearchNodesSniffer的构造器需要restClient、sniffRequestTimeoutMillis、scheme三个参数,其中sniffRequestTimeoutMillis默认为1秒,scheme默认为HTTP;它的构造器创建了GET /_nodes/http的request;sniff方法使用restClient.performRequest来执行这个GET /_nodes/http的request,之后调用readHosts来解析response,其中调用了readNode方法来解析nodes部分

GET /_nodes/http实例

{
  "_nodes" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "cluster_name" : "docker-cluster",
  "nodes" : {
    "d7w2wdw7Q7SqERe5_fxZYA" : {
      "name" : "d7w2wdw",
      "transport_address" : "172.17.0.2:9300",
      "host" : "172.17.0.2",
      "ip" : "172.17.0.2",
      "version" : "6.6.2",
      "build_flavor" : "oss",
      "build_type" : "tar",
      "build_hash" : "3bd3e59",
      "roles" : [
        "master",
        "data",
        "ingest"
      ],
      "http" : {
        "bound_address" : [
          "0.0.0.0:9200"
        ],
        "publish_address" : "192.168.99.100:9200",
        "max_content_length_in_bytes" : 104857600
      }
    }
  }
}
  • 这里http部分有publish_address信息

readNode

elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/ElasticsearchNodesSniffer.java

public final class ElasticsearchNodesSniffer implements NodesSniffer {
    //......

    private static Node readNode(String nodeId, JsonParser parser, Scheme scheme) throws IOException {
        HttpHost publishedHost = null;
        /*
         * We sniff the bound hosts so we can look up the node based on any
         * address on which it is listening. This is useful in Elasticsearch's
         * test framework where we sometimes publish ipv6 addresses but the
         * tests contact the node on ipv4.
         */
        Set<HttpHost> boundHosts = new HashSet<>();
        String name = null;
        String version = null;
        /*
         * Multi-valued attributes come with key = `real_key.index` and we
         * unflip them after reading them because we can't rely on the order
         * that they arive.
         */
        final Map<String, String> protoAttributes = new HashMap<String, String>();

        boolean sawRoles = false;
        boolean master = false;
        boolean data = false;
        boolean ingest = false;

        String fieldName = null;
        while (parser.nextToken() != JsonToken.END_OBJECT) {
            if (parser.getCurrentToken() == JsonToken.FIELD_NAME) {
                fieldName = parser.getCurrentName();
            } else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
                if ("http".equals(fieldName)) {
                    while (parser.nextToken() != JsonToken.END_OBJECT) {
                        if (parser.getCurrentToken() == JsonToken.VALUE_STRING && "publish_address".equals(parser.getCurrentName())) {
                            URI publishAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString());
                            publishedHost = new HttpHost(publishAddressAsURI.getHost(), publishAddressAsURI.getPort(),
                                    publishAddressAsURI.getScheme());
                        } else if (parser.currentToken() == JsonToken.START_ARRAY && "bound_address".equals(parser.getCurrentName())) {
                            while (parser.nextToken() != JsonToken.END_ARRAY) {
                                URI boundAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString());
                                boundHosts.add(new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(),
                                        boundAddressAsURI.getScheme()));
                            }
                        } else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
                            parser.skipChildren();
                        }
                    }
                } else if ("attributes".equals(fieldName)) {
                    while (parser.nextToken() != JsonToken.END_OBJECT) {
                        if (parser.getCurrentToken() == JsonToken.VALUE_STRING) {
                            String oldValue = protoAttributes.put(parser.getCurrentName(), parser.getValueAsString());
                            if (oldValue != null) {
                                throw new IOException("repeated attribute key [" + parser.getCurrentName() + "]");
                            }
                        } else {
                            parser.skipChildren();
                        }
                    }
                } else {
                    parser.skipChildren();
                }
            } else if (parser.currentToken() == JsonToken.START_ARRAY) {
                if ("roles".equals(fieldName)) {
                    sawRoles = true;
                    while (parser.nextToken() != JsonToken.END_ARRAY) {
                        switch (parser.getText()) {
                        case "master":
                            master = true;
                            break;
                        case "data":
                            data = true;
                            break;
                        case "ingest":
                            ingest = true;
                            break;
                        default:
                            logger.warn("unknown role [" + parser.getText() + "] on node [" + nodeId + "]");
                        }
                    }
                } else {
                    parser.skipChildren();
                }
            } else if (parser.currentToken().isScalarValue()) {
                if ("version".equals(fieldName)) {
                    version = parser.getText();
                } else if ("name".equals(fieldName)) {
                    name = parser.getText();
                }
            }
        }
        //http section is not present if http is not enabled on the node, ignore such nodes
        if (publishedHost == null) {
            logger.debug("skipping node [" + nodeId + "] with http disabled");
            return null;
        }

        Map<String, List<String>> realAttributes = new HashMap<>(protoAttributes.size());
        List<String> keys = new ArrayList<>(protoAttributes.keySet());
        for (String key : keys) {
            if (key.endsWith(".0")) {
                String realKey = key.substring(0, key.length() - 2);
                List<String> values = new ArrayList<>();
                int i = 0;
                while (true) {
                    String value = protoAttributes.remove(realKey + "." + i);
                    if (value == null) {
                        break;
                    }
                    values.add(value);
                    i++;
                }
                realAttributes.put(realKey, unmodifiableList(values));
            }
        }
        for (Map.Entry<String, String> entry : protoAttributes.entrySet()) {
            realAttributes.put(entry.getKey(), singletonList(entry.getValue()));
        }

        if (version.startsWith("2.")) {
            /*
             * 2.x doesn't send roles, instead we try to read them from
             * attributes.
             */
            boolean clientAttribute = v2RoleAttributeValue(realAttributes, "client", false);
            Boolean masterAttribute = v2RoleAttributeValue(realAttributes, "master", null);
            Boolean dataAttribute = v2RoleAttributeValue(realAttributes, "data", null);
            master = masterAttribute == null ? false == clientAttribute : masterAttribute;
            data = dataAttribute == null ? false == clientAttribute : dataAttribute;
        } else {
            assert sawRoles : "didn't see roles for [" + nodeId + "]";
        }
        assert boundHosts.contains(publishedHost) :
                "[" + nodeId + "] doesn't make sense! publishedHost should be in boundHosts";
        logger.trace("adding node [" + nodeId + "]");
        return new Node(publishedHost, boundHosts, name, version, new Roles(master, data, ingest),
                unmodifiableMap(realAttributes));
    }

    /**
     * Returns {@code defaultValue} if the attribute didn't come back,
     * {@code true} or {@code false} if it did come back as
     * either of those, or throws an IOException if the attribute
     * came back in a strange way.
     */
    private static Boolean v2RoleAttributeValue(Map<String, List<String>> attributes,
            String name, Boolean defaultValue) throws IOException {
        List<String> valueList = attributes.remove(name);
        if (valueList == null) {
            return defaultValue;
        }
        if (valueList.size() != 1) {
            throw new IOException("expected only a single attribute value for [" + name + "] but got "
                    + valueList);
        }
        switch (valueList.get(0)) {
        case "true":
            return true;
        case "false":
            return false;
        default:
            throw new IOException("expected [" + name + "] to be either [true] or [false] but was ["
                    + valueList.get(0) + "]");
        }
    }

    //......
}
  • readNode方法用于解析nodes部分的数据,它会解析http、attributes、roles、version;然后会对2.x版本的进行特殊处理,最后使用publishedHost、boundHosts、name、version、master、data、ingest、realAttributes构建Node实例并返回

小结

  • NodesSniffer接口定义了sniff方法用于获取sniffed Elasticsearch nodes,它有一个实现类为ElasticsearchNodesSniffer
  • ElasticsearchNodesSniffer的构造器需要restClient、sniffRequestTimeoutMillis、scheme三个参数,其中sniffRequestTimeoutMillis默认为1秒,scheme默认为HTTP;它的构造器创建了GET /_nodes/http的request;sniff方法使用restClient.performRequest来执行这个GET /_nodes/http的request,之后调用readHosts来解析response,其中调用了readNode方法来解析nodes部分
  • readNode方法用于解析nodes部分的数据,它会解析http、attributes、roles、version;然后会对2.x版本的进行特殊处理,最后使用publishedHost、boundHosts、name、version、master、data、ingest、realAttributes构建Node实例并返回

doc


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Letting Go of the Words

Letting Go of the Words

Janice (Ginny) Redish / Morgan Kaufmann / 2007-06-11 / USD 49.95

"Redish has done her homework and created a thorough overview of the issues in writing for the Web. Ironically, I must recommend that you read her every word so that you can find out why your customer......一起来看看 《Letting Go of the Words》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

SHA 加密
SHA 加密

SHA 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器