dubbo之timeout超时分析

栏目: 后端 · 发布时间: 6年前

内容简介:在使用dubbo时,通常会遇到timeout这个属性,timeout属性的作用是:给某个服务调用设置超时时间,如果服务在设置的时间内未返回结果,则会抛出调用超时异常:TimeoutException,在使用的过程中,我们有时会对provider和consumer两个配置都会设置timeout值,那么服务调用过程中会以哪个为准?本文主要针对这个问题进行分析和扩展以provider配置为例:在dubbo中如果provider和consumer都配置了相同的一个属性,比如本文分析的timeout,其实是有一个优先

背景

在使用dubbo时,通常会遇到timeout这个属性,timeout属性的作用是:给某个服务调用设置超时时间,如果服务在设置的时间内未返回结果,则会抛出调用超时异常:TimeoutException,在使用的过程中,我们有时会对provider和consumer两个配置都会设置timeout值,那么服务调用过程中会以哪个为准?本文主要针对这个问题进行分析和扩展

三种设置方式

以provider配置为例:

  • 方法级别

    设置方式如下所示:

<dubbo:service interface="fy.test.service.TestService" ref="testServiceImpl">
   <dubbo:method name="test" timeout="10000"/>
</dubbo:service>
  • 接口级别
<dubbo:service interface="fy.test.service.TestService" ref="testServiceImpl" timeout="10000"/>
  • 全局级别
<dubbo:provider timeout="10000"/>

优先级选择

在dubbo中如果provider和consumer都配置了相同的一个属性,比如本文分析的timeout,其实是有一个优先级的,优先级:

consumer方法配置 > provider方法配置 > consumer接口配置 > provider接口配置 > consumer全局配置 > provider全局配置。所以对于本文开始的提出的问题就有了结果,会以消费者配置的为准,接下结合源码来进行解析,其实源码很简单,在RegistryDirectory类中将服务列表转换为DubboInvlker方法中进行了处理:

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                        " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                        " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                        ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            // 重点就是下面这个方法
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

重点就是上面 mergeUrl() 方法,将provider和comsumer的url参数进行了整合,在

mergeUrl()方法有会调用ClusterUtils.mergeUrl方法进行整合,因为这个方法比较简单,就是对一些参数进行了整合了,会用consumer参数进行覆盖,咱们这里就不分析了,如果感兴趣的同学可以去研究一下。

超时处理

在配置设置了超时timeout,那么代码中是如何处理的,这里咱们在进行一下扩展,分析一下dubbo中是如何处理超时的,在调用服务方法,最后都会调用DubboInvoker.doInvoke方法,咱们就从这个方法开始分析:

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                // For compatibility
                FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);

                Result result;
                // 异步处理
                if (isAsyncFuture) {
                    // register resultCallback, sometimes we need the async result being processed by the filter chain.
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                // 同步处理
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

在这个方法中,咱们就以同步模式进行分析,看request方法,request()方法会返回一个DefaultFuture类,在去调用DefaultFuture.get()方法,这里其实涉及到一个在异步中实现同步的技巧,咱们这里不做分析,所以重点就在get()方法里:

@Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

在调用get()方法时,会去调用get(timeout)这个方法,在这个方法中会传一个timeout字段,在和timeout就是咱们配置的那个参数,在这个方法中咱们要关注下面一个代码块:

if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    // 线程阻塞
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 在超时时间里,还没有结果,则抛出超时异常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }

重点看await()方法,会进行阻塞timeout时间,如果阻塞时间到了,则会唤醒往下执行,超时跳出while循环中,判断是否有结果返回,如果没有(这个地方要注意:只有有结果返回,或超时才跳出循环中),则抛出超时异常。讲到这里,超时原理基本上其实差不多了,DefaultFuture这个类还有个地方需要注意,在初始化DefaultFuture对象时,会去创建一个超时的延迟任务,延迟时间就是timeout值,在这个延迟任务中也会调用signal()方法唤醒阻塞


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Programming PHP

Programming PHP

Rasmus Lerdorf、Kevin Tatroe、Peter MacIntyre / O'Reilly Media / 2006-5-5 / USD 39.99

Programming PHP, 2nd Edition, is the authoritative guide to PHP 5 and is filled with the unique knowledge of the creator of PHP (Rasmus Lerdorf) and other PHP experts. When it comes to creating websit......一起来看看 《Programming PHP》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

MD5 加密
MD5 加密

MD5 加密工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试