当前位置:   article > 正文

进阶分布式系统架构系列(十):Zookeeper 注册中心

curatorcachelistener.builder().forpathchildrencache

点击下方名片,设为星标

回复“1024”获取2TB学习资源!

前面介绍了 Zookeeper 节点选举机制监听机制集群 ZAB 协议配置中心等相关的知识点,今天我将详细的为大家介绍 zookeeper 注册中心相关知识,希望大家能够从中收获多多!如有帮助,请点在看、转发支持一波!!!

什么是注册中心

注册中心主要有三种角色:

服务提供者(RPC Server):在启动时,向 Registry 注册自身服务,并向 Registry 定期发送心跳汇报存活状态。

服务消费者(RPC Client):在启动时,向 Registry 订阅服务,把 Registry 返回的服务节点列表缓存在本地内存中,并与 RPC Sever 建立连接。

服务注册中心(Registry):用于保存 RPC Server 的注册信息,当 RPC Server 节点发生变更时,Registry 会同步变更,RPC Client 感知后会刷新本地 内存中缓存的服务节点列表。

最后,RPC Client 从本地缓存的服务节点列表中,基于负载均衡算法选择一台 RPC Sever 发起调用。bb1663edee7aec1ef7fb661572dd9b78.png

zookeeper注册中心实现原理

服务注册
  • springboot项目启动时,自定义监听器ApplicationListener去监听web服务启动事件。

  • web server启动成功,则触发事件回调方法。

  • 回调方法中,在zookeeper指定节点下创建临时节点,临时节点的值保存当前项目启动的 ip + port。

  • 如果某个服务宕机,服务断开一定时间(默认30s)临时节点会自动删除。

服务发现
  • springboot项目启动时,会从zookeeper指定节点获取对应服务的所有可用url列表(可以缓存此url列表)。

  • 然后根据负载均衡算法,将请求负载到url列表中的某一个server上。

  • 利用spring初始化器扩展机制创建zookeeper节点监听,当节点列表发生变更,则更新url列表缓存。

服务注册大体流程

8cb532038591269e0fff00d9c5ae8f38.png服务注册,等对应的service的容器启动成功,针对微服务项目,一般是spring boot内置的tomcat启动成功,这个服务才可以使用,这个时候才可以将服务注册到zookeeper中。

那么如何知道tomcat容器启动成功了呢?

通过spring的事件监听机制,当tomcat启动成功会发布一个事件,我们可以监听这个事件,当tomcat启动成功做出相应。

Spring事件监听机制

772650bcf97eb08870f0b913b039b479.png

手写注册中心

服务注册
服务注册原理
  • 自定义监听器ApplicationListener去监听springboot项目的内置容器启动事件。

  • 一旦springboot项目内置的tomcat启动成功,会触发监听器回调方法。

  • 在回调方法中,创建临时节点在zookeeper指定的节点下,当前项目启动的 ip:port 即为节点名称。

创建springboot项目order-service
  • pom.xml

  1. <parent>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-parent</artifactId>
  4.     <version>2.6.8</version>
  5.     <relativePath/>
  6. </parent>
  7.  
  8. <dependencies>
  9.     <dependency>
  10.         <groupId>org.springframework.boot</groupId>
  11.         <artifactId>spring-boot-starter</artifactId>
  12.     </dependency>
  13.     <dependency>
  14.         <groupId>org.springframework.boot</groupId>
  15.         <artifactId>spring-boot-starter-web</artifactId>
  16.     </dependency>
  17.     <dependency>
  18.         <groupId>org.springframework.boot</groupId>
  19.         <artifactId>spring-boot-starter-test</artifactId>
  20.         <scope>test</scope>
  21.     </dependency>
  22.  
  23.     <dependency>
  24.         <groupId>org.apache.curator</groupId>
  25.         <artifactId>curator-recipes</artifactId>
  26.         <version>5.2.1</version>
  27.     </dependency>
  28. </dependencies>
  • 配置文件application.properties

  1. server.ip=192.168.9.1
  2. server.port=9090
  3.  
  4. # 自定义的配置信息
  5. zk.service-name=order-service
  6. zk.server=192.168.1.104:2181
  • 创建监听器 ApplicationListener

监听spring web服务器已经初始化完成事件 WebServerInitializedEvent

  1. public class ZkApplicationListener implements ApplicationListener<WebServerInitializedEvent> {
  2.     @Override
  3.     public void onApplicationEvent(WebServerInitializedEvent event) {
  4.         System.out.println("事件监听机制的回调...");
  5.  
  6.         // 获取app.properties配置属性
  7.         Environment environment = event.getApplicationContext().getEnvironment();
  8.         String serviceName = environment.getProperty("zk.service-name");
  9.         String ip = environment.getProperty("server.ip");
  10.         String port = environment.getProperty("server.port");
  11.         String zkServer = environment.getProperty("zk.server");
  12.         // 服务注册
  13.         ServiceRegistry zookeeperServiceRegistry = new ZookeeperServiceRegistry(serviceName,ip,port,zkServer);
  14.         zookeeperServiceRegistry.register();
  15.     }
  16. }
  • SPI配置spring.factories

  1. # Application Listeners
  2. org.springframework.context.ApplicationListener=\
  3.   com.zk.serviceregistry.orderservice.listener.ZkApplicationListener
  • 注册服务到zookeeper

  1. // spring cloud 团队提供了服务注册的接口
  2. public interface ServiceRegistry {
  3.     void register();
  4. }
  5. public class ZookeeperServiceRegistry implements ServiceRegistry {
  6.  
  7.     private CuratorFramework curatorFramework;
  8.     private final String ip;
  9.     private final String port;
  10.     private final String serviceName;
  11.     private final String basePath = "/zk-registry";
  12.  
  13.     public ZookeeperServiceRegistry(String serviceName, String ip, String port, String zkServer) {
  14.         this.serviceName = serviceName;
  15.         this.ip = ip;
  16.         this.port = port;
  17.         this.curatorFramework = CuratorFrameworkFactory
  18.                 .builder()
  19.                 .connectionTimeoutMs(20000)
  20.                 .connectString(zkServer)
  21.                 .retryPolicy(new ExponentialBackoffRetry(10003))
  22.                 .build();
  23.         curatorFramework.start();
  24.     }
  25.  
  26.     @Override
  27.     public void register() {
  28.         // 服务名称
  29.         String serviceNamePath = basePath + "/" + serviceName;
  30.         try {
  31.             if (curatorFramework.checkExists().forPath(serviceNamePath) == null) {
  32.                 // 创建持久化的节点,作为服务名称
  33.                 this.curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(serviceNamePath);
  34.             }
  35.             String urlNode = curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(serviceNamePath + "/" + ip + ":" + port);
  36.             System.out.println("服务 " + urlNode + " 成功注册到zookeeper server...");
  37.         } catch (Exception e) {
  38.             e.printStackTrace();
  39.         }
  40.     }
  41. }
  • 启动服务测试

会发现服务注册已经生效,日志中打印127.0.0.1:9090已经注册到zookeeper serverc837b0b2060e1beb35dc1467e95656f3.png查看zookeeper,发现创建了新的节点7525042d317f064571c1faa931edf5d7.png启动多个服务192.168.9.1:9091,192.168.9.1:9092,192.168.9.1:9093,192.168.9.1:9094,新的服务ip:port也会被依次注册到zookeeper中。02925f8ddb4fcf93c91fb07176ba298d.png停掉某个服务比如192.168.9.1:9094去模拟某个服务宕机的情况,当zookeeper server在一定时间内(默认30s)没有收到来自192.168.9.1:9094服务的反馈时,就会认为此服务已经挂了,会将此服务从zookeeper节点中删除。17f6aecc10926c1cfa107e5cc48d5e02.png

服务发现
服务发现原理
  • 项目启动时自动获取zookeeper中配置的需要调用的服务order-service的所有可用url列表

  • 利用zookeeper临时节点特性,如果某个服务节点宕机,那么对应临时节点会在一定时间后自动删除

  • 访问服务user-service时,根据负载均衡算法从可用的服务url列表中获取某个节点url

创建springboot项目user-service
  • pom.xml

  1. <parent>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-parent</artifactId>
  4.     <version>2.6.8</version>
  5.     <relativePath/>
  6. </parent>
  7.  
  8. <dependencies>
  9.     <dependency>
  10.         <groupId>org.springframework.boot</groupId>
  11.         <artifactId>spring-boot-starter-web</artifactId>
  12.     </dependency>
  13.     <dependency>
  14.         <groupId>org.springframework.boot</groupId>
  15.         <artifactId>spring-boot-starter-test</artifactId>
  16.         <scope>test</scope>
  17.     </dependency>
  18.  
  19.     <dependency>
  20.         <groupId>org.apache.curator</groupId>
  21.         <artifactId>curator-recipes</artifactId>
  22.         <version>5.2.1</version>
  23.     </dependency>
  24. </dependencies>
  • 配置文件application.properties

  1. server.port=9999
  2. zk.server=192.168.1.104:2181
  • SPI配置spring.factories

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2.   com.zk.servicediscovery.userservice.config.ZookeeperDiscoveryAutoConfiguration
  • 自动配置,项目启动时去执行服务发现

  1. @Configuration
  2. public class ZookeeperDiscoveryAutoConfiguration {
  3.     @Resource
  4.     private Environment environment;
  5.  
  6.     @Bean
  7.     public ServiceDiscoveryImpl serviceDiscovery(){
  8.         return new ServiceDiscoveryImpl(environment.getProperty("zk.server"));
  9.     }
  10. }
  • 服务发现与监听

  1. public interface ServiceDiscovery {
  2.     // 服务发现:获取所有子节点(所有可用的服务url列表)
  3.     List<String> discovery(String serviceName);
  4.     // 注册监听:当子节点发生变更(代表有新服务添加或者有服务宕机),则会触发监听,更新服务url列表
  5.     void registerWatch(String serviceNamePath);
  6. }
  7. public class ServiceDiscoveryImpl implements ServiceDiscovery {
  8.     private final CuratorFramework curatorFramework;
  9.     private final String basePath = "/zk-registry";
  10.  
  11.     public ServiceDiscoveryImpl(String zkServer) {
  12.         this.curatorFramework = CuratorFrameworkFactory
  13.                 .builder()
  14.                 .connectionTimeoutMs(20000)
  15.                 .connectString(zkServer)
  16.                 .retryPolicy(new ExponentialBackoffRetry(10003))
  17.                 .build();
  18.         curatorFramework.start();
  19.     }
  20.  
  21.     @Override
  22.     public List<String> discovery(String serviceName) {
  23.         // /zk-registry/order-service
  24.         String serviceNamePath = basePath + "/" + serviceName;
  25.         try {
  26.             if (this.curatorFramework.checkExists().forPath(serviceNamePath) != null) {
  27.                 return this.curatorFramework.getChildren().forPath(serviceNamePath);
  28.             }
  29.         } catch (Exception e) {
  30.             e.printStackTrace();
  31.         }
  32.         return null;
  33.     }
  34.  
  35.     public void registerWatch(String serviceNamePath) {
  36.         // 永久的监听,当/zk-registry/order-service下的子节点变动,则更新
  37.         CuratorCache curatorCache = CuratorCache.build(curatorFramework, serviceNamePath);
  38.         CuratorCacheListener listener = CuratorCacheListener.builder().forPathChildrenCache(serviceNamePath, curatorFramework, new PathChildrenCacheListener() {
  39.             @Override
  40.             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  41.                 // 拉模式
  42.                 System.out.println("最新的urls为: " + curatorFramework.getChildren().forPath(serviceNamePath));
  43.             }
  44.         }).build();
  45.         curatorCache.listenable().addListener(listener);
  46.         curatorCache.start();
  47.     }
  48. }
  • 随机访问某个服务节点:模拟负载均衡

  1. public interface LoadBalance {
  2.     String select(List<String> urls);
  3. }
  4. public class RandomLoadBalance implements LoadBalance{
  5.     @Override
  6.     public String select(List<String> urls) {
  7.         int len=urls.size();
  8.         Random random=new Random();
  9.         return urls.get(random.nextInt(len));
  10.     }
  11. }
  • UserController模拟请求

  1. @RestController
  2. @RequestMapping("/user")
  3. public class UserController {
  4.     @Autowired
  5.     private ServiceDiscovery serviceDiscovery;
  6.  
  7.     @RequestMapping("/discovery")
  8.     public void discovery() throws IOException {
  9.         List<String> urls= this.serviceDiscovery.discovery("order-service");
  10.         LoadBalance loadBalance=new RandomLoadBalance();
  11.         String url = loadBalance.select(urls);
  12.         System.out.println("获取可用的order-service服务节点路径为: "+url);
  13.         String response = new RestTemplate().getForObject("http://" + url + "/order/query", String.class);
  14.         System.out.println("order-service response: "+response);
  15.         // 添加对节点order-service的监听
  16.         this.serviceDiscovery.registerWatch("/zk-registry/order-service");
  17.     }
  18. }
  • 测试

访问http://192.168.9.1:9999/user/discovery测试094f070f5fec66897d06885954569e91.png停掉order-service某个服务节点,不需要重启,再次访问user-service0582e8fd62553f568dbce079f55084d6.png

Spring Cloud Zookeeper实现注册中心

示例代码

创建spring-cloud-zookeeper的spring boot项目,Spring Boot版本为2.6.8

pom.xml
  1. <parent>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-parent</artifactId>
  4.     <version>2.6.8</version>
  5.     <relativePath/> <!-- lookup parent from repository -->
  6. </parent>
  7.  
  8. <dependencies>
  9.     <dependency>
  10.         <groupId>org.springframework.boot</groupId>
  11.         <artifactId>spring-boot-starter-web</artifactId>
  12.     </dependency>
  13.     <dependency>
  14.         <groupId>org.springframework.boot</groupId>
  15.         <artifactId>spring-boot-starter-test</artifactId>
  16.         <scope>test</scope>
  17.     </dependency>
  18.  
  19.     <!-- spring cloud zookeeper config 配置中心-->
  20.     <dependency>
  21.         <groupId>org.springframework.cloud</groupId>
  22.         <artifactId>spring-cloud-starter-zookeeper-config</artifactId>
  23.     </dependency>
  24.  
  25.     <!-- bootstrap.yaml文件所需依赖 -->
  26.     <dependency>
  27.         <groupId>org.springframework.cloud</groupId>
  28.         <artifactId>spring-cloud-starter-bootstrap</artifactId>
  29.     </dependency>
  30.  
  31.     <!-- spring cloud zookeeper discovery 注册中心-->
  32.     <dependency>
  33.         <groupId>org.springframework.cloud</groupId>
  34.         <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
  35.     </dependency>
  36.  
  37. </dependencies>
  38.  
  39. <!--定义版本的管理-->
  40. <dependencyManagement>
  41.     <dependencies>
  42.         <!--定义spring cloud的版本-->
  43.         <dependency>
  44.             <groupId>org.springframework.cloud</groupId>
  45.             <artifactId>spring-cloud-dependencies</artifactId>
  46.             <version>2021.0.3</version>
  47.             <scope>import</scope>
  48.             <type>pom</type>
  49.         </dependency>
  50.     </dependencies>
  51. </dependencyManagement>
  • 配置文件

  1. application.properties
  2. server.port=9091
  3. spring.cloud.zookeeper.connect-string=192.168.1.104:2181
  4. spring.cloud.zookeeper.discovery.root=/services/registries
  5. spring.application.name=spring-cloud-zookeeper

或者application.yml

  1. spring:
  2.   cloud:
  3.     zookeeper:
  4.       connect-string192.168.1.104:2181
  5.       discovery:
  6.         root: /services/registries
  7.   application:
  8.     name: spring-cloud-zookeeper
  9. server:
  10.   port: 9091
  11. (3) bootstrap.yaml
  12. spring:
  13.   profiles:
  14.     active: dev
  15.   application:
  16.     name: spring-cloud-zookeeper    # 找哪一个ZNode节点  spring-cloud-zookeeper-dev
  17.   cloud:
  18.     zookeeper:
  19.       config:
  20.         root: config    # 相当于 /zk-config/spring-cloud-zookeeper-dev
  21.         profile-separator: "-"
  22.         enabled: true
  23.       connect-string192.168.1.104:2181
  • 启动Spring Boot项目,观察Zookeeper Server上的数据

可以发现zookeeper server上自动创建了对应的节点6274dd54277a832b301a82cc6019d506.png

  • 服务发现代码

  1. @RestController
  2. public class SpringCloudZkDiscoveryController {
  3.     // 1.注入服务发现客户端接口
  4.     @Autowired
  5.     private DiscoveryClient discoveryClient;
  6.  
  7.     @RequestMapping("/sc-zk-discovery")
  8.     public List<ServiceInstance> serviceUrl() {
  9.         // 2.调用getInstances方法可获得所有可用实例
  10.         List<ServiceInstance> instances = discoveryClient.getInstances("spring-cloud-zookeeper");
  11.         String url = instances.get(0).getUri().toString();
  12.         System.out.println("url=" + url);
  13.         return discoveryClient.getInstances("spring-cloud-zookeeper");
  14.     }
  15. }

访问测试dfbd030dcfd531e6b4dbffc7f1c987c4.png7c9a77779e46ab5717678a7036e9dd12.png

Spring Cloud Zookeeper注册中心实现原理
监听器AbstractAutoServiceRegistration监听web容器启动

类似于我们手动实现服务注册,Spring Cloud也自定义了一个监听器 AbstractAutoServiceRegistration 去监听 web服务器启动事件 WebServerInitializedEvent。

  1. org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#onApplicationEvent(WebServerInitializedEvent)源代码片段:
  2. public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
  3.  
  4.     private final ServiceRegistry<R> serviceRegistry;
  5.     // .................
  6.  
  7.     public void onApplicationEvent(WebServerInitializedEvent event) {
  8.         this.bind(event);
  9.     }
  10.  
  11.     @Deprecated
  12.     public void bind(WebServerInitializedEvent event) {
  13.         ApplicationContext context = event.getApplicationContext();
  14.         if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
  15.             this.port.compareAndSet(0, event.getWebServer().getPort());
  16.             this.start();
  17.         }
  18.     }
  19.  
  20.     public void start() {
  21.         if (!this.isEnabled()) {
  22.         } else {
  23.             if (!this.running.get()) {
  24.                 this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
  25.                 this.register();
  26.                 if (this.shouldRegisterManagement()) {
  27.                     this.registerManagement();
  28.                 }
  29.  
  30.                 this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
  31.                 this.running.compareAndSet(falsetrue);
  32.             }
  33.         }
  34.     }
  35. }
服务注册与发现

getRegistration() 获取具体注册实现类

  1. org.springframework.cloud.zookeeper.serviceregistry.ZookeeperAutoServiceRegistration
  2. serviceRegistry.register(registration) 具体服务注册实现
  3. public class ZookeeperServiceRegistry implements ServiceRegistry<ZookeeperRegistration>, SmartInitializingSingleton, Closeable {
  4.     protected CuratorFramework curator;
  5.     protected ZookeeperDiscoveryProperties properties;
  6.     private ServiceDiscovery<ZookeeperInstance> serviceDiscovery;
  7.  
  8.     public void register(ZookeeperRegistration registration) {
  9.         try {
  10.             this.getServiceDiscovery().registerService(registration.getServiceInstance());
  11.         } catch (Exception var3) {
  12.             ReflectionUtils.rethrowRuntimeException(var3);
  13.         }
  14.     }
  15. }

服务发现实现类:创建zookeeper节点,创建节点监听

  1. public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> {
  2.     public void registerService(ServiceInstance<T> service) throws Exception {
  3.         ServiceDiscoveryImpl.Entry<T> newEntry = new ServiceDiscoveryImpl.Entry(service);
  4.         ServiceDiscoveryImpl.Entry<T> oldEntry = (ServiceDiscoveryImpl.Entry)this.services.putIfAbsent(service.getId(), newEntry);
  5.         ServiceDiscoveryImpl.Entry<T> useEntry = oldEntry != null ? oldEntry : newEntry;
  6.         synchronized(useEntry) {
  7.             if (useEntry == newEntry) {
  8.                 // 创建节点监听
  9.                 useEntry.cache = this.makeNodeCache(service);
  10.             }
  11.             // 创建zookeeper节点
  12.             this.internalRegisterService(service);
  13.         }
  14.     }
  15.     // 创建节点监听
  16.     private CuratorCacheBridge makeNodeCache(ServiceInstance<T> instance) {
  17.         if (!this.watchInstances) {
  18.             return null;
  19.         } else {
  20.             CuratorCacheBridge cache = CuratorCache.bridgeBuilder(this.client, this.pathForInstance(instance.getName(), instance.getId())).withOptions(new Options[]{Options.SINGLE_NODE_CACHE}).withDataNotCached().build();
  21.             CuratorCacheListener listener = CuratorCacheListener.builder().afterInitialized().forAll((__, ___, data) -> {
  22.                 if (data != null) {
  23.                     try {
  24.                         ServiceInstance<T> newInstance = this.serializer.deserialize(data.getData());
  25.                         ServiceDiscoveryImpl.Entry<T> entry = (ServiceDiscoveryImpl.Entry)this.services.get(newInstance.getId());
  26.                         if (entry != null) {
  27.                             synchronized(entry) {
  28.                                 entry.service = newInstance;
  29.                             }
  30.                         }
  31.                     } catch (Exception var10) {
  32.                         this.log.debug("Could not deserialize: " + data.getPath());
  33.                     }
  34.                 } else {
  35.                     this.log.warn("Instance data has been deleted for: " + instance);
  36.                 }
  37.  
  38.             }).build();
  39.             cache.listenable().addListener(listener);
  40.             cache.start();
  41.             return cache;
  42.         }
  43.     }
  44.  
  45.     // 创建zookeeper节点
  46.     @VisibleForTesting
  47.     protected void internalRegisterService(ServiceInstance<T> service) throws Exception {
  48.         byte[] bytes = this.serializer.serialize(service);
  49.         String path = this.pathForInstance(service.getName(), service.getId());
  50.         int MAX_TRIES = true;
  51.         boolean isDone = false;
  52.  
  53.         for(int i = 0; !isDone && i < 2; ++i) {
  54.             try {
  55.                 CreateMode mode;
  56.                 switch(service.getServiceType()) {
  57.                 case DYNAMIC:
  58.                     mode = CreateMode.EPHEMERAL;
  59.                     break;
  60.                 case DYNAMIC_SEQUENTIAL:
  61.                     mode = CreateMode.EPHEMERAL_SEQUENTIAL;
  62.                     break;
  63.                 default:
  64.                     mode = CreateMode.PERSISTENT;
  65.                 }
  66.                 ((ACLBackgroundPathAndBytesable)this.client.create().creatingParentContainersIfNeeded().withMode(mode)).forPath(path, bytes);
  67.                 isDone = true;
  68.             } catch (NodeExistsException var8) {
  69.                 this.client.delete().forPath(path);
  70.             }
  71.         }
  72.     }
  73.  
  74. }
spring容器启动事件

SpringBoot项目启动 -> webServer启动 -> 监听器监听服务启动事件执行流程:

SpringApplication.run(args) -> refreshContext(context) -> refresh(context) -> ServletWebServerApplicationContext.refresh() -> AbstractApplicationContext.refresh() -> finishRefresh() -> DefaultLifecycleProcessor.onRefresh() -> startBeans(true) -> DefaultLifecycleProcessor$LifecycleGroup.start() -> doStart() -> WebServerStartStopLifecycle.start() -> AbstractApplicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext)) -> SimpleApplicationEventMulticaster.multicastEvent(applicationEvent, eventType) -> invokeListener(listener, event) -> doInvokeListener(listener, event) -> listener.onApplicationEvent(event);

堆栈信息:

  1. onApplicationEvent:12, ZkApplicationListener (com.zk.serviceregistry.orderservice.listener)
  2. doInvokeListener:176, SimpleApplicationEventMulticaster (org.springframework.context.event)
  3. invokeListener:169, SimpleApplicationEventMulticaster (org.springframework.context.event)
  4. multicastEvent:143, SimpleApplicationEventMulticaster (org.springframework.context.event)
  5. publishEvent:421, AbstractApplicationContext (org.springframework.context.support)
  6. publishEvent:378, AbstractApplicationContext (org.springframework.context.support)
  7. start:46, WebServerStartStopLifecycle (org.springframework.boot.web.servlet.context)
  8. doStart:178, DefaultLifecycleProcessor (org.springframework.context.support)
  9. access$200:54, DefaultLifecycleProcessor (org.springframework.context.support)
  10. start:356, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support)
  11. accept:-11643565953 (org.springframework.context.support.DefaultLifecycleProcessor$$Lambda$541)
  12. forEach:75, Iterable (java.lang)
  13. startBeans:155, DefaultLifecycleProcessor (org.springframework.context.support)
  14. onRefresh:123, DefaultLifecycleProcessor (org.springframework.context.support)
  15. finishRefresh:935, AbstractApplicationContext (org.springframework.context.support)
  16. refresh:586, AbstractApplicationContext (org.springframework.context.support)
  17. refresh:145, ServletWebServerApplicationContext (org.springframework.boot.web.servlet.context)
  18. refresh:745, SpringApplication (org.springframework.boot)
  19. refreshContext:420, SpringApplication (org.springframework.boot)
  20. run:307, SpringApplication (org.springframework.boot)
  21. run:1317, SpringApplication (org.springframework.boot)
  22. run:1306, SpringApplication (org.springframework.boot)
  23. main:10, OrderServiceApplication (com.zk.serviceregistry.orderservice)

1)SpringApplicaton.run()

  1. public class SpringApplication {
  2.     public ConfigurableApplicationContext run(String... args) {
  3.         long startTime = System.nanoTime();
  4.         DefaultBootstrapContext bootstrapContext = this.createBootstrapContext();
  5.         ConfigurableApplicationContext context = null;
  6.         this.configureHeadlessProperty();
  7.         SpringApplicationRunListeners listeners = this.getRunListeners(args);
  8.         listeners.starting(bootstrapContext, this.mainApplicationClass);
  9.  
  10.         try {
  11.             ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
  12.             ConfigurableEnvironment environment = this.prepareEnvironment(listeners, bootstrapContext, applicationArguments);
  13.             this.configureIgnoreBeanInfo(environment);
  14.             Banner printedBanner = this.printBanner(environment);
  15.             context = this.createApplicationContext();
  16.             context.setApplicationStartup(this.applicationStartup);
  17.             this.prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
  18.             this.refreshContext(context);                // 1
  19.             this.afterRefresh(context, applicationArguments);
  20.             Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
  21.             if (this.logStartupInfo) {
  22.                 (new StartupInfoLogger(this.mainApplicationClass)).logStarted(this.getApplicationLog(), timeTakenToStartup);
  23.             }
  24.  
  25.             listeners.started(context, timeTakenToStartup);
  26.             this.callRunners(context, applicationArguments);
  27.         } catch (Throwable var12) {
  28.             this.handleRunFailure(context, var12, listeners);
  29.             throw new IllegalStateException(var12);
  30.         }
  31.  
  32.         try {
  33.             Duration timeTakenToReady = Duration.ofNanos(System.nanoTime() - startTime);
  34.             listeners.ready(context, timeTakenToReady);
  35.             return context;
  36.         } catch (Throwable var11) {
  37.             this.handleRunFailure(context, var11, (SpringApplicationRunListeners)null);
  38.             throw new IllegalStateException(var11);
  39.         }
  40.     }
  41. }

2)refreshContext(context)

  1. private void refreshContext(ConfigurableApplicationContext context) {
  2.         if (this.registerShutdownHook) {
  3.             shutdownHook.registerApplicationContext(context);
  4.         }
  5.         this.refresh(context);                // 2
  6.     }

3)AbstractApplicationContext.refresh()

  1. public void refresh() throws BeansException, IllegalStateException {
  2.         synchronized(this.startupShutdownMonitor) {
  3.             StartupStep contextRefresh = this.applicationStartup.start("spring.context.refresh");
  4.             this.prepareRefresh();
  5.             ConfigurableListableBeanFactory beanFactory = this.obtainFreshBeanFactory();
  6.             this.prepareBeanFactory(beanFactory);
  7.  
  8.             try {
  9.                 this.postProcessBeanFactory(beanFactory);
  10.                 StartupStep beanPostProcess = this.applicationStartup.start("spring.context.beans.post-process");
  11.                 this.invokeBeanFactoryPostProcessors(beanFactory);
  12.                 this.registerBeanPostProcessors(beanFactory);
  13.                 beanPostProcess.end();
  14.                 this.initMessageSource();
  15.                 this.initApplicationEventMulticaster();
  16.                 this.onRefresh();
  17.                 this.registerListeners();
  18.                 this.finishBeanFactoryInitialization(beanFactory);
  19.                 this.finishRefresh();                 // 3
  20.             } catch (BeansException var10) {
  21.                 if (this.logger.isWarnEnabled()) {
  22.                     this.logger.warn("Exception encountered during context initialization - cancelling refresh attempt: " + var10);
  23.                 }
  24.  
  25.                 this.destroyBeans();
  26.                 this.cancelRefresh(var10);
  27.                 throw var10;
  28.             } finally {
  29.                 this.resetCommonCaches();
  30.                 contextRefresh.end();
  31.             }
  32.  
  33.         }
  34.     }

4)finishRefresh()

  1. protected void finishRefresh() {
  2.         this.clearResourceCaches();
  3.         this.initLifecycleProcessor();
  4.         this.getLifecycleProcessor().onRefresh();                // 4
  5.         this.publishEvent((ApplicationEvent)(new ContextRefreshedEvent(this)));
  6.         if (!NativeDetector.inNativeImage()) {
  7.             LiveBeansView.registerApplicationContext(this);
  8.         }
  9.     }

5)DefaultLifecycleProcessor.onRefresh()

  1. public void onRefresh() {
  2.         this.startBeans(true);                // 5
  3.         this.running = true;
  4.     }

6)startBeans(true)

  1. private void startBeans(boolean autoStartupOnly) {
  2.         Map<String, Lifecycle> lifecycleBeans = this.getLifecycleBeans();
  3.         Map<Integer, DefaultLifecycleProcessor.LifecycleGroup> phases = new TreeMap();
  4.         lifecycleBeans.forEach((beanName, bean) -> {
  5.             if (!autoStartupOnly || bean instanceof SmartLifecycle && ((SmartLifecycle)bean).isAutoStartup()) {
  6.                 int phase = this.getPhase(bean);
  7.                 ((DefaultLifecycleProcessor.LifecycleGroup)phases.computeIfAbsent(phase, (p) -> {
  8.                     return new DefaultLifecycleProcessor.LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
  9.                 })).add(beanName, bean);
  10.             }
  11.         });
  12.         if (!phases.isEmpty()) {
  13.             phases.values().forEach(DefaultLifecycleProcessor.LifecycleGroup::start);                // 6
  14.         }
  15.     }

7)DefaultLifecycleProcessor$LifecycleGroup.start()

  1. public void start() {
  2.             if (!this.members.isEmpty()) {
  3.                 if (DefaultLifecycleProcessor.this.logger.isDebugEnabled()) {
  4.                     DefaultLifecycleProcessor.this.logger.debug("Starting beans in phase " + this.phase);
  5.                 }
  6.                 Collections.sort(this.members);
  7.                 Iterator var1 = this.members.iterator();
  8.                 while(var1.hasNext()) {
  9.                     DefaultLifecycleProcessor.LifecycleGroupMember member = (DefaultLifecycleProcessor.LifecycleGroupMember)var1.next();
  10.                     DefaultLifecycleProcessor.this.doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);                // 7
  11.                 }
  12.             }
  13.         }

8)DefaultLifecycleProcessor.doStart()

  1. private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
  2.         Lifecycle bean = (Lifecycle)lifecycleBeans.remove(beanName);
  3.         if (bean != null && bean != this) {
  4.             String[] dependenciesForBean = this.getBeanFactory().getDependenciesForBean(beanName);
  5.             String[] var6 = dependenciesForBean;
  6.             int var7 = dependenciesForBean.length;
  7.             for(int var8 = 0; var8 < var7; ++var8) {
  8.                 String dependency = var6[var8];
  9.                 this.doStart(lifecycleBeans, dependency, autoStartupOnly);
  10.             }
  11.  
  12.             if (!bean.isRunning() && (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle)bean).isAutoStartup())) {
  13.                 if (this.logger.isTraceEnabled()) {
  14.                     this.logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
  15.                 }
  16.  
  17.                 try {
  18.                     bean.start();                // 8
  19.                 } catch (Throwable var10) {
  20.                     throw new ApplicationContextException("Failed to start bean '" + beanName + "'", var10);
  21.                 }
  22.             }
  23.         }
  24.     }

9)发布web服务启动完成事件

事件 ServletWebServerInitializedEvent extends WebServerInitializedEvent

  1. public void start() {
  2.         this.webServer.start();
  3.         this.running = true;
  4.         this.applicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext));                // 9
  5.     }

10)发布事件 AbstractApplicationContext.publishEvent()

  1. protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
  2.         if (this.earlyApplicationEvents != null) {
  3.             this.earlyApplicationEvents.add(applicationEvent);
  4.         } else {
  5.             this.getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType);                // 10
  6.         }
  7.  
  8.         if (this.parent != null) {
  9.             if (this.parent instanceof AbstractApplicationContext) {
  10.                 ((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
  11.             } else {
  12.                 this.parent.publishEvent(event);
  13.             }
  14.         }
  15.     }

11)广播器发布事件 SimpleApplicationEventMulticaster.multicastEvent(event, eventType)

  1. public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
  2.         ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event);
  3.         Executor executor = this.getTaskExecutor();
  4.         Iterator var5 = this.getApplicationListeners(event, type).iterator();
  5.         while(var5.hasNext()) {
  6.             ApplicationListener<?> listener = (ApplicationListener)var5.next();
  7.             if (executor != null) {
  8.                 executor.execute(() -> {
  9.                     this.invokeListener(listener, event);
  10.                 });
  11.             } else {
  12.                 this.invokeListener(listener, event);                // 11
  13.             }
  14.         }
  15.     }

12)调用监听器 invokeListener(listener, event)

  1. protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
  2.         ErrorHandler errorHandler = this.getErrorHandler();
  3.         if (errorHandler != null) {
  4.             try {
  5.                 this.doInvokeListener(listener, event);
  6.             } catch (Throwable var5) {
  7.                 errorHandler.handleError(var5);
  8.             }
  9.         } else {
  10.             this.doInvokeListener(listener, event);                // 12
  11.         }
  12.     }
  13.  
  14.     private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
  15.         try {
  16.             listener.onApplicationEvent(event);                // 13
  17.         } catch (ClassCastException var6) {}
  18.     }

13)调用监听器事件回调方法

  1. public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
  2.     public void onApplicationEvent(WebServerInitializedEvent event) {
  3.         // TODO ..................
  4.     }
  5. }

Zookeeper作为注册中心探讨

作为一个分布式协同服务,ZooKeeper非常好,但是对于Service发现服务来说就不合适了,因为对于Service发现服务来说就算是返回了包含不实的信息的结果也比什么都不返回要好。所以当向注册中心查询服务列表时,我们可以容忍注册中心返回的是几分钟以前的注册信息,但不能接受服务直接down掉不可用。

但是zk会出现这样一种情况,当master节点因为网络故障与其他节点失去联系时,剩余节点会重新进行leader选举。问题在于,选举leader的时间太长,30 ~ 120s, 且选举期间整个zk集群都是不可用的,这就导致在选举期间注册服务瘫痪。在云部署的环境下,因网络问题使得zk集群失去master节点是较大概率会发生的事,虽然服务能够最终恢复,但是漫长的选举时间导致的注册长期不可用是不能容忍的。

所以说,作为注册中心,可用性的要求要高于一致性!

在 CAP 模型中,Zookeeper整体遵循一致性(CP)原则,即在任何时候对 Zookeeper 的访问请求能得到一致的数据结果,但是当机器下线或者宕机时,不能保证服务可用性。

那为什么Zookeeper不使用最终一致性(AP)模型呢?因为这个依赖Zookeeper的核心算法是ZAB,所有设计都是为了强一致性。这个对于分布式协调系统,完全没没有毛病,但是你如果将Zookeeper为分布式协调服务所做的一致性保障,用在注册中心,或者说服务发现场景,这个其实就不合适。

来源:https://blog.csdn.net/luciferlongxu/article/

details/126529104

读者专属技术群

构建高质量的技术交流社群,欢迎从事后端开发、运维技术进群(备注岗位,已在技术交流群的请勿重复添加)。主要以技术交流、内推、行业探讨为主,请文明发言。广告人士勿入,切勿轻信私聊,防止被骗。

扫码加我好友,拉你进群

d518aea6a4b75177530dd2b5c2623cad.jpeg

推荐阅读 点击标题可跳转

一个比 ping 更强大、更牛逼的命令行工具!

外资IT连连败退!Citrix和Radware或将撤离中国

新来个技术总监:谁再用 rebase 提交合并开除

一款超牛 X 的手机端 SSH 工具!吹爆

又一知名互联网大厂员工猝死!打工人务必警醒啊

CentOS 搭建 OpenVPN 服务,一次性成功!收藏了

9a5812a036e5c7509ae2d1072b340b24.png

PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下在看,加个星标,这样每次新文章推送才会第一时间出现在你的订阅列表里。点在看支持我们吧!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/851638
推荐阅读
相关标签
  

闽ICP备14008679号