赞
踩
这一步就是将第5步插入数据库的变更配置,每秒扫描一次,扫出来之后发送给监听者(还记得NotificationControllerV2 implements ReleaseMessageListener)
- public class ReleaseMessageScanner implements InitializingBean {
- private static final Logger logger = LoggerFactory.getLogger(ReleaseMessageScanner.class);
- @Autowired
- private BizConfig bizConfig;
- @Autowired
- private ReleaseMessageRepository releaseMessageRepository;
- private int databaseScanInterval;
- private List<ReleaseMessageListener> listeners;
- private ScheduledExecutorService executorService;
- private long maxIdScanned;
-
- public ReleaseMessageScanner() {
- listeners = Lists.newCopyOnWriteArrayList();
- executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
- .create("ReleaseMessageScanner", true));
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- //定时任务频率,默认1000ms
- databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
- //最大的ReleaseMessage编号
- maxIdScanned = loadLargestMessageId();
- //每秒走一次,扫描消息定时
- executorService.scheduleWithFixedDelay((Runnable) () -> {
- Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
- try {
- scanMessages();
- transaction.setStatus(Transaction.SUCCESS);
- } catch (Throwable ex) {
- transaction.setStatus(ex);
- logger.error("Scan and send message failed", ex);
- } finally {
- transaction.complete();
- }
- }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
-
- }
-
- /**
- * add message listeners for release message
- * @param listener
- */
- public void addMessageListener(ReleaseMessageListener listener) {
- if (!listeners.contains(listener)) {
- listeners.add(listener);
- }
- }
-
- /**
- * Scan messages, continue scanning until there is no more messages
- */
- private void scanMessages() {
- boolean hasMoreMessages = true;
- while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {
- hasMoreMessages = scanAndSendMessages();
- }
- }
-
- /**
- * scan messages and send
- * 扫描,触发监听
- * @return 是否有新的Message可以扫描
- *
- * @return whether there are more messages
- */
- private boolean scanAndSendMessages() {
- //current batch is 500
- List<ReleaseMessage> releaseMessages =
- releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
- if (CollectionUtils.isEmpty(releaseMessages)) {
- return false;
- }
- //触发监听
- fireMessageScanned(releaseMessages);
- int messageScanned = releaseMessages.size();
- //记录最新一条Id
- maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
- return messageScanned == 500;
- }
-
- /**
- * 获取最大ReleaseMessage编号
- * find largest message id as the current start point
- * @return current largest message id
- */
- private long loadLargestMessageId() {
- ReleaseMessage releaseMessage = releaseMessageRepository.findTopByOrderByIdDesc();
- return releaseMessage == null ? 0 : releaseMessage.getId();
- }
-
- /**
- * Notify listeners with messages loaded
- * @param messages
- */
- private void fireMessageScanned(List<ReleaseMessage> messages) {
- for (ReleaseMessage message : messages) {
- //将每条消息循环通知所有订阅者
- for (ReleaseMessageListener listener : listeners) {
- try {
- //触发监听
- listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
- } catch (Throwable ex) {
- Tracer.logError(ex);
- logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
- }
- }
- }
- }
总结:
1、启动一个定时任务(每秒一次)
2、根据最大编号查询数据库500条(按id倒叙)
3、设置最新的最大编号
4、触发监听者们--还记得下面这个监听吗com.ctrip.framework.apollo.configservice.ConfigServiceAutoConfiguration.MessageScannerConfiguration#releaseMessageScanner
- /**
- * 监听器注册,namespaces变更后接收消息
- * @return
- */
- @Bean
- public ReleaseMessageScanner releaseMessageScanner() {
- ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
- //0. handle release message cache
- releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
- //1. handle gray release rule
- releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
- //2. handle server cache
- releaseMessageScanner.addMessageListener(configService);
- releaseMessageScanner.addMessageListener(configFileController);
- //3. notify clients
- releaseMessageScanner.addMessageListener(notificationControllerV2);
- releaseMessageScanner.addMessageListener(notificationController);
- return releaseMessageScanner;
- }
- }
可关注我的公众号每天更新
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。