当前位置:   article > 正文

微服务系列(二)(1) Eureka源码分析_eureka acceptorexecutors源码分析

eureka acceptorexecutors源码分析

微服务系列(二)(1) Eureka源码分析

关于eureka的使用,就不做介绍了,不熟悉的可以参考官方文档

引入依赖,修改好配置文件,在主类上加上注解@EnableEurekaServer,启动服务,一个简单的eureka搭建好了。

先看看@EnableEurekaServer

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

继续EurekaServerMarkerConfiguration.class

@Configuration
public class EurekaServerMarkerConfiguration {

   @Bean
   public Marker eurekaServerMarkerBean() {
      return new Marker();
   }

   class Marker {
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

这里的Marker内部类并没有具体的实现逻辑,因为它仅仅是一个“标记”,用于判断“真正干活的类”是否要加载。

以后在Spring Cloud的源码探索过程会常常看到,可以加入到自己的“技能库”。

别走丢,真正干活的类在这里。

org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

关注其上的几个眼熟的类:

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • EurekaServerInitializerConfiguration.class:用于eureka初始化,别急,等会进入看看详请,看看它到底初始化了什么,如何初始化,以哪种顺序
  • EurekaServerMarkerConfiguration.Marker.class:熟悉吗,刚刚说到的"标记"
  • EurekaDashboardProperties.class:控制台配置信息,常见的*Properties类,不做介绍
  • InstanceRegistryProperties.class:注册中心相关配置信息,可以无需关心,和配置文件中的配置内容不同
  • @PropertySource("classpath:/eureka/server.properties"):原来eureka提供了properties的配置方式

这里顺带提一下EurekaServerConfigBean这个类才是跟我们经常打交道的配置bean,不要和InstanceRegistryProperties混淆。

找到了eureka入口,那么用问题来驱动看源码的方向。

问题1:如何解决单点故障问题

关注RefreshablePeerEurekaNodes这个类

它有以下方法:

shouldUpdate:判断是否刷新PeerEurekaNodes

onApplicationEvent:Spring事件监听器,当触发EnvironmentChangeEvent时,判断shouldUpdate,并选择是否update(com.netflix.eureka.cluster.PeerEurekaNodes#updatePeerEurekaNodes

更新PeerEurekaNodes的逻辑

protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
    if (newPeerUrls.isEmpty()) {
        logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
        return;
    }

    Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
    toShutdown.removeAll(newPeerUrls);
    Set<String> toAdd = new HashSet<>(newPeerUrls);
    toAdd.removeAll(peerEurekaNodeUrls);

    if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
        return;
    }

    // Remove peers no long available
    List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);

    if (!toShutdown.isEmpty()) {
        logger.info("Removing no longer available peer nodes {}", toShutdown);
        int i = 0;
        while (i < newNodeList.size()) {
            PeerEurekaNode eurekaNode = newNodeList.get(i);
            if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                newNodeList.remove(i);
                eurekaNode.shutDown();
            } else {
                i++;
            }
        }
    }

    // Add new peers
    if (!toAdd.isEmpty()) {
        logger.info("Adding new peer nodes {}", toAdd);
        for (String peerUrl : toAdd) {
            newNodeList.add(createPeerEurekaNode(peerUrl));
        }
    }

    this.peerEurekaNodes = newNodeList;
    this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

逻辑很简单,更新peerEurekaNodeUrlspeerEurekaNodes,先创建副本,在副本中移除旧节点再新增节点,最后用副本替换原集合,很常见的线程安全的编程方式,优点在于逻辑简单、线程安全、效率高,缺点在于浪费内存。

peerEurekaNodeUrls:同伴节点(暂时这样翻译吧,挺好理解的)的url地址

peerEurekaNodes:同伴节点的详细信息,包括serviceUrl、config等信息,其中还有PeerAwareInstanceRegistryHttpReplicationClientTaskDispatcher这三个重要的工作类,暂且不看

先看看com.netflix.eureka.cluster.PeerEurekaNodes#start中有一段很重要的代码

Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
taskExecutor.scheduleWithFixedDelay(
        peersUpdateTask,
        serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
        serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
        TimeUnit.MILLISECONDS
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

可以发现,在PeerEurekaNodes调用start()方法后,会创建一个守护进程来定期更新同伴节点的信息,至于频率怎样,当然是提供了配置入口,在EurekaServerConfigBean可以找到默认是10分钟。

继续进入peerEurekaNode

去看看PeerAwareInstanceRegistryHttpReplicationClientTaskDispatcher做了什么重要的工作

这里的PeerAwareInstanceRegistry是通过@inject注解注入,想了解@inject可以参考JSR330规范,注入过程就不说了,并不是本文重点。

PeerAwareInstanceRegistry作为一个接口,需要找到其实现类,在EurekaServerAutoConfiguration中:

@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
      ServerCodecs serverCodecs) {
   this.eurekaClient.getApplications(); // force initialization
   return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
         serverCodecs, this.eurekaClient,
         this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
         this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

PeerAwareInstanceRegistry提供了以下方法:

init:做一些初始化的工作,用于从远程区域获取注册表信息及增量更新的RemoteRegionRegistry,以及用Monitors为该类提供JMS远程监控服务等

syncUp:从同伴节点拉取注册表信息,这样如果出现单节点故障问题时,会访问到其他存活的同伴节点上,以此来保证eureka的高可用性

shouldAllowAccess:检查此时是否允许注册表的访问

register:主要做了两件事,1. 注册节点信息 2.如果不是副本节点,则同步到同伴节点上

statusUpdate:更新实例的状态,并同步到同伴节点上

初始化过程中有一个这样的方法需要关注

private void scheduleRenewalThresholdUpdateTask() {
    timer.schedule(new TimerTask() {
                       @Override
                       public void run() {
                           updateRenewalThreshold();
                       }
                   }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
            serverConfig.getRenewalThresholdUpdateIntervalMs());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

可以看到它是一个定时任务线程,它是为了防止在网络分区故障下,失效大量实例而导致服务大面积瘫痪,它通过调整更新阈值来控制短时间内大量实例的状态变更。

现在重新进入PeerEurekaNode,关注这样一个方法com.netflix.eureka.cluster.PeerEurekaNode#syncInstancesIfTimestampDiffers

private void syncInstancesIfTimestampDiffers(String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) {
    try {
        if (infoFromPeer != null) {
            logger.warn("Peer wants us to take the instance information from it, since the timestamp differs,"
                    + "Id : {} My Timestamp : {}, Peer's timestamp: {}", id, info.getLastDirtyTimestamp(), infoFromPeer.getLastDirtyTimestamp());

            if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) {
                logger.warn("Overridden Status info -id {}, mine {}, peer's {}", id, info.getOverriddenStatus(), infoFromPeer.getOverriddenStatus());
                registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus());
            }
            registry.register(infoFromPeer, true);
        }
    } catch (Throwable e) {
        logger.warn("Exception when trying to set information from peer :", e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

当发现同伴eureka实例的时间戳与本地不同时,从同伴节点同步信息并设置最新的重写状态(Overridden Status: 通常由外部进程设置以禁用实例获取流量)。

可以发现,PeerAwareInstanceRegistry主要工作内容是与同伴节点进行信息的交互,包括从同伴节点同步信息、推送新节点信息到同伴节点等。

下面来了解一下HttpReplicationClient,先看看它的API,大致做哪些事

register/cancel/statusUpdate/sendHeartBeat…等

感觉和PeerAwareInstanceRegistry比较相似…

打开它的实现类,只有一个com.netflix.eureka.transport.JerseyReplicationClient

原来它是为了支持Jersey框架的通讯类,至于什么是Jersey

来看看PeerEurekaNode中的最后一个工作的类TaskDispatcher

先看看PeerEurekaNode是如何使用它的

public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
public void cancel(final String appName, final String id) throws Exception {
    long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
    batchingDispatcher.process(
            taskId("cancel", appName, id),
            new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
                @Override
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.cancel(appName, id);
                }

                @Override
                public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                    super.handleFailure(statusCode, responseEntity);
                    if (statusCode == 404) {
                        logger.warn("{}: missing entry.", getTaskName());
                    }
                }
            },
            expiryTime
    );
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

看起来像是一个分发器,并且在PeerEurekaNode内部有batchDispachernobatchDispacher两种,初步推测是一种通用api,将register/cancel/heartbeat等操作抽象,通过调用相同的API来执行操作。

慢慢看

this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
        batcherName,
        config.getMaxElementsInPeerReplicationPool(),
        batchSize,
        config.getMaxThreadsForPeerReplication(),
        maxBatchingDelayMs,
        serverUnavailableSleepTimeMs,
        retrySleepTimeMs,
        taskProcessor
);
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
        targetHost,
        config.getMaxElementsInStatusReplicationPool(),
        config.getMaxThreadsForStatusReplication(),
        maxBatchingDelayMs,
        serverUnavailableSleepTimeMs,
        retrySleepTimeMs,
        taskProcessor
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
                                                                         int maxBufferSize,
                                                                         int workloadSize,
                                                                         int workerCount,
                                                                         long maxBatchingDelay,
                                                                         long congestionRetryDelayMs,
                                                                         long networkFailureRetryMs,
                                                                         TaskProcessor<T> taskProcessor) {
    final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
            id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
    );
    final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
    return new TaskDispatcher<ID, T>() {
        @Override
        public void process(ID id, T task, long expiryTime) {
            acceptorExecutor.process(id, task, expiryTime);
        }

        @Override
        public void shutdown() {
            acceptorExecutor.shutdown();
            taskExecutor.shutdown();
        }
    };
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

看来它实际上就是一个调用框架,作为分发器,实际上最终逻辑就是调用:

acceptorExecutor.process(id, task, expiryTime);
  • 1

进入com.netflix.eureka.util.batcher.AcceptorExecutor

这里代码比较多,就不贴了,感兴趣的可以打开idea去了解下

它是一个任务分配器,由一个工作线程分配,可以有批量分配和单独分配两种方式,并且它能在旧版本未分派出去的情况下有新版本的任务进入时替换旧版本的任务,且当任务执行失败时会将任务放回。

这样的设计方式是值得借鉴的,把不同的任务抽象成task,交给单独的线程处理,在调用时无需关心其实现且使用统一的API,达到了解耦的效果。

到这里,可以回答这几个问题了:

  1. 如何解决单点故障问题
  2. 如何保证信息不丢失
  3. 通讯方式是怎样的,效率如何

问题1:

eureka使用了同伴节点的方式,当一个eureka初始化时,会主动拉取已在线的eureka的信息,包括eureka节点信息、配置信息、注册表信息等,并定期更新是否有新的eureka节点加入,并进行增量的信息同步。当一个节点发生故障时,会将其移除本地维护的在线列表,并继续与其他节点交互,这样在eureka集群下,即使eureka节点大量瘫痪,只要有一个eureka存活,就可以提供服务,但也会存在数据丢失、不同步的问题。

问题2:

eureka保证信息不丢失的方式是”弱一致性“的,eureka通过在同伴节点上冗余自己的节点信息来”尽量保证“数据不丢失,在同步过程中依然可以对外提供服务,因为理论上是会存在节点数据未同步成功就出现服务器宕机的情况,可以看到,eureka在CAP的抉择中选择了AP,对数据一致性的要求降低了许多。

问题3:

eureka节点间是采用http协议通信,从JerseyReplicationClient类可以看到,它还支持了rest风格Jersey框架,http协议的特点在于无状态性,这也是eureka无法保证数据强一致性的原因之一。

关于问题3.信息是如何存储的,关注这个类AbstractInstanceRegistry,不要觉得陌生,它其实是org.springframework.cloud.netflix.eureka.server.InstanceRegistry的父抽象类,而InstanceRegistry则是com.netflix.eureka.registry.PeerAwareInstanceRegistry的注入实现类

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
protected Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry = new HashMap<String, RemoteRegionRegistry>();
protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder
        .newBuilder().initialCapacity(500)
        .expireAfterAccess(1, TimeUnit.HOURS)
        .<String, InstanceStatus>build().asMap();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

通过前文的深入,这几个类应该比较熟悉了,InstanceInfo实例信息/RemoteRegionRegistry远程区域注册中心/InstanceStatus实例状态

可以知道的是,eureka完全采用内存存储信息,且都是JVM内存,所以如果用eureka支持大量的服务时,一定要调整好JVM堆参数,防止内存溢出。

对于问题5.功能是否丰富,从编码上可以看到一些灵活的配置项,本质上其功能仅有服务发现、服务注册、限流,如:

renewalThresholdUpdateIntervalMs:刷新阈值更新间隔时间

peerEurekaNodesUpdateIntervalMs:同伴节点更新间隔时间

peerEurekaStatusRefreshTimeIntervalMs:同伴节点状态刷新超时间隔

waitTimeInMsWhenSyncEmpty:同步到空信息时的等待时间

…由于eureka的运行机制,大量运用”轮询",有很多时间上的配置

另外:

rateLimiterEnabled用于流控

org.springframework.cloud.netflix.eureka.server.EurekaDashboardProperties配置控制台

以上均为eureka-server端配置信息org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean,client配置信息请参考org.springframework.cloud.netflix.eureka.EurekaClientConfigBean

到这里,这些问题都有了答案,但看源码的路并没有到尽头,最后去了解一下eureka的初始化过程吧。

不要忘了这个类org.springframework.cloud.netflix.eureka.server.EurekaServerInitializerConfiguration

它实现了这三个接口ServletContextAware, SmartLifecycle, Ordered,这意味着它用到了ServletContext并把自己这个bean交给了Spring来管理它的生命周期,由Spring来初始化和销毁它。

public void start() {
   new Thread(new Runnable() {
      @Override
      public void run() {
         try {
            //TODO: is this class even needed now?
            eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
            log.info("Started Eureka Server");

            publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
            EurekaServerInitializerConfiguration.this.running = true;
            publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
         }
         catch (Exception ex) {
            // Help!
            log.error("Could not initialize Eureka servlet context", ex);
         }
      }
   }).start();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

代码很简单,只做了三件事,一、使用eurekaServerBootstrap引导上下文的初始化,二、发布eureka spring事件,三、修改初始化Bean的状态

public void contextInitialized(ServletContext context) {
   try {
      initEurekaEnvironment();
      initEurekaServerContext();

      context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
   }
   catch (Throwable e) {
      log.error("Cannot bootstrap eureka server :", e);
      throw new RuntimeException("Cannot bootstrap eureka server :", e);
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

eureka上下文初始化过程:1.环境初始化 2.eureka服务上下文初始化 3.把eureka服务上下文交给servletContext

环境初始化:设置dataCenter和environment值

服务上下文初始化:JSON解析器、XML解析器初始化、构建EurekaServerContextHolder、从同伴节点同步信息、注册JMX等远程服务

那么这次的源码追踪就结束了,关于一些细节上的实现,如线程模型、调度规则、流控规则、AWS支持等,如果不是工作内容是做类似中间件的话,很难遇到相关问题,那就等真正遇到问题的时候再去驱动源码追踪吧。

无论如何,这次源码追踪也是有不少的收获,也相信读者也清楚的认识到了eureka的工作原理及存储方式。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/818226
推荐阅读
相关标签
  

闽ICP备14008679号