内容简介:目前dubbo支持多种注册中心:Zookeeper、Redis、Simple、Multicast、Etcd3。本编文章是分析使用Zookeeper作为注册中心,dubbo如何整合Zookeeper进行服务注册和订阅服务。首先dubbo将服务注册到Zookeeper后,目录结构如下所示:(注册接口名:com.bob.dubbo.service.CityDubboService)
目前dubbo支持多种注册中心:Zookeeper、 Redis 、Simple、Multicast、Etcd3。
本编文章是分析使用Zookeeper作为注册中心,dubbo如何整合Zookeeper进行服务注册和订阅服务。
首先dubbo将服务注册到Zookeeper后,目录结构如下所示:(注册接口名:com.bob.dubbo.service.CityDubboService)
在consumer和provider服务启动的时候,去把自身URL格式化成字符串,然后注册到zookeeper相应节点下,作为临时节点,断开连接后,节点删除;consumer启动时,不仅会订阅服务,同时也会将自己的URL注册到zookeeper中;
ZookeeperRegistry
ZookeeperRegistry:dubbo与zookeeper交互主要的类,已下结合源码进行分析,先来看doSubcribe()方法:
@Override public void doSubscribe(final URL url, final NotifyListener listener) { try { // 处理所有service层发起的订阅,例如监控中心的订阅 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, (parentPath, currentChilds) -> { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } }); zkListener = listeners.get(listener); } zkClient.create(root, false); List<String> services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } // 处理指定service层发起的订阅,例如服务消费者的订阅 } else { List<URL> urls = new ArrayList<>(); // 循环分类数组 , router, configurator, provider for (String path : toCategoriesPath(url)) { // 获得 url 对应的监听器集合 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) {// 不存在,进行创建 zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get(url); } // 获得 ChildListener 对象 ChildListener zkListener = listeners.get(listener); if (zkListener == null) {// 不存在子目录的监听器,进行创建 ChildListener 对象 // 订阅父级目录, 当有子节点发生变化时,触发此回调函数,回调listener中的notify()方法 listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))); zkListener = listeners.get(listener); } // 向 Zookeeper ,PATH 节点,发起订阅,返回此节点下的所有子元素 path : /根节点/接口全名/providers, 比如 : /dubbo/com.bob.service.CityService/providers zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 首次全量数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener, 在这一步从连接Provider,实例化Invoker notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。