赞
踩
点击下方名片,设为星标!
回复“1024”获取2TB学习资源!
前面介绍了 Zookeeper 节点、选举机制、监听机制、集群 ZAB 协议、配置中心等相关的知识点,今天我将详细的为大家介绍 zookeeper 注册中心相关知识,希望大家能够从中收获多多!如有帮助,请点在看、转发支持一波!!!
注册中心主要有三种角色:
服务提供者(RPC Server):在启动时,向 Registry 注册自身服务,并向 Registry 定期发送心跳汇报存活状态。
服务消费者(RPC Client):在启动时,向 Registry 订阅服务,把 Registry 返回的服务节点列表缓存在本地内存中,并与 RPC Sever 建立连接。
服务注册中心(Registry):用于保存 RPC Server 的注册信息,当 RPC Server 节点发生变更时,Registry 会同步变更,RPC Client 感知后会刷新本地 内存中缓存的服务节点列表。
最后,RPC Client 从本地缓存的服务节点列表中,基于负载均衡算法选择一台 RPC Sever 发起调用。
springboot项目启动时,自定义监听器ApplicationListener去监听web服务启动事件。
web server启动成功,则触发事件回调方法。
回调方法中,在zookeeper指定节点下创建临时节点,临时节点的值保存当前项目启动的 ip + port。
如果某个服务宕机,服务断开一定时间(默认30s)临时节点会自动删除。
springboot项目启动时,会从zookeeper指定节点获取对应服务的所有可用url列表(可以缓存此url列表)。
然后根据负载均衡算法,将请求负载到url列表中的某一个server上。
利用spring初始化器扩展机制创建zookeeper节点监听,当节点列表发生变更,则更新url列表缓存。
服务注册,等对应的service的容器启动成功,针对微服务项目,一般是spring boot内置的tomcat启动成功,这个服务才可以使用,这个时候才可以将服务注册到zookeeper中。
那么如何知道tomcat容器启动成功了呢?
通过spring的事件监听机制,当tomcat启动成功会发布一个事件,我们可以监听这个事件,当tomcat启动成功做出相应。
自定义监听器ApplicationListener去监听springboot项目的内置容器启动事件。
一旦springboot项目内置的tomcat启动成功,会触发监听器回调方法。
在回调方法中,创建临时节点在zookeeper指定的节点下,当前项目启动的 ip:port 即为节点名称。
pom.xml
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.6.8</version>
- <relativePath/>
- </parent>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>5.2.1</version>
- </dependency>
- </dependencies>
配置文件application.properties
- server.ip=192.168.9.1
- server.port=9090
-
- # 自定义的配置信息
- zk.service-name=order-service
- zk.server=192.168.1.104:2181
创建监听器 ApplicationListener
监听spring web服务器已经初始化完成事件 WebServerInitializedEvent
- public class ZkApplicationListener implements ApplicationListener<WebServerInitializedEvent> {
- @Override
- public void onApplicationEvent(WebServerInitializedEvent event) {
- System.out.println("事件监听机制的回调...");
-
- // 获取app.properties配置属性
- Environment environment = event.getApplicationContext().getEnvironment();
- String serviceName = environment.getProperty("zk.service-name");
- String ip = environment.getProperty("server.ip");
- String port = environment.getProperty("server.port");
- String zkServer = environment.getProperty("zk.server");
- // 服务注册
- ServiceRegistry zookeeperServiceRegistry = new ZookeeperServiceRegistry(serviceName,ip,port,zkServer);
- zookeeperServiceRegistry.register();
- }
- }
SPI配置spring.factories
- # Application Listeners
- org.springframework.context.ApplicationListener=\
- com.zk.serviceregistry.orderservice.listener.ZkApplicationListener
注册服务到zookeeper
- // spring cloud 团队提供了服务注册的接口
- public interface ServiceRegistry {
- void register();
- }
- public class ZookeeperServiceRegistry implements ServiceRegistry {
-
- private CuratorFramework curatorFramework;
- private final String ip;
- private final String port;
- private final String serviceName;
- private final String basePath = "/zk-registry";
-
- public ZookeeperServiceRegistry(String serviceName, String ip, String port, String zkServer) {
- this.serviceName = serviceName;
- this.ip = ip;
- this.port = port;
- this.curatorFramework = CuratorFrameworkFactory
- .builder()
- .connectionTimeoutMs(20000)
- .connectString(zkServer)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .build();
- curatorFramework.start();
- }
-
- @Override
- public void register() {
- // 服务名称
- String serviceNamePath = basePath + "/" + serviceName;
- try {
- if (curatorFramework.checkExists().forPath(serviceNamePath) == null) {
- // 创建持久化的节点,作为服务名称
- this.curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(serviceNamePath);
- }
- String urlNode = curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(serviceNamePath + "/" + ip + ":" + port);
- System.out.println("服务 " + urlNode + " 成功注册到zookeeper server...");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
启动服务测试
会发现服务注册已经生效,日志中打印127.0.0.1:9090已经注册到zookeeper server查看zookeeper,发现创建了新的节点启动多个服务192.168.9.1:9091,192.168.9.1:9092,192.168.9.1:9093,192.168.9.1:9094,新的服务ip:port也会被依次注册到zookeeper中。停掉某个服务比如192.168.9.1:9094去模拟某个服务宕机的情况,当zookeeper server在一定时间内(默认30s)没有收到来自192.168.9.1:9094服务的反馈时,就会认为此服务已经挂了,会将此服务从zookeeper节点中删除。
项目启动时自动获取zookeeper中配置的需要调用的服务order-service的所有可用url列表
利用zookeeper临时节点特性,如果某个服务节点宕机,那么对应临时节点会在一定时间后自动删除
访问服务user-service时,根据负载均衡算法从可用的服务url列表中获取某个节点url
pom.xml
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.6.8</version>
- <relativePath/>
- </parent>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>5.2.1</version>
- </dependency>
- </dependencies>
配置文件application.properties
- server.port=9999
- zk.server=192.168.1.104:2181
SPI配置spring.factories
- org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
- com.zk.servicediscovery.userservice.config.ZookeeperDiscoveryAutoConfiguration
自动配置,项目启动时去执行服务发现
- @Configuration
- public class ZookeeperDiscoveryAutoConfiguration {
- @Resource
- private Environment environment;
-
- @Bean
- public ServiceDiscoveryImpl serviceDiscovery(){
- return new ServiceDiscoveryImpl(environment.getProperty("zk.server"));
- }
- }
服务发现与监听
- public interface ServiceDiscovery {
- // 服务发现:获取所有子节点(所有可用的服务url列表)
- List<String> discovery(String serviceName);
- // 注册监听:当子节点发生变更(代表有新服务添加或者有服务宕机),则会触发监听,更新服务url列表
- void registerWatch(String serviceNamePath);
- }
- public class ServiceDiscoveryImpl implements ServiceDiscovery {
- private final CuratorFramework curatorFramework;
- private final String basePath = "/zk-registry";
-
- public ServiceDiscoveryImpl(String zkServer) {
- this.curatorFramework = CuratorFrameworkFactory
- .builder()
- .connectionTimeoutMs(20000)
- .connectString(zkServer)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .build();
- curatorFramework.start();
- }
-
- @Override
- public List<String> discovery(String serviceName) {
- // /zk-registry/order-service
- String serviceNamePath = basePath + "/" + serviceName;
- try {
- if (this.curatorFramework.checkExists().forPath(serviceNamePath) != null) {
- return this.curatorFramework.getChildren().forPath(serviceNamePath);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
- public void registerWatch(String serviceNamePath) {
- // 永久的监听,当/zk-registry/order-service下的子节点变动,则更新
- CuratorCache curatorCache = CuratorCache.build(curatorFramework, serviceNamePath);
- CuratorCacheListener listener = CuratorCacheListener.builder().forPathChildrenCache(serviceNamePath, curatorFramework, new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- // 拉模式
- System.out.println("最新的urls为: " + curatorFramework.getChildren().forPath(serviceNamePath));
- }
- }).build();
- curatorCache.listenable().addListener(listener);
- curatorCache.start();
- }
- }
随机访问某个服务节点:模拟负载均衡
- public interface LoadBalance {
- String select(List<String> urls);
- }
- public class RandomLoadBalance implements LoadBalance{
- @Override
- public String select(List<String> urls) {
- int len=urls.size();
- Random random=new Random();
- return urls.get(random.nextInt(len));
- }
- }
UserController模拟请求
- @RestController
- @RequestMapping("/user")
- public class UserController {
- @Autowired
- private ServiceDiscovery serviceDiscovery;
-
- @RequestMapping("/discovery")
- public void discovery() throws IOException {
- List<String> urls= this.serviceDiscovery.discovery("order-service");
- LoadBalance loadBalance=new RandomLoadBalance();
- String url = loadBalance.select(urls);
- System.out.println("获取可用的order-service服务节点路径为: "+url);
- String response = new RestTemplate().getForObject("http://" + url + "/order/query", String.class);
- System.out.println("order-service response: "+response);
- // 添加对节点order-service的监听
- this.serviceDiscovery.registerWatch("/zk-registry/order-service");
- }
- }
测试
访问http://192.168.9.1:9999/user/discovery测试停掉order-service某个服务节点,不需要重启,再次访问user-service
创建spring-cloud-zookeeper的spring boot项目,Spring Boot版本为2.6.8
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.6.8</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <!-- spring cloud zookeeper config 配置中心-->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-zookeeper-config</artifactId>
- </dependency>
-
- <!-- bootstrap.yaml文件所需依赖 -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-bootstrap</artifactId>
- </dependency>
-
- <!-- spring cloud zookeeper discovery 注册中心-->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
- </dependency>
-
- </dependencies>
-
- <!--定义版本的管理-->
- <dependencyManagement>
- <dependencies>
- <!--定义spring cloud的版本-->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>2021.0.3</version>
- <scope>import</scope>
- <type>pom</type>
- </dependency>
- </dependencies>
- </dependencyManagement>
配置文件
- application.properties
-
- server.port=9091
- spring.cloud.zookeeper.connect-string=192.168.1.104:2181
- spring.cloud.zookeeper.discovery.root=/services/registries
- spring.application.name=spring-cloud-zookeeper
或者application.yml
- spring:
- cloud:
- zookeeper:
- connect-string: 192.168.1.104:2181
- discovery:
- root: /services/registries
- application:
- name: spring-cloud-zookeeper
- server:
- port: 9091
- (3) bootstrap.yaml
- spring:
- profiles:
- active: dev
- application:
- name: spring-cloud-zookeeper # 找哪一个ZNode节点 spring-cloud-zookeeper-dev
- cloud:
- zookeeper:
- config:
- root: config # 相当于 /zk-config/spring-cloud-zookeeper-dev
- profile-separator: "-"
- enabled: true
- connect-string: 192.168.1.104:2181
启动Spring Boot项目,观察Zookeeper Server上的数据
可以发现zookeeper server上自动创建了对应的节点
服务发现代码
- @RestController
- public class SpringCloudZkDiscoveryController {
- // 1.注入服务发现客户端接口
- @Autowired
- private DiscoveryClient discoveryClient;
-
- @RequestMapping("/sc-zk-discovery")
- public List<ServiceInstance> serviceUrl() {
- // 2.调用getInstances方法可获得所有可用实例
- List<ServiceInstance> instances = discoveryClient.getInstances("spring-cloud-zookeeper");
- String url = instances.get(0).getUri().toString();
- System.out.println("url=" + url);
- return discoveryClient.getInstances("spring-cloud-zookeeper");
- }
- }
访问测试
类似于我们手动实现服务注册,Spring Cloud也自定义了一个监听器 AbstractAutoServiceRegistration 去监听 web服务器启动事件 WebServerInitializedEvent。
- org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#onApplicationEvent(WebServerInitializedEvent)源代码片段:
- public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
-
- private final ServiceRegistry<R> serviceRegistry;
- // .................
-
- public void onApplicationEvent(WebServerInitializedEvent event) {
- this.bind(event);
- }
-
- @Deprecated
- public void bind(WebServerInitializedEvent event) {
- ApplicationContext context = event.getApplicationContext();
- if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
- this.port.compareAndSet(0, event.getWebServer().getPort());
- this.start();
- }
- }
-
- public void start() {
- if (!this.isEnabled()) {
- } else {
- if (!this.running.get()) {
- this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
- this.register();
- if (this.shouldRegisterManagement()) {
- this.registerManagement();
- }
-
- this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
- this.running.compareAndSet(false, true);
- }
- }
- }
- }
getRegistration() 获取具体注册实现类
org.springframework.cloud.zookeeper.serviceregistry.ZookeeperAutoServiceRegistration serviceRegistry.register(registration) 具体服务注册实现 public class ZookeeperServiceRegistry implements ServiceRegistry<ZookeeperRegistration>, SmartInitializingSingleton, Closeable { protected CuratorFramework curator; protected ZookeeperDiscoveryProperties properties; private ServiceDiscovery<ZookeeperInstance> serviceDiscovery; public void register(ZookeeperRegistration registration) { try { this.getServiceDiscovery().registerService(registration.getServiceInstance()); } catch (Exception var3) { ReflectionUtils.rethrowRuntimeException(var3); } } }
服务发现实现类:创建zookeeper节点,创建节点监听
- public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> {
- public void registerService(ServiceInstance<T> service) throws Exception {
- ServiceDiscoveryImpl.Entry<T> newEntry = new ServiceDiscoveryImpl.Entry(service);
- ServiceDiscoveryImpl.Entry<T> oldEntry = (ServiceDiscoveryImpl.Entry)this.services.putIfAbsent(service.getId(), newEntry);
- ServiceDiscoveryImpl.Entry<T> useEntry = oldEntry != null ? oldEntry : newEntry;
- synchronized(useEntry) {
- if (useEntry == newEntry) {
- // 创建节点监听
- useEntry.cache = this.makeNodeCache(service);
- }
- // 创建zookeeper节点
- this.internalRegisterService(service);
- }
- }
- // 创建节点监听
- private CuratorCacheBridge makeNodeCache(ServiceInstance<T> instance) {
- if (!this.watchInstances) {
- return null;
- } else {
- CuratorCacheBridge cache = CuratorCache.bridgeBuilder(this.client, this.pathForInstance(instance.getName(), instance.getId())).withOptions(new Options[]{Options.SINGLE_NODE_CACHE}).withDataNotCached().build();
- CuratorCacheListener listener = CuratorCacheListener.builder().afterInitialized().forAll((__, ___, data) -> {
- if (data != null) {
- try {
- ServiceInstance<T> newInstance = this.serializer.deserialize(data.getData());
- ServiceDiscoveryImpl.Entry<T> entry = (ServiceDiscoveryImpl.Entry)this.services.get(newInstance.getId());
- if (entry != null) {
- synchronized(entry) {
- entry.service = newInstance;
- }
- }
- } catch (Exception var10) {
- this.log.debug("Could not deserialize: " + data.getPath());
- }
- } else {
- this.log.warn("Instance data has been deleted for: " + instance);
- }
-
- }).build();
- cache.listenable().addListener(listener);
- cache.start();
- return cache;
- }
- }
-
- // 创建zookeeper节点
- @VisibleForTesting
- protected void internalRegisterService(ServiceInstance<T> service) throws Exception {
- byte[] bytes = this.serializer.serialize(service);
- String path = this.pathForInstance(service.getName(), service.getId());
- int MAX_TRIES = true;
- boolean isDone = false;
-
- for(int i = 0; !isDone && i < 2; ++i) {
- try {
- CreateMode mode;
- switch(service.getServiceType()) {
- case DYNAMIC:
- mode = CreateMode.EPHEMERAL;
- break;
- case DYNAMIC_SEQUENTIAL:
- mode = CreateMode.EPHEMERAL_SEQUENTIAL;
- break;
- default:
- mode = CreateMode.PERSISTENT;
- }
- ((ACLBackgroundPathAndBytesable)this.client.create().creatingParentContainersIfNeeded().withMode(mode)).forPath(path, bytes);
- isDone = true;
- } catch (NodeExistsException var8) {
- this.client.delete().forPath(path);
- }
- }
- }
-
- }
SpringBoot项目启动 -> webServer启动 -> 监听器监听服务启动事件执行流程:
SpringApplication.run(args) -> refreshContext(context) -> refresh(context) -> ServletWebServerApplicationContext.refresh() -> AbstractApplicationContext.refresh() -> finishRefresh() -> DefaultLifecycleProcessor.onRefresh() -> startBeans(true) -> DefaultLifecycleProcessor$LifecycleGroup.start() -> doStart() -> WebServerStartStopLifecycle.start() -> AbstractApplicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext)) -> SimpleApplicationEventMulticaster.multicastEvent(applicationEvent, eventType) -> invokeListener(listener, event) -> doInvokeListener(listener, event) -> listener.onApplicationEvent(event);
堆栈信息:
- onApplicationEvent:12, ZkApplicationListener (com.zk.serviceregistry.orderservice.listener)
- doInvokeListener:176, SimpleApplicationEventMulticaster (org.springframework.context.event)
- invokeListener:169, SimpleApplicationEventMulticaster (org.springframework.context.event)
- multicastEvent:143, SimpleApplicationEventMulticaster (org.springframework.context.event)
- publishEvent:421, AbstractApplicationContext (org.springframework.context.support)
- publishEvent:378, AbstractApplicationContext (org.springframework.context.support)
- start:46, WebServerStartStopLifecycle (org.springframework.boot.web.servlet.context)
- doStart:178, DefaultLifecycleProcessor (org.springframework.context.support)
- access$200:54, DefaultLifecycleProcessor (org.springframework.context.support)
- start:356, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support)
- accept:-1, 1643565953 (org.springframework.context.support.DefaultLifecycleProcessor$$Lambda$541)
- forEach:75, Iterable (java.lang)
- startBeans:155, DefaultLifecycleProcessor (org.springframework.context.support)
- onRefresh:123, DefaultLifecycleProcessor (org.springframework.context.support)
- finishRefresh:935, AbstractApplicationContext (org.springframework.context.support)
- refresh:586, AbstractApplicationContext (org.springframework.context.support)
- refresh:145, ServletWebServerApplicationContext (org.springframework.boot.web.servlet.context)
- refresh:745, SpringApplication (org.springframework.boot)
- refreshContext:420, SpringApplication (org.springframework.boot)
- run:307, SpringApplication (org.springframework.boot)
- run:1317, SpringApplication (org.springframework.boot)
- run:1306, SpringApplication (org.springframework.boot)
- main:10, OrderServiceApplication (com.zk.serviceregistry.orderservice)
1)SpringApplicaton.run()
- public class SpringApplication {
- public ConfigurableApplicationContext run(String... args) {
- long startTime = System.nanoTime();
- DefaultBootstrapContext bootstrapContext = this.createBootstrapContext();
- ConfigurableApplicationContext context = null;
- this.configureHeadlessProperty();
- SpringApplicationRunListeners listeners = this.getRunListeners(args);
- listeners.starting(bootstrapContext, this.mainApplicationClass);
-
- try {
- ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
- ConfigurableEnvironment environment = this.prepareEnvironment(listeners, bootstrapContext, applicationArguments);
- this.configureIgnoreBeanInfo(environment);
- Banner printedBanner = this.printBanner(environment);
- context = this.createApplicationContext();
- context.setApplicationStartup(this.applicationStartup);
- this.prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
- this.refreshContext(context); // 1
- this.afterRefresh(context, applicationArguments);
- Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
- if (this.logStartupInfo) {
- (new StartupInfoLogger(this.mainApplicationClass)).logStarted(this.getApplicationLog(), timeTakenToStartup);
- }
-
- listeners.started(context, timeTakenToStartup);
- this.callRunners(context, applicationArguments);
- } catch (Throwable var12) {
- this.handleRunFailure(context, var12, listeners);
- throw new IllegalStateException(var12);
- }
-
- try {
- Duration timeTakenToReady = Duration.ofNanos(System.nanoTime() - startTime);
- listeners.ready(context, timeTakenToReady);
- return context;
- } catch (Throwable var11) {
- this.handleRunFailure(context, var11, (SpringApplicationRunListeners)null);
- throw new IllegalStateException(var11);
- }
- }
- }
2)refreshContext(context)
- private void refreshContext(ConfigurableApplicationContext context) {
- if (this.registerShutdownHook) {
- shutdownHook.registerApplicationContext(context);
- }
- this.refresh(context); // 2
- }
3)AbstractApplicationContext.refresh()
- public void refresh() throws BeansException, IllegalStateException {
- synchronized(this.startupShutdownMonitor) {
- StartupStep contextRefresh = this.applicationStartup.start("spring.context.refresh");
- this.prepareRefresh();
- ConfigurableListableBeanFactory beanFactory = this.obtainFreshBeanFactory();
- this.prepareBeanFactory(beanFactory);
-
- try {
- this.postProcessBeanFactory(beanFactory);
- StartupStep beanPostProcess = this.applicationStartup.start("spring.context.beans.post-process");
- this.invokeBeanFactoryPostProcessors(beanFactory);
- this.registerBeanPostProcessors(beanFactory);
- beanPostProcess.end();
- this.initMessageSource();
- this.initApplicationEventMulticaster();
- this.onRefresh();
- this.registerListeners();
- this.finishBeanFactoryInitialization(beanFactory);
- this.finishRefresh(); // 3
- } catch (BeansException var10) {
- if (this.logger.isWarnEnabled()) {
- this.logger.warn("Exception encountered during context initialization - cancelling refresh attempt: " + var10);
- }
-
- this.destroyBeans();
- this.cancelRefresh(var10);
- throw var10;
- } finally {
- this.resetCommonCaches();
- contextRefresh.end();
- }
-
- }
- }
4)finishRefresh()
- protected void finishRefresh() {
- this.clearResourceCaches();
- this.initLifecycleProcessor();
- this.getLifecycleProcessor().onRefresh(); // 4
- this.publishEvent((ApplicationEvent)(new ContextRefreshedEvent(this)));
- if (!NativeDetector.inNativeImage()) {
- LiveBeansView.registerApplicationContext(this);
- }
- }
5)DefaultLifecycleProcessor.onRefresh()
- public void onRefresh() {
- this.startBeans(true); // 5
- this.running = true;
- }
6)startBeans(true)
- private void startBeans(boolean autoStartupOnly) {
- Map<String, Lifecycle> lifecycleBeans = this.getLifecycleBeans();
- Map<Integer, DefaultLifecycleProcessor.LifecycleGroup> phases = new TreeMap();
- lifecycleBeans.forEach((beanName, bean) -> {
- if (!autoStartupOnly || bean instanceof SmartLifecycle && ((SmartLifecycle)bean).isAutoStartup()) {
- int phase = this.getPhase(bean);
- ((DefaultLifecycleProcessor.LifecycleGroup)phases.computeIfAbsent(phase, (p) -> {
- return new DefaultLifecycleProcessor.LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
- })).add(beanName, bean);
- }
- });
- if (!phases.isEmpty()) {
- phases.values().forEach(DefaultLifecycleProcessor.LifecycleGroup::start); // 6
- }
- }
7)DefaultLifecycleProcessor$LifecycleGroup.start()
- public void start() {
- if (!this.members.isEmpty()) {
- if (DefaultLifecycleProcessor.this.logger.isDebugEnabled()) {
- DefaultLifecycleProcessor.this.logger.debug("Starting beans in phase " + this.phase);
- }
- Collections.sort(this.members);
- Iterator var1 = this.members.iterator();
- while(var1.hasNext()) {
- DefaultLifecycleProcessor.LifecycleGroupMember member = (DefaultLifecycleProcessor.LifecycleGroupMember)var1.next();
- DefaultLifecycleProcessor.this.doStart(this.lifecycleBeans, member.name, this.autoStartupOnly); // 7
- }
- }
- }
8)DefaultLifecycleProcessor.doStart()
- private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
- Lifecycle bean = (Lifecycle)lifecycleBeans.remove(beanName);
- if (bean != null && bean != this) {
- String[] dependenciesForBean = this.getBeanFactory().getDependenciesForBean(beanName);
- String[] var6 = dependenciesForBean;
- int var7 = dependenciesForBean.length;
- for(int var8 = 0; var8 < var7; ++var8) {
- String dependency = var6[var8];
- this.doStart(lifecycleBeans, dependency, autoStartupOnly);
- }
-
- if (!bean.isRunning() && (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle)bean).isAutoStartup())) {
- if (this.logger.isTraceEnabled()) {
- this.logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
- }
-
- try {
- bean.start(); // 8
- } catch (Throwable var10) {
- throw new ApplicationContextException("Failed to start bean '" + beanName + "'", var10);
- }
- }
- }
- }
9)发布web服务启动完成事件
事件 ServletWebServerInitializedEvent extends WebServerInitializedEvent
- public void start() {
- this.webServer.start();
- this.running = true;
- this.applicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext)); // 9
- }
10)发布事件 AbstractApplicationContext.publishEvent()
- protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
- if (this.earlyApplicationEvents != null) {
- this.earlyApplicationEvents.add(applicationEvent);
- } else {
- this.getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType); // 10
- }
-
- if (this.parent != null) {
- if (this.parent instanceof AbstractApplicationContext) {
- ((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
- } else {
- this.parent.publishEvent(event);
- }
- }
- }
11)广播器发布事件 SimpleApplicationEventMulticaster.multicastEvent(event, eventType)
- public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
- ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event);
- Executor executor = this.getTaskExecutor();
- Iterator var5 = this.getApplicationListeners(event, type).iterator();
- while(var5.hasNext()) {
- ApplicationListener<?> listener = (ApplicationListener)var5.next();
- if (executor != null) {
- executor.execute(() -> {
- this.invokeListener(listener, event);
- });
- } else {
- this.invokeListener(listener, event); // 11
- }
- }
- }
12)调用监听器 invokeListener(listener, event)
- protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
- ErrorHandler errorHandler = this.getErrorHandler();
- if (errorHandler != null) {
- try {
- this.doInvokeListener(listener, event);
- } catch (Throwable var5) {
- errorHandler.handleError(var5);
- }
- } else {
- this.doInvokeListener(listener, event); // 12
- }
- }
-
- private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
- try {
- listener.onApplicationEvent(event); // 13
- } catch (ClassCastException var6) {}
- }
13)调用监听器事件回调方法
- public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
- public void onApplicationEvent(WebServerInitializedEvent event) {
- // TODO ..................
- }
- }
作为一个分布式协同服务,ZooKeeper非常好,但是对于Service发现服务来说就不合适了,因为对于Service发现服务来说就算是返回了包含不实的信息的结果也比什么都不返回要好。所以当向注册中心查询服务列表时,我们可以容忍注册中心返回的是几分钟以前的注册信息,但不能接受服务直接down掉不可用。
但是zk会出现这样一种情况,当master节点因为网络故障与其他节点失去联系时,剩余节点会重新进行leader选举。问题在于,选举leader的时间太长,30 ~ 120s, 且选举期间整个zk集群都是不可用的,这就导致在选举期间注册服务瘫痪。在云部署的环境下,因网络问题使得zk集群失去master节点是较大概率会发生的事,虽然服务能够最终恢复,但是漫长的选举时间导致的注册长期不可用是不能容忍的。
所以说,作为注册中心,可用性的要求要高于一致性!
在 CAP 模型中,Zookeeper整体遵循一致性(CP)原则,即在任何时候对 Zookeeper 的访问请求能得到一致的数据结果,但是当机器下线或者宕机时,不能保证服务可用性。
那为什么Zookeeper不使用最终一致性(AP)模型呢?因为这个依赖Zookeeper的核心算法是ZAB,所有设计都是为了强一致性。这个对于分布式协调系统,完全没没有毛病,但是你如果将Zookeeper为分布式协调服务所做的一致性保障,用在注册中心,或者说服务发现场景,这个其实就不合适。
来源:https://blog.csdn.net/luciferlongxu/article/
details/126529104
读者专属技术群
构建高质量的技术交流社群,欢迎从事后端开发、运维技术进群(备注岗位,已在技术交流群的请勿重复添加)。主要以技术交流、内推、行业探讨为主,请文明发言。广告人士勿入,切勿轻信私聊,防止被骗。
扫码加我好友,拉你进群
推荐阅读 点击标题可跳转
CentOS 搭建 OpenVPN 服务,一次性成功!收藏了
PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。