当前位置:   article > 正文

通过Nacos配置刷新进行RabbitMQ消费者在线启停_rabbitmq nacos

rabbitmq nacos

前提

公司在做一些金融相关业务,某些时候由于数据提供商定期维护或者特殊原因需要暂停某些服务的消费者。之前选用的消息队列技术栈是RabbitMQ,用于微服务之间的消息投递,对于这类需要暂停消费者的场景是选用注释掉消费者Bean中的相应Spring(Boot)注解重新发布来实现,后面需要重新启动消费就是解开对应的注释再发布一次。这样的处理流程既繁琐,也显得没有技术含量,所以笔者就这个问题结合已有的配置中心Nacos集群做了一个方案,使用Nacos的配置准实时刷新功能去控制某个微服务实例的所有RabbitMQ消费者(容器)的停止和启动。

方案原理

下面探讨一下方案的原理和可行性,主要包括:

  • RabbitMQ消费者生命周期管理
  • Nacos长轮询与配置刷新

因为工作中的主要技术栈是SpringBoot + RabbitMQ,下文是探讨场景针对spring-boot-starter-amqp(下面简称amqp)展开。

使用SpringBoot版本为2.3.0.RELEASE,spring-cloud-alibaba-nacos-config的版本为2.2.0.RELEASE

RabbitMQ消费者生命周期管理

查看RabbitAnnotationDrivenConfiguration的源码:

amqp中默认启用spring.rabbitmq.listener.type=simple,使用的RabbitListenerContainerFactory(消息监听器容器工厂)实现为SimpleRabbitListenerContainerFactory,使用的MessageListenerContainer(消息监听器容器)实现为SimpleMessageListenerContainer。在amqp中,无论注解声明式或者编程式注册的消费者最终都会封装为MessageListenerContainer实例,因此消费者生命周期可以直接通过MessageListenerContainer进行管理,MessageListenerContainer的生命周期管理API会直接作用于最底层的真实消费者实现BlockingQueueConsumer。几者的关系如下:

一般声明式消费者注册方式如下:

  1. @Slf4j
  2. @RabbitListener(id = "SingleAnnoMethodDemoConsumer", queues = "srd->srd.demo")
  3. @Component
  4. public class SingleAnnoMethodDemoConsumer {
  5. @RabbitHandler
  6. public void onMessage(Message message) {
  7. log.info("SingleAnnoMethodDemoConsumer.onMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
  8. }
  9. }
  10. @RabbitListener(id = "MultiAnnoMethodDemoConsumer", queues = "srd->srd.demo")
  11. @Component
  12. @Slf4j
  13. public class MultiAnnoMethodDemoConsumer {
  14. @RabbitHandler
  15. public void firstOnMessage(Message message) {
  16. log.info("MultiAnnoMethodDemoConsumer.firstOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
  17. }
  18. @RabbitHandler
  19. public void secondOnMessage(Message message) {
  20. log.info("MultiAnnoMethodDemoConsumer.secondOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
  21. }
  22. }
  23. @Component
  24. @Slf4j
  25. public class MultiAnnoInstanceDemoConsumer {
  26. @RabbitListener(id = "MultiAnnoInstanceDemoConsumer-firstOnInstanceMessage", queues = "srd->srd.demo")
  27. public void firstOnInstanceMessage(Message message) {
  28. log.info("MultiAnnoInstanceDemoConsumer.firstOnInstanceMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
  29. }
  30. @RabbitListener(id = "MultiAnnoInstanceDemoConsumer-secondOnInstanceMessage", queues = "srd->srd.sec")
  31. public void secondOnInstanceMessage(Message message) {
  32. log.info("MultiAnnoInstanceDemoConsumer.secondOnInstanceMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
  33. }
  34. }
  35. 复制代码

对于基于@RabbitListener进行声明式注册的消费者,每个被@RabbitListener修饰的Bean或者方法最终都会单独生成一个SimpleMessageListenerContainer实例,这些SimpleMessageListenerContainer实例的唯一标识由@RabbitListenerid属性指定,缺省值为org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N,建议在使用时候通过规范约束必须定义此id属性。分析源码可以得知这类型的消费者通过RabbitListenerAnnotationBeanPostProcessor进行发现和自动注册,并且在RabbitListenerEndpointRegistry缓存了注册信息,因此可以通过RabbitListenerEndpointRegistry直接获取这些声明式的消费者容器实例:

  1. RabbitListenerEndpointRegistry endpointRegistry = configurableListableBeanFactory.getBean(
  2. RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
  3. RabbitListenerEndpointRegistry.class);
  4. Set<String> listenerContainerIds = endpointRegistry.getListenerContainerIds();
  5. for (String containerId : listenerContainerIds) {
  6. MessageListenerContainer messageListenerContainer = endpointRegistry.getListenerContainer(containerId);
  7. // do something with messageListenerContainer
  8. }
  9. 复制代码

一般编程式消费者注册方式如下:

  1. // MessageListenerDemoConsumer
  2. @Component
  3. @Slf4j
  4. public class MessageListenerDemoConsumer implements MessageListener {
  5. @Override
  6. public void onMessage(Message message) {
  7. log.info("MessageListenerDemoConsumer.onMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
  8. }
  9. }
  10. // CustomMethodDemoConsumer
  11. @Component
  12. @Slf4j
  13. public class CustomMethodDemoConsumer {
  14. public void customOnMessage(Message message) {
  15. log.info("CustomMethodDemoConsumer.customOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
  16. }
  17. }
  18. // configuration class
  19. // 通过现存的MessageListener实例进行消费
  20. @Bean
  21. public SimpleMessageListenerContainer messageListenerDemoConsumerContainer(
  22. ConnectionFactory connectionFactory,
  23. @Qualifier("messageListenerDemoConsumer") MessageListener messageListener) {
  24. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  25. container.setListenerId("MessageListenerDemoConsumer");
  26. container.setConnectionFactory(connectionFactory);
  27. container.setConcurrentConsumers(1);
  28. container.setMaxConcurrentConsumers(1);
  29. container.setQueueNames("srd->srd.demo");
  30. container.setAcknowledgeMode(AcknowledgeMode.AUTO);
  31. container.setPrefetchCount(10);
  32. container.setAutoStartup(true);
  33. container.setMessageListener(messageListener);
  34. return container;
  35. }
  36. // 通过IOC容器中某个Bean的具体方法进行消费
  37. @Bean
  38. public SimpleMessageListenerContainer customMethodDemoConsumerContainer(
  39. ConnectionFactory connectionFactory,
  40. CustomMethodDemoConsumer customMethodDemoConsumer) {
  41. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  42. container.setListenerId("CustomMethodDemoConsumer");
  43. container.setConnectionFactory(connectionFactory);
  44. container.setConcurrentConsumers(1);
  45. container.setMaxConcurrentConsumers(1);
  46. container.setQueueNames("srd->srd.demo");
  47. container.setAcknowledgeMode(AcknowledgeMode.AUTO);
  48. container.setPrefetchCount(10);
  49. container.setAutoStartup(true);
  50. MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
  51. messageListenerAdapter.setDelegate(customMethodDemoConsumer);
  52. messageListenerAdapter.setDefaultListenerMethod("customOnMessage");
  53. container.setMessageListener(messageListenerAdapter);
  54. return container;
  55. }
  56. 复制代码

编程式注册的SimpleMessageListenerContainer可以直接从IOC容器中获取:

  1. Map<String, MessageListenerContainer> messageListenerContainerBeans
  2. = configurableListableBeanFactory.getBeansOfType(MessageListenerContainer.class);
  3. if (!CollectionUtils.isEmpty(messageListenerContainerBeans)) {
  4. messageListenerContainerBeans.forEach((beanId, messageListenerContainer) -> {
  5. // do something with messageListenerContainer
  6. });
  7. }
  8. 复制代码

至此,我们知道可以比较轻松地拿到服务中所有的MessageListenerContainer的实例,从而可以管理服务内所有消费者的生命周期。

Nacos长轮询与配置刷新

Nacos的客户端通过LongPolling(长轮询)的方式监听Nacos服务端集群对应dataIdgroup的配置数据变更,具体可以参考ClientWorker的源码实现,实现的过程大致如下:

在非Spring(Boot)体系中,可以通过ConfigService#addListener()进行配置变更监听,示例代码如下:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
  3. properties.put(PropertyKeyConst.NAMESPACE, "LOCAL");
  4. ConfigService configService = NacosFactory.createConfigService(properties);
  5. Executor executor = Executors.newSingleThreadExecutor(runnable -> {
  6. Thread thread = new Thread(runnable);
  7. thread.setDaemon(true);
  8. thread.setName("NacosConfigSyncWorker");
  9. return thread;
  10. });
  11. configService.addListener("application-aplha.properties", "customer-service", new Listener() {
  12. @Override
  13. public Executor getExecutor() {
  14. return executor;
  15. }
  16. @Override
  17. public void receiveConfigInfo(String configInfo) {
  18. // do something with 'configInfo'
  19. }
  20. });
  21. 复制代码

这种LongPolling的方式目前来看可靠性是比较高,因为Nacos服务端集群一般在生产部署是大于3的奇数个实例节点,并且底层基于raft共识算法实现集群通讯,只要不是同一时间超过半数节点宕机集群还是能正常提供服务。但是从实现上来看会有一些局限性:

  • 如果注册过多的配置变更监听器有可能会对Nacos服务端造成比较大的压力,毕竟是多个客户端进行轮询
  • 配置变更是由Nacos客户端向Nacos服务端发起请求,因此监听器回调有可能不是实时的(有可能延迟到客户端下一轮的LongPolling提交)
  • Nacos客户端会缓存每次从Nacos服务端拉取的配置内容,如果要变更配置文件过大有可能导致缓存的数据占用大量内存,影响客户端所在服务的性能

关于配置变更监听其实有其他候选的方案,例如Redis的发布订阅,Zookeeper的节点路径变更监听甚至是使用消息队列进行通知,本文使用Nacos配置变更监听的原因是更好的划分不同应用配置文件的编辑查看权限方便进行管理,其他候选方案要实现分权限管理需要二次开发

使用SpringCloudAlibaba提供的spring-cloud-alibaba-nacos-config可以更加简便地使用Nacos配置刷新监听,并且会把变更的PropertySource重新绑定到对应的配置属性Bean。引入依赖:

  1. <dependency>
  2. <groupId>com.alibaba.cloud</groupId>
  3. <artifactId>spring-cloud-alibaba-nacos-config</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>com.alibaba.nacos</groupId>
  7. <artifactId>nacos-client</artifactId>
  8. </dependency>
  9. 复制代码

具体的配置类是NacosConfigProperties

红圈中是需要关注的配置项,refreshEnabled是配置刷新的开关,默认是开启的。sharedConfigsextensionConfigs虽然命名不同,但是两者实现和功能没有差异,都是类似于共享或者说扩展配置,每个共享(扩展)配置支持单独配置刷新开关。举个例子,在Nacos服务端的某个配置如下图:

为了支持配置变更和对应的实体类成员变量更新,对应客户端的配置文件是这样的:

  1. spring.cloud.nacos.config.refresh-enabled=true
  2. spring.cloud.nacos.config.shared-configs[0].data-id=shared.properties
  3. spring.cloud.nacos.config.shared-configs[0].group=shared-conf
  4. spring.cloud.nacos.config.shared-configs[0].refresh=true
  5. 复制代码

对应的配置属性Bean如下:

  1. @Data
  2. @ConfigurationProperties(prefix = "shared")
  3. public class SharedProperties {
  4. private String foo;
  5. }
  6. 复制代码

只要客户端所在SpringBoot服务启动完成后,修改Nacos服务端对应dataIdshared.propertiesshared.foo属性值,那边SharedPropertiesfoo属性就会准实时刷新。可以在SharedProperties添加一个@PostConstruct来观察这个属性更新的过程:

  1. @Slf4j
  2. @Data
  3. @ConfigurationProperties(prefix = "shared")
  4. public class SharedProperties {
  5. private final AtomicBoolean firstInit = new AtomicBoolean();
  6. private String foo;
  7. @PostConstruct
  8. public void postConstruct() {
  9. if (!firstInit.compareAndSet(false, true)) {
  10. log.info("SharedProperties refresh...");
  11. } else {
  12. log.info("SharedProperties first init...");
  13. }
  14. }
  15. }
  16. 复制代码

方案实施

整个方案实施包括下面几步:

  • 配置变更通知与配置类刷新
  • 发现所有消费者容器
  • 管理消费者容器生命周期

初始化一个Maven项目,引入下面的依赖:

  • org.projectlombok:lombok:1.18.12
  • org.springframework.boot:spring-boot-starter-web:2.3.0.RELEASE
  • org.springframework.boot:spring-boot-starter-amqp:2.3.0.RELEASE
  • com.alibaba.cloud:spring-cloud-alibaba-nacos-config:2.2.0.RELEASE
  • com.alibaba.nacos:nacos-client:1.4.4

下载Nacos服务并且启动一个单机实例(当前2023-02的最新稳定版为2.2.0),新建命名空间LOCAL并且添加四份配置文件:

可以使用1.x的Nacos客户端去连接2.x的Nacos服务端,这个是Nacos做的向下兼容,反过来不行

前文提到的Nacos客户端中,ConfigService是通过dataIdgroup定位到具体的配置文件,一般dataId按照配置文件的内容命名,对于SpringBoot的应用配置文件一般命名为application-${profile}.[properties,yml]group是配置文件的分组,对于SpringBoot的应用配置文件一般命名为${spring.application.name}。笔者在在这份SpringBoot的应用配置文件中只添加了RabbitMQ的配置:

确保本地或者远程有一个可用的RabbitMQ服务,接下来往下开始实施方案。

配置变更通知与配置类刷新

前面已经提到过SpringBoot结合Nacos进行配置属性Bean的成员变量刷新,在项目的Classpathresources文件夹)添加bootstrap.properties文件,内容如下:

  1. spring.application.name=rabbitmq-rocketmq-demo
  2. spring.profiles.active=default
  3. # nacos配置
  4. spring.cloud.nacos.config.enabled=true
  5. spring.cloud.nacos.config.server-addr=127.0.0.1:8848
  6. spring.cloud.nacos.config.namespace=LOCAL
  7. spring.cloud.nacos.config.group=rabbitmq-rocketmq-demo
  8. spring.cloud.nacos.config.prefix=application
  9. spring.cloud.nacos.config.file-extension=properties
  10. spring.cloud.nacos.config.refresh-enabled=true
  11. spring.cloud.nacos.config.shared-configs[0].data-id=shared.properties
  12. spring.cloud.nacos.config.shared-configs[0].group=shared-conf
  13. spring.cloud.nacos.config.shared-configs[0].refresh=true
  14. spring.cloud.nacos.config.extension-configs[0].data-id=extension.properties
  15. spring.cloud.nacos.config.extension-configs[0].group=extension-conf
  16. spring.cloud.nacos.config.extension-configs[0].refresh=true
  17. spring.cloud.nacos.config.extension-configs[1].data-id=rabbitmq-toggle.properties
  18. spring.cloud.nacos.config.extension-configs[1].group=rabbitmq-rocketmq-demo
  19. spring.cloud.nacos.config.extension-configs[1].refresh=true
  20. 复制代码

这里profile定义为default也就是会关联到NacosdataId = 'application.properties', group = 'rabbitmq-rocketmq-demo'那份配置文件,主要是用于定义amqp需要的配置属性。对于RabbitMQ消费者的开关,定义在dataId = 'rabbitmq-toggle.properties', group = 'rabbitmq-rocketmq-demo'的文件中。添加RabbitmqToggleProperties

  1. // RabbitmqToggleProperties
  2. @Slf4j
  3. @Data
  4. @ConfigurationProperties(prefix = "rabbitmq.toggle")
  5. public class RabbitmqToggleProperties {
  6. private final AtomicBoolean firstInit = new AtomicBoolean();
  7. private List<RabbitmqConsumer> consumers;
  8. @PostConstruct
  9. public void postConstruct() {
  10. if (!firstInit.compareAndSet(false, true)) {
  11. StaticEventPublisher.publishEvent(new RabbitmqToggleRefreshEvent(this));
  12. log.info("RabbitmqToggleProperties refresh, publish RabbitmqToggleRefreshEvent...");
  13. } else {
  14. log.info("RabbitmqToggleProperties first init...");
  15. }
  16. }
  17. @Data
  18. public static class RabbitmqConsumer {
  19. private String listenerId;
  20. private Integer concurrentConsumers;
  21. private Integer maxConcurrentConsumers;
  22. private Boolean enable;
  23. }
  24. }
  25. // RabbitmqToggleRefreshEvent
  26. @Getter
  27. public class RabbitmqToggleRefreshEvent extends ApplicationEvent {
  28. private final RabbitmqToggleProperties rabbitmqToggleProperties;
  29. public RabbitmqToggleRefreshEvent(RabbitmqToggleProperties rabbitmqToggleProperties) {
  30. super("RabbitmqToggleRefreshEvent");
  31. this.rabbitmqToggleProperties = rabbitmqToggleProperties;
  32. }
  33. }
  34. // StaticEventPublisher
  35. public class StaticEventPublisher {
  36. private static ApplicationEventPublisher PUBLISHER = null;
  37. public static void publishEvent(ApplicationEvent applicationEvent) {
  38. if (Objects.nonNull(PUBLISHER)) {
  39. PUBLISHER.publishEvent(applicationEvent);
  40. }
  41. }
  42. public static void attachApplicationEventPublisher(ApplicationEventPublisher publisher) {
  43. PUBLISHER = publisher;
  44. }
  45. }
  46. 复制代码

这里prefix定义为rabbitmq.toggle,为了和rabbitmq-toggle.properties的属性一一绑定,该文件中的配置Key必须以rabbitmq.toggle为前缀。RabbitmqToggleProperties首次回调@PostConstruct方法只打印初始化日志,再次回调@PostConstruct方法则发布RabbitmqToggleRefreshEvent事件,用于后面通知对应的消费者容器Bean进行启停。

发现所有消费者容器

为了统一管理服务中所有消费者容器Bean,需要定义一个类似于消费者容器注册或者缓存中心类,缓存Key可以考虑使用listenerIdValue就直接使用MessageListenerContainer实例即可:

  1. private final ConcurrentMap<String, MessageListenerContainer> containerCache = Maps.newConcurrentMap();
  2. 复制代码

这里既然选定了listenerId作为缓存的Key,那么必须定义好规范,要求无论注解声明式定义的消费者还是编程式定义的消费者,必须明确指定具体意义的listenerId,否则到时候存在Key的格式为org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N会比较混乱

接下来发现和缓存所有消费者容器:

  1. private ConfigurableListableBeanFactory configurableListableBeanFactory;
  2. private ApplicationEventPublisher applicationEventPublisher;
  3. // ----------------------------------------------------------------------
  4. // 获取声明式消费者容器
  5. RabbitListenerEndpointRegistry endpointRegistry = configurableListableBeanFactory.getBean(
  6. RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
  7. RabbitListenerEndpointRegistry.class);
  8. Set<String> listenerContainerIds = endpointRegistry.getListenerContainerIds();
  9. for (String containerId : listenerContainerIds) {
  10. MessageListenerContainer messageListenerContainer = endpointRegistry.getListenerContainer(containerId);
  11. containerCache.putIfAbsent(containerId, messageListenerContainer);
  12. }
  13. // 获取编程式消费者容器
  14. Map<String, MessageListenerContainer> messageListenerContainerBeans
  15. = configurableListableBeanFactory.getBeansOfType(MessageListenerContainer.class);
  16. if (!CollectionUtils.isEmpty(messageListenerContainerBeans)) {
  17. messageListenerContainerBeans.forEach((beanId, bean) -> {
  18. if (bean instanceof AbstractMessageListenerContainer) {
  19. AbstractMessageListenerContainer abstractMessageListenerContainer = (AbstractMessageListenerContainer) bean;
  20. String listenerId = abstractMessageListenerContainer.getListenerId();
  21. if (StringUtils.hasLength(listenerId)) {
  22. containerCache.putIfAbsent(listenerId, abstractMessageListenerContainer);
  23. } else {
  24. containerCache.putIfAbsent(beanId, bean);
  25. }
  26. } else {
  27. containerCache.putIfAbsent(beanId, bean);
  28. }
  29. });
  30. }
  31. Set<String> listenerIds = containerCache.keySet();
  32. listenerIds.forEach(listenerId -> log.info("Cache message listener container => {}", listenerId));
  33. // 所有消费者容器Bean发现完成后才接收刷新事件
  34. StaticEventPublisher.attachApplicationEventPublisher(this.applicationEventPublisher);
  35. 复制代码

StaticEventPublisher中的ApplicationEventPublisher属性延迟到所有消费者容器缓存完成后赋值,防止过早的属性变更通知导致部分消费者容器的启停操作被忽略。

管理消费者容器生命周期

接收到RabbitmqToggleRefreshEvent事件后,然后遍历传递过来的RabbitmqToggleProperties里面的consumers,再基于已经发现的消费者容器进行处理,代码大概如下:

  1. @EventListener(classes = RabbitmqToggleRefreshEvent.class)
  2. public void onRabbitmqToggleRefreshEvent(RabbitmqToggleRefreshEvent event) {
  3. RabbitmqToggleProperties rabbitmqToggleProperties = event.getRabbitmqToggleProperties();
  4. List<RabbitmqToggleProperties.RabbitmqConsumer> consumers = rabbitmqToggleProperties.getConsumers();
  5. if (!CollectionUtils.isEmpty(consumers)) {
  6. consumers.forEach(consumerConf -> {
  7. String listenerId = consumerConf.getListenerId();
  8. if (StringUtils.hasLength(listenerId)) {
  9. MessageListenerContainer messageListenerContainer = containerCache.get(listenerId);
  10. if (Objects.nonNull(messageListenerContainer)) {
  11. // running -> stop
  12. if (messageListenerContainer.isRunning() && Objects.equals(Boolean.FALSE, consumerConf.getEnable())) {
  13. messageListenerContainer.stop();
  14. log.info("Message listener container => {} stop successfully", listenerId);
  15. }
  16. // modify concurrency
  17. if (messageListenerContainer instanceof SimpleMessageListenerContainer) {
  18. SimpleMessageListenerContainer simpleMessageListenerContainer
  19. = (SimpleMessageListenerContainer) messageListenerContainer;
  20. if (Objects.nonNull(consumerConf.getConcurrentConsumers())) {
  21. simpleMessageListenerContainer.setConcurrentConsumers(consumerConf.getConcurrentConsumers());
  22. }
  23. if (Objects.nonNull(consumerConf.getMaxConcurrentConsumers())) {
  24. simpleMessageListenerContainer.setMaxConcurrentConsumers(consumerConf.getMaxConcurrentConsumers());
  25. }
  26. }
  27. // stop -> running
  28. if (!messageListenerContainer.isRunning() && Objects.equals(Boolean.TRUE, consumerConf.getEnable())) {
  29. messageListenerContainer.start();
  30. log.info("Message listener container => {} start successfully", listenerId);
  31. }
  32. }
  33. }
  34. });
  35. }
  36. }
  37. 复制代码

修改Nacos服务里面的rabbitmq-toggle.properties文件,输入内容如下:

  1. rabbitmq.toggle.consumers[0].listenerId=MultiAnnoInstanceDemoConsumer-firstOnInstanceMessage
  2. rabbitmq.toggle.consumers[0].enable=true
  3. rabbitmq.toggle.consumers[1].listenerId=MultiAnnoInstanceDemoConsumer-secondOnInstanceMessage
  4. rabbitmq.toggle.consumers[1].enable=true
  5. rabbitmq.toggle.consumers[2].listenerId=MultiAnnoMethodDemoConsumer
  6. rabbitmq.toggle.consumers[2].enable=true
  7. rabbitmq.toggle.consumers[3].listenerId=SingleAnnoMethodDemoConsumer
  8. rabbitmq.toggle.consumers[3].enable=true
  9. rabbitmq.toggle.consumers[4].listenerId=CustomMethodDemoConsumer
  10. rabbitmq.toggle.consumers[4].enable=true
  11. rabbitmq.toggle.consumers[5].listenerId=MessageListenerDemoConsumer
  12. rabbitmq.toggle.consumers[5].enable=true
  13. 复制代码

启动项目,观察RabbitMQ WebUI对应的队列消费者数量:

然后随机修改rabbitmq-toggle.properties文件某个消费者容器设置为enable = 'fasle',观察服务日志和观察RabbitMQ WebUI的变化:

可见RabbitMQ WebUI中队列消费者数量减少,服务日志也提示listenerId = 'MessageListenerDemoConsumer'的消费者容器被停止了。

一些思考

为了更精确控制有消费者容器的启停,可以考虑在配置文件中定义关闭消费者容器的自动启动开关:

  1. spring.rabbitmq.listener.simple.auto-startup=false
  2. 复制代码

可以考虑在RabbitmqToggleProperties首次回调@PostConstruct方法时候发布RabbitmqToggleInitEvent事件,然后监听此事件启动所有已经发现的消费者容器。这样就能做到应用内部的消费者的启停行为总是以Nacos的开关配置文件为准,并且可以实现在线启停和动态调整最小最大消费者数量。

另外,如果细心的话能够观察到服务日志中,每当监听到Nacos配置变动会打印Started application in N seconds (JVM running for M)的日志,这个并不是服务重启了,而是启动了一个Spring子容器用于构建一个全新的StandardEnvironment(见文末Demo项目中的EnvironmentCaptureApplicationRunner)用来承载刷新后的配置文件内容,然后再拷贝或者覆盖到当前的Spring容器中的PropertySources,这个过程的代码实现类似这样:

小结

本文探讨了一种通过Nacos配置刷新方式管理SpringBoot服务中RabbitMQ消费者生命周期管理的方案,目前只是提供了完整的思路和一些Demo级别代码,后续应该会完善方案和具体的工程级别编码实现。

本文Demo项目仓库:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/414755
推荐阅读
相关标签
  

闽ICP备14008679号