赞
踩
我们使用springboot集成rabbitmq时会配置消费者数量,然而我们想调整这个数量时却每次都要重启,这样就很麻烦。如果能在不重启服务的情况下,可以动态调整消费者数量的话就会是分方便了。
先看下springboot中关于rabbitmq的自动配置类,RabbitAutoConfiguration,
- @Configuration
- @ConditionalOnClass({ RabbitTemplate.class, Channel.class })
- @EnableConfigurationProperties(RabbitProperties.class)
- @Import(RabbitAnnotationDrivenConfiguration.class)
- public class RabbitAutoConfiguration {
- 。。。。。。
- }
@Import导入了RabbitAnnotationDrivenConfiguration
- @Configuration
- @ConditionalOnClass(EnableRabbit.class)
- class RabbitAnnotationDrivenConfiguration {
- ......
- }
RabbitAnnotationDrivenConfiguration上面有个EnableRabbit,打开看一下EnableRabbit是一个注解,里面又导入了RabbitBootstrapConfiguration
- @Target(ElementType.TYPE)
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- @Import(RabbitBootstrapConfiguration.class)
- public @interface EnableRabbit {
- }
RabbitBootstrapConfiguration类内容如下:
- @Configuration
- public class RabbitBootstrapConfiguration {
-
- @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
- return new RabbitListenerAnnotationBeanPostProcessor();
- }
-
- @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
- public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
- return new RabbitListenerEndpointRegistry();
- }
-
- }
RabbitListenerAnnotationBeanPostProcessor用于处理@RabbitListener注解修饰的方法
RabbitListenerEndpointRegistry用于创建和管理消息监听容器MessageListenerContainer,重点看这里。。。。。
RabbitListenerEndpointRegistry类中有个registerListenerContainer注册消息监听容器的方法,该方法被RabbitListenerEndpointRegistrar的registerAllEndpoints调用,endpointDescriptors是前面的RabbitListenerAnnotationBeanPostProcessor获取的@RabbitListener注解修饰的消息消费处理的方法集合。
- @Override
- public void afterPropertiesSet() {
- registerAllEndpoints();
- }
-
- protected void registerAllEndpoints() {
- Assert.state(this.endpointRegistry != null, "No registry available");
- synchronized (this.endpointDescriptors) {
- for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
- this.endpointRegistry.registerListenerContainer(// NOSONAR never null
- descriptor.endpoint, resolveContainerFactory(descriptor));
- }
- this.startImmediately = true; // trigger immediate startup
- }
- }
这里不再深入探究具体的源码了,感兴趣的话可以自己翻看一下。大致调用顺序为:
RabbitListenerEndpointRegistry.registerListenerContainer方法如下,将所有创建的消息监听容器MessageListenerContainer都放到了listenerContainers这个map中。
- private final Map<String, MessageListenerContainer> listenerContainers =
- new ConcurrentHashMap<String, MessageListenerContainer>();
- public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
- boolean startImmediately) {
- Assert.notNull(endpoint, "Endpoint must not be null");
- Assert.notNull(factory, "Factory must not be null");
-
- String id = endpoint.getId();
- Assert.hasText(id, "Endpoint id must not be empty");
- synchronized (this.listenerContainers) {
- Assert.state(!this.listenerContainers.containsKey(id),
- "Another endpoint is already registered with id '" + id + "'");
- MessageListenerContainer container = createListenerContainer(endpoint, factory);
- this.listenerContainers.put(id, container);
- if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
- List<MessageListenerContainer> containerGroup;
- if (this.applicationContext.containsBean(endpoint.getGroup())) {
- containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
- }
- else {
- containerGroup = new ArrayList<MessageListenerContainer>();
- this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
- }
- containerGroup.add(container);
- }
- if (startImmediately) {
- startIfNecessary(container);
- }
- }
- }
我们遍历listenerContainers就能拿到对应的消息监听容器MessageListenerContainer,然后调用MessageListenerContainer的setConcurrentConsumers、setMaxConcurrentConsumers方法就可以调整消费者数量了。
RabbitListenerEndpointRegistry.getListenerContainers可以获取所有消费监听容器
- public void setConcurrentConsumers(final int concurrentConsumers) {
- Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
- Assert.isTrue(!isExclusive() || concurrentConsumers == 1,
- "When the consumer is exclusive, the concurrency must be 1");
- if (this.maxConcurrentConsumers != null) {
- Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers,
- "'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'");
- }
- synchronized (this.consumersMonitor) {
- if (logger.isDebugEnabled()) {
- logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers);
- }
- int delta = this.concurrentConsumers - concurrentConsumers;
- this.concurrentConsumers = concurrentConsumers;
- if (isActive()) {
- adjustConsumers(delta);
- }
- }
- }
-
-
- public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
- Assert.isTrue(maxConcurrentConsumers >= this.concurrentConsumers,
- "'maxConcurrentConsumers' value must be at least 'concurrentConsumers'");
- Assert.isTrue(!isExclusive() || maxConcurrentConsumers == 1,
- "When the consumer is exclusive, the concurrency must be 1");
- Integer oldMax = this.maxConcurrentConsumers;
- this.maxConcurrentConsumers = maxConcurrentConsumers;
- if (oldMax != null && isActive()) {
- int delta = oldMax - maxConcurrentConsumers;
- if (delta > 0) { // only decrease, not increase
- adjustConsumers(delta);
- }
- }
-
- }
废话不多说,直接上代码示例:
- @Resource
- RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
-
- @RequestMapping(value = "/modifyMqConsumerNum")
- @ApiOperation(value = "更新队列消费者数量接口")
- public Response modifyMqConsumerNum(@RequestParam(value = "queueName", required = false) String queueName,
- @RequestParam(value = "concurrentConsumers") Integer concurrentConsumers,
- @RequestParam(value = "maxConcurrentConsumers") Integer maxConcurrentConsumers) {
- Collection<MessageListenerContainer> listenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();
- for (MessageListenerContainer container : listenerContainers) {
- SimpleMessageListenerContainer con = (SimpleMessageListenerContainer) container;
- //消息监听容器要消费的队列名称集合
- List<String> queueNamesList = Arrays.asList(con.getQueueNames());
- //判断容器中的队列名称是否包含需要调整的队列名参数
- if (queueNamesList.contains(queueName)) {
- //注意先设置最大的消费者数量,再设置最小的消费者数量,因为先修改最小数量超过修改前的最大数量时会报异常修改失败
- con.setMaxConcurrentConsumers(maxConcurrentConsumers);
- con.setConcurrentConsumers(concurrentConsumers);
- }
- }
- return Response.success();
- }
调用RabbitListenerEndpointRegistry.getListenerContainers获取所有消费者监听容器,判断是否包含要调整的队列名称,如果包含则进行调整。
注意:先设置最大的消费者数量,再设置最小的消费者数量,因为先修改最小数量超过修改前的最大数量时会报异常修改失败。例如之前最小最大分别是2和4,如果先将最小改成5则会报参数异常,即最小数量超过了最大数量。
java.lang.IllegalArgumentException: 'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'
at org.springframework.util.Assert.isTrue(Assert.java:118)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.setConcurrentConsumers(SimpleMessageListenerContainer.java:161)
另外,这里通过接口修改只能改服务的一个实例的消费者数量,生产上面一个服务都是集群部署的,可以结合配置中心(Nacos、Apollo等)进行处理。程序中监听配置中心的对应队列的消费者数量,如果数值发生了变化,则调用上面的方法进行变更就好了,这里就不再进行实现了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。