聊聊Elasticsearch RestClient的DeadHostState

栏目: 后端 · 发布时间: 4年前

内容简介:本文主要研究一下Elasticsearch RestClient的DeadHostStateelasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.javaelasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RestClient.java

本文主要研究一下Elasticsearch RestClient的DeadHostState

DeadHostState

elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java

/**
 * Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and
 * when the host should be retried (based on number of previous failed attempts).
 * Class is immutable, a new copy of it should be created each time the state has to be changed.
 */
final class DeadHostState implements Comparable<DeadHostState> {

    private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
    static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);

    private final int failedAttempts;
    private final long deadUntilNanos;
    private final TimeSupplier timeSupplier;

    /**
     * Build the initial dead state of a host. Useful when a working host stops functioning
     * and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so.
     *
     * @param timeSupplier a way to supply the current time and allow for unit testing
     */
    DeadHostState(TimeSupplier timeSupplier) {
        this.failedAttempts = 1;
        this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
        this.timeSupplier = timeSupplier;
    }

    /**
     * Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence
     * it already failed for one or more consecutive times. The more failed attempts we register the longer we wait
     * to retry that same host again. Minimum is 1 minute (for a node the only failed once created
     * through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
     *
     * @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt
     */
    DeadHostState(DeadHostState previousDeadHostState) {
        long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
                MAX_CONNECTION_TIMEOUT_NANOS);
        this.deadUntilNanos = previousDeadHostState.timeSupplier.nanoTime() + timeoutNanos;
        this.failedAttempts = previousDeadHostState.failedAttempts + 1;
        this.timeSupplier = previousDeadHostState.timeSupplier;
    }

    /**
     * Indicates whether it's time to retry to failed host or not.
     *
     * @return true if the host should be retried, false otherwise
     */
    boolean shallBeRetried() {
        return timeSupplier.nanoTime() - deadUntilNanos > 0;
    }

    /**
     * Returns the timestamp (nanos) till the host is supposed to stay dead without being retried.
     * After that the host should be retried.
     */
    long getDeadUntilNanos() {
        return deadUntilNanos;
    }

    int getFailedAttempts() {
        return failedAttempts;
    }

    @Override
    public int compareTo(DeadHostState other) {
        if (timeSupplier != other.timeSupplier) {
            throw new IllegalArgumentException("can't compare DeadHostStates with different clocks ["
                    + timeSupplier + " != " + other.timeSupplier + "]");
        }
        return Long.compare(deadUntilNanos, other.deadUntilNanos);
    }

    @Override
    public String toString() {
        return "DeadHostState{" +
                "failedAttempts=" + failedAttempts +
                ", deadUntilNanos=" + deadUntilNanos +
                ", timeSupplier=" + timeSupplier +
                '}';
    }

    /**
     * Time supplier that makes timing aspects pluggable to ease testing
     */
    interface TimeSupplier {
        TimeSupplier DEFAULT = new TimeSupplier() {
            @Override
            public long nanoTime() {
                return System.nanoTime();
            }

            @Override
            public String toString() {
                return "nanoTime";
            }
        };

        long nanoTime();
    }
}
(long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1)
timeSupplier.nanoTime() - deadUntilNanos > 0

RestClient

elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RestClient.java

public class RestClient implements Closeable {
    //......

    /**
     * Select nodes to try and sorts them so that the first one will be tried initially, then the following ones
     * if the previous attempt failed and so on. Package private for testing.
     */
    static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist,
                                      AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException {
        /*
         * Sort the nodes into living and dead lists.
         */
        List<Node> livingNodes = new ArrayList<>(Math.max(0, nodeTuple.nodes.size() - blacklist.size()));
        List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
        for (Node node : nodeTuple.nodes) {
            DeadHostState deadness = blacklist.get(node.getHost());
            if (deadness == null) {
                livingNodes.add(node);
                continue;
            }
            if (deadness.shallBeRetried()) {
                livingNodes.add(node);
                continue;
            }
            deadNodes.add(new DeadNode(node, deadness));
        }

        if (false == livingNodes.isEmpty()) {
            /*
             * Normal state: there is at least one living node. If the
             * selector is ok with any over the living nodes then use them
             * for the request.
             */
            List<Node> selectedLivingNodes = new ArrayList<>(livingNodes);
            nodeSelector.select(selectedLivingNodes);
            if (false == selectedLivingNodes.isEmpty()) {
                /*
                 * Rotate the list using a global counter as the distance so subsequent
                 * requests will try the nodes in a different order.
                 */
                Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
                return selectedLivingNodes;
            }
        }

        /*
         * Last resort: there are no good nodes to use, either because
         * the selector rejected all the living nodes or because there aren't
         * any living ones. Either way, we want to revive a single dead node
         * that the NodeSelectors are OK with. We do this by passing the dead
         * nodes through the NodeSelector so it can have its say in which nodes
         * are ok. If the selector is ok with any of the nodes then we will take
         * the one in the list that has the lowest revival time and try it.
         */
        if (false == deadNodes.isEmpty()) {
            final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);
            /*
             * We'd like NodeSelectors to remove items directly from deadNodes
             * so we can find the minimum after it is filtered without having
             * to compare many things. This saves us a sort on the unfiltered
             * list.
             */
            nodeSelector.select(new Iterable<Node>() {
                @Override
                public Iterator<Node> iterator() {
                    return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
                }
            });
            if (false == selectedDeadNodes.isEmpty()) {
                return singletonList(Collections.min(selectedDeadNodes).node);
            }
        }
        throw new IOException("NodeSelector [" + nodeSelector + "] rejected all nodes, "
                + "living " + livingNodes + " and dead " + deadNodes);
    }

    /**
     * Called after each successful request call.
     * Receives as an argument the host that was used for the successful request.
     */
    private void onResponse(Node node) {
        DeadHostState removedHost = this.blacklist.remove(node.getHost());
        if (logger.isDebugEnabled() && removedHost != null) {
            logger.debug("removed [" + node + "] from blacklist");
        }
    }

    /**
     * Called after each failed attempt.
     * Receives as an argument the host that was used for the failed attempt.
     */
    private void onFailure(Node node) {
        while(true) {
            DeadHostState previousDeadHostState =
                blacklist.putIfAbsent(node.getHost(), new DeadHostState(TimeSupplier.DEFAULT));
            if (previousDeadHostState == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("added [" + node + "] to blacklist");
                }
                break;
            }
            if (blacklist.replace(node.getHost(), previousDeadHostState,
                    new DeadHostState(previousDeadHostState))) {
                if (logger.isDebugEnabled()) {
                    logger.debug("updated [" + node + "] already in blacklist");
                }
                break;
            }
        }
        failureListener.onFailure(node);
    }

    //......
}
  • RestClient的selectNodes方法首先将不在blacklist中的node,或者在blacklist中但是shallBeRetried返回为true的node归为livingNodes,之后通过nodeSelector来从这些livingNodes中选择一个node
  • RestClient的onResponse方法会将该node的host从blacklist中移除
  • RestClient的onFailure方法会往blacklist创建或更新host对应的DeadHostState,如果之前该host没有DeadHostState则使用TimeSupplier.DEFAULT创建一个新的并放入blacklist,如果该host已经有DeadHostState则使用该DeadHostState创建新的DeadHostState然后更新到blacklist中

小结

timeSupplier.nanoTime() - deadUntilNanos > 0
(long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1)

doc


以上所述就是小编给大家介绍的《聊聊Elasticsearch RestClient的DeadHostState》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

解密搜索引擎技术实战

解密搜索引擎技术实战

罗刚 / 2011-6 / 69.80元

《解密搜索引擎技术实战-Lucene&Java精华版(附盘)》,本书主要包括总体介绍部分、爬虫部分、自然语言处理部分、全文检索部分以及相关案例分析。爬虫部分介绍了网页遍历方法和如何实现增量抓取,并介绍了从网页等各种格式的文档中提取主要内容的方法。自然语言处理部分从统计机器学习的原理出发,包括了中文分词与词性标注的理论与实现以及在搜索引擎中的实用等细节,同时对文档排重、文本分类、自动聚类、句法分析树......一起来看看 《解密搜索引擎技术实战》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具