当前位置:   article > 正文

ReleaseMessageScanner:扫描变更配置,触发监听者们_apollo releasemessagescanner

apollo releasemessagescanner

6、ReleaseMessageScanner:扫描变更配置,触发监听者们

 这一步就是将第5步插入数据库的变更配置,每秒扫描一次,扫出来之后发送给监听者(还记得NotificationControllerV2 implements ReleaseMessageListener)

  1. public class ReleaseMessageScanner implements InitializingBean {
  2. private static final Logger logger = LoggerFactory.getLogger(ReleaseMessageScanner.class);
  3. @Autowired
  4. private BizConfig bizConfig;
  5. @Autowired
  6. private ReleaseMessageRepository releaseMessageRepository;
  7. private int databaseScanInterval;
  8. private List<ReleaseMessageListener> listeners;
  9. private ScheduledExecutorService executorService;
  10. private long maxIdScanned;
  11. public ReleaseMessageScanner() {
  12. listeners = Lists.newCopyOnWriteArrayList();
  13. executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
  14. .create("ReleaseMessageScanner", true));
  15. }
  16. @Override
  17. public void afterPropertiesSet() throws Exception {
  18. //定时任务频率,默认1000ms
  19. databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
  20. //最大的ReleaseMessage编号
  21. maxIdScanned = loadLargestMessageId();
  22. //每秒走一次,扫描消息定时
  23. executorService.scheduleWithFixedDelay((Runnable) () -> {
  24. Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
  25. try {
  26. scanMessages();
  27. transaction.setStatus(Transaction.SUCCESS);
  28. } catch (Throwable ex) {
  29. transaction.setStatus(ex);
  30. logger.error("Scan and send message failed", ex);
  31. } finally {
  32. transaction.complete();
  33. }
  34. }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
  35. }
  36. /**
  37. * add message listeners for release message
  38. * @param listener
  39. */
  40. public void addMessageListener(ReleaseMessageListener listener) {
  41. if (!listeners.contains(listener)) {
  42. listeners.add(listener);
  43. }
  44. }
  45. /**
  46. * Scan messages, continue scanning until there is no more messages
  47. */
  48. private void scanMessages() {
  49. boolean hasMoreMessages = true;
  50. while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {
  51. hasMoreMessages = scanAndSendMessages();
  52. }
  53. }
  54. /**
  55. * scan messages and send
  56. * 扫描,触发监听
  57. * @return 是否有新的Message可以扫描
  58. *
  59. * @return whether there are more messages
  60. */
  61. private boolean scanAndSendMessages() {
  62. //current batch is 500
  63. List<ReleaseMessage> releaseMessages =
  64. releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
  65. if (CollectionUtils.isEmpty(releaseMessages)) {
  66. return false;
  67. }
  68. //触发监听
  69. fireMessageScanned(releaseMessages);
  70. int messageScanned = releaseMessages.size();
  71. //记录最新一条Id
  72. maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
  73. return messageScanned == 500;
  74. }
  75. /**
  76. * 获取最大ReleaseMessage编号
  77. * find largest message id as the current start point
  78. * @return current largest message id
  79. */
  80. private long loadLargestMessageId() {
  81. ReleaseMessage releaseMessage = releaseMessageRepository.findTopByOrderByIdDesc();
  82. return releaseMessage == null ? 0 : releaseMessage.getId();
  83. }
  84. /**
  85. * Notify listeners with messages loaded
  86. * @param messages
  87. */
  88. private void fireMessageScanned(List<ReleaseMessage> messages) {
  89. for (ReleaseMessage message : messages) {
  90. //将每条消息循环通知所有订阅者
  91. for (ReleaseMessageListener listener : listeners) {
  92. try {
  93. //触发监听
  94. listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
  95. } catch (Throwable ex) {
  96. Tracer.logError(ex);
  97. logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
  98. }
  99. }
  100. }
  101. }

总结:

1、启动一个定时任务(每秒一次)

2、根据最大编号查询数据库500条(按id倒叙)

3、设置最新的最大编号

4、触发监听者们--还记得下面这个监听吗com.ctrip.framework.apollo.configservice.ConfigServiceAutoConfiguration.MessageScannerConfiguration#releaseMessageScanner

  1. /**
  2. * 监听器注册,namespaces变更后接收消息
  3. * @return
  4. */
  5. @Bean
  6. public ReleaseMessageScanner releaseMessageScanner() {
  7. ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
  8. //0. handle release message cache
  9. releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
  10. //1. handle gray release rule
  11. releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
  12. //2. handle server cache
  13. releaseMessageScanner.addMessageListener(configService);
  14. releaseMessageScanner.addMessageListener(configFileController);
  15. //3. notify clients
  16. releaseMessageScanner.addMessageListener(notificationControllerV2);
  17. releaseMessageScanner.addMessageListener(notificationController);
  18. return releaseMessageScanner;
  19. }
  20. }

可关注我的公众号每天更新

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/733427
推荐阅读
相关标签
  

闽ICP备14008679号