赞
踩
nacos 服务端保存了配置信息,客户端连接到服务端之后,根据 dataID、group及tanant(就是Namesapce)可以获取到具体的配置信息,当服务端的配置发生变更时,客户端会通过长轮询的HTTP长连接获取到变更通知,然后再由客户端主动去获取变更后的配置信息。
1、Nacos 服务端创建了相关的配置项(创建 ConfigService并实例化 ConfigService);
2、客户端添加 Listener进行监听:
客户端通过在ClientWorker中启动一个定期检查配置的线程用于检查配置的更新(该线程会10毫秒触发一次),该检查线程中有一个checkConfigInfo()方法:
该方法中通过将任务进行分组的方式,启动一个或多个LongPollingRunnable线程从服务端获取配置:
此处ParamUtil.getPerTaskConfigSize()用于获取每个Long Polling线程任务处理的任务数,默认值为3000,因而通常在单个Consumer中都只会启动一个Long Polling线程。
Long Polling线程启动了一个Long Polling的HTTP长连接,用于从服务端获取配置:
发送的Params内容包括dataId、groupName、本地配置的md5值以及namespace,如下所示:
(注:为了取运行时变量,只有通过拍照才可以得到,截屏运行时变量会自动消失)
Nacos服务端接收到请求后,会比较当前namespace下对应groupName中名为指定dataId的配置的md5值与传上来的md5值是否相等,如果不相等则说明配置有变化,则返回有变化的dataId、groupName及namespace(注:代码中tenant表示namespace),但是不返回实际的配置内容,并断开当前Long Polling Http连接,然后客户端再通过单独的http请求获取实际的配置内容;如果配置没有变化,则不返回任何内容,然后一直保持该Long Polling Http连接直到超时(由header中的Long-Polling-Timeout指定的timeout值,默认为30秒),然后由服务端断开该连接。
每个长连接的默认时间为30秒(可以通过修改属性值configLongPollTimeout修改,):
LongPollingRunnable线程:
其中会通过调用(cacheData.checkListenerMd5())来检查自己监听的配置项的数据的:
一旦服务端的数据发生变化时,客户端将会获取到最新的数据,并将最新的数据保存在一个 CacheData 对象中,然后会重新计算 CacheData 的 md5 属性的值,此时(safeNotifyListener())就会通过单独线程对该 CacheData 所绑定的 Listener 触发 receiveConfigInfo 回调:
listener.receiveConfigInfo(...)中的listener这里是Listener的具体实现DelegatingEventPublishingListener,具体方法如下:
在onReceived方法中,delegate变量为NacosPropertySourcePostProcessor中定义内部Listener实现:
该Listener实现会更新注入到Bean中的变量的值,NacosPropertySourcePostProcessor本身也为解析及更新Nacos注解变量的核心PostProcessor。
综合这些分析,Nacos做为配置中心,通过Long Polling的HTTP请求,可以做到准实时的配置变化通知,也能够通得知Nacos客户端获取到最新的配置,是通过定时Long Polling拉取的方式而非服务端Push给客户端的方式,客户端拉取到最新的配置信息后,再将最新的数据推送给Listener 的持有者,并更新配置。
实际是通过反射调用了 NacosConfigService 的构造方法来创建 ConfigService 的,而且是有一个 Properties 参数的构造方法。
需要注意的是,这里并没有通过单例或者缓存技术,也就是说每次调用都会重新创建一个 ConfigService的实例。
实例化时主要是初始化了两个对象,他们分别是:
agent 是通过装饰者模式实现的,ServerHttpAgent 是实际工作的类,MetricsHttpAgent 在内部也是调用了 ServerHttpAgent 的方法,另外加上了一些统计操作,所以我们只需要关心 ServerHttpAgent 的功能就可以了。
ClientWorker
checkConfigInfo方法是取出了一批任务,然后提交给executorService线程池去执行,执行的任务就是LongPollingRunnable,每个任务都有一个 taskId。
首先取出与该 taskId 相关的 CacheData,然后对 CacheData 进行检查,包括本地配置检查和监听器的 md5 检查,本地检查主要是做一个故障容错,当服务端挂掉后,Nacos 客户端可以从本地的文件系统中获取相关的配置信息,如下图所示:
通过 checkUpdateDataIds() 方法从服务端获取那些值发生了变化的 dataId 列表,
通过 getServerConfig 方法,根据 dataId 到服务端获取最新的配置信息,接着将最新的配置信息保存到 CacheData 中。
最后调用 CacheData 的 checkListenerMd5 方法,可以看到该方法在第一部分也被调用过。
在该任务的最后,也就是在 finally 中又重新通过 executorService 提交了本任务。
调用了ClientWorker 的 addTenantListeners 方法为 ConfigService 来添加一个 Listener
listener 最终是被这里的 CacheData 所持有了,那 listener 的回调方法 receiveConfigInfo 就应该是在 CacheData 中触发的。CacheData 是出现频率非常高的一个类,在 LongPollingRunnable 的任务中,几乎所有的方法都围绕着 CacheData 类,现在添加 Listener 的时候,实际上该 Listener 也被委托给了 CacheData。
成员变量:
在 ClientWorker 中的定时任务中,启动了一个长轮询的任务:LongPollingRunnable,该任务多次执行了 cacheData.checkListenerMd5() 方法:
该方法会检查 CacheData 当前的 md5 与 CacheData 持有的所有 Listener 中保存的 md5 的值是否一致,如果不一致,就执行一个安全的监听器的通知方法:safeNotifyListener(),通应该是通知 Listener 的使用者,该 Listener 所关注的配置信息已经发生改变了。
那 CacheData 的 md5 值是何时发生改变的呢?我们可以回想一下,在上面的 LongPollingRunnable 所执行的任务中,在获取服务端发生变更的配置信息时,将最新的 content 数据写入了 CacheData 中,我们可以看下该方法如下:
在长轮询的任务中,当服务端配置信息发生变更时,客户端将最新的数据获取下来之后,保存在了 CacheData 中,同时更新了该 CacheData 的 md5 值,所以当下次执行 checkListenerMd5 方法时,就会发现当前 listener 所持有的 md5 值已经和 CacheData 的 md5 值不一样了,也就意味着服务端的配置信息发生改变了,这时就需要将最新的数据通知给 Listener 的持有者。
- package com.learn.nacos;
-
- import com.alibaba.nacos.api.NacosFactory;
- import com.alibaba.nacos.api.config.ConfigService;
- import com.alibaba.nacos.api.config.listener.Listener;
- import com.alibaba.nacos.api.exception.NacosException;
-
- import java.util.Properties;
- import java.util.concurrent.Executor;
-
- /**
- * @author liuliuyang001
- *
- */
- public class NacosConfig {
-
- public static void main(String[] args) {
- try {
- String serverAddr = "127.0.0.1:8848";
- String dataId = "nacos-sdk-java-config";
- String group = "DEFAULT_GROUP";
- String content = "nacos-sdk-java-config:init";
- Properties properties = new Properties();
- properties.put("serverAddr", serverAddr);
-
- // 方式一:创建ConfigService
- ConfigService configService = NacosFactory.createConfigService(serverAddr);
-
- // 方式二:创建ConfigService
- //ConfigService configService = ConfigFactory.createConfigService(serverAddr);
-
- // 方式三:创建ConfigService
- //ConfigService configService = NacosFactory.createConfigService(properties);
-
- // 方式四:创建ConfigService
- //ConfigService configService = ConfigFactory.createConfigService(properties);
-
- // 获取服务状态
- System.out.println("当前线程:" + Thread.currentThread().getName() + " ,服务状态:" + configService.getServerStatus());
-
- Listener configListener = new Listener() {
- public Executor getExecutor() {
- return null;
- }
-
- public void receiveConfigInfo(String configInfo) {
- System.out.println("当前线程:" + Thread.currentThread().getName() + " ,监听到配置内容变化:" + configInfo);
- }
- };
-
- // 添加监听(有时候能监听到,有时候监听不到,为什么?)
- System.out.println("添加监听");
- configService.addListener(dataId, group, configListener);
- System.out.println("添加监听成功");
-
- // 发布配置(多次运行,有时候会获取不到配置内容,难道要等待一段时间才能获取?)
- System.out.println("发布配置");
- configService.publishConfig(dataId, group, content);
- System.out.println("发布配置成功");
-
- try {
- Thread.sleep(1000 * 3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- // 本地缓存:\nacos\config\fixed-127.0.0.1_8848_nacos\snapshot\DEFAULT_GROUP
- // 读取配置
- String configAfterPublish = configService.getConfig(dataId, group, 3000);
- System.out.println("当前线程:" + Thread.currentThread().getName() + " ,发布配置后获取配置内容:" + configAfterPublish);
-
- // 重新发布配置
- content = "sdk-java-config:update";
- System.out.println("重新发布配置");
- boolean rePublishFlag = configService.publishConfig(dataId, group, content);
- if(rePublishFlag) {
- System.out.println("重新发布配置成功");
- } else {
- System.out.println("重新发布配置失败");
- }
-
- // 重新读取配置
- String configAfterUpdate = configService.getConfig(dataId, group, 3000);
- System.out.println("当前线程:" + Thread.currentThread().getName() + " ,重新发布配置后获取配置内容:" + configAfterUpdate);
-
- try {
- Thread.sleep(1000 * 30);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- // 移除配置
- System.out.println("移除配置");
- boolean removeFlag = configService.removeConfig(dataId, group);
- if(removeFlag) {
- System.out.println("移除配置成功");
- } else {
- System.out.println("移除配置失败");
- }
-
- try {
- Thread.sleep(1000 * 3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- String configAfterRemove = configService.getConfig(dataId, group, 3000);
- System.out.println("当前线程:" + Thread.currentThread().getName() + " ,移除配置后获取配置内容:" + configAfterRemove);
-
- /* // 取消监听
- System.out.println("取消监听");
- configService.removeListener(dataId, group, configListener);
- System.out.println("取消监听成功");*/
- } catch (NacosException e) {
- e.printStackTrace();
- }
- }
- }
声明:
文章的部份内容来自于https://blog.csdn.net/liyanan21/article/details/89161313,我是在该文章的基础之上补充了更多的细节。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。