当前位置:   article > 正文

eureka集群间的数据是如何同步的_eureka集群同步数据的过程

eureka集群同步数据的过程

客户端注册

启动的时候是触发注册,看下图是客户端注册的过程

1.com.netflix.discovery.DiscoveryClient#initScheduledTasks
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
               @Override
               public void notify(StatusChangeEvent statusChangeEvent) {
                   if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {
                       logger.error("Saw local status change event {}", statusChangeEvent);
                   } else {
                       logger.info("Saw local status change event {}", statusChangeEvent);
                   }
                   instanceInfoReplicator.onDemandUpdate();//看这里
               }
           };

2.com.netflix.discovery.InstanceInfoReplicator#onDemandUpdate
public boolean onDemandUpdate() {
       if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
           if (!scheduler.isShutdown()) {
               scheduler.submit(new Runnable() {
                   @Override
                   public void run() {  
                       Future latestPeriodic = scheduledPeriodicRef.get();
                       if (latestPeriodic != null && !latestPeriodic.isDone()) {
                           logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                           latestPeriodic.cancel(false);
                       }
                          InstanceInfoReplicator.this.run();//看这里
                   }
               });
               return true;
           } else {
               return false;
           }
       } else {
           return false;
       }
   }
3.com.netflix.discovery.InstanceInfoReplicator#run
public void run() {
       try {
           discoveryClient.refreshInstanceInfo();
           Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
           if (dirtyTimestamp != null) {
               discoveryClient.register();//注册
               instanceInfo.unsetIsDirty(dirtyTimestamp);
           }
       } catch (Throwable t) {
           logger.warn("There was a problem with the instance info replicator", t);
       } finally {
           Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);//注册失败,会进行重试
           scheduledPeriodicRef.set(next);
       }
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

服务端接受注册信息并同步到其他节点

服务端接受客户端的注册信息并同步到其他节点的过程,看下图源码,我将一些不关注的信息给删除了

1.com.netflix.eureka.resources.ApplicationResource#addInstance
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                           @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
   DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
   if (dataCenterInfo instanceof UniqueIdentifier) {
       String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
       if (isBlank(dataCenterInfoId)) {
           boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
           if (experimental) {
               String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
               return Response.status(400).entity(entity).build();
           } else if (dataCenterInfo instanceof AmazonInfo) {
               AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
               String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
               if (effectiveId == null) {
                   amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
               }
           } else {
               logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
           }
       }
   }
   registry.register(info, "true".equals(isReplication));//请看这里
   return Response.status(204).build();  // 204 to be backwards compatible
}

 2.com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);//请看这里
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

服务端同步注册信息

1.com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers
private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info ,
                                  InstanceStatus newStatus, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
          if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);//还想往下看,就去看这个方法,这次我们就追到这里就不往下追了
            }
        } finally {
            tracer.stop();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

isReplication解读
true表示是服务节点之间的数据同步
false表示是客户端请求
假如现在有eureka集群有3个节点 a,b,c.当客户端user将自己的服务信息注册给a节点的时候,a节点要将客户端user的服务信息同步给b,c节点。此时他也是和客户端一样,发送http请求,调用com.netflix.eureka.resources.ApplicationResource#addInstance这个方法,但是b,c节点收到信息后,同样也要同步给其他节点,这时候就会有一个死循环的问题。(a同步给b,c;b同步给a,c;c同步给a,b。这不就死循环了吗。) 当服务节点之间同步数据的时候,isReplication=true。就可以通过下面的代码完美解决。
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/395452
推荐阅读
相关标签
  

闽ICP备14008679号