赞
踩
【Eureka】【源码+图解】【一】Eureka的功能及HelloWorld体验
客户端的代码非常简单,简单到我们不知道从何入口,怎么办?只能从启动log下手了,下图为启动日志
日志中大量出现了DiscoveryClient
这个类,并且看到了注册的信息,所以我们就从这个类开始Eureka Client端的源码研究。那么,这个类是怎么加载的呢?一个类要使用就必须要加载和初始化,所以我们就从构造函数开始。打开这个类发现有好几个,别怕,每个构造函数都打个断点,然后重新启动客户端,进到断点后去看下它的debug栈信息,从这些栈的路径中摘取出我们看得懂的类,就画出了下面这个流程图
看debug栈信息的时候一定要找到自己看得懂或者相关性强的记录,看不懂的就放过,不然那么长的栈根本理不出头绪。
图中绿色的三个部分是客户端启动的关键部分,所以我们就探讨这三部分:
这个类的用途就是在Spring启动的时候找到实现了Lifecycle
这个接口的类,然后注入到容器中并调用其start()
方法进行初始化。
@Override public void onRefresh() { startBeans(true); this.running = true; } private void startBeans(boolean autoStartupOnly) { // 1、搜集实现了Lifecycle的类 Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans(); Map<Integer, LifecycleGroup> phases = new TreeMap<>(); lifecycleBeans.forEach((beanName, bean) -> { // 2、判断类是否自动启动 SmartLifecycle && isAutoStartup() if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) { int phase = getPhase(bean); phases.computeIfAbsent( phase, p -> new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly) ).add(beanName, bean); } }); if (!phases.isEmpty()) { // 3、调用类中的start()方法启动 phases.values().forEach(LifecycleGroup::start); } }
你说巧不巧,Eureka
中的EurekaAutoServiceRegistration
就满足
// 1、实现了SmartLifecycle,SmartLifecycle是Lifecycle的子类
public class EurekaAutoServiceRegistration
implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {
// 2、开启自动启动
@Override
public boolean isAutoStartup() {
return true;
}
}
因此,在Spring启动的过程中它的start()
方法会执行,中间的过程就如上面的流程图,我们直接跳到关键的地方,也就是接下来的创建实例
它的作用就是创建Instance,相应的配置以及默认值在下面的代码中标出,感兴趣的自己可以修改这些值,然后在这个方法中打断点调试看看是不是能生效
public class InstanceInfoFactory { public InstanceInfo create(EurekaInstanceConfig config) { LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder() // eureka.instance.lease-renewal-interval-in-seconds: 30 .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds()) // eureka.instance.lease-expiration-duration-in-seconds: 90 .setDurationInSecs(config.getLeaseExpirationDurationInSeconds()); InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(); String namespace = config.getNamespace(); ...... builder // eureka.instance.namespace: eureka .setNamespace(namespace) // eureka.instance.appname > spring.application.name > 'unknown' .setAppName(config.getAppname()) // eureka.instance.instanceId > eureka.instance.metadataMap.instanceId > hostname:appName:port .setInstanceId(config.getInstanceId()) // eureka.instance.appGroupName: .setAppGroupName(config.getAppGroupName()) .setDataCenterInfo(config.getDataCenterInfo()) // eureka.instance.ipAddress: 127.0.0.1 .setIPAddr(config.getIpAddress()) // eureka.instance.hostname: localhost .setHostName(config.getHostName(false)) // eureka.instance.nonSecurePort: 80 .setPort(config.getNonSecurePort()) // eureka.instance.nonSecurePortEnabled: true .enablePort(InstanceInfo.PortType.UNSECURE, config.isNonSecurePortEnabled()) // eureka.instance.securePort: 443 .setSecurePort(config.getSecurePort()) // eureka.instance.securePortEnabled: false .enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled()) // eureka.instance.virtualHostName: unknown .setVIPAddress(config.getVirtualHostName()) // eureka.instance.secureVirtualHostName: unknown .setSecureVIPAddress(config.getSecureVirtualHostName()) // eureka.instance.homePageUrlPath: / // eureka.instance.homePageUrl: .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl()) // eureka.instance.statusPageUrlPath: /actuator/info // eureka.instance.statusPageUrl: .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl()) // eureka.instance.healthCheckUrlPath: /actuator/health // eureka.instance.healthCheckUrl: // eureka.instance.secureHealthCheckUrl: .setHealthCheckUrls(config.getHealthCheckUrlPath(), config.getHealthCheckUrl(), config.getSecureHealthCheckUrl()) // eureka.instance.aSGName: .setASGName(config.getASGName()); // Start off with the STARTING state to avoid traffic if (!config.isInstanceEnabledOnit()) { InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING; ...... builder.setStatus(initialStatus); } else { ...... } // eureka.instance.metadataMap for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) { ...... } InstanceInfo instanceInfo = builder.build(); instanceInfo.setLeaseInfo(leaseInfoBuilder.build()); return instanceInfo; } }
如果一开始去看这个类的代码估计会两眼一黑,因为实在太多了,所以先不看具体的代码,先看看它的类图,看看它实现了什么接口,毕竟,接口定义了功能嘛
顶层接口定义了发现实例的功能,次级接口EurekaClient
的功能:获取实例、获取当前客户端的信息、注册并访问healthcheck。这些功能都需要DiscoveryClient
来实现,那么它是怎么实现的呢?从它的构造函数说起
它的构造函数代码太多,为了精简代码,先把它们拷到文本编辑器,然后开始精简代码
public class DiscoveryClient implements EurekaClient { DiscoveryClient( ApplicationInfoManager applicationInfoManager, // 实例信息 EurekaClientConfig config, // eureka.client.*的配置信息 AbstractDiscoveryClientOptionalArgs args, // RestTemplateDiscoveryClientOptionalArgs Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { ...... // 初始化调度器,线程数为2, 1个给heartbeat续约,1个给cacheRefresh定时获取注册信息 scheduler = Executors.newScheduledThreadPool(...); // 心跳线程池,核心数为1,最大线程数eureka.client.heartbeatExecutorThreadPoolSize,默认2 heartbeatExecutor = new ThreadPoolExecutor(...); // 刷新实例列表线程池,核心数为1,最大线程数eureka.client.cacheRefreshExecutorThreadPoolSize,默认2 cacheRefreshExecutor = new ThreadPoolExecutor(...); eurekaTransport = new EurekaTransport(); // 1、初始化eurekaTransport,它的作用就是把客户端的请求发往服务端,用的是restTemplate scheduleServerEndpointTask(eurekaTransport, args); ...... // eureka.client.fetchRegistry,默认true if (clientConfig.shouldFetchRegistry()) { ...... // 2、获取注册信息 boolean primaryFetchRegistryResult = fetchRegistry(false); ...... } ...... // eureka.client.registerWithEureka,默认true // eureka.client.shouldEnforceRegistrationAtInit默认false,一般客户端启动30s后再register if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { ...... if (!register() ) { ...... } ...... } // 3、初始化并开启计划任务 initScheduledTasks(); ...... } }
构造函数主要有3步值得我们去分析:初始化eurekaTransport、获取注册信息、初始化并开启计划任务
主要是初始化eurekaTransport,因为客户端与服务端的通信就靠它,所以这里着重分析下它的每一个组成
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport, // RestTemplateDiscoveryClientOptionalArgs AbstractDiscoveryClientOptionalArgs args ) { ...... TransportClientFactories argsTransportClientFactories = null; if (args != null && args.getTransportClientFactories() != null) { // RestTemplateTransportClientFactories argsTransportClientFactories = args.getTransportClientFactories(); } // 1、eurekaTransport.transportClientFactory,客户端与服务端通信的client eurekaTransport.transportClientFactory = providedJerseyClient == null // transportClientFactories = RestTemplateTransportClientFactories ? transportClientFactories.newTransportClientFactory(...) : transportClientFactories.newTransportClientFactory(...); ...... // 2、eurekaTransport.bootstrapResolver,维护server节点信息 eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(...); // 3、eurekaTransport.registrationClientFactory,eurekaTransport.registrationClient,更新client if (clientConfig.shouldRegisterWithEureka()) { ...... newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(...); newRegistrationClient = newRegistrationClientFactory.newClient(); ...... eurekaTransport.registrationClientFactory = newRegistrationClientFactory; eurekaTransport.registrationClient = newRegistrationClient; } // 4、eurekaTransport.queryClientFactory,eurekaTransport.queryClient,查询client if (clientConfig.shouldFetchRegistry()) { ...... newQueryClientFactory = EurekaHttpClients.queryClientFactory(...); newQueryClient = newQueryClientFactory.newClient(); ...... eurekaTransport.queryClientFactory = newQueryClientFactory; eurekaTransport.queryClient = newQueryClient; } }
public class RestTemplateTransportClientFactories implements TransportClientFactories<Void> {
@Override
public TransportClientFactory newTransportClientFactory(final EurekaClientConfig clientConfig,
final Collection<Void> additionalFilters, final InstanceInfo myInstanceInfo,
final Optional<SSLContext> sslContext, final Optional<HostnameVerifier> hostnameVerifier) {
return new RestTemplateTransportClientFactory(this.args.getSSLContext(), this.args.getHostnameVerifier(),
this.args.eurekaClientHttpRequestFactorySupplier);
}
}
eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(...)
看下整个过程做了什么
看下绿色部分的代码
public class AsyncResolver<T extends EurekaEndpoint> implements ClosableResolver<T> { AsyncResolver(String name, ClusterResolver<T> delegate, List<T> initialValue, int executorThreadPoolSize, // 1 int refreshIntervalMs, int warmUpTimeoutMs) { this.name = name; // ZoneAffinityClusterResolver this.delegate = delegate; // eureka.client.eurekaServiceUrlPollIntervalSeconds,默认5分钟 // refreshIntervalMs = eureka.client.eurekaServiceUrlPollIntervalSeconds * 1000 this.refreshIntervalMs = refreshIntervalMs; this.warmUpTimeoutMs = warmUpTimeoutMs; this.executorService = Executors.newScheduledThreadPool(...); this.threadPoolExecutor = new ThreadPoolExecutor(...); // 这个后台任务的作用是每隔5分钟动态更新它注册的server节点信息,接下来从两个角度分析下它:何时开启、如何更新 this.backgroundTask = new TimedSupervisorTask( this.getClass().getSimpleName(), executorService, threadPoolExecutor, refreshIntervalMs, TimeUnit.MILLISECONDS, 5, updateTask ); this.resultsRef = new AtomicReference<>(initialValue); Monitors.registerObject(name, this); } }
在下面这个方法我们找到了开启定时的代码,在这行代码打个断点,重启client,去debug栈看看log
public class AsyncResolver<T extends EurekaEndpoint> implements ClosableResolver<T> {
void scheduleTask(long delay) {
executorService.schedule(
backgroundTask, delay, TimeUnit.MILLISECONDS);
}
}
debug日志很清楚,是在fetchRegistry的时候开启,这个后面再详谈,这里只要知道哪里开启这个定时任务即可
它的最终用途就是根据eureka.client.region
、eureka.client.availabilityZones
、eureka.client.serviceUrl
更新server节点信息
public class AsyncResolver<T extends EurekaEndpoint> implements ClosableResolver<T> {
private final Runnable updateTask = new Runnable() {
@Override
public void run() {
......
// 获取server节点
List<T> newList = delegate.getClusterEndpoints();
if (newList != null) {
// 更新到本地
resultsRef.getAndSet(newList);
lastLoadTimestamp = System.currentTimeMillis();
}......
};
}
对于delegate.getClusterEndpoints()
最终会调用ConfigClusterResolver.getClusterEndpoints()
不理解的可以回头看看一开始的流程图
public class ConfigClusterResolver implements ClusterResolver<AwsEndpoint> { @Override public List<AwsEndpoint> getClusterEndpoints() { // eureka.client.useDnsForFetchingServiceUrls,默认false if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { return getClusterEndpointsFromDns(); } else { return getClusterEndpointsFromConfig(); } } private List<AwsEndpoint> getClusterEndpointsFromConfig() { // 根据eureka.client.region从eureka.client.availabilityZones获取availZones String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); // availZones[0] String myZone = InstanceInfo.getZone(availZones, myInstanceInfo); // 根据availZones获取eureka.client.serviceUrl,这里虽然传入的是zone,但貌似最终还是获取availZones的所有serviceUrl Map<String, List<String>> serviceUrls = EndpointUtils .getServiceUrlsMapFromConfig(clientConfig, myZone, clientConfig.shouldPreferSameZoneEureka()); List<AwsEndpoint> endpoints = new ArrayList<>(); for (String zone : serviceUrls.keySet()) { for (String url : serviceUrls.get(zone)) { // 构建集群节点 endpoints.add(new AwsEndpoint(url, getRegion(), zone)); } } return endpoints; } }
两者的创建过程是类似的,只不过两者的用途以及name不一样而已。这里以eurekaTransport.queryClient
为例,先看下它的类型,就是下图蓝色的SessionedEurekaHttpClient
,它具有绿色的EurekaHttpClient
所有功能,但是真正发出http请求的是RestTemplateEurekaHttpClient
,它是怎么做到的呢?往下看
http请求的过程
上面初始化newQueryClient = newQueryClientFactory.newClient();
这行代码就是上面流程图的绿色过程,当有调用eurekaTransport.queryClient.xxx()
方法时走的就是蓝色过程,所以,后面分析客户端功能的时候最终发的请求都会跑到RestTemplateEurekaHttpClient
,我们以getApplications
为例分析一下源码的过程(为什么用这个API后文解释)
在RestTemplateEurekaHttpClient.getApplications(...)
方法打个断点,把当前IDE启动的Server和Client都关闭,注意,一定要都关闭,因为不管是Server还是Client端都会不断发请求,并且两者的过程是不一样的,debug的时候怕会混淆。重新启动一个Client,看下debug栈信息
看到了吗?顺序就和我们上面的流程图一样,接下来按着栈的步骤逐步分析源码
1、SessionedEurekaHttpClient
// 1、eurekaTransport.queryClient.getApplications(...) @Override public EurekaHttpResponse<Applications> getApplications(final String... regions) { return execute(new RequestExecutor<Applications>() { @Override public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) { // 3、这里的delegate即RetryableEurekaHttpClient,接下来就到RetryableEurekaHttpClient.getApplications(...)了 return delegate.getApplications(regions); } ...... }); } // 2、调用excute @Override protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) { ...... // 2.1、这里的clientFactory调用newClient()创建出来的是RetryableEurekaHttpClient,不理解的可以回头看看或者自己debug一下 eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient()); // 2.2、调用requestExecutor.execute(...) return requestExecutor.execute(eurekaHttpClient); }
2、RetryableEurekaHttpClient
// 4、RetryableEurekaHttpClient.getApplications(...) @Override public EurekaHttpResponse<Applications> getApplications(final String... regions) { return execute(new RequestExecutor<Applications>() { @Override public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) { // 6、这里的delegate即RedirectingEurekaHttpClient,接下来就到RedirectingEurekaHttpClient.getApplications(...)了 return delegate.getApplications(regions); } ...... }); } // 5、调用excute(...) @Override protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) { ...... // 默认失败重试3次 for (int retry = 0; retry < numberOfRetries; retry++) { ...... // 这里的candidateHosts就是上一节updateTask更新的endpoints candidateHosts = getHostCandidates(); // 5.1、这里的clientFactory调用newClient()创建出来的是RedirectingEurekaHttpClient,不理解的可以回头看看或者自己debug一下 currentHttpClient = clientFactory.newClient(currentEndpoint); ...... // 5.2、调用requestExecutor.execute(...) EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient); ...... } ...... }
3、RedirectingEurekaHttpClient
// 7、RedirectingEurekaHttpClient.getApplications(...) @Override public EurekaHttpResponse<Applications> getApplications(final String... regions) { return execute(new RequestExecutor<Applications>() { @Override public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) { // 10、这里的delegate即RestTemplateEurekaHttpClient,接下来就到RestTemplateEurekaHttpClient.getApplications(...)了 return delegate.getApplications(regions); } ...... }); } // 8、调用excute(...) @Override protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) { EurekaHttpClient currentEurekaClient = delegateRef.get(); ...... // 8.1、这里的clientFactory调用newClient()创建出来的是RestTemplateEurekaHttpClient,不理解的可以回头看看或者自己debug一下 AtomicReference<EurekaHttpClient> currentEurekaClientRef = new AtomicReference<>(factory.newClient(serviceEndpoint)); ...... // 9、调用内部方法 EurekaHttpResponse<R> response = executeOnNewServer(requestExecutor, currentEurekaClientRef); ...... } private <R> EurekaHttpResponse<R> executeOnNewServer(RequestExecutor<R> requestExecutor, AtomicReference<EurekaHttpClient> currentHttpClientRef) { ...... // 失败重定向最多10次 for (int followRedirectCount = 0; followRedirectCount < MAX_FOLLOWED_REDIRECTS; followRedirectCount++) { // 9.1、调用requestExecutor.execute(...) EurekaHttpResponse<R> httpResponse = requestExecutor.execute(currentHttpClientRef.get()); ...... } }
4、RestTemplateEurekaHttpClient
// 11、RestTemplateEurekaHttpClient.getApplications(...)
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
......
// 11.1、调用restTemplate向服务端发送http请求
ResponseEntity<EurekaApplications> response = restTemplate.exchange(url, HttpMethod.GET, null,
EurekaApplications.class);
......;
}
整个过程就到这,从中我们看到了装饰器、工厂、代理等模式的使用,四个client各司其职,其作用如同它们的名字:Session、Retry、Redirect、Resttemplate。
private boolean fetchRegistry(boolean forceFullRegistryFetch) { ...... Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || 首次){ // 默认情况下首次走这里,继续分许getAndStoreFullRegistry() getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } ...... return true; } private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications apps = null; EurekaHttpResponse<Applications> httpResponse = // eureka.client.registryRefreshSingleVipAddress,默认null clientConfig.getRegistryRefreshSingleVipAddress() == null // 上一步选getApplications为例的原因就在这,首次启动的时候会调用一次,不受其他request请求影响,方便debug ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); ...... // 从服务端拿到信息后更新到本地,以便之后的服务调用 localRegionApps.set(this.filterAndShuffle(apps)); ...... }
启动了三个线程:刷新注册信息、注册、续约,开启一个状态监听器。客户端与服务端的功能主要也在这了,后面会和服务端联动分析,这里只要知道当客户端完全启动后就由这几个线程以及监听器维护就行。
private void initScheduledTasks() { // 是否获取注册信息, if (clientConfig.shouldFetchRegistry()) { ...... // 获取并更新注册信息的task cacheRefreshTask = new TimedSupervisorTask(...); // 开启定时任务 scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } // 是否注册,eureka.client.registerWithEureka,默认true if (clientConfig.shouldRegisterWithEureka()) { ...... // 续约task heartbeatTask = new TimedSupervisorTask(......); // 开启定时续约task scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // 注册线程 instanceInfoReplicator = new InstanceInfoReplicator(...); // 客户端状态监听 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {...}; // 是否后台监控并更新客户端状态 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 启动注册线程 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { ...... } }
客户端的启动过程就到这,接下来就是服务端。
未完待续,欢迎关注公众号三横兰
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。