赞
踩
Dubbo通过注册中心实现了分布式环境中各服务之间的注册与发现,是各个分布式节点之间的纽带。其主要作用如下:
Dubbo主要包含ZooKeeper、Nacos、Multicast 等注册中心的实现,其中ZooKeeper是官方推荐的注册中心。
ZooKeeper是树形结构的注册中心,存在多种节点类型,具体可分为:
Dubbo使用ZooKeeper作为注册中心时,只会创建 持久节点 和 临时节点 两种,节点树形结构如图:
在Dubbo框架启动时,会根据用户配置的服务,在注册中心中创建4个目录,在providers和consumers目录中分别存储服务提供方、消费方元数据信息,主要包括IP、端口、权重和应用名等数据,同时服务元数据中的所有参数都是以键值对形式存储的。
ZooKeeper发布代码非常简单,只是调用了ZooKeeper的客户端库在注册中心上创建一个目录,取消发布也很简单,只是把ZooKeeper注册中心上对应的路径删除,代码如下:
# 发布代码
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
# 取消发布
zkClient.delete(toUrlPath(url));
订阅通常有pull和push两种方式,一种是客户端定时轮询注册中心拉取配置,另一种是注册中心主动推送数据给客户端。这两种方式各有利弊,目前Dubbo采用的是第一次启动拉取方式,后续接收事件重新拉取数据。
在服务暴露时,服务端会订阅configurators用于监听动态配置,在消费端启动时,消费端会订阅providers、routers和configurators这三个目录,分别对应服务提供者、路由和动态配置变更通知。 核心代码来自 ZookeeperRegistry,具体代码如下:
public void doSubscribe(final URL url, final NotifyListener listener) { try { checkDestroyed(); // 订阅全量服务(主要支持Dubbo服务治理平台(dubbo-admin),平台在启动时会订阅全量接口,它会感知每个服务的状态) if (ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); boolean check = url.getParameter(CHECK_KEY, false); // 订阅所有数据 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChildren) -> { // 子节点有变化则会接到通知,遍历所有的子节点 for (String child : currentChildren) { child = URL.decode(child); // 如果存在子节点还未被订阅,说明是新的节点,则订阅 if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(check)), k); } } }); // 创建持久节点,接下来订阅持久节点的直接子节点 zkClient.create(root, false); // 增加当前节点的订阅,并且会返回该节点下所有子节点列表 List<String> services = zkClient.addChildListener(root, zkListener); if (CollectionUtils.isNotEmpty(services)) { // 遍历所有节点进行订阅 for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(check)), listener); } } } else { // 订阅类别服务(普通消费者的订阅逻辑) CountDownLatch latch = new CountDownLatch(1); try { List<URL> urls = new ArrayList<>(); // 根据URL的类别得到一组需要订阅的路径。如果类别是*,则会订阅四种类型的路径(providers、routers、consumers> configurators),否则只订阅providers路径 for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k, latch)); if (zkListener instanceof RegistryChildListenerImpl) { ((RegistryChildListenerImpl) zkListener).setLatch(latch); } zkClient.create(path, false); // 订阅,返回该节点下的子路径并缓存 List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 回调 NotifyListener 更新本地缓存信息 notify(url, listener, urls); } finally { // 告诉侦听器仅在主线程的同步通知完成后运行 latch.countDown(); } } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
缓存的存在就是用空间换取时间,如果每次远程调用都要先从注册中心获取一次可调用的服务列表,则会让注册中心承受巨大的流量压力。另外,每次额外的网络请求也会让整个系统的性能下降,Dubbo的注册中心实现了通用的缓存机制,在抽象类AbstractRegistry中实现。AbstractRegistry类结构关系如图:
消费者或服务治理中心获取注册信息后会做本地缓存。内存中会有一份,保存在 Properties 对象里,磁盘上也会持久化一份文件,通过file对象引用。在AbstractRegistry抽象类中有如下定义:
// 本地磁盘缓存,其中特殊key value.registries记录注册中心列表,其他为通知服务提供者列表
private final Properties properties = new Properties();
// 盘文件服务缓存对象
private File file;
// 内存中的服务缓存对象,外层Map的key是消费者的 URL,内层 Map 的 key 是分类,包含 providers、consumers、routes、configurators四种。value则是对应的服务列表,对于没有服务提供者提供服务的URL,它会以特殊的empty://前缀开头
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
在服务初始化的时候,AbstractRegistry构造函数里会从本地磁盘文件中把持久化的注册数据读到Properties对象里,并加载到内存缓存中,核心代码如下:
public AbstractRegistry(URL url) { ······ loadProperties(); notify(url.getBackupUrls()); } private void loadProperties() { ······ // 读取磁盘上的文件并加载到 properties 中 try (InputStream in = Files.newInputStream(file.toPath())) { properties.load(in); if (logger.isInfoEnabled()) { logger.info("Loaded registry cache file " + file); } } ······ }
Properties保存了所有服务提供者的URL,使用URL#serviceKey()作为key,提供者列表、 路由规则列表、配置规则列表等作为value。由于value是列表,当存在多个的时候使用空格隔开。还有一个特殊的key.registies,保存所有的注册中心的地址。如果应用在启动过程中,注册中心无法连接或宕机,则Dubbo框架会自动通过本地缓存加载Invoker
缓存的保存有同步和异步两种方式。异步会使用线程池异步保存,具体核心代码如下:
private void saveProperties(URL url) {
if (syncSaveFile) {
// 同步保存
doSaveProperties(version);
} else {
// 异步保存
registryCacheExecutor.execute(() -> doSaveProperties(version));
}
}
AbstractRegistry#notify方法中封装了更新内存缓存和更新文件缓存的逻辑,具体代码如下:
/** * Notify changes from the Provider side. * * @param url consumer side url * @param listener listener * @param urls provider latest urls */ protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size()); } // keep every provider's category. Map<String, List<URL>> result = new HashMap<>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getCategory(DEFAULT_CATEGORY); List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); listener.notify(categoryList); // 在每次通知后更新缓存文件,当由于网络抖动导致订阅失败时,我们至少可以返回现有的缓存URL if (localCacheEnabled) { saveProperties(url); } } }
FailbackRegistry 继承了 AbstractRegistry,并在此基础上增加了失败重试机制作为抽象能力,FailbackRegistry 抽象类中定义了一个 HashedWheelTimer,每经过固定间隔(默认为5秒)调用AbstractRetryTask#doRetry()方法,AbstractRetryTask类结构信息如下:
/* retry task map */
// 取消订阅失败的监听器集合
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<>();
// 发起订阅失败的监听器集合
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<>();
// 取消注册失败的URL集合
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<>();
// 发起注册失败的URL集合
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<>();
在定时器中调用retry方法的时候,会把对应集合分别遍历和重试,重试成功则从集合中移除。
Dubbo注册中心拥有良好的扩展性,用户可以在其基础上,快速开发出符合自己业务需求的注册中心。这种扩展性和Dubbo中使用的设计模式密不可分,注册中心模块使用的设计模式:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。