赞
踩
启动的时候是触发注册,看下图是客户端注册的过程
1.com.netflix.discovery.DiscoveryClient#initScheduledTasks statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public void notify(StatusChangeEvent statusChangeEvent) { if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) { logger.error("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate();//看这里 } }; 2.com.netflix.discovery.InstanceInfoReplicator#onDemandUpdate public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { if (!scheduler.isShutdown()) { scheduler.submit(new Runnable() { @Override public void run() { Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } InstanceInfoReplicator.this.run();//看这里 } }); return true; } else { return false; } } else { return false; } } 3.com.netflix.discovery.InstanceInfoReplicator#run public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register();//注册 instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);//注册失败,会进行重试 scheduledPeriodicRef.set(next); } }
服务端接受客户端的注册信息并同步到其他节点的过程,看下图源码,我将一些不关注的信息给删除了
1.com.netflix.eureka.resources.ApplicationResource#addInstance @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication));//请看这里 return Response.status(204).build(); // 204 to be backwards compatible } 2.com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);//请看这里 }
1.com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers private void replicateToPeers(Action action, String appName, String id, InstanceInfo info , InstanceStatus newStatus, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);//还想往下看,就去看这个方法,这次我们就追到这里就不往下追了 } } finally { tracer.stop(); } }
isReplication解读
true表示是服务节点之间的数据同步
false表示是客户端请求
假如现在有eureka集群有3个节点 a,b,c.当客户端user将自己的服务信息注册给a节点的时候,a节点要将客户端user的服务信息同步给b,c节点。此时他也是和客户端一样,发送http请求,调用com.netflix.eureka.resources.ApplicationResource#addInstance这个方法,但是b,c节点收到信息后,同样也要同步给其他节点,这时候就会有一个死循环的问题。(a同步给b,c;b同步给a,c;c同步给a,b。这不就死循环了吗。) 当服务节点之间同步数据的时候,isReplication=true。就可以通过下面的代码完美解决。
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。