当前位置:   article > 正文

Spring Cloud 2.2.2 源码之五十八nacos服务端处理实例心跳二_nacos 心跳地址

nacos 心跳地址

心跳处理基本流程

在这里插入图片描述

ClientBeatProcessor的run处理临时实例心跳

这里就体现出RsInfo的用途啦,其实就是保存下实例的相关信息,IP,端口,集群。遍历实例集群的实例,找出对应的IP和端口的实例进行状态更新。如果发现有问题,还要用PushService进行UDP进行通知,UDP端口是客户端请求的时候刷新服务实例列表的使用传上来的,客户端也有个PushReceiver就是来接受UDP报文信息,具体可以看这篇文章

 @Override
    public void run() {
        Service service = this.service;
        ...
        String ip = rsInfo.getIp();//IP
        String clusterName = rsInfo.getCluster();//集群名字
        int port = rsInfo.getPort();//端口
        Cluster cluster = service.getClusterMap().get(clusterName);//获取集群
        List<Instance> instances = cluster.allIPs(true);//获取集群所有的临时服务实例
		//遍历更新对应的状态
        for (Instance instance : instances) {
            if (instance.getIp().equals(ip) && instance.getPort() == port) {
                if (Loggers.EVT_LOG.isDebugEnabled()) {
                    Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                }
                instance.setLastBeat(System.currentTimeMillis());//刷新心跳时间
                if (!instance.isMarked()) {//没被标记的
                    if (!instance.isHealthy()) {//不健康的
                        instance.setHealthy(true);//设置为健康
                        Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                            cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
                        getPushService().serviceChanged(service);//UDP发送服务改变通知
                    }
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

PushService的serviceChanged服务有改变UDP客户端

实现了监听接口:
在这里插入图片描述
缓存里没有的话,服务有改变的时候用上下文来发送ServiceChangeEvent事件。
在这里插入图片描述

UDP调度任务

其实就是遍历需要推送的客户端,然后封装数据,推送。

    @Override
    public void onApplicationEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();

        Future future = udpSender.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    ...
                    //获取所有需要推送的PushClient
                    ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                    if (MapUtils.isEmpty(clients)) {
                        return;
                    }

                    Map<String, Object> cache = new HashMap<>(16);
                    long lastRefTime = System.nanoTime();
                    for (PushClient client : clients.values()) {
                        if (client.zombie()) {//超时的不删除不处理
                          	...
                            clients.remove(client.toString());
                           	...
                            continue;
                        }

                        Receiver.AckEntry ackEntry;
                       	...
                        String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                        byte[] compressData = null;
                        Map<String, Object> data = null;
                        //有压缩数据
                        if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                            org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                            compressData = (byte[]) (pair.getValue0());
                            data = (Map<String, Object>) pair.getValue1();
							...
                        }
						//准备UDP数据
                        if (compressData != null) {
                            ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                        } else {
                            ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                            if (ackEntry != null) {
                                cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                            }
                        }

                        Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                            client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key));
						//发送
                        udpPush(ackEntry);
                    }
                } catch (Exception e) {
                    Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

                } finally {
                	//发送完删除
                    futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                }

            }
        }, 1000, TimeUnit.MILLISECONDS);
		//放缓存,不会重复发送
        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

udpPush

判断是否大于重试次数,因为UDP不可靠,可能发出去没收到,也可能客户端发来的没收到,所以要尝试,后面有开启任务重试的。然后封装好数据发送,再开启一个Retransmitter任务10秒后看有没有成功响应,没有的话就重新发送。

 private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }
        //大于尝试的次数
        if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;
            return ackEntry;
        }

        try {
            if (!ackMap.containsKey(ackEntry.key)) {
                totalPush++;
            }
            ackMap.put(ackEntry.key, ackEntry);
            udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

            Loggers.PUSH.info("send udp packet: " + ackEntry.key);
            udpSocket.send(ackEntry.origin);//发送UDP报文

            ackEntry.increaseRetryTime();
            //10秒没应答就再尝试一次
            executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),
                TimeUnit.MILLISECONDS);

            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}",
                ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;

            return null;
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

Retransmitter

如果发现ackMap中还有,说明没收到客户端响应。

    public static class Retransmitter implements Runnable {
        Receiver.AckEntry ackEntry;

        public Retransmitter(Receiver.AckEntry ackEntry) {
            this.ackEntry = ackEntry;
        }

        @Override
        public void run() {
            if (ackMap.containsKey(ackEntry.key)) {//没接受到响应
                Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);
                udpPush(ackEntry);
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Receiver

如果收到客户端响应的话会从ackMap中删除ackEntry.key
在这里插入图片描述

好了,心跳的接受,处理,如果有数据改变的通知和UDP推送原理基本都讲了,具体细节看源码吧。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/137665
推荐阅读
相关标签
  

闽ICP备14008679号