当前位置:   article > 正文

二、Eureka之server端集群节点发现,数据同步_nodejs 查询 eureka 集群服务

nodejs 查询 eureka 集群服务

一、前言

        Eureka服务端封装了一个集群节点管理的类名称为PeerEurekaNodes 通过名称翻译出来为对等的Eureka节点集合,可以看出这个类是对eureka服务端集群节点抽象,下面通过源码查询eureka是怎么管理与发现节点信息

  1. //eureka server 集群节点 集合类 帮助管理维护集群节点
  2. @Singleton
  3. public class PeerEurekaNodes {
  4. /**
  5. * 集群节点集合
  6. */
  7. private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
  8. /**
  9. * 集群节点URL集合
  10. */
  11. private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
  12. /**
  13. * 定时任务线程池
  14. */
  15. private ScheduledExecutorService taskExecutor;

通过PeerEurekaNodes类属性可以看到提供了两个集合以及一个执行定时任务的线程池,其它配置属性忽略

  1. peerEurekaNodes 表示集群节点集合
  2. peerEurekaNodeUrls 表示集群节点对应的URL集合
  3. taskExecutor 执行定时任务的线程池

同时 PeerEurekaNodes 类提供start 与shutdown方法,接下来主要看start方法的实现

  1. /**
  2. * 启动方法,此方法管理集群节点间的通讯
  3. */
  4. public void start() {
  5. //初始化定时任务线程池
  6. taskExecutor = Executors.newSingleThreadScheduledExecutor(
  7. new ThreadFactory() {
  8. @Override
  9. public Thread newThread(Runnable r) {
  10. Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
  11. thread.setDaemon(true);
  12. return thread;
  13. }
  14. }
  15. );
  16. try {
  17. updatePeerEurekaNodes(resolvePeerUrls());
  18. //节点更新任务线程
  19. Runnable peersUpdateTask = new Runnable() {
  20. @Override
  21. public void run() {
  22. try {
  23. updatePeerEurekaNodes(resolvePeerUrls());
  24. } catch (Throwable e) {
  25. logger.error("Cannot update the replica Nodes", e);
  26. }
  27. }
  28. };
  29. // 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,
  30. // 在每一次执行终止和下一次执行开始之间都存在给定的延迟。
  31. // 如果任务的任一执行遇到异常,就会取消后续执行。
  32. //否则,只能通过执行程序的取消或终止方法来终止该任务。
  33. // 参数:
  34. // command - 要执行的任务
  35. // initialdelay - 首次执行的延迟时间
  36. // delay - 一次执行终止和下一次执行开始之间的延迟
  37. // unit - initialdelay 和 delay 参数的时间单位
  38. //定时执行节点更新任务线程
  39. taskExecutor.scheduleWithFixedDelay(
  40. peersUpdateTask,
  41. serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
  42. serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
  43. TimeUnit.MILLISECONDS
  44. );
  45. } catch (Exception e) {
  46. throw new IllegalStateException(e);
  47. }
  48. for (PeerEurekaNode node : peerEurekaNodes) {
  49. logger.info("Replica node URL: " + node.getServiceUrl());
  50. }
  51. }

start 方法主要完成以下几件事

  1. 初始化定时任务线程池
  2. 首次更新集群节点 updatePeerEurekaNodes方法
  3. 创建更新集群节点任务线程
  4. 通过定时任务线程池定时执行更新集群节点线程

通过start 可以看出 eureka是通过一个定时线程定时去更新集群的节点信息达到对集群节点的动态发现和感知,在上面我们可以看到更新操作主要由updatePeerEurekaNodes方法完成,下面查看此方法的实现

updatePeerEurekaNodes根据传入的新集群URL集合完成节点的更新

  1. 校验传入的URL集合是否需要更新
  2. 移除新url集合中没有的旧节点并关闭节点
  3. 创建旧节点集合中没有的新URL节点通过createPeerEurekaNode方法
  4. 重新赋值节点集合以及URL集合完成节点的更新

updatePeerEurekaNodes传入的新URL集合是通过resolvePeerUrls方法获取,这个方法实际上是解析配置文件中的eureka.serviceUrl前缀的配置获取,并动态监听配置的更新。 创建新的节点是通过createPeerEurekaNode创建,下面查看此方法源码

  1. /**
  2. * 根据URL创建server新节点信息
  3. * @param peerEurekaNodeUrl
  4. * @return
  5. */
  6. protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
  7. //创建一个连接远程节点的客户端
  8. HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
  9. //获取新节点host信息
  10. String targetHost = hostFromUrl(peerEurekaNodeUrl);
  11. if (targetHost == null) {
  12. targetHost = "host";
  13. }
  14. //创建新节点
  15. return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
  16. }

PeerEurekaNode 方法

  1. 创建远程通讯客户端replicationClient 用户与此节点间通讯,数据同步等工作
  2. 获取要创建的远程节点的host
  3. 创建一个表示远程节点实例 PeerEurekaNode

PeerEurekaNode 表示一个与当前节点对等的远程节点,当前节点与远程节点的数据同步工作都是在此实例中完成的。

集群节点数据同步

在上面节点发现中知道eureka是通过PeerEurekaNode表示远程对等接点,并将远程通讯客户端replicationClient传入到PeerEurekaNode中,接下来通过查看PeerEurekaNode源码来看eureka集群节点间都有那些数据需要同步以及通讯内容

PeerEurekaNode 完成以下事件

  1. 创建数据同步的任务处理器ReplicationTaskProcessor
  2. 创建批处理任务调度器
  3. 创建单任务处理调度器

说明: eureka将节点间的数据同步工作包装成一个个细微的任务ReplicationTask ,每一个数据操作代表一个任务,将任务发送给任务调度器TaskDispatcher去异步处理。

  • register

  1. /**
  2. * 当eureka server注册新服务时,同时创建一个定时任务将新服务同步到集群其它节点
  3. * Sends the registration information of {@link InstanceInfo} receiving by
  4. * this node to the peer node represented by this class.
  5. */
  6. public void register(final InstanceInfo info) throws Exception {
  7. long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
  8. //任务调度器中添加一个请求类型为注册register新服务的同步任务
  9. batchingDispatcher.process(
  10. taskId("register", info),
  11. new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
  12. public EurekaHttpResponse<Void> execute() {
  13. return replicationClient.register(info);
  14. }
  15. },
  16. expiryTime
  17. );
  18. }

注册同步任务,当有服务注册到当前节点时,通过注册同步任务将服务信息同步到集群远程节点

  • cancel
  1. public void cancel(final String appName, final String id) throws Exception {
  2. long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
  3. //任务调度器中添加一个请求类型为取消cancel服务的同步任务
  4. batchingDispatcher.process(
  5. taskId("cancel", appName, id),
  6. new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
  7. @Override
  8. public EurekaHttpResponse<Void> execute() {
  9. return replicationClient.cancel(appName, id);
  10. }
  11. @Override
  12. public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
  13. super.handleFailure(statusCode, responseEntity);
  14. if (statusCode == 404) {
  15. logger.warn("{}: missing entry.", getTaskName());
  16. }
  17. }
  18. },
  19. expiryTime
  20. );
  21. }

取消服务注册任务,当前节点有服务取消注册,将信息同步到集群远程节点

  • heartbeat

心跳同步任务,当前节点有服务发送心跳续租,将信息同步到集群远程节点

三、集群节点数据同步任务处理

在PeerEurekaNode的构造函数中可以看到同步任务处理由ReplicationTaskProcessor完成,下面看此类源码

  1. /**
  2. * 单个处理ReplicationTask任务
  3. * @param task
  4. * @return
  5. */
  6. @Override
  7. public ProcessingResult process(ReplicationTask task) {
  8. try {
  9. //调用任务execute方法,完成任务的执行
  10. EurekaHttpResponse<?> httpResponse = task.execute();
  11. int statusCode = httpResponse.getStatusCode();
  12. //判断任务返回结果
  13. Object entity = httpResponse.getEntity();
  14. if (logger.isDebugEnabled()) {
  15. logger.debug("Replication task {} completed with status {}, (includes entity {})", task.getTaskName(), statusCode, entity != null);
  16. }
  17. if (isSuccess(statusCode)) {
  18. task.handleSuccess();
  19. } else if (statusCode == 503) {
  20. logger.debug("Server busy (503) reply for task {}", task.getTaskName());
  21. return ProcessingResult.Congestion;
  22. } else {
  23. task.handleFailure(statusCode, entity);
  24. return ProcessingResult.PermanentError;
  25. }
  26. } catch (Throwable e) {
  27. if (isNetworkConnectException(e)) {
  28. logNetworkErrorSample(task, e);
  29. return ProcessingResult.TransientError;
  30. } else {
  31. logger.error(peerId + ": " + task.getTaskName() + "Not re-trying this exception because it does not seem to be a network exception", e);
  32. return ProcessingResult.PermanentError;
  33. }
  34. }
  35. return ProcessingResult.Success;
  36. }

单任务处理

  1. 调用任务task的execute完成远程数据同步
  2. 分析远程返回结果
  1. /**
  2. * 批量处理ReplicationTask任务
  3. * @param tasks
  4. * @return
  5. */
  6. @Override
  7. public ProcessingResult process(List<ReplicationTask> tasks) {
  8. //根据task集合创建ReplicationList
  9. ReplicationList list = createReplicationListOf(tasks);
  10. try {
  11. //调用批量同步接口 将同步集合发送到远端节点同步数据
  12. EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
  13. //判断同步返回结果
  14. int statusCode = response.getStatusCode();
  15. if (!isSuccess(statusCode)) {
  16. if (statusCode == 503) {
  17. logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
  18. return ProcessingResult.Congestion;
  19. } else {
  20. // Unexpected error returned from the server. This should ideally never happen.
  21. logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
  22. return ProcessingResult.PermanentError;
  23. }
  24. } else {
  25. handleBatchResponse(tasks, response.getEntity().getResponseList());
  26. }
  27. } catch (Throwable e) {
  28. if (isNetworkConnectException(e)) {
  29. logNetworkErrorSample(null, e);
  30. return ProcessingResult.TransientError;
  31. } else {
  32. logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
  33. return ProcessingResult.PermanentError;
  34. }
  35. }
  36. return ProcessingResult.Success;
  37. }

批处理任务,将一组任务一次性发送到远程进行处理

  1. 根据task集合创建ReplicationList
  2. 调用批量同步接口将同步集合发送到远端节点同步数据 即调用rest API /{version}/peerreplication
  3. 分析远程返回结果

        eureka 服务端 集群节点发现,数据同步功能主要是由PeerEurekaNodes与PeerEurekaNode类实现,通过源码的跟踪可以清晰看出集群实现的逻辑,方便在实际应用中对问题的定位

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/818182
推荐阅读
相关标签
  

闽ICP备14008679号