赞
踩
本文来说下Eureka Server集群同步
集群启动同步
protected void initEurekaServerContext() throws Exception {
// ....省略N多代码
// 同步信息
int registryCount = this.registry.syncUp();
// ....省略N多代码
}
网上很多文章说是调用syncUp这个方法去其他Eureka Server节点复制注册信息,这个说法不是很准确, 在这个地方,SyncUp()这个方法并不会去其他Eureka Server节点复制信息,而是从本地内存里面获取注册信息, 看源码就知道了。
public int syncUp() { // Copy entire entry from neighboring DS node // 获取到的注册节点数量 int count = 0; // 如果count==0 , 那么默认重试5次(前提是开启了register-with-eureka = true,否则为0) for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { // 从第二次开始,每次默认沉睡30秒 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } // 从本地内存里面获取注册实例信息 Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { // 判断是否可以注册 if (isRegisterable(instance)) { // 注册到当前Eureka Server里面 register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; }
参数说明:
regirstrySyncRetries : 当eureka服务器启动时尝试去获取集群里其他服务器上的注册信息的次数,默认为5,
只有当 eureka.client.register-with-eureka = true 的时候才会是5,如果是false ,则为0
registrySyncRetryWaitMs : 当eureka服务器启动时获取其他服务器的注册信息失败时,会再次尝试获取,期间需要等待的时间,默认为30 * 1000毫秒
count : 获取到的注册实例数量,如果为0 则根据重试次数进行重试,每次重试前沉默30秒
PS: 在之前的文章中Eureka获取注册信息 ,讲过Eureka Client启动的时候默认会自动从Eureka Server获取注册信息, 要想Eureka Server在启动的时候可以同步其他集群节点的注册信息,那么必须开启客户端配置
eureka.client.register-with-eureka = true ## 是否作为一个Eureka Client 注册到Eureka Server上去
2eureka.client.fetch-registry = true ## 是否需要从Eureka Server上拉取注册信息到本地。
只有开启了上面两个配置,那么集群节点在启动的时候,会初始化Eureka Client端的配置 ,会从其他Eureka Server拉取注册信息到本地,同时在初始化Eureka Server的时候,会从本地内存里面读取 注册信息,自动注册到本身的服务上。
集群同步类型
public enum Action { Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride; private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name()); public com.netflix.servo.monitor.Timer getTimer() { return this.timer; } } //Heartbeat : 心跳续约 //Register : 注册 //Cancel : 下线 //StatusUpdate : 添加覆盖状态 //DeleteStatusOverride : 删除覆盖状态
这里以注册的代码为例
@Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } // 发起注册 super.register(info, leaseDuration, isReplication); // 注册完成后,在这里发起同步,同步类型为Register replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { // 判断是否是集群同步请求,如果是,则记录最后一分钟的同步次数 if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication // 集群节点为空,或者这是一个Eureka Server 同步请求,直接return if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } // 循环相邻的Eureka Server Node, 分别发起请求同步 for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // 判断是否是自身的URL,过滤掉 if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } // 发起同步请求 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
步骤说明:
PS: 这里提到了PeerEurekaNode , 对于PeerEurekaNodes的集群节点更新及数据读取,可以看这个深入Eureka Server启动源码分析,在服务启动的时候,对PeerEurekaNodes集群开启了线程更新集群节点信息。每15分钟一次
private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: // 下线 node.cancel(appName, id); break; case Heartbeat: // 心跳 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); // 获取本地最新的实例信息 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: // 注册 node.register(info); break; case StatusUpdate: // 设置覆盖状态 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: //删除覆盖状态 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
这里直接看注册,其他的原理上是一致的。PeerEurekaNode的register方法如下。
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
// 默认采用的是批处理
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
默认采用的是批量任务处理器,就是将task放入任务队列中,然后通过线程获取任务队列里面的任务,模仿ThreadExecutorPool的方式,生成线程,从队列里面抓取任务处理,统一批量执行,Eureka Server 那边也是统一接收,这样提高了同步效率。批量处理的任务执行器是com.netflix.eureka.cluster.ReplicationTaskProcessor
@Override public ProcessingResult process(List<ReplicationTask> tasks) { // 构建ReplicationInstance放入ReplicationList ReplicationList list = createReplicationListOf(tasks); try { // 发起批量处理请求 EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list); int statusCode = response.getStatusCode(); if (!isSuccess(statusCode)) { if (statusCode == 503) { logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId); return ProcessingResult.Congestion; } else { // Unexpected error returned from the server. This should ideally never happen. logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size()); return ProcessingResult.PermanentError; } } else { // 处理执行结果 ,成功则调用handleSuccess ,失败则调用handleFailure。 handleBatchResponse(tasks, response.getEntity().getResponseList()); } } catch (Throwable e) { if (isNetworkConnectException(e)) { logNetworkErrorSample(null, e); return ProcessingResult.TransientError; } else { logger.error("Not re-trying this exception because it does not seem to be a network exception", e); return ProcessingResult.PermanentError; } } return ProcessingResult.Success; }
请求批量处理的接口地址 : peerreplication/batch/handleBatchResponse(tasks, response.getEntity().getResponseList()) , 循环调用处理结果,成功则调用handleSuccess. , 失败则调用handleFailure , 比如hearbeat的时候,调用返回码为404的时候,会重新发起注册。
ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) { @Override public EurekaHttpResponse<InstanceInfo> execute() throws Throwable { return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); } @Override public void handleFailure(int statusCode, Object responseEntity) throws Throwable { super.handleFailure(statusCode, responseEntity); if (statusCode == 404) { // 重新发起注册。 register(info); } } else if (config.shouldSyncWhenTimestampDiffers()) { InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity; if (peerInstanceInfo != null) { syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo); } } } };
程序入口 : com.netflix.eureka.resources.PeerReplicationResource
@Path("batch") @POST public Response batchReplication(ReplicationList replicationList) { try { ReplicationListResponse batchResponse = new ReplicationListResponse(); // 循环请求的任务 for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) { try { // 分发任务,同时将处理结果收集起来,等会统一返回 batchResponse.addResponse(dispatch(instanceInfo)); } catch (Exception e) { batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null)); logger.error(instanceInfo.getAction() + " request processing failed for batch item " + instanceInfo.getAppName() + '/' + instanceInfo.getId(), e); } } return Response.ok(batchResponse).build(); } catch (Throwable e) { logger.error("Cannot execute batch Request", e); return Response.status(Status.INTERNAL_SERVER_ERROR).build(); } } private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) { // 创建实例 ApplicationResource applicationResource = createApplicationResource(instanceInfo); // 创建实例 InstanceResource resource = createInstanceResource(instanceInfo, applicationResource); //获取客户端instance的lastDirtyTimestamp ,有点类似于版本号的概念。 String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp()); // 获取覆盖状态 String overriddenStatus = toString(instanceInfo.getOverriddenStatus()); // 获取instance的状态 String instanceStatus = toString(instanceInfo.getStatus()); Builder singleResponseBuilder = new Builder(); switch (instanceInfo.getAction()) { case Register: // 注册 singleResponseBuilder = handleRegister(instanceInfo, applicationResource); break; case Heartbeat: // 心跳续约 singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus); break; case Cancel: // 下线 singleResponseBuilder = handleCancel(resource); break; case StatusUpdate: // 修改覆盖状态 singleResponseBuilder = handleStatusUpdate(instanceInfo, resource); break; case DeleteStatusOverride: // 删除覆盖状态 singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource); break; } return singleResponseBuilder.build(); }
以上五个场景,这里就不一一说了,就说一下注册吧,
private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
// 调用Application控制层的接口,添加实例
applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
return new Builder().setStatusCode(Status.OK.getStatusCode());
}
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
// 省略代码1000行
return Response.status(204).build(); // 204 to be backwards compatible
}
REPLICATION = “true” ,此次请求为true,表示是一个服务端的复制请求。
由上面可以知道,集群同步走的和客户端注册的后续流程是一样的,只不过isReplication=true , 表明这是一个集群同步的请求
本文详细介绍了Eureka Server集群同步相关的知识与核心源码。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。