赞
踩
阅读源码的方式,很重要:大体架构,静态代码与断点运行相结合
1、尽量下载source download,进去了.class文件,下载下来才是.java文件,.java文件比.class文件好看些
2、静态代码:ctrl+alt+B 查看接口实现 左边可以看Structure 鼠标放在一个类上,ctrl+h 右边看继承体系 光标放在一个类上,右键Diagrams,可以看类图
3、断点运行:静态代码局限性,方法转到定义一遇到接口就gg,不知道走的是哪个具体实现,就断点在该方法调用,然后断点进入就好了
看源码的技巧:
1、定位if(xxx instanceof Xxx),我们只要实现Xxx接口或类就好,源码自动调用实现方法。
2、定位ConcurrentHashMap,这是源码对于线程安全的处理。
3、定位()-> 这是new Runnable() 多线程处理
定位Event -> 这是事件注册,仅仅注册,并未调用
4、@Override接口都是自动调用(实现接口和继承类)
本文讲解Springcloud Eureka源码,从Eureka运行的流程来看整个源码,Eureka具体执行流程
对于上图的解释:
Register 服务注册,从eureka client发起请求到eureka server,重要,包括两个部分,eureka-server如何接收请求+ eureka-client如何注册。
Get Register 拉取服务列表,从eureka client发起请求到eureka server,重要,包括两个部分,eureka-server如何存储地址列表 + eureka-client如何拉取地址列表。
Renew 服务续约,这是无意义的心跳,只要收到ok回复就好,DiscoveryClient类中的renew()方法和initScheduledTasks()心跳计时器和TimedSupervisorTask类中run()方法中的2倍重试;
Cancel 服务取消,从eureka client发起请求到eureka server,不重要,本文未涉及源码;
Make Remote Call,从eureka client到eureka client,远程调用,不重要,本文未涉及源码;
分为六个部分:
1、Eureka Server如何接收请求
2、Eureka Client如何注册
3、Eureka Server如何存储服务地址
4、Eureka Client如何拉取服务端地址列表
5、心跳续约
6、Eureka服务延迟
金手指:服务端接收客户端请求,必须进行监听和通信,通信方式包括两种: http通信 socket通信 (nio netty)
注意到两个比较重要的类 ApplicationsResource类和ApplicationResource类
金手指:ApplicationsResource类和ApplicationResource类 简单理解为接收服务端请求的两个controller
先看ApplicationsResource类,这个类保存eureka-server 各个服务的各个实例的信息
eureka-server是没有持久化的,就是存在内存中,eureka-server的服务停止,注册的实例信息就丢失了,要重新注册。
专门的分布式配置中心Spring cloud config是可以持久化配置信息的,放在database git disk三个地方都可以持久化。
大致阅读以下 applicationsResource中的实例信息
金手指:ApplicatinsResource类下面的两个比较重要的方法
全量查询eureka-server的地址列表和增量查询eureka-server地址列表,在本文第五部分,Eureka-Client如何拉取服务端地址列表用到
再看ApplicationResource类
我们的程序,让我们从springboot 的 run() 方法开始吧
我们的目标是要搞懂Eureka Client 如何注册的,这里用user-service为例。
注意两个比较重要的接口 Registration 和 ServiceRegistry
Springcloud 服务注册的接口定义和实现
这里我们看到,Registration 和 ServiceRegistry 两个接口都是springcloud自带的,都是在package org.springframework.cloud.client.serviceregistry;里面,但是springcloud并没有提供这两个接口的实现,
Eureka作为Netflix提供的组件,当它需要注册的时候,实现Springcloud的Registration接口,
public class EurekaRegistration implements Registration
所以,EurekaRegistration 在 package org.springframework.cloud.netflix.eureka.serviceregistry; 包里面。
金手指:springcloud提供一个Registration接口,用来给需要注册的组件使用的,Eureka需要注册,所以通过实现Registration接口完成一个EurekaRegistration 类,其他组件也是也是这样,这就是springcloud的一种插拔式的设计,仅提供接口,实现交给具体组件。
EurekaServiceRegistry 在 package org.springframework.cloud.netflix.eureka.serviceregistry; 包路径下,是eureka的类,实现了ServiceRegistry接口,该接口在 package org.springframework.cloud.client.serviceregistry; 包路径下,是springcloud的接口。
则,springcloud提供一个接口,给第三方组件使用,这里是eureka。
EurekaRegistration 在 package org.springframework.cloud.netflix.eureka.serviceregistry;包路径下,是eureka的类,实现了Registration接口,该接口在 package org.springframework.cloud.client.serviceregistry; 包路径下,是springlcoud的接口。
则,springcloud提供一个接口,给第三方组件使用,这里是eureka。
这就是springcloud的插拔式的设计。
springboot应用启动后(我们的user-service也是一个springboot应用),对于所有实现了SmartLifeCycle接口的类触发调用
演示一下是如何触发调用的,spring容器加载完所有bean并加载完成后,触发这个调用
当spring容器加载完成所有bean并加载完成后,将实现了SmartLifeCycle接口的类注册到容器里面,注意这个类上要加上@Component注解,才能注册到Spring IOC容器中。
金手指:
我们下面会看到eureka 实现SmartLifeCycle接口的类没有家伙是那个@Component注解,它是在自动装配类(使用@Configuration注解修饰)里面使用@Bean注解注入进去的。
实现三个方法 start() stop() isRunning 现在只有start()有用,用来验证spring容器完成所有bean初始化之后,调用SmartLifeCycle实现类
spring容器触发了 good
我们进入run()方法里面的逻辑, springboot的初始化工作比spring更加复杂
点击进入到run方法里面去
刷新上下文方法不断往下走,走到无路可走,走到接口的实现,这个接口是ConfigurableApplicationContext.java
ctrl + alt +B 查看接口的实现
一般进入抽象类,我们这里进入 AbstractApplicationContext.java类
在AbstractApplicationContext类中,找到finishRefresh()方法
本节概要:DefaultLifeCycleProcessor类,获取所有的smartLifeCycle并启动
找到DefaultLifeCycleProcessor.java
进入startBeans()
先启动程序,然后 debug 启动user-service
点击进入start()方法
点击进入doStart()方法
引入的类:EurekaAutoServiceRegistration EurekaClientAutoConfiguration
class EurekaAutoServiceRegistration implements SmartLifecycle
找到EurekaAutoServiceRegistration类,这个类实现了SmartLifeCycle接口
所以,实现了start() stop() isRunning() 三个方法
好了,smartlifecycle完成
那么这个EurekaAutoServiceRegistration类的又是什么时候被初始化的
刚才我们的TestSmartLifeCycle implements SmartLifecycle,有一个@Component注解,没有注解无法被装配到Spring IOC容器中,那么这里是这样装配的呢?答案是在EurekaClientAutoConfiguration类(该类被@Configuration注解修饰,是一个配置类)中使用@Bean注解装配到Spring IOC容器
金手指:
金手指
我们所有的阐述的中心都是实现了SmartLifeCycle接口的EurekaServcieRegistry这个类。
现在,完成了EurekaServcieRegistry这个类(这个实现springcloud提供Registry接口的这个类)是怎么的来?是在EurekaAutoServiceRegistration类的初始化中被赋值的
EurekaServcieRegistry这个类是怎样触发的?在EurekaClientAutoConfiguration类(该类被@Configuration注解修饰,是一个配置类)中使用@Bean注解装配到Spring IOC容器,从而在启动的时候触发。
找到EurekaAutoServiceRegistration这个实现了SmartLifeCycle接口的类,找到它的start()方法,这个方法会在spring完成所有bean初始化加载后调用(我们已经用TestSmartLifeCycle测试过了)。
这里方法里,完成
this.serviceRegistry.register(this.registration);
其中,
private EurekaServiceRegistry serviceRegistry;
private EurekaRegistration registration;
ServiceRegistry里面有register() deregister() close() setStatus() getStatus() 五个方法,
Registration没有任何方法,它继承自ServiceInstance,该类中都是一些getXxx()方法,getInstanceId() getServiceId() getHost() getPort() isSecure() getUri() getMetadata() getScheme(),所以,Eureka的注册过程是以EurekaServiceRegistry类为主体,EurekaRegistration只是作为参数罢了。
进入register方法
涉及的类:EurekaServiceRegistry类 和 ApplicationInfoManager类
public void register(EurekaRegistration reg) { maybeInitializeClient(reg); if (log.isInfoEnabled()) { log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } // 设置指定的应用实例状态(状态更新通知) reg.getApplicationInfoManager() .setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); // 健康检测 reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg .getEurekaClient().registerHealthCheck(healthCheckHandler)); }
到目前为止,还是没有找到eureka-client(即user-service)的注册逻辑
仅仅发布一个事件,没有达到服务注册
我们的流程被中断了,没有找到注册的逻辑,先放在这里
SpringApplication.run() // 启动
AbstractApplicationContext.finishRefresh() // 刷新 DefaultLifecycleProcessor
DefaultLifecycleProcessor.doStart() // 获取所有的smartLifeCycle并启动
EurekaAutoServiceRegistration.start() SmartLifeCycle EurekaClientAutoConfiguration // 注册
EurekaServiceRegistry.register() // 发布通知
找到一个DiscoveryClient类,服务发现就是通过这个类。
找到另外一个类 EurekaClientAutoConfiguration,就是装配EurekaAutoServiceRegisteration 的配置类。
金手指:
eureka里面,client都和通信有关。
找到其中一个内部类EurekaClientConfig,这个类是自动装配的子类,也是一个装配类
找到EurekaClient
提醒,在EurekaServiceRegistry类的register()方法中,有getApplicationInfoManager()和getEurekaClient()
ApplicationInfoManager和EurekaClient 都是在这里装配的
进入CloudEurekaClient类
不注册不拉取 直接return
心跳和缓存刷新 计时器 (重点:更新示例状态,服务注册的逻辑是怎样做的)
心跳计时器和缓存计时器,换行看(很重要)
强制全量拉取信息 focreFullRigistryFetch 默认情况下不为false
看到initScheduledTasks()方法
整体分析
注册到eureka-server那一段逻辑有一个心跳,两行代码
匿名内部类实现notify()
金手指:就是之前ApplicationInfoManager中的notify()
这里是一个复制 DiscoveryClient类中的instanceInfoReplicator
点击进入start()方法,进入 InstanceInfoReplicator类。
本节概要:注册逻辑 + start() + run()
进入InstanceInfoReplicator类
InstanceInfoReplicator类的start()方法
InstanceInfoReplicator类的run()方法
终于找到注册了
没有必要再看了 到了jersey 远程通行
不是socket通信,是http通信,jesery 就是一个httpclient 之前restTemplate也是这样,只要是http通信就好。
EurekaClientAutoConfiguration
EurekaClientConfiguration
装配bean -> CloudEurekaClient
DiscoveryClient(CloudEurekaClient 调用父类构造方法进入 DiscoveryClient)
heartbeatExecutor 心跳定时任务
cacheRefreshExecutor 定时去拉取服务端地址列表
register() 发起服务注册
initScheduledTasks() 建立心跳检测机制
initScheduledTasks() 通过内部类实例化StatusChangeListener 的实例状态监测接口
initScheduledTasks() 定时上报服务状态
金手指:eureka各个节点使用http通信
刚才那里发送一个事件是干嘛?用来更新 eurekaServer上的服务状态和自己的状态,up down
知道了注册入口,现在看eureka-server地址在服务端如何保存的。
controller收到一个请求后,怎么走?
回到ApplicationsResource类,找到其中的addInstance()方法
addInstance()方法中,找到
this.registry.register(info, "true".equals(isReplication));
ctrl + alt +B
进入父类AbstractInstanceRegistry类register(),这里是eureka-server存放地址的关键
点进去就可以看到registry是一个双层map设计
那么服务端registry中的实例信息又是怎么来的,是之前注册来的
服务端registry信息存放是在一个 ConcurrentHashMap<String, Map<String, Lease>>
金手指: eureka-server 实例信息没有持久化,就是保存在内存里面
嵌入:Lease中存放Instance实例信息,一起来看一看Lease内部结构
回到 AbstractInstanceRegistry 类 register() 方法,继续往下看,自我保护机制在这里
双层map结构
ConcurrentHashMap<String, Map<String, Lease>>
第一层map key (String)是serviceId 服务名,如user-service;
第一层map value (Map<String, Lease>) 是 服务具体信息;
第二层map key (String) 是instanceId 实例名,如DESKTOP-BFM6AVN:user-service:8081;
第二层map key ( Lease) 是实例信息。
因为一个服务对应多个实例,所以String, Map<String, Lease> ,使一个String类型的serviceId对应一个Map类型Instances;
因为一个实例有很多信息,所有封装到一个Lease类中,是一个String类型的instanceId对应一个Lease对象,前者为key,后者为value,放在一个map里面。
为什么第一层是 ConcurrentHashMap,第二层是Map?
第一层ConcurrentHashMap高性能的线程安全(serviceId,服务具体信息);
第一层Map是(instanceId,实例具体信息);
说明第一层要保证多线程情况下的线程安全问题,但是第二层就不需要了,这很正常,就是这样。
金手指:看源码的时候遵循源码优美原则,源码一个类中要找到对于线程安全的处理,就找到一个ConcurrentHashMap类型的集合框架,源码中既然使用ConcurrentHashMap,就说明这就是源码对于多线程的处理,否则它为什么不使用HashMap呢,源码设计师非常优美的,既然使用了ConcurrentHashMap,就说明这里出现多线程问题。
所以,Eureka Server是 如何存放服务地址的,使用一个双层map存放 ConcurrentHashMap<String, Map<String, Lease>>
本节概要:eureka-server服务端的存放地址的三级缓存的设置
继续走AbstractInstanceRegistry的register()方法
eureka-server中的缓存和mybatis中的缓存不一样,这里的是三级缓存就是让缓存失效
这个方法点进去 good 看源码 重要三级缓存
三级缓存设计如下,分别是:
registry(双层map结构设计 ConcurrentHashMap<String, Map<String, Lease>> )
readWriteCacheMap 读写缓存,也称为写缓存 LoadingCache<Key, Value> (interface LoadingCache<K, V> extends Cache<K, V>, Function<K, V>)
readOnlyCacheMap 只读缓存,也称为读缓存 ConcurrentMap<Key, Value>
其中,服务注册到registry这个双层map(serviceId instanceId 实例信息),然后每隔60s,registry中的信息同步到readWriteCacheMap,每隔30s,readWriteCacheMap 中的信息同步到 readOnlyCacheMap
不还不够,任何eureka-client可以注册到eureka-server(写请求),同时,任何eureka-client可以拉取eureka-server地址列表(读请求)
对于写请求,直接写入到registry的双层map结构,然后60s同步到readWriteCacheMap,每隔30s,readWriteCacheMap 中的信息同步到 readOnlyCacheMap
对于读请求,直接拉取readOnlyCacheMap的服务地址列表
因为三级缓存信息不是实时一致的,所以,eureka集群是一个AP模拟,高可用模型,不要求强一致性,只要保证最终一致性就好了。
金手指:这两个同步都是使用计时器来完成。
三级缓存的意义:
N个请求,高并发,使用读写分离,减少使用线程安全的带来性能开销。
点击进入invalidateCache()方法
金手指:ResponseCache
回到
ReponseCacheImpl 是 ResponeCache 的唯一继承类,所以刚才ctrl + alt + B 直接就找到实现了。
三级缓存核心源码来了
写缓存意义在于缓存同步
回调事件
存储地址完成,查询地址开始
Eureka Server如何存放地址知道了,现在Eureka Client 如何拉取服务端地址列表
1、DiscoveryClient构造方法中会触发(拉取服务端地址列表)
2、30s执行一次的定时任务(拉取服务端地址列表)
schedule方法三个参数,分别是
Runnable command, 命令,调用计时器
long delay, 延迟,多久调用一次
TimeUnit unit 单位,延迟单位,一般是秒或毫秒,这里是毫秒
这是一种两倍衰减
this.cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
this.scheduler,
this.cacheRefreshExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new DiscoveryClient.CacheRefreshThread());
this.scheduler.schedule(
this.cacheRefreshTask,
(long)renewalIntervalInSecs, TimeUnit.SECONDS);
现在回到DiscoveryClient类 initScheduledTasks()方法,这就是
找到 refreshRegistry() 方法
找到 fetchRegistry() 方法,这个方法是用来客户端出服务端拉取地址列表的
又因为fetchRegistry()是从远程拉取地址列表,一定是一个远程调用
找到DiscoveryClient中的fetchRegistry()方法
拉取地址列表分为两个,全量拉取Full和增量拉取Delta
先介绍全量Full 再介绍增量
全量开始
全量拉取
EurekaHttpResponse httpResponse = this.clientConfig.getRegistryRefreshSingleVipAddress() == null? this.eurekaTransport.queryClient.getApplications((String[])this.remoteRegionsRef.get()):
this.eurekaTransport.queryClient.getVip(this.clientConfig.getRegistryRefreshSingleVipAddress(), (String[])this.remoteRegionsRef.get());
vipAddress为null 调用getApplications() 不为null 调用getVip()
先看getApplications()
再看getVip
回到DiscoveryClient的全量拉取地址列表方法 getAndStoreFullRegistry
好了,全量拉取完成。
看一看增量拉取
eureka-client 拉取服务端列表完成。
那么,eureka-server如何响应eureka-client的请求,将地址列表返回给eureka-client,且看下面。
ResponseCacheImpl类中有一个get()方法,先来看看这个get()方法是用来干什么的?
找到getValue()方法
那么,这个get()方法在哪里有用到
看一看ApplicationsResource类getContainers(),原来,这个方法是eureak-server返回eureka-client拉取地址信息的全量查询
缓存失效时间
本地缓存是原子引用
这个本地缓存用在哪里
客户端拉取服务端地址列表完成。
找到 BaseLoadBalancer类 和 DynamicServerListLoadBalancer类
集成ribbon ribbon如何拿到信息,DynamicServerListLoadBalancer类中的updateListOfServers()方法
看一看心跳续约的源码如何实现的
进入DiscoveryClient类
找到renew()方法
心跳:注册服务
this.heartbeatTask = new TimedSupervisorTask(
"heartbeat", // 这里字符串不一样
this.scheduler,
this.heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new DiscoveryClient.HeartbeatThread(null)); // 这里对象不一样
this.scheduler.schedule(this.heartbeatTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS);
缓存:拉取服务 30s
this.cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh", // 这里字符串不一样
this.scheduler,
this.cacheRefreshExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new DiscoveryClient.CacheRefreshThread()); // 这里对象不一样
this.scheduler.schedule(this.cacheRefreshTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS);
服务上线到被感知最长 90s :
readOnlyCache 30s同步一次 (即缓存同步的30s,可以修改 responseCacheUpdateIntervalMs)
eureka-client 每30s fetch拉取列表一次 (可以修改 registryFetchIntervalSeconds)
ribbon 每30s 更新一次serverList (可以修改 ServerListRefreshInterval)
服务正常下线到被感知最长 90s :
readOnlyCache 30s同步一次 (即缓存同步的30s,可以修改 responseCacheUpdateIntervalMs)
eureka-client 每30s fetch拉取列表一次 (可以修改 registryFetchIntervalSeconds)
ribbon 每30s 更新一次serverList (可以修改 ServerListRefreshInterval)
服务正常下线到被感知最长 240s (无限趋近于240s,但是不会是240s)
因为eureka-server服务端每60s会清理超过90s未续约的服务, 每60s清理超过90s没有获取的服务,移除缓存,30s 29-89 89-149 跨三个30s
金手指:240s不是同时执行的。
回到InstanceInfoReplicator 的 run()方法
如何看源码
1、先知道技术主体,再看源码
2、了解技术组件的原理,合理猜想
3、开始看源码,看主线
一个接口的多个实现,抽象工厂模式
4、看分支
Eureka 源码分析完了。
天天打码,天天进步!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。