当前位置:   article > 正文

apollo全流程、原理、源码解析_apollo代码解析

apollo代码解析

目录

apollo架构

流程解析(用户界面和服务端(apollo-adminservice)通信过程)

流程解析(服务端(apollo-adminservice)和configservice端通信过程)

流程解析(configservice和client端通信过程) 


apollo架构

流程解析(用户界面和服务端(apollo-adminservice)通信过程)

1、用户在前端触发增、删、改操作

2、前端服务会将请求发送至apollo-portal服务(ItemController),在发送请求之前apollo-portal做了以下事情:

  • 验证请求对象各参数有效性(是否为空)以及权限验证,有独立的数据库apolloportaldb
  • 获取用户信息并设置进请求对象中
  • 通过apollo-configservice提供的接口(ServiceController),configservice从注册中心拉取apollo-adminservice的实例列表集合(ip,端口等)并且缓存到本地
  • 通过apollo-adminservic提供的接口(NamespaceController),查询apolloconfigdb库中的namespace表,获取namespaceid设置进请求对象中
  • 发送真正的请求到apollo-adminservice(ItemController)
  1. @PreAuthorize(value = "@permissionValidator.hasModifyNamespacePermission(#appId, #namespaceName, #env)")
  2. @PostMapping("/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces/{namespaceName}/item")
  3. public ItemDTO createItem(@PathVariable String appId, @PathVariable String env,
  4. @PathVariable String clusterName, @PathVariable String namespaceName,
  5. @RequestBody ItemDTO item) {
  6. checkModel(isValidItem(item));//验证
  7. item.setLineNum(0);
  8. item.setId(0);
  9. String userId = userInfoHolder.getUser().getUserId();//获取用户信息
  10. item.setDataChangeCreatedBy(userId);
  11. item.setDataChangeLastModifiedBy(userId);
  12. item.setDataChangeCreatedTime(null);
  13. item.setDataChangeLastModifiedTime(null);
  14. return configService.createItem(appId, Env.valueOf(env), clusterName, namespaceName, item);
  15. }

3、apollo-adminservice将数据持久化到apolloconfigdb库的item表中,同时持久化apolloconfigdb库的audit表,同时将本次变更(增删改)的数据转换为json后持久化至apolloconfigdb库commit表

  1. @PreAcquireNamespaceLock
  2. @PostMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/items")
  3. public ItemDTO create(@PathVariable("appId") String appId,
  4. @PathVariable("clusterName") String clusterName,
  5. @PathVariable("namespaceName") String namespaceName, @RequestBody ItemDTO dto) {
  6. Item entity = BeanUtils.transform(Item.class, dto);
  7. ConfigChangeContentBuilder builder = new ConfigChangeContentBuilder();
  8. Item managedEntity = itemService.findOne(appId, clusterName, namespaceName, entity.getKey());
  9. if (managedEntity != null) {
  10. throw new BadRequestException("item already exists");
  11. } else {
  12. entity = itemService.save(entity);
  13. builder.createItem(entity);
  14. }
  15. dto = BeanUtils.transform(ItemDTO.class, entity);
  16. Commit commit = new Commit();
  17. commit.setAppId(appId);
  18. commit.setClusterName(clusterName);
  19. commit.setNamespaceName(namespaceName);
  20. commit.setChangeSets(builder.build());
  21. commit.setDataChangeCreatedBy(dto.getDataChangeLastModifiedBy());
  22. commit.setDataChangeLastModifiedBy(dto.getDataChangeLastModifiedBy());
  23. commitService.save(commit);
  24. return dto;
  25. }

流程解析(服务端(apollo-adminservice)和configservice端通信过程)

1、用户在前端触发发布操作

2、前端服务会将请求发送至apollo-portal服务(ReleaseController)

  • 发送真正的请求到apollo-adminservice(ReleaseController)
  • apollo-adminservice查询apolloconfigdb库的item表的当前namespace下所有的items(所有的配置),并且查询当前namespace下的父namespace(如有父)的所有的items
  • 父的items的数据从apolloconfigdb库的release表中查询(json存储),然后合并父(如果有)和子的所有item
  • apollo-adminservice保存上述合并后的父(如果有)子的所有items,保存至apolloconfigdb库的release表,配置以json字段形式保存
  • apollo-adminservice将本次发布的历史记录(操作类型、操作人、操作时间、前后配置等)保存至apolloconfigdb库的releasehistory表
  • apollo-adminservice获取当前namespace存在的子namespace,如果存在则将子namespace的配置也同时获取并做上述的流程(merge、save、save history)
  • apollo-adminservice将appId+cluster+namespace保存至apolloconfigdb库的releasemessage表中,每次发布都会新增一条新数据,并且删除老数据,
    其实就是id会增大,并且保持同一个appid+cluster+namespace只有一条数据,这一步就是apollo-adminservice和configservice通信的连接点,在
    这一步将2个服务串起来了(底下的configservice服务会定时默认1秒扫描一次这个表)。很多场景下我们是用消息队列来做这一步进行解耦,但是apollo这里
    采用了本地数据库的方式,其实和消息队列是一样的。
  • apollo-portal发布事件ConfigPublishEvent(继承自Spring的ApplicationEvent)
  • ConfigPublishListener监听到上述事件后进行异步的发送Email(无需关注)和发消息(ActiveMQ,topic名称为ops.noc.record.created),这里如果我们配置了activemq的话也可以通过监听这个队列来实现配置变更的感知。
  1. @PreAuthorize(value = "@permissionValidator.hasReleaseNamespacePermission(#appId, #namespaceName, #env)")
  2. @PostMapping(value = "/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces/{namespaceName}/releases")
  3. public ReleaseDTO createRelease(@PathVariable String appId,
  4. @PathVariable String env, @PathVariable String clusterName,
  5. @PathVariable String namespaceName, @RequestBody NamespaceReleaseModel model) {
  6. model.setAppId(appId);
  7. model.setEnv(env);
  8. model.setClusterName(clusterName);
  9. model.setNamespaceName(namespaceName);
  10. if (model.isEmergencyPublish() && !portalConfig.isEmergencyPublishAllowed(Env.valueOf(env))) {
  11. throw new BadRequestException(String.format("Env: %s is not supported emergency publish now", env));
  12. }
  13. ReleaseDTO createdRelease = releaseService.publish(model);
  14. ConfigPublishEvent event = ConfigPublishEvent.instance();
  15. event.withAppId(appId)
  16. .withCluster(clusterName)
  17. .withNamespace(namespaceName)
  18. .withReleaseId(createdRelease.getId())
  19. .setNormalPublishEvent(true)
  20. .setEnv(Env.valueOf(env));
  21. publisher.publishEvent(event);
  22. return createdRelease;
  23. }

流程解析(configservice和client端通信过程) 

1、apollo-configservice服务中,通过自动装配类ConfigServiceAutoConfiguration在初始化bean ReleaseMessageScanner的过程中做了以下事情:

  • 注册了定时任务的线程池,该线程池默认配置核心线程为1,最大线程为Integer.maxvalue,线程存活时间为10ms,执行周期为每秒(默认,可配置)执行一次。
  • 为ReleaseMessageScanner加入6个listener
  1. @Bean
  2. public ReleaseMessageScanner releaseMessageScanner() {
  3. ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
  4. releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
  5. releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
  6. //主要关注这个
  7. releaseMessageScanner.addMessageListener(configService);
  8. releaseMessageScanner.addMessageListener(configFileController);
  9. //主要关注这个
  10. releaseMessageScanner.addMessageListener(notificationControllerV2);
  11. releaseMessageScanner.addMessageListener(notificationController);
  12. return releaseMessageScanner;
  13. }
  • 获取最大扫描的全局变量maxIdScanned,此变量的值为服务启动的过程中查询apolloconfigdb库的releasemessage表的当前最大的主键,如果没记录(初次部署apollo的时候)则为0。

2、apollo-configservice执行定时任务:

  • 每次查询apolloconfigdb库的releasemessage表中,大于上述记录中的maxIdScanned值的的500条记录(排好序的),若为空则结束。
  • 将所有扫描到的数据(ReleaseMessage)依次发送给上述1的所有listener串行处理(每个listener处理逻辑不同,只列举一个)
  • 如果单次扫描的有变更的数据量等于500,则继续循环继续扫描,如果小于500则结束循环进行下一步
  • 更新maxIdScanned为实际获取的最后一条message的id
  1. public ReleaseMessageScanner() {
  2. listeners = Lists.newCopyOnWriteArrayList();
  3. executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
  4. .create("ReleaseMessageScanner", true));
  5. }
  6. @Override
  7. public void afterPropertiesSet() throws Exception {
  8. databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
  9. maxIdScanned = loadLargestMessageId();
  10. //注册定时任务
  11. executorService.scheduleWithFixedDelay((Runnable) () -> {
  12. Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
  13. try {
  14. //具体的扫描逻辑
  15. scanMessages();
  16. transaction.setStatus(Transaction.SUCCESS);
  17. } catch (Throwable ex) {
  18. transaction.setStatus(ex);
  19. logger.error("Scan and send message failed", ex);
  20. } finally {
  21. transaction.complete();
  22. }
  23. }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
  24. }

3、列举apollo-configservice服务的ConfigServiceWithCache(上述2步骤的listener之一,也是client通信获取服务端数据的数据源)

  • 服务初始化结束,调用initialize方法,此方法初始化了本地缓存的更新策略,使用的是guaua的LoadingCache,具体策略是通过key去查询本地数据库apolloconfigdb库的releasemessage表
  • handleMessage方法,收到上述2发送过来的message数据(当服务启动的时候/配置发布的时候(增删改)),先去缓存LoadingCache中的此key的数据清空失效,然后调用LoadingCache的的getUnchecked方法,这个方法底层的逻辑是先从map中获取值,如果不存在,会执行上述初始化的逻辑,即从数据库去查询数据,在执行getUnchecked方法之前,已经在缓存中删除了此key的数据,所以当收到消息以后会拿着key去数据库查询值。
  • 最终能保证,所有的配置变更都会实时存在此缓存对象中,最终apollo-configservice会对外提供一个接口ConfigController,查询数据的时候会从此缓存中查询,而不访问数据库

  1. @Override
  2. public void handleMessage(ReleaseMessage message, String channel) {
  3. logger.info("message received - channel: {}, message: {}", channel, message);
  4. if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(message.getMessage())) {
  5. return;
  6. }
  7. try {
  8. //让缓存失效
  9. invalidate(message.getMessage());
  10. //获取如果没有则从数据库查
  11. configCache.getUnchecked(message.getMessage());
  12. } catch (Throwable ex) {
  13. //ignore
  14. }
  15. }

4、client端与服务端的通信的最终原理,就是服务端会调用3步骤中configservice提供的接口/configs/{appId}/{clusterName}/{namespace},具体看下client如何调用的。

5、client调用apollo-configservice的接口

  • ApolloApplicationContextInitializer
  1. //这里才是真正设置application namespace的值进入spring中的地方
  2. protected void initialize(ConfigurableEnvironment environment) {
  3. if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
  4. //already initialized
  5. return;
  6. }
  7. //这一行,获取不到这个变量,用默认的,即application namespace
  8. String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION);
  9. logger.debug("Apollo bootstrap namespaces: {}", namespaces);
  10. List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces);
  11. CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
  12. for (String namespace : namespaceList) {
  13. Config config = ConfigService.getConfig(namespace);
  14. composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
  15. }
  16. environment.getPropertySources().addFirst(composite);
  17. }
  • 最终会执行DefaultConfigFactory类中的createLocalConfigRepository方法,创建LocalFileConfigRepository之前先创建RemoteConfigRepository对象
  1. LocalFileConfigRepository createLocalConfigRepository(String namespace) {
  2. if (m_configUtil.isInLocalMode()) {
  3. logger.warn(
  4. "==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
  5. namespace);
  6. return new LocalFileConfigRepository(namespace);
  7. }
  8. return new LocalFileConfigRepository(namespace, //看这里 createRemoteConfigRepository(namespace));
  9. }

  • 在RemoteConfigRepository对象的构造方法中会调用trySync、schedulePeriodicRefresh、scheduleLongPollingRefresh三个方法
  1. public RemoteConfigRepository(String namespace) {
  2. m_namespace = namespace;
  3. m_configCache = new AtomicReference<>();
  4. m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
  5. m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
  6. m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
  7. remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
  8. m_longPollServiceDto = new AtomicReference<>();
  9. m_remoteMessages = new AtomicReference<>();
  10. m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
  11. m_configNeedForceRefresh = new AtomicBoolean(true);
  12. m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
  13. m_configUtil.getOnErrorRetryInterval() * 8);
  14. gson = new Gson();
  15. //先看这里,去同步数据
  16. this.trySync();
  17. //这个是默认5分钟调用一次this.trySync();
  18. this.schedulePeriodicRefresh();
  19. //这个是默认定时5秒去拉取是否变更的数据,也是最终调用this.trySync();简单理解为5秒调用一次this.trySync();但是不严谨
  20. this.scheduleLongPollingRefresh();
  21. }

  • trySync方法调用loadApolloConfig方法获取远程配置数据,从服务的启动参数中获取http请求的必要参数,上述4步骤中占位符的参数{appId}/{clusterName}/{namespace},其中appid我们的ApolloApplicationRunListener在Spring的environmentPrepared阶段有做过2次封装,从启动参数获取不到会获取应用的spring.application.name的值作为appid,准备好请求参数后,去请求apollo的metaService下的ServiceController提供的/services/config接口,从eureka中获取apollo-configservice的实例列表,随机找一个机器去访问上述4步骤提供的获取配置接口拿到数据
  1. @Override
  2. protected synchronized void sync() {
  3. Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
  4. try {
  5. ApolloConfig previous = m_configCache.get();
  6. //看这里
  7. ApolloConfig current = loadApolloConfig();
  8. // reference equals means HTTP 304
  9. if (previous != current) {
  10. logger.debug("Remote Config refreshed!");
  11. m_configCache.set(current);
  12. this.fireRepositoryChange(m_namespace, this.getConfig());
  13. }
  14. if (current != null) {
  15. Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
  16. current.getReleaseKey());
  17. }
  18. transaction.setStatus(Transaction.SUCCESS);
  19. } catch (Throwable ex) {
  20. transaction.setStatus(ex);
  21. throw ex;
  22. } finally {
  23. transaction.complete();
  24. }
  25. }

  • RemoteConfigRepository的构造方法注册2个定时任务,第一个schedulePeriodicRefresh是默认5分钟执行一次的请求一次当前namespace的所有config数据,scheduleLongPollingRefresh定时任务是定时5秒去获取apollo-configservice提供的配置变更接口,上述2步骤的其中一个listener(NotificationControllerV2),如果有新的配置变更,则去重新执行remoteRepository的trySync方法。
  • trySync方法调用sync方法,判断RemoteConfigRepository本地缓存对象(ApolloConfig)和loadApolloConfig刚刚拿到的数据是否一致,如果不一致,先替换本地缓存的值为刚刚拿到的数据,再调用fireRepositoryChange方法,此方法里面会去串行执行所有的监听配置变更的监听器去执行自己的onRepositoryChange方法,这个监听器由业务方自己去编写,可以根据实际业务需要编写。apollo默认添加了一个监听器AutoUpdateConfigChangeListener(后面会赘述)
  1. @Override
  2. protected synchronized void sync() {
  3. Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
  4. try {
  5. ApolloConfig previous = m_configCache.get();
  6. ApolloConfig current = loadApolloConfig();
  7. // reference equals means HTTP 304
  8. if (previous != current) {
  9. //不一样,说明有变更(增/删/改)
  10. logger.debug("Remote Config refreshed!");
  11. m_configCache.set(current);
  12. //记住这里,这里很重要,而且很绕,现在先不用深究细节
  13. this.fireRepositoryChange(m_namespace, this.getConfig());
  14. }
  15. if (current != null) {
  16. Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
  17. current.getReleaseKey());
  18. }
  19. transaction.setStatus(Transaction.SUCCESS);
  20. } catch (Throwable ex) {
  21. transaction.setStatus(ex);
  22. throw ex;
  23. } finally {
  24. transaction.complete();
  25. }
  26. }

  • ApolloApplicationContextInitializer类中的initialize,放在了spring管理的MutablePropertySource对象中去。
  1. protected void initialize(ConfigurableEnvironment environment) {
  2. if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
  3. //already initialized
  4. return;
  5. }
  6. String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION);
  7. logger.debug("Apollo bootstrap namespaces: {}", namespaces);
  8. List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces);
  9. CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
  10. for (String namespace : namespaceList) {
  11. Config config = ConfigService.getConfig(namespace);
  12. //看这里 composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
  13. }
  14. environment.getPropertySources().addFirst(composite);
  15. }
  • PropertySourceProcessor类在服务初始化过程中为当前应用添加了配置变更监听器AutoUpdateConfigChangeListener,当运行过程中有变更的时候都会通知此监听器,注意,此监听器在服务启动过程中并未注册,因为还没启动成功,只有全量拉取。
  1. //注意这里也很重要,和上面的this.fireRepositoryChange(m_namespace, this.getConfig());有关系
  2. private void initializeAutoUpdatePropertiesFeature(ConfigurableListableBeanFactory beanFactory) {
  3. if (!configUtil.isAutoUpdateInjectedSpringPropertiesEnabled() ||
  4. !AUTO_UPDATE_INITIALIZED_BEAN_FACTORIES.add(beanFactory)) {
  5. return;
  6. }
  7. AutoUpdateConfigChangeListener autoUpdateConfigChangeListener = new AutoUpdateConfigChangeListener(
  8. environment, beanFactory);
  9. List<ConfigPropertySource> configPropertySources = configPropertySourceFactory.getAllConfigPropertySources();
  10. for (ConfigPropertySource configPropertySource : configPropertySources) {
  11. configPropertySource.addChangeListener(autoUpdateConfigChangeListener);
  12. }
  13. }
  • AutoUpdateConfigChangeListener收到的是增量的配置变更,会调用自身的onChange方法,在此方法中会循环所有变更的配置,以此对spring bean里面的值进行反射的替换。

6、在client接收到服务端的配置变更后会有一系列的监听器的监听和处理,主要有ConfigChangeListener和RepositoryChangeListener2种类型,大致的顺序和过程如下

  • 首先由RemoteConfigRepository这个listener接收到这个namespace有变更(注意拿到的是所有的变更数据包含已变更的和未变更)
  • RemoteConfigRepository会调用自身的fireRepositoryChange方法,此方法由父类AbstractConfigRepository定义和实现,同时LocalFileConfigRepository也实现了AbstractConfigRepository
  • 我们在上述第5步骤的第二小点介绍过,创建LocalFileConfigRepository对象的同时会先去创建RemoteConfigRepository类,并且将自身(this)作为监听器注册到RemoteConfigRepository对象中
  1. public LocalFileConfigRepository(String namespace, //注意这个对象就是上述的RemoteConfigRepository ConfigRepository upstream) {
  2. m_namespace = namespace;
  3. m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
  4. this.setLocalCacheDir(findLocalCacheDir(), false);
  5. //再看这里
  6. this.setUpstreamRepository(upstream);
  7. this.trySync();
  8. }
  9. @Override
  10. public void setUpstreamRepository(ConfigRepository upstreamConfigRepository) {
  11. if (upstreamConfigRepository == null) {
  12. return;
  13. }
  14. //clear previous listener
  15. if (m_upstream != null) {
  16. m_upstream.removeChangeListener(this);
  17. }
  18. m_upstream = upstreamConfigRepository;
  19. trySyncFromUpstream();‘
  20. //看这里,记住了,给RemoteConfigRepository里面加了一个LocalFileConfigRepository的listener
  21. upstreamConfigRepository.addChangeListener(this);
  22. }

  • 所以RemoteConfigRepository类此时是有一个listener对象的,即为LocalFileConfigRepository,这时候会调用进入LocalFileConfigRepository的onRepositoryChange方法中,做了2件事,先更新本地文件的配置缓存,第二调用自身的fireRepositoryChange方法,本地文件这个不需要太多关注,猜测可能是因为担心服务器挂了的一种兜底吧。主要看LocalFileConfigRepository的fireRepositoryChange
  • 同样的也是调用父类的默认实现,还是找监听器,上述步骤5中的第三小节,createLocalConfigRepository是在DefaultConfigFactory调用工厂方法create的时候执行的调用,在这之前new 了一个DefaultConfig对象,在这个构造方法中,传入的ConfigRepository是上面的LocalFileConfigRepository对象,然后接下来在finally中将当前对象(this)添加进去LocalFileConfigRepository的listener。所以,上述LocalFiIeConfigRepository调用自身的fireRepositoryChange其实是进入到了DefaultConfig类中的onRepositoryChange方法
  1. @Override
  2. public Config create(String namespace) {
  3. ConfigFileFormat format = determineFileFormat(namespace);
  4. if (ConfigFileFormat.isPropertiesCompatible(format)) {
  5. return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
  6. }
  7. //看这里new DefaultConfig
  8. return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
  9. }
  10. //构造方法调用了这里
  11. private void initialize() {
  12. try {
  13. updateConfig(m_configRepository.getConfig(), m_configRepository.getSourceType());
  14. } catch (Throwable ex) {
  15. Tracer.logError(ex);
  16. logger.warn("Init Apollo Local Config failed - namespace: {}, reason: {}.",
  17. m_namespace, ExceptionUtil.getDetailMessage(ex));
  18. } finally {
  19. //看这里记住了吗,在当前LocalFileConfigRepository对象中加入了DefaultConfig为listener
  20. m_configRepository.addChangeListener(this);
  21. }
  22. }
  • 到这一步其实已经有变化了,到目前为止DefaultConfig类拿到的还是全量的配置数据,我们可以看见DefaultConfig类中的onRepositoryChange方法调用的是他的父类AbstractConfig中的fireConfigChange,在这里调用的方法名变了,从fireRepositoryChange变为了fireConfigChange,在这里我们可以看见DefaultConfig类中的onRepositoryChange方法通过自身updateAndCalcConfigChanges方法计算出来本次变更涉及的key都有哪些,变更类型是什么(增/删/改),最终封装成一个Map<String,ConfigChange>对象,并且更新内存里面的缓存。
  1. @Override
  2. public synchronized void onRepositoryChange(String namespace, Properties newProperties) {
  3. if (newProperties.equals(m_configProperties.get())) {
  4. return;
  5. }
  6. ConfigSourceType sourceType = m_configRepository.getSourceType();
  7. Properties newConfigProperties = new Properties();
  8. newConfigProperties.putAll(newProperties);
  9. Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties, sourceType);
  10. if (actualChanges.isEmpty()) {
  11. return;
  12. }
  13. this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges));
  14. Tracer.logEvent("Apollo.Client.ConfigChanges", m_namespace);
  15. }

  • 最后发布一个一个ConfigChangeEvent事件,调用AbstractConfig中的fireConfigChange方法,上述第5步骤倒数第二小节已经说过了,apollo已经将此监听器AutoUpdateConfigChangeListener加入进来了,并且只有此一个监听器,直接调用AutoUpdateConfigChangeListener类的onchange方法,注意前面所有的监听器的执行都是串行的,此步骤是异步线程池执行的。
  1. protected void fireConfigChange(final ConfigChangeEvent changeEvent) {
  2. for (final ConfigChangeListener listener : m_listeners) {
  3. // check whether the listener is interested in this change event
  4. if (!isConfigChangeListenerInterested(listener, changeEvent)) {
  5. continue;
  6. }
  7. m_executorService.submit(new Runnable() {
  8. @Override
  9. public void run() {
  10. String listenerName = listener.getClass().getName();
  11. Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);
  12. try {
  13. listener.onChange(changeEvent);
  14. transaction.setStatus(Transaction.SUCCESS);
  15. } catch (Throwable ex) {
  16. transaction.setStatus(ex);
  17. Tracer.logError(ex);
  18. logger.error("Failed to invoke config change listener {}", listenerName, ex);
  19. } finally {
  20. transaction.complete();
  21. }
  22. }
  23. });
  24. }
  25. }
  • 最终就是循环所有变更的key去一一替换spring容器中的值

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/414091
推荐阅读
相关标签
  

闽ICP备14008679号