赞
踩
目录
流程解析(用户界面和服务端(apollo-adminservice)通信过程)
流程解析(服务端(apollo-adminservice)和configservice端通信过程)
流程解析(configservice和client端通信过程)
1、用户在前端触发增、删、改操作
2、前端服务会将请求发送至apollo-portal服务(ItemController),在发送请求之前apollo-portal做了以下事情:
- @PreAuthorize(value = "@permissionValidator.hasModifyNamespacePermission(#appId, #namespaceName, #env)")
- @PostMapping("/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces/{namespaceName}/item")
- public ItemDTO createItem(@PathVariable String appId, @PathVariable String env,
- @PathVariable String clusterName, @PathVariable String namespaceName,
- @RequestBody ItemDTO item) {
- checkModel(isValidItem(item));//验证
- item.setLineNum(0);
- item.setId(0);
- String userId = userInfoHolder.getUser().getUserId();//获取用户信息
- item.setDataChangeCreatedBy(userId);
- item.setDataChangeLastModifiedBy(userId);
- item.setDataChangeCreatedTime(null);
- item.setDataChangeLastModifiedTime(null);
-
- return configService.createItem(appId, Env.valueOf(env), clusterName, namespaceName, item);
- }
3、apollo-adminservice将数据持久化到apolloconfigdb库的item表中,同时持久化apolloconfigdb库的audit表,同时将本次变更(增删改)的数据转换为json后持久化至apolloconfigdb库commit表
- @PreAcquireNamespaceLock
- @PostMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/items")
- public ItemDTO create(@PathVariable("appId") String appId,
- @PathVariable("clusterName") String clusterName,
- @PathVariable("namespaceName") String namespaceName, @RequestBody ItemDTO dto) {
- Item entity = BeanUtils.transform(Item.class, dto);
-
- ConfigChangeContentBuilder builder = new ConfigChangeContentBuilder();
- Item managedEntity = itemService.findOne(appId, clusterName, namespaceName, entity.getKey());
- if (managedEntity != null) {
- throw new BadRequestException("item already exists");
- } else {
- entity = itemService.save(entity);
- builder.createItem(entity);
- }
- dto = BeanUtils.transform(ItemDTO.class, entity);
-
- Commit commit = new Commit();
- commit.setAppId(appId);
- commit.setClusterName(clusterName);
- commit.setNamespaceName(namespaceName);
- commit.setChangeSets(builder.build());
- commit.setDataChangeCreatedBy(dto.getDataChangeLastModifiedBy());
- commit.setDataChangeLastModifiedBy(dto.getDataChangeLastModifiedBy());
- commitService.save(commit);
-
- return dto;
- }
1、用户在前端触发发布操作
2、前端服务会将请求发送至apollo-portal服务(ReleaseController)
apollo-adminservice将appId+cluster+namespace保存至apolloconfigdb库的releasemessage表中,每次发布都会新增一条新数据,并且删除老数据, 其实就是id会增大,并且保持同一个appid+cluster+namespace只有一条数据,这一步就是apollo-adminservice和configservice通信的连接点,在 这一步将2个服务串起来了(底下的configservice服务会定时默认1秒扫描一次这个表)。很多场景下我们是用消息队列来做这一步进行解耦,但是apollo这里 采用了本地数据库的方式,其实和消息队列是一样的。
- @PreAuthorize(value = "@permissionValidator.hasReleaseNamespacePermission(#appId, #namespaceName, #env)")
- @PostMapping(value = "/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces/{namespaceName}/releases")
- public ReleaseDTO createRelease(@PathVariable String appId,
- @PathVariable String env, @PathVariable String clusterName,
- @PathVariable String namespaceName, @RequestBody NamespaceReleaseModel model) {
- model.setAppId(appId);
- model.setEnv(env);
- model.setClusterName(clusterName);
- model.setNamespaceName(namespaceName);
-
- if (model.isEmergencyPublish() && !portalConfig.isEmergencyPublishAllowed(Env.valueOf(env))) {
- throw new BadRequestException(String.format("Env: %s is not supported emergency publish now", env));
- }
-
- ReleaseDTO createdRelease = releaseService.publish(model);
-
- ConfigPublishEvent event = ConfigPublishEvent.instance();
- event.withAppId(appId)
- .withCluster(clusterName)
- .withNamespace(namespaceName)
- .withReleaseId(createdRelease.getId())
- .setNormalPublishEvent(true)
- .setEnv(Env.valueOf(env));
-
- publisher.publishEvent(event);
-
- return createdRelease;
- }
1、apollo-configservice服务中,通过自动装配类ConfigServiceAutoConfiguration在初始化bean ReleaseMessageScanner的过程中做了以下事情:
- @Bean
- public ReleaseMessageScanner releaseMessageScanner() {
- ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
- releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
- releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
- //主要关注这个
- releaseMessageScanner.addMessageListener(configService);
- releaseMessageScanner.addMessageListener(configFileController);
- //主要关注这个
- releaseMessageScanner.addMessageListener(notificationControllerV2);
- releaseMessageScanner.addMessageListener(notificationController);
- return releaseMessageScanner;
- }
2、apollo-configservice执行定时任务:
- public ReleaseMessageScanner() {
- listeners = Lists.newCopyOnWriteArrayList();
- executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
- .create("ReleaseMessageScanner", true));
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
- maxIdScanned = loadLargestMessageId();
- //注册定时任务
- executorService.scheduleWithFixedDelay((Runnable) () -> {
- Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
- try {
- //具体的扫描逻辑
- scanMessages();
- transaction.setStatus(Transaction.SUCCESS);
- } catch (Throwable ex) {
- transaction.setStatus(ex);
- logger.error("Scan and send message failed", ex);
- } finally {
- transaction.complete();
- }
- }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
-
- }
3、列举apollo-configservice服务的ConfigServiceWithCache(上述2步骤的listener之一,也是client通信获取服务端数据的数据源)
- @Override
- public void handleMessage(ReleaseMessage message, String channel) {
- logger.info("message received - channel: {}, message: {}", channel, message);
- if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(message.getMessage())) {
- return;
- }
-
- try {
- //让缓存失效
- invalidate(message.getMessage());
-
- //获取如果没有则从数据库查
- configCache.getUnchecked(message.getMessage());
- } catch (Throwable ex) {
- //ignore
- }
- }
4、client端与服务端的通信的最终原理,就是服务端会调用3步骤中configservice提供的接口/configs/{appId}/{clusterName}/{namespace},具体看下client如何调用的。
5、client调用apollo-configservice的接口
- //这里才是真正设置application namespace的值进入spring中的地方
- protected void initialize(ConfigurableEnvironment environment) {
-
- if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
- //already initialized
- return;
- }
- //这一行,获取不到这个变量,用默认的,即application namespace
- String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION);
- logger.debug("Apollo bootstrap namespaces: {}", namespaces);
- List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces);
-
- CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
- for (String namespace : namespaceList) {
- Config config = ConfigService.getConfig(namespace);
-
- composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
- }
-
- environment.getPropertySources().addFirst(composite);
- }
- LocalFileConfigRepository createLocalConfigRepository(String namespace) {
- if (m_configUtil.isInLocalMode()) {
- logger.warn(
- "==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
- namespace);
- return new LocalFileConfigRepository(namespace);
- }
- return new LocalFileConfigRepository(namespace, //看这里 createRemoteConfigRepository(namespace));
- }
- public RemoteConfigRepository(String namespace) {
- m_namespace = namespace;
- m_configCache = new AtomicReference<>();
- m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
- m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
- m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
- remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
- m_longPollServiceDto = new AtomicReference<>();
- m_remoteMessages = new AtomicReference<>();
- m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
- m_configNeedForceRefresh = new AtomicBoolean(true);
- m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
- m_configUtil.getOnErrorRetryInterval() * 8);
- gson = new Gson();
- //先看这里,去同步数据
- this.trySync();
- //这个是默认5分钟调用一次this.trySync();
- this.schedulePeriodicRefresh();
- //这个是默认定时5秒去拉取是否变更的数据,也是最终调用this.trySync();简单理解为5秒调用一次this.trySync();但是不严谨
- this.scheduleLongPollingRefresh();
- }
- @Override
- protected synchronized void sync() {
- Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
-
- try {
- ApolloConfig previous = m_configCache.get();
- //看这里
- ApolloConfig current = loadApolloConfig();
-
- // reference equals means HTTP 304
- if (previous != current) {
- logger.debug("Remote Config refreshed!");
- m_configCache.set(current);
- this.fireRepositoryChange(m_namespace, this.getConfig());
- }
-
- if (current != null) {
- Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
- current.getReleaseKey());
- }
-
- transaction.setStatus(Transaction.SUCCESS);
- } catch (Throwable ex) {
- transaction.setStatus(ex);
- throw ex;
- } finally {
- transaction.complete();
- }
- }
- @Override
- protected synchronized void sync() {
- Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
-
- try {
- ApolloConfig previous = m_configCache.get();
- ApolloConfig current = loadApolloConfig();
-
- // reference equals means HTTP 304
- if (previous != current) {
- //不一样,说明有变更(增/删/改)
- logger.debug("Remote Config refreshed!");
- m_configCache.set(current);
- //记住这里,这里很重要,而且很绕,现在先不用深究细节
- this.fireRepositoryChange(m_namespace, this.getConfig());
- }
-
- if (current != null) {
- Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
- current.getReleaseKey());
- }
-
- transaction.setStatus(Transaction.SUCCESS);
- } catch (Throwable ex) {
- transaction.setStatus(ex);
- throw ex;
- } finally {
- transaction.complete();
- }
- }
- protected void initialize(ConfigurableEnvironment environment) {
-
- if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
- //already initialized
- return;
- }
- String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION);
- logger.debug("Apollo bootstrap namespaces: {}", namespaces);
- List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces);
-
- CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
- for (String namespace : namespaceList) {
- Config config = ConfigService.getConfig(namespace);
-
- //看这里 composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
- }
-
- environment.getPropertySources().addFirst(composite);
- }
- //注意这里也很重要,和上面的this.fireRepositoryChange(m_namespace, this.getConfig());有关系
- private void initializeAutoUpdatePropertiesFeature(ConfigurableListableBeanFactory beanFactory) {
- if (!configUtil.isAutoUpdateInjectedSpringPropertiesEnabled() ||
- !AUTO_UPDATE_INITIALIZED_BEAN_FACTORIES.add(beanFactory)) {
- return;
- }
-
- AutoUpdateConfigChangeListener autoUpdateConfigChangeListener = new AutoUpdateConfigChangeListener(
- environment, beanFactory);
-
- List<ConfigPropertySource> configPropertySources = configPropertySourceFactory.getAllConfigPropertySources();
- for (ConfigPropertySource configPropertySource : configPropertySources) {
- configPropertySource.addChangeListener(autoUpdateConfigChangeListener);
- }
- }
6、在client接收到服务端的配置变更后会有一系列的监听器的监听和处理,主要有ConfigChangeListener和RepositoryChangeListener2种类型,大致的顺序和过程如下
- public LocalFileConfigRepository(String namespace, //注意这个对象就是上述的RemoteConfigRepository ConfigRepository upstream) {
- m_namespace = namespace;
- m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
- this.setLocalCacheDir(findLocalCacheDir(), false);
- //再看这里
- this.setUpstreamRepository(upstream);
- this.trySync();
- }
-
- @Override
- public void setUpstreamRepository(ConfigRepository upstreamConfigRepository) {
- if (upstreamConfigRepository == null) {
- return;
- }
- //clear previous listener
- if (m_upstream != null) {
- m_upstream.removeChangeListener(this);
- }
- m_upstream = upstreamConfigRepository;
- trySyncFromUpstream();‘
- //看这里,记住了,给RemoteConfigRepository里面加了一个LocalFileConfigRepository的listener
- upstreamConfigRepository.addChangeListener(this);
- }
- @Override
- public Config create(String namespace) {
- ConfigFileFormat format = determineFileFormat(namespace);
- if (ConfigFileFormat.isPropertiesCompatible(format)) {
- return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
- }
- //看这里new DefaultConfig
- return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
- }
-
- //构造方法调用了这里
- private void initialize() {
- try {
- updateConfig(m_configRepository.getConfig(), m_configRepository.getSourceType());
- } catch (Throwable ex) {
- Tracer.logError(ex);
- logger.warn("Init Apollo Local Config failed - namespace: {}, reason: {}.",
- m_namespace, ExceptionUtil.getDetailMessage(ex));
- } finally {
- //看这里记住了吗,在当前LocalFileConfigRepository对象中加入了DefaultConfig为listener
- m_configRepository.addChangeListener(this);
- }
- }
- @Override
- public synchronized void onRepositoryChange(String namespace, Properties newProperties) {
- if (newProperties.equals(m_configProperties.get())) {
- return;
- }
- ConfigSourceType sourceType = m_configRepository.getSourceType();
- Properties newConfigProperties = new Properties();
- newConfigProperties.putAll(newProperties);
- Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties, sourceType);
- if (actualChanges.isEmpty()) {
- return;
- }
- this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges));
- Tracer.logEvent("Apollo.Client.ConfigChanges", m_namespace);
- }
- protected void fireConfigChange(final ConfigChangeEvent changeEvent) {
- for (final ConfigChangeListener listener : m_listeners) {
- // check whether the listener is interested in this change event
- if (!isConfigChangeListenerInterested(listener, changeEvent)) {
- continue;
- }
- m_executorService.submit(new Runnable() {
- @Override
- public void run() {
- String listenerName = listener.getClass().getName();
- Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);
- try {
- listener.onChange(changeEvent);
- transaction.setStatus(Transaction.SUCCESS);
- } catch (Throwable ex) {
- transaction.setStatus(ex);
- Tracer.logError(ex);
- logger.error("Failed to invoke config change listener {}", listenerName, ex);
- } finally {
- transaction.complete();
- }
- }
- });
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。