赞
踩
这里就体现出RsInfo
的用途啦,其实就是保存下实例的相关信息,IP
,端口,集群。遍历实例集群的实例,找出对应的IP
和端口的实例进行状态更新。如果发现有问题,还要用PushService
进行UDP
进行通知,UDP
端口是客户端请求的时候刷新服务实例列表的使用传上来的,客户端也有个PushReceiver
就是来接受UDP
报文信息,具体可以看这篇文章。
@Override public void run() { Service service = this.service; ... String ip = rsInfo.getIp();//IP String clusterName = rsInfo.getCluster();//集群名字 int port = rsInfo.getPort();//端口 Cluster cluster = service.getClusterMap().get(clusterName);//获取集群 List<Instance> instances = cluster.allIPs(true);//获取集群所有的临时服务实例 //遍历更新对应的状态 for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } instance.setLastBeat(System.currentTimeMillis());//刷新心跳时间 if (!instance.isMarked()) {//没被标记的 if (!instance.isHealthy()) {//不健康的 instance.setHealthy(true);//设置为健康 Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); getPushService().serviceChanged(service);//UDP发送服务改变通知 } } } } }
实现了监听接口:
缓存里没有的话,服务有改变的时候用上下文来发送ServiceChangeEvent
事件。
其实就是遍历需要推送的客户端,然后封装数据,推送。
@Override public void onApplicationEvent(ServiceChangeEvent event) { Service service = event.getService(); String serviceName = service.getName(); String namespaceId = service.getNamespaceId(); Future future = udpSender.schedule(new Runnable() { @Override public void run() { try { ... //获取所有需要推送的PushClient ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); if (MapUtils.isEmpty(clients)) { return; } Map<String, Object> cache = new HashMap<>(16); long lastRefTime = System.nanoTime(); for (PushClient client : clients.values()) { if (client.zombie()) {//超时的不删除不处理 ... clients.remove(client.toString()); ... continue; } Receiver.AckEntry ackEntry; ... String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent()); byte[] compressData = null; Map<String, Object> data = null; //有压缩数据 if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) { org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key); compressData = (byte[]) (pair.getValue0()); data = (Map<String, Object>) pair.getValue1(); ... } //准备UDP数据 if (compressData != null) { ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); } else { ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime); if (ackEntry != null) { cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); } } Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key)); //发送 udpPush(ackEntry); } } catch (Exception e) { Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e); } finally { //发送完删除 futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); } } }, 1000, TimeUnit.MILLISECONDS); //放缓存,不会重复发送 futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future); }
判断是否大于重试次数,因为UDP
不可靠,可能发出去没收到,也可能客户端发来的没收到,所以要尝试,后面有开启任务重试的。然后封装好数据发送,再开启一个Retransmitter
任务10
秒后看有没有成功响应,没有的话就重新发送。
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) { if (ackEntry == null) { Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null."); return null; } //大于尝试的次数 if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) { Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return ackEntry; } try { if (!ackMap.containsKey(ackEntry.key)) { totalPush++; } ackMap.put(ackEntry.key, ackEntry); udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis()); Loggers.PUSH.info("send udp packet: " + ackEntry.key); udpSocket.send(ackEntry.origin);//发送UDP报文 ackEntry.increaseRetryTime(); //10秒没应答就再尝试一次 executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS); return ackEntry; } catch (Exception e) { Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return null; } }
如果发现ackMap
中还有,说明没收到客户端响应。
public static class Retransmitter implements Runnable {
Receiver.AckEntry ackEntry;
public Retransmitter(Receiver.AckEntry ackEntry) {
this.ackEntry = ackEntry;
}
@Override
public void run() {
if (ackMap.containsKey(ackEntry.key)) {//没接受到响应
Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);
udpPush(ackEntry);
}
}
}
如果收到客户端响应的话会从ackMap
中删除ackEntry.key
:
好了,心跳的接受,处理,如果有数据改变的通知和UDP
推送原理基本都讲了,具体细节看源码吧。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。