当前位置:   article > 正文

(17)不重启服务动态调整RabbitMQ消费者数量_setconcurrentconsumers

setconcurrentconsumers

        我们使用springboot集成rabbitmq时会配置消费者数量,然而我们想调整这个数量时却每次都要重启,这样就很麻烦。如果能在不重启服务的情况下,可以动态调整消费者数量的话就会是分方便了。

        先看下springboot中关于rabbitmq的自动配置类,RabbitAutoConfiguration,

  1. @Configuration
  2. @ConditionalOnClass({ RabbitTemplate.class, Channel.class })
  3. @EnableConfigurationProperties(RabbitProperties.class)
  4. @Import(RabbitAnnotationDrivenConfiguration.class)
  5. public class RabbitAutoConfiguration {
  6. 。。。。。。
  7. }

        @Import导入了RabbitAnnotationDrivenConfiguration

  1. @Configuration
  2. @ConditionalOnClass(EnableRabbit.class)
  3. class RabbitAnnotationDrivenConfiguration {
  4. ......
  5. }

        RabbitAnnotationDrivenConfiguration上面有个EnableRabbit,打开看一下EnableRabbit是一个注解,里面又导入了RabbitBootstrapConfiguration

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. @Import(RabbitBootstrapConfiguration.class)
  5. public @interface EnableRabbit {
  6. }

        RabbitBootstrapConfiguration类内容如下:

  1. @Configuration
  2. public class RabbitBootstrapConfiguration {
  3. @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
  4. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  5. public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
  6. return new RabbitListenerAnnotationBeanPostProcessor();
  7. }
  8. @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
  9. public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
  10. return new RabbitListenerEndpointRegistry();
  11. }
  12. }

        RabbitListenerAnnotationBeanPostProcessor用于处理@RabbitListener注解修饰的方法

        RabbitListenerEndpointRegistry用于创建和管理消息监听容器MessageListenerContainer,重点看这里。。。。。

        RabbitListenerEndpointRegistry类中有个registerListenerContainer注册消息监听容器的方法,该方法被RabbitListenerEndpointRegistrar的registerAllEndpoints调用,endpointDescriptors是前面的RabbitListenerAnnotationBeanPostProcessor获取的@RabbitListener注解修饰的消息消费处理的方法集合。

  1. @Override
  2. public void afterPropertiesSet() {
  3. registerAllEndpoints();
  4. }
  5. protected void registerAllEndpoints() {
  6. Assert.state(this.endpointRegistry != null, "No registry available");
  7. synchronized (this.endpointDescriptors) {
  8. for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
  9. this.endpointRegistry.registerListenerContainer(// NOSONAR never null
  10. descriptor.endpoint, resolveContainerFactory(descriptor));
  11. }
  12. this.startImmediately = true; // trigger immediate startup
  13. }
  14. }

        这里不再深入探究具体的源码了,感兴趣的话可以自己翻看一下。大致调用顺序为:

  1. RabbitListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(获取被@RabbitListener注解修饰的方法)————>RabbitListenerEndpointRegistrar.registerEndpoint(添加到endpointDescriptors list集合)
  2. RabbitListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated()————>RabbitListenerEndpointRegistrar.afterPropertiesSet————>RabbitListenerEndpointRegistrar.registerAllEndpoints————>RabbitListenerEndpointRegistry.registerListenerContainer

         RabbitListenerEndpointRegistry.registerListenerContainer方法如下,将所有创建的消息监听容器MessageListenerContainer都放到了listenerContainers这个map中。

  1. private final Map<String, MessageListenerContainer> listenerContainers =
  2. new ConcurrentHashMap<String, MessageListenerContainer>();
  3. public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
  4. boolean startImmediately) {
  5. Assert.notNull(endpoint, "Endpoint must not be null");
  6. Assert.notNull(factory, "Factory must not be null");
  7. String id = endpoint.getId();
  8. Assert.hasText(id, "Endpoint id must not be empty");
  9. synchronized (this.listenerContainers) {
  10. Assert.state(!this.listenerContainers.containsKey(id),
  11. "Another endpoint is already registered with id '" + id + "'");
  12. MessageListenerContainer container = createListenerContainer(endpoint, factory);
  13. this.listenerContainers.put(id, container);
  14. if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
  15. List<MessageListenerContainer> containerGroup;
  16. if (this.applicationContext.containsBean(endpoint.getGroup())) {
  17. containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
  18. }
  19. else {
  20. containerGroup = new ArrayList<MessageListenerContainer>();
  21. this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
  22. }
  23. containerGroup.add(container);
  24. }
  25. if (startImmediately) {
  26. startIfNecessary(container);
  27. }
  28. }
  29. }

        我们遍历listenerContainers就能拿到对应的消息监听容器MessageListenerContainer,然后调用MessageListenerContainer的setConcurrentConsumers、setMaxConcurrentConsumers方法就可以调整消费者数量了。

        RabbitListenerEndpointRegistry.getListenerContainers可以获取所有消费监听容器

  1. public void setConcurrentConsumers(final int concurrentConsumers) {
  2. Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
  3. Assert.isTrue(!isExclusive() || concurrentConsumers == 1,
  4. "When the consumer is exclusive, the concurrency must be 1");
  5. if (this.maxConcurrentConsumers != null) {
  6. Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers,
  7. "'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'");
  8. }
  9. synchronized (this.consumersMonitor) {
  10. if (logger.isDebugEnabled()) {
  11. logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers);
  12. }
  13. int delta = this.concurrentConsumers - concurrentConsumers;
  14. this.concurrentConsumers = concurrentConsumers;
  15. if (isActive()) {
  16. adjustConsumers(delta);
  17. }
  18. }
  19. }
  20. public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
  21. Assert.isTrue(maxConcurrentConsumers >= this.concurrentConsumers,
  22. "'maxConcurrentConsumers' value must be at least 'concurrentConsumers'");
  23. Assert.isTrue(!isExclusive() || maxConcurrentConsumers == 1,
  24. "When the consumer is exclusive, the concurrency must be 1");
  25. Integer oldMax = this.maxConcurrentConsumers;
  26. this.maxConcurrentConsumers = maxConcurrentConsumers;
  27. if (oldMax != null && isActive()) {
  28. int delta = oldMax - maxConcurrentConsumers;
  29. if (delta > 0) { // only decrease, not increase
  30. adjustConsumers(delta);
  31. }
  32. }
  33. }

        废话不多说,直接上代码示例:

  1. @Resource
  2. RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
  3. @RequestMapping(value = "/modifyMqConsumerNum")
  4. @ApiOperation(value = "更新队列消费者数量接口")
  5. public Response modifyMqConsumerNum(@RequestParam(value = "queueName", required = false) String queueName,
  6. @RequestParam(value = "concurrentConsumers") Integer concurrentConsumers,
  7. @RequestParam(value = "maxConcurrentConsumers") Integer maxConcurrentConsumers) {
  8. Collection<MessageListenerContainer> listenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();
  9. for (MessageListenerContainer container : listenerContainers) {
  10. SimpleMessageListenerContainer con = (SimpleMessageListenerContainer) container;
  11. //消息监听容器要消费的队列名称集合
  12. List<String> queueNamesList = Arrays.asList(con.getQueueNames());
  13. //判断容器中的队列名称是否包含需要调整的队列名参数
  14. if (queueNamesList.contains(queueName)) {
  15. //注意先设置最大的消费者数量,再设置最小的消费者数量,因为先修改最小数量超过修改前的最大数量时会报异常修改失败
  16. con.setMaxConcurrentConsumers(maxConcurrentConsumers);
  17. con.setConcurrentConsumers(concurrentConsumers);
  18. }
  19. }
  20. return Response.success();
  21. }

        调用RabbitListenerEndpointRegistry.getListenerContainers获取所有消费者监听容器,判断是否包含要调整的队列名称,如果包含则进行调整。

        注意:先设置最大的消费者数量,再设置最小的消费者数量,因为先修改最小数量超过修改前的最大数量时会报异常修改失败。例如之前最小最大分别是2和4,如果先将最小改成5则会报参数异常,即最小数量超过了最大数量。

java.lang.IllegalArgumentException: 'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'
    at org.springframework.util.Assert.isTrue(Assert.java:118)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.setConcurrentConsumers(SimpleMessageListenerContainer.java:161)

         另外,这里通过接口修改只能改服务的一个实例的消费者数量,生产上面一个服务都是集群部署的,可以结合配置中心(Nacos、Apollo等)进行处理。程序中监听配置中心的对应队列的消费者数量,如果数值发生了变化,则调用上面的方法进行变更就好了,这里就不再进行实现了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/515741
推荐阅读
相关标签
  

闽ICP备14008679号