是什么?
俗称:配置中心,是一个基于“发布-订阅”模型的分布式通信框架
常用应用场景
- 场景1. HSF服务 。服务发布者发布服务,服务调用者订阅服务,ConfigServer向订阅服务的调用者推送可访问的服务IP地址列表,服务调用者从中选一个发起远程调用。
- 场景2. notify 。消息发送和接收,都要通过notifyserver来完成。notifyclient端也是通过configserver来获取notifyserver的地址列表的。
特点
主动推
客户端(不管是发布者还是订阅者)和Server之间维持着一个TCP长连接,server通过连接往里面写数据,相当于主动的通知了客户端。
数据聚合
ConfigServer会自动把注册相同dataId的多个客户端聚合在一起
适合非持久型数据
数据的生命周期和发布者的TCP连接的生命周期相同,一旦发布者连接断开后,发布的数据就失效了。
动态感知发布者断连
两种情况
- 发布者 正常断开连接 (TCP连接):通过连接事件,TCP连接断开时,应用层ConfigServer是可以感知到的,然后ConfigServer把断开的连接对应的数据删除。
- 发布者 异常断开连接 ,比如直接拔网线。这样应用层是无法直接感知到的。所以需要一种通知机制: 心跳机制 。发布者每隔3秒向ConfigServer发送一个心跳包,用以维持与ConfigServer的连接。ConfigServer定时查看当前时间与上次更新时间的间隔,判断连接是否失效。如果失效,就删除这个连接对应的数据,同时通知订阅者。
Server端
- Server端是分布式部署的,有多个节点,无主仆之分,是对等关系,同时工作。
- 每个Server数据是一致的: 每个Server节点的数据都是 全量数据 ,Server两两之间进行数据同步。同步过程中通过更新数据的版本号,保证数据一致性。
发布数据的表示
客户端连接的表示
ClientConnection类
主要功能:
- 注册发布者 addPublisher(GroupId group,String clientId)
- 注册订阅者 addSubscriber(GroupId group,String clientId)
- 发布数据 publishData(..) 发布的数据封装在PubInfo类中
- 删除发布者 removePublisher(..)
- 延续指定client的生命周期 通过renew() 方法更新lastRenewTime为当前时间
通过两个ConcurrentHashMap保存连接上注册的发布者和订阅者:
- // 并发访问,client线程和推送线程
- private final ConcurrentHashMap<GroupId, String> subscribers = new ConcurrentHashMap<GroupId, String>(
- 16, 0.75f, 1);
- // 并发访问,client线程和推送线程
- private final ConcurrentHashMap<GroupId, PubInfo> publishers = new ConcurrentHashMap<GroupId, PubInfo>(
- 16, 0.75f, 1);
连接管理
ClientConnectionWorker类
此类是用于管理客户端连接的线程。
执行以下任务:
- 处理连接上的请求
- 向连接推送数据 通过对连接的哈希值对线程个数取模,对连接归组,每个cconnectionWorker管理一组连接
- private final Map<Connection, ClientConnection> nativeClients;
- private final Map<String, ClientConnection> clusterClients;
clusterClients:集群客户, 保存集群中其他服务器同步来的客户端
nativeClients :原生客户端,保存直连到本服务器的客户端
ClientConnectionWorkers 类
客户端连接线程池,向ClientConnectionWorker分派任务。
static private final ClientConnectionWorker[] clientWorkers;
有两种分派方式:
- /**
- * 由originalClientIp找到client线程,并向其分派执行任务。请求线程不需要知道结果。
- */
- static public void dispatch(String clientIp, Runnable task) {
- if (clientIp.contains(":")) {
- new Exception("illegal clientIp, " + clientIp).printStackTrace();
- System.exit(0);
- }
-
- ClientConnectionWorker worker = getWorker(clientIp);
- worker.execute(task);
- }
- /**
- * 向所有ConnectionWorker线程分派执行任务。请求线程通过返回的List<Future>来获得执行结果。
- */
- static public <V> List<Future<V>> dispatchToAllWorkers(Callable<V> task) {
- List<Future<V>> futures = new ArrayList<Future<V>>(clientWorkers.length);
- for (ClientConnectionWorker worker : clientWorkers) {
- futures.add(worker.execute(task));
- }
- return futures;
- }
数据推送
DefaultPushService类(向客户端推送)
监听组数据的变化和订阅者新增事件,并向客户端推送数据。
继承了EventListener的时间监听机制
onEvent(Event event)方法:根据不同的事件,做出不同处理
组数据变化事件:全推(获取该组所有发布者的数据,向该组所有订阅者所在的连接发送数据)
新增订阅者事件:单推(获取该订阅者所在组中所有发布者的数据,向该订阅者所属的连接发送数据)
- @Override
- public void onEvent(Event event) {
- if (event instanceof GroupDataChangedEvent) {
- GroupDataChangedEvent gdce = (GroupDataChangedEvent) event;
- scheduleAllPush(gdce.group);
- }
- else if (event instanceof SubscriberAddedEvent) {
- SubscriberAddedEvent sae = (SubscriberAddedEvent) event;
- scheduleSinglePush(sae.group, sae.client);
- }
- }
-
- @Override
- public void scheduleSinglePush(GroupId group, ClientConnection dest) {
- if (dest.getConnection().isConnected()) {
- PushDelayTask task = new PushDelayTask(group, dest);
- pushTaskManager.addTask(task);
- }
- }
-
- @Override
- public void scheduleAllPush(GroupId group) {
- PushDelayTask task = new PushDelayTask(group);
- pushTaskManager.addTask(task);
- }
上面代码里的PushDelayTask是延期推送任务类,由TaskManager线程执行,到期后立即推送。
推送执行任务类是PushExecuteTask,实现了Runnable接口,其run方法主要工作就是数据压缩、打包、发送。
推送部分源码:
- // Send
- Connection connection = client.getConnection();
- RequestControl control = new RequestControlImpl(connection.lastRequestProtocol(),
- RESP_TIMEOUT);
- ResponseCallback callback = new PushDataCallback(client);
- //回调Client的invokeWithCallback()方法,把打包好的数据protocolPackage作为参数推送到客户端。
- connection.getClient().invokeWithCallback(protocolPackage, callback, control);
DefaultConfigClusterService类(集群间推送)
监听:数据发布、发布者注销、客户端断连。
事件处理:通知其他Server
数据发布事件包括:原生发布、集群间发布
连接断开事件的触发条件包括:
1. 原生连接(直接TCP连接到本服务器)断开
2. 收到其他Server的通知
3. 长时间收不到renew包
DefaultConfigClusterService类会启动一个周期性任务,向连接线程池中的所有线程委派心跳任务 SyncRenewTask
- // 定时发送renew包
- timerService.scheduleWithFixedDelay(new SyncRenewSchedulerTask(),SyncRenewSchedulerTask.RENEW_PERIOD);
SyncRenewTask表示心跳任务,被委派给ConnectionWorker线程去执行。
1. 对管理的native client,向其他server发送心跳包
2. 对管理的cluster client,计算最后一次renew到当前的时间,如果超过特定时长(过期时长是3分钟),视为断开连接,将其删除。
Client端原理
发布者和订阅者
发布者和订阅者的注册
类结构如下:
基本注册信息:
心跳
使用的是remoting的ConnectionHeartBeat类(心跳包)和HeartBeatProcessor类(心跳处理类)
内部只有一个String 类型的clientUrl属性
也就是说客户端发送心跳数据包,内容很少,只有客户端连接的url