当前位置:   article > 正文

深入理解Eureka Server集群同步(十)_获取eureka集群node数量

获取eureka集群node数量

集群启动同步

  1. protected void initEurekaServerContext() throws Exception {
  2. // ....省略N多代码
  3. // 同步信息
  4. int registryCount = this.registry.syncUp();
  5. // ....省略N多代码
  6. }

网上很多文章说是调用syncUp这个方法去其他Eureka Server节点复制注册信息,这个说法不是很准确, 在这个地方,SyncUp()这个方法并不会去其他Eureka Server节点复制信息,而是从本地内存里面获取注册信息, 看源码就知道了。

  1. public int syncUp() {
  2. // Copy entire entry from neighboring DS node
  3. // 获取到的注册节点数量
  4. int count = 0;
  5. // 如果count==0 , 那么默认重试5次(前提是开启了register-with-eureka = true,否则为0
  6. for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
  7. if (i > 0) {
  8. try {
  9. // 从第二次开始,每次默认沉睡30
  10. Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
  11. } catch (InterruptedException e) {
  12. logger.warn("Interrupted during registry transfer..");
  13. break;
  14. }
  15. }
  16. // 从本地内存里面获取注册实例信息
  17. Applications apps = eurekaClient.getApplications();
  18. for (Application app : apps.getRegisteredApplications()) {
  19. for (InstanceInfo instance : app.getInstances()) {
  20. try {
  21. // 判断是否可以注册
  22. if (isRegisterable(instance)) {
  23. // 注册到当前Eureka Server里面
  24. register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
  25. count++;
  26. }
  27. } catch (Throwable t) {
  28. logger.error("During DS init copy", t);
  29. }
  30. }
  31. }
  32. }
  33. return count;
  34. }

参数说明:

regirstrySyncRetries : 当eureka服务器启动时尝试去获取集群里其他服务器上的注册信息的次数,默认为5,

只有当 eureka.client.register-with-eureka = true 的时候才会是5,如果是false ,则为0

registrySyncRetryWaitMs : 当eureka服务器启动时获取其他服务器的注册信息失败时,会再次尝试获取,期间需要等待的时间,默认为30 * 1000毫秒

count : 获取到的注册实例数量,如果为0 则根据重试次数进行重试,每次重试前沉默 30秒

PS: 在之前的文章中 7. 深入理解Eureka 获取注册信息(七) ,讲过Eureka Client启动的时候默认会自动从Eureka Server获取注册信息, 要想Eureka Server在启动的时候可以同步其他集群节点的注册信息,那么必须开启客户端配置

  1. eureka.client.register-with-eureka = true ## 是否作为一个Eureka Client 注册到Eureka Server上去
  2. eureka.client.fetch-registry = true ## 是否需要从Eureka Server上拉取注册信息到本地。

只有开启了上面两个配置,那么集群节点在启动的时候,会初始化Eureka Client端的配置 ,会从其他Eureka Server拉取注册信息到本地,同时

在初始化Eureka Server的时候,会从本地内存里面读取 注册信息,自动注册到本身的服务上

集群同步类型

  1. public enum Action {
  2. Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
  3. private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
  4. public com.netflix.servo.monitor.Timer getTimer() {
  5. return this.timer;
  6. }
  7. }
  8. Heartbeat : 心跳续约
  9. Register : 注册
  10. Cancel : 下线
  11. StatusUpdate : 添加覆盖状态
  12. DeleteStatusOverride : 删除覆盖状态

发起同步

这里以注册的代码为例

  1. @Override
  2. public void register(final InstanceInfo info, final boolean isReplication) {
  3. int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
  4. if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
  5. leaseDuration = info.getLeaseInfo().getDurationInSecs();
  6. }
  7. // 发起注册
  8. super.register(info, leaseDuration, isReplication);
  9. // 注册完成后,在这里发起同步,同步类型为Register
  10. replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
  11. }
  12. private void replicateToPeers(Action action, String appName, String id,
  13. InstanceInfo info /* optional */,
  14. InstanceStatus newStatus /* optional */, boolean isReplication) {
  15. Stopwatch tracer = action.getTimer().start();
  16. try {
  17. // 判断是否是集群同步请求,如果是,则记录最后一分钟的同步次数
  18. if (isReplication) {
  19. numberOfReplicationsLastMin.increment();
  20. }
  21. // If it is a replication already, do not replicate again as this will create a poison replication
  22. // 集群节点为空,或者这是一个Eureka Server 同步请求,直接return
  23. if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
  24. return;
  25. }
  26. // 循环相邻的Eureka Server Node, 分别发起请求同步
  27. for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
  28. // 判断是否是自身的URL,过滤掉
  29. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
  30. continue;
  31. }
  32. // 发起同步请求
  33. replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
  34. }
  35. } finally {
  36. tracer.stop();
  37. }
  38. }

步骤说明:

1.判断集群节点是否为空,为空则返回

2.isReplication 代表是否是一个复制请求, isReplication = true 表示是其他Eureka Server发过来的同步请求

这个时候是不需要继续往下同步的。否则会陷入同步死循环

3.循环集群节点,过滤掉自身的节点

4.发起同步请求 ,调用replicateInstanceActionsToPeers

PS: 这里提到了PeerEurekaNode , 对于PeerEurekaNodes的集群节点更新及数据读取,可以看这个1. 深入理解Eureka Server启动(一)在服务启动的时候,对PeerEurekaNodes集群开启了线程更新集群节点信息。每15分钟一次

  1. private void replicateInstanceActionsToPeers(Action action, String appName,
  2. String id, InstanceInfo info, InstanceStatus newStatus,
  3. PeerEurekaNode node) {
  4. try {
  5. InstanceInfo infoFromRegistry = null;
  6. CurrentRequestVersion.set(Version.V2);
  7. switch (action) {
  8. case Cancel: // 下线
  9. node.cancel(appName, id);
  10. break;
  11. case Heartbeat:
  12. // 心跳
  13. InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
  14. // 获取本地最新的实例信息
  15. infoFromRegistry = getInstanceByAppAndId(appName, id, false);
  16. node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
  17. break;
  18. case Register: // 注册
  19. node.register(info);
  20. break;
  21. case StatusUpdate: // 设置覆盖状态
  22. infoFromRegistry = getInstanceByAppAndId(appName, id, false);
  23. node.statusUpdate(appName, id, newStatus, infoFromRegistry);
  24. break;
  25. case DeleteStatusOverride: //删除覆盖状态
  26. infoFromRegistry = getInstanceByAppAndId(appName, id, false);
  27. node.deleteStatusOverride(appName, id, infoFromRegistry);
  28. break;
  29. }
  30. } catch (Throwable t) {
  31. logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
  32. }
  33. }

这里直接看注册,其他的原理上是一致的。

PeerEurekaNode的register方法如下。

  1. public void register(final InstanceInfo info) throws Exception {
  2. long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
  3. // 默认采用的是批处理
  4. batchingDispatcher.process(
  5. taskId("register", info),
  6. new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
  7. public EurekaHttpResponse<Void> execute() {
  8. return replicationClient.register(info);
  9. }
  10. },
  11. expiryTime
  12. );
  13. }

默认采用的是批量任务处理器,就是将task放入任务队列中,然后通过线程获取任务队列里面的任务,模仿ThreadExecutorPool的方式,生成线程,

从队列里面抓取任务处理,统一批量执行,Eureka Server 那边也是统一接收,这样提高了同步效率

批量处理的任务执行器是com.netflix.eureka.cluster.ReplicationTaskProcessor

  1. @Override
  2. public ProcessingResult process(List<ReplicationTask> tasks) {
  3. // 构建ReplicationInstance放入ReplicationList
  4. ReplicationList list = createReplicationListOf(tasks);
  5. try {
  6. // 发起批量处理请求
  7. EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
  8. int statusCode = response.getStatusCode();
  9. if (!isSuccess(statusCode)) {
  10. if (statusCode == 503) {
  11. logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
  12. return ProcessingResult.Congestion;
  13. } else {
  14. // Unexpected error returned from the server. This should ideally never happen.
  15. logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
  16. return ProcessingResult.PermanentError;
  17. }
  18. } else {
  19. // 处理执行结果 ,成功则调用handleSuccess ,失败则调用handleFailure。
  20. handleBatchResponse(tasks, response.getEntity().getResponseList());
  21. }
  22. } catch (Throwable e) {
  23. if (isNetworkConnectException(e)) {
  24. logNetworkErrorSample(null, e);
  25. return ProcessingResult.TransientError;
  26. } else {
  27. logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
  28. return ProcessingResult.PermanentError;
  29. }
  30. }
  31. return ProcessingResult.Success;
  32. }

请求批量处理的接口地址 : peerreplication/batch/

handleBatchResponse(tasks, response.getEntity().getResponseList()) , 循环调用处理结果,

成功则调用handleSuccess. , 失败则调用handleFailure , 比如hearbeat的时候,调用返回码为

404的时候,会重新发起注册。

  1. ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
  2. @Override
  3. public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
  4. return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
  5. }
  6. @Override
  7. public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
  8. super.handleFailure(statusCode, responseEntity);
  9. if (statusCode == 404) {
  10. // 重新发起注册。
  11. register(info);
  12. }
  13. } else if (config.shouldSyncWhenTimestampDiffers()) {
  14. InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
  15. if (peerInstanceInfo != null) {
  16. syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
  17. }
  18. }
  19. }
  20. };

Eureka Server接收同步

程序入口 : com.netflix.eureka.resources.PeerReplicationResource

  1. @Path("batch")
  2. @POST
  3. public Response batchReplication(ReplicationList replicationList) {
  4. try {
  5. ReplicationListResponse batchResponse = new ReplicationListResponse();
  6. // 循环请求的任务
  7. for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
  8. try {
  9. // 分发任务,同时将处理结果收集起来,等会统一返回
  10. batchResponse.addResponse(dispatch(instanceInfo));
  11. } catch (Exception e) {
  12. batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
  13. logger.error(instanceInfo.getAction() + " request processing failed for batch item "
  14. + instanceInfo.getAppName() + '/' + instanceInfo.getId(), e);
  15. }
  16. }
  17. return Response.ok(batchResponse).build();
  18. } catch (Throwable e) {
  19. logger.error("Cannot execute batch Request", e);
  20. return Response.status(Status.INTERNAL_SERVER_ERROR).build();
  21. }
  22. }
  23. private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
  24. // 创建实例
  25. ApplicationResource applicationResource = createApplicationResource(instanceInfo);
  26. // 创建实例
  27. InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
  28. //获取客户端instance的lastDirtyTimestamp ,有点类似于版本号的概念。
  29. String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
  30. // 获取覆盖状态
  31. String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
  32. // 获取instance的状态
  33. String instanceStatus = toString(instanceInfo.getStatus());
  34. Builder singleResponseBuilder = new Builder();
  35. switch (instanceInfo.getAction()) {
  36. case Register: // 注册
  37. singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
  38. break;
  39. case Heartbeat: // 心跳续约
  40. singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
  41. break;
  42. case Cancel: // 下线
  43. singleResponseBuilder = handleCancel(resource);
  44. break;
  45. case StatusUpdate: // 修改覆盖状态
  46. singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
  47. break;
  48. case DeleteStatusOverride: // 删除覆盖状态
  49. singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
  50. break;
  51. }
  52. return singleResponseBuilder.build();
  53. }

以上五个场景,这里就不一一说了,就说一下注册吧,

  1. private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
  2. // 调用Application控制层的接口,添加实例
  3. applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
  4. return new Builder().setStatusCode(Status.OK.getStatusCode());
  5. }
  6. @POST
  7. @Consumes({"application/json", "application/xml"})
  8. public Response addInstance(InstanceInfo info,
  9. @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
  10. // 省略代码1000行
  11. return Response.status(204).build(); // 204 to be backwards compatible
  12. }

REPLICATION = “true” ,此次请求为true,表示是一个服务端的复制请求。

由上面可以知道,集群同步走的和客户端注册的后续流程是一样的,只不过isReplication=true , 表明这是一个集群同步的请求

深入理解Eureka Server集群同步(十) - 简书
https://www.jianshu.com/p/b8c614c442e0

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/818203
推荐阅读
相关标签
  

闽ICP备14008679号