赞
踩
源码不能添注释,搬运一下。
public int syncUp() { //获取到的注册节点数量 int count = 0; //如果count==0,默认重试五次(serverConfig.getRegistrySyncRetryWaitMs()默认为5) for(int i = 0; i < this.serverConfig.getRegistrySyncRetries() && count == 0; ++i) { if (i > 0) { try { //从第二次开始,每次默认沉睡30秒 Thread.sleep(this.serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException var10) { logger.warn("Interrupted during registry transfer.."); break; } } //从本地内存里获取注册实例信息 Applications apps = this.eurekaClient.getApplications(); //获取迭代器 Iterator var4 = apps.getRegisteredApplications().iterator(); while(var4.hasNext()) { Application app = (Application)var4.next(); Iterator var6 = app.getInstances().iterator(); while(var6.hasNext()) { InstanceInfo instance = (InstanceInfo)var6.next(); try { //判断当前节点是否可以注册 if (this.isRegisterable(instance)) { //注册到当前Eureka Server里 this.register(instance, instance.getLeaseInfo().getDurationInSecs(), true); ++count; } } catch (Throwable var9) { logger.error("During DS init copy", var9); } } } } return count; }
public void register(InstanceInfo info, boolean isReplication) {
int leaseDuration = 90;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//发起注册
super.register(info, leaseDuration, isReplication);
//注册完成后,发起同步,这里同步类型为Register
this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register,
info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
}
private void replicateToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { //判断是否集群同步请求,如果是,记录最后一分钟的同步次数 if (isReplication) { this.numberOfReplicationsLastMin.increment(); } //如果集群节点非空,并且不是集群同步请求 if (this.peerEurekaNodes != Collections.EMPTY_LIST && !isReplication) { Iterator var8 = this.peerEurekaNodes.getPeerEurekaNodes().iterator(); //遍历相邻的Eureka Server请求,分别发起请求同步 while(var8.hasNext()) { PeerEurekaNode node = (PeerEurekaNode)var8.next(); //判断是否是自身的url,过滤掉 if (!this.peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { //发起同步请求 this.replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } return; } } finally { tracer.stop(); } }
private void replicateInstanceActionsToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch(action) { case Cancel: //下线 node.cancel(appName, id); break; case Heartbeat: //心跳 InstanceStatus overriddenStatus = (InstanceStatus)this.overriddenInstanceStatusMap.get(id); //获取本地最新的实例信息 infoFromRegistry = this.getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: //注册 node.register(info); break; case StatusUpdate: //设置覆盖状态 infoFromRegistry = this.getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: //删除覆盖状态 infoFromRegistry = this.getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); } } catch (Throwable var9) { logger.error("Cannot replicate information to {} for action {}", new Object[]{node.getServiceUrl(), action.name(), var9}); } }
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + (long)getLeaseRenewalOf(info);
//批处理
this.batchingDispatcher.process(taskId("register", info),
new InstanceReplicationTask(this.targetHost, Action.Register, info,
(InstanceStatus)null, true) {
public EurekaHttpResponse<Void> execute() {
return PeerEurekaNode.this.replicationClient.register(info);
}
}, expiryTime);
}
默认采用的是批量任务处理器,就是将task放入任务队列中,然后通过线程获取任务队列里面的任务,模仿ThreadExecutorPool的方式,生成线程,
从队列里面抓取任务处理,统一批量执行,Eureka Server 那边也是统一接收,这样提高了同步效率。
批量处理的任务执行器是com.netflix.eureka.cluster.ReplicationTaskProcessor。
@Path("batch") @POST public Response batchReplication(ReplicationList replicationList) { try { ReplicationListResponse batchResponse = new ReplicationListResponse(); Iterator var3 = replicationList.getReplicationList().iterator(); //循环请求的任务 while(var3.hasNext()) { ReplicationInstance instanceInfo = (ReplicationInstance)var3.next(); try { //分发任务,同时将处理结果收集起来,等会统一返回 batchResponse.addResponse(this.dispatch(instanceInfo)); } catch (Exception var6) { batchResponse.addResponse(new ReplicationInstanceResponse (Status.INTERNAL_SERVER_ERROR.getStatusCode(), (InstanceInfo)null)); logger.error("{} request processing failed for batch item {}/{}", new Object[]{instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), var6}); } } return Response.ok(batchResponse).build(); } catch (Throwable var7) { logger.error("Cannot execute batch Request", var7); return Response.status(Status.INTERNAL_SERVER_ERROR).build(); } } private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) { ApplicationResource applicationResource = this.createApplicationResource(instanceInfo); InstanceResource resource = this.createInstanceResource(instanceInfo, applicationResource); //获取客户端instance的最后的时间戳 String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp()); //获取覆盖状态 String overriddenStatus = toString(instanceInfo.getOverriddenStatus()); //获取instance状态 String instanceStatus = toString(instanceInfo.getStatus()); Builder singleResponseBuilder = new Builder(); switch(instanceInfo.getAction()) { case Register: //注册 singleResponseBuilder = handleRegister(instanceInfo, applicationResource); break; case Heartbeat: //心跳 singleResponseBuilder = handleHeartbeat(this.serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus); break; case Cancel: //下线 singleResponseBuilder = handleCancel(resource); break; case StatusUpdate: //修改覆盖状态 singleResponseBuilder = handleStatusUpdate(instanceInfo, resource); break; case DeleteStatusOverride: //删除覆盖状态 singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource); } return singleResponseBuilder.build(); }
private static Builder handleRegister(ReplicationInstance instanceInfo,
ApplicationResource applicationResource) {
// 调用Application控制层的接口,添加实例
applicationResource.addInstance(instanceInfo.getInstanceInfo(), "true");
return (new Builder()).setStatusCode(Status.OK.getStatusCode());
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。