赞
踩
首先搞清楚一个问题:Eureka Server集群数据同步与Eureka Server集群数据备份,是同一个概念吗?
很显然,不是一个概念。
Eureka Server集群数据同步处理的是Eureka Client发起的数据同步请求。
Eureka Server集群数据备份处理的是Eureka Server发起的数据备份请求。
Eureka服务的启动类是EurekaBootStrap,这个类在eureka-core-1.4.6-source.jar包下。
总体来说,Eureka Server集群数据同步分为2个阶段:
第一阶段:Eureka Client发起数据同步请求
第二阶段:Eureka Server处理同步请求
Eureka Client发起数据同步请求有5种Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride
相应的,Eureka Server处理同步请求也是这5种。
数据同步类型定义如下:
- public enum Action {
-
- Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
-
- private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
-
- public com.netflix.servo.monitor.Timer getTimer() {
- return this.timer;
- }
- }
Heartbeat:心跳检测
Register:服务注册
Cancel:失效节点清除
StatusUpdate:服务节点状态更新
DeleteStatusOverride:删除状态覆盖,也就是把InstanceInfo.overriddenStatus状态置为InstanceStatus.UNKNOWN
Eureka Client如何触发数据同步呢?入口有很多,比如服务注册,心跳检测等等都会触发数据同步,我们这里以服务注册这个入口为例,
来分析Eureka Server的集群数据同步。
PeerAwareInstanceRegistry.register(InstanceInfo info, boolean isReplication);
OK,来看源码,有注释
- /**
- * Registers the information about the {@link InstanceInfo} and replicates
- * this information to all peer eureka nodes. If this is replication event
- * from other replica nodes then it is not replicated.
- *
- * @param info
- * the {@link InstanceInfo} to be registered and replicated.
- * @param isReplication
- * true if this is a replication event from other replica nodes,
- * false otherwise.
- */
- @Override
- public void register(final InstanceInfo info, final boolean isReplication) {
-
- // 心跳续约租期,默认90秒
- int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
- if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
- leaseDuration = info.getLeaseInfo().getDurationInSecs();
- }
-
- // 服务注册
- super.register(info, leaseDuration, isReplication);
-
- // 向Eureka Server发起数据同步请求 类型:Action.Register
- replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
- }
Eureka Client端发起数据同步请求的关键方法是这个 replicateInstanceActionsToPeers,会根据同步类型,
进行相应的处理。
- /**
- * Replicates all instance changes to peer eureka nodes except for
- * replication traffic to this node.
- *
- */
- private void replicateInstanceActionsToPeers(Action action, String appName,
- String id, InstanceInfo info, InstanceStatus newStatus,
- PeerEurekaNode node) {
- try {
- InstanceInfo infoFromRegistry = null;
- CurrentRequestVersion.set(Version.V2);
- switch (action) {
- // 失效节点清理
- case Cancel:
- // 发送http请求,清理失效节点 jersey框架
- node.cancel(appName, id);
- break;
- // 心跳检测
- case Heartbeat:
- InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
- infoFromRegistry = getInstanceByAppAndId(appName, id, false);
- node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
- break;
- // 服务注册
- case Register:
- node.register(info);
- break;
- // 服务状态更新 UP/DOWN/STARTING/OUT_OF_SERVICE/UNKNOWN
- case StatusUpdate:
- infoFromRegistry = getInstanceByAppAndId(appName, id, false);
- node.statusUpdate(appName, id, newStatus, infoFromRegistry);
- break;
- // 删除状态覆盖,也就是把InstanceInfo.overriddenStatus状态置为InstanceStatus.UNKNOWN
- case DeleteStatusOverride:
- infoFromRegistry = getInstanceByAppAndId(appName, id, false);
- node.deleteStatusOverride(appName, id, infoFromRegistry);
- break;
- }
- } catch (Throwable t) {
- logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
- }
- }
下面我们以清理失效节点为例,来看看后续发送请求的操作,这里其实很容易理解了。就是封装http请求入参,然后发送http请求。
- @Override
- public EurekaHttpResponse<Void> cancel(String appName, String id) {
- String urlPath = "apps/" + appName + '/' + id;
- ClientResponse response = null;
- try {
- // 组装http请求入参
- Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
- addExtraHeaders(resourceBuilder);
- // 发送http请求
- response = resourceBuilder.delete(ClientResponse.class);
- // 封装http响应,并返回
- return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
- } finally {
- if (logger.isDebugEnabled()) {
- logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
- }
- if (response != null) {
- response.close();
- }
- }
- }
这里有一点要提一下,就是有人会问了,这里都是一个项目,为什么还要发送http请求呢?这个其实就属于架构方面的问题了,Spring Cloud
被设计成微服务架构,就是分布式架构,在进行部署的时候,Eureka Server和Eureka Client可能被部署到了不同的服务器节点,所以需要
进行http通信。
最后,Eureka Server提供一个处理数据同步请求的接口,处理请求,这样,整个数据同步的流程就结束了。Eureka Server处理
数据同步请求的内容,还是以清除失效节点为例,Eureka Server接收到这个请求后,都做了哪些操作呢,这个节点清除操作,
都有哪些流程需要处理呢,敬请关注下一篇博客。
Eureka Server集群数据同步源码解析(下)Eureka Server接收并处理数据同步请求
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。