当前位置:   article > 正文

Eureka Server集群数据同步源码解析(上)Eureka Client发起数据同步请求_eureka client增量同步

eureka client增量同步

 

首先搞清楚一个问题:Eureka Server集群数据同步与Eureka Server集群数据备份,是同一个概念吗?
很显然,不是一个概念。
Eureka Server集群数据同步处理的是Eureka Client发起的数据同步请求。
Eureka Server集群数据备份处理的是Eureka Server发起的数据备份请求。

Eureka服务的启动类是EurekaBootStrap,这个类在eureka-core-1.4.6-source.jar包下。
总体来说,Eureka Server集群数据同步分为2个阶段:
第一阶段:Eureka Client发起数据同步请求
第二阶段:Eureka Server处理同步请求

Eureka Client发起数据同步请求有5种Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride
相应的,Eureka Server处理同步请求也是这5种。

数据同步类型定义如下:

  1. public enum Action {
  2. Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
  3. private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
  4. public com.netflix.servo.monitor.Timer getTimer() {
  5. return this.timer;
  6. }
  7. }

 

Heartbeat:心跳检测
Register:服务注册
Cancel:失效节点清除
StatusUpdate:服务节点状态更新
DeleteStatusOverride:删除状态覆盖,也就是把InstanceInfo.overriddenStatus状态置为InstanceStatus.UNKNOWN

 

Eureka Client如何触发数据同步呢?入口有很多,比如服务注册,心跳检测等等都会触发数据同步,我们这里以服务注册这个入口为例,
来分析Eureka Server的集群数据同步。

PeerAwareInstanceRegistry.register(InstanceInfo info, boolean isReplication);

 

OK,来看源码,有注释

 

  1. /**
  2. * Registers the information about the {@link InstanceInfo} and replicates
  3. * this information to all peer eureka nodes. If this is replication event
  4. * from other replica nodes then it is not replicated.
  5. *
  6. * @param info
  7. * the {@link InstanceInfo} to be registered and replicated.
  8. * @param isReplication
  9. * true if this is a replication event from other replica nodes,
  10. * false otherwise.
  11. */
  12. @Override
  13. public void register(final InstanceInfo info, final boolean isReplication) {
  14. // 心跳续约租期,默认90
  15. int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
  16. if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
  17. leaseDuration = info.getLeaseInfo().getDurationInSecs();
  18. }
  19. // 服务注册
  20. super.register(info, leaseDuration, isReplication);
  21. // 向Eureka Server发起数据同步请求 类型:Action.Register
  22. replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
  23. }

 

Eureka Client端发起数据同步请求的关键方法是这个 replicateInstanceActionsToPeers,会根据同步类型,

进行相应的处理。

  1. /**
  2. * Replicates all instance changes to peer eureka nodes except for
  3. * replication traffic to this node.
  4. *
  5. */
  6. private void replicateInstanceActionsToPeers(Action action, String appName,
  7. String id, InstanceInfo info, InstanceStatus newStatus,
  8. PeerEurekaNode node) {
  9. try {
  10. InstanceInfo infoFromRegistry = null;
  11. CurrentRequestVersion.set(Version.V2);
  12. switch (action) {
  13. // 失效节点清理
  14. case Cancel:
  15. // 发送http请求,清理失效节点 jersey框架
  16. node.cancel(appName, id);
  17. break;
  18. // 心跳检测
  19. case Heartbeat:
  20. InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
  21. infoFromRegistry = getInstanceByAppAndId(appName, id, false);
  22. node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
  23. break;
  24. // 服务注册
  25. case Register:
  26. node.register(info);
  27. break;
  28. // 服务状态更新 UP/DOWN/STARTING/OUT_OF_SERVICE/UNKNOWN
  29. case StatusUpdate:
  30. infoFromRegistry = getInstanceByAppAndId(appName, id, false);
  31. node.statusUpdate(appName, id, newStatus, infoFromRegistry);
  32. break;
  33. // 删除状态覆盖,也就是把InstanceInfo.overriddenStatus状态置为InstanceStatus.UNKNOWN
  34. case DeleteStatusOverride:
  35. infoFromRegistry = getInstanceByAppAndId(appName, id, false);
  36. node.deleteStatusOverride(appName, id, infoFromRegistry);
  37. break;
  38. }
  39. } catch (Throwable t) {
  40. logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
  41. }
  42. }

 

下面我们以清理失效节点为例,来看看后续发送请求的操作,这里其实很容易理解了。就是封装http请求入参,然后发送http请求。

 

  1. @Override
  2. public EurekaHttpResponse<Void> cancel(String appName, String id) {
  3. String urlPath = "apps/" + appName + '/' + id;
  4. ClientResponse response = null;
  5. try {
  6. // 组装http请求入参
  7. Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
  8. addExtraHeaders(resourceBuilder);
  9. // 发送http请求
  10. response = resourceBuilder.delete(ClientResponse.class);
  11. // 封装http响应,并返回
  12. return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
  13. } finally {
  14. if (logger.isDebugEnabled()) {
  15. logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
  16. }
  17. if (response != null) {
  18. response.close();
  19. }
  20. }
  21. }

 

这里有一点要提一下,就是有人会问了,这里都是一个项目,为什么还要发送http请求呢?这个其实就属于架构方面的问题了,Spring Cloud
被设计成微服务架构,就是分布式架构,在进行部署的时候,Eureka Server和Eureka Client可能被部署到了不同的服务器节点,所以需要
进行http通信。

最后,Eureka Server提供一个处理数据同步请求的接口,处理请求,这样,整个数据同步的流程就结束了。Eureka Server处理
数据同步请求的内容,还是以清除失效节点为例,Eureka Server接收到这个请求后,都做了哪些操作呢,这个节点清除操作,

都有哪些流程需要处理呢,敬请关注下一篇博客。

Eureka Server集群数据同步源码解析(下)Eureka Server接收并处理数据同步请求

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/818237
推荐阅读
相关标签
  

闽ICP备14008679号