当前位置:   article > 正文

SpringBoot中 RabbitMQ 动态添加Queue和Listener_rabbitlistenerendpointregistry

rabbitlistenerendpointregistry

有这样一个需求,在程序运行过程中,需要动态添加Queue,每个Queue需要动态添加Listener,Google了半天,发现大都实现的是让已存在的Listener去增加监听Queue,而不是动态增加Listener。

于是扒了下spring amqp的源代码,从RabbitListener找到RabbitListenerAnnotationBeanPostProcessor,再找到RabbitListenerEndpointRegistry,发现RabbitListenerEndpointRegistry可以注册Listener,如下:

  1. public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
  2. boolean startImmediately)

便试了下,果然好使,如何动态添加Queue,网上的文章有很多,这里不多做描述了,下面主要说明如何动态添加Listener设置:

  1. /**
  2. * 核心配置类
  3. **/
  4. @Configuration
  5. public class MyMqConfig implements RabbitListenerConfigurer {
  6. @Bean
  7. public RabbitListenerErrorHandler rabbitListenerErrorHandler() {
  8. return (amqpMessage, message, exception) -> {
  9. exception.printStackTrace();
  10. throw exception;
  11. };
  12. }
  13. @Bean
  14. public ConnectionFactory connectionFactory() {
  15. return this.connectionFactory(rabbitMqProperties.getHostname(), rabbitMqProperties.getUsername(), rabbitMqProperties.getPasswd()
  16. , rabbitMqProperties.getPort(), rabbitMqProperties.getEnv());
  17. }
  18. @Bean
  19. public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
  20. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  21. factory.setConnectionFactory(connectionFactory());
  22. factory.setConcurrentConsumers(rabbitMqProperties.getConcurrentConsumers());
  23. factory.setMaxConcurrentConsumers(rabbitMqProperties.getMaxConcurrentConsumers());
  24. factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
  25. factory.setAdviceChain(retryTemplate());
  26. factory.setPrefetchCount(rabbitMqProperties.getPrefetchCount());
  27. return factory;
  28. }
  29. @Bean
  30. public MessageHandlerMethodFactory messageHandlerMethodFactory() {
  31. DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
  32. messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
  33. return messageHandlerMethodFactory;
  34. }
  35. @Bean
  36. public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
  37. return new MappingJackson2MessageConverter();
  38. }
  39. @Override
  40. public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
  41. registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
  42. }
  43. // 其他配置忽略
  44. }
  1. /**
  2. * 监听方法类
  3. **/
  4. @Component
  5. public class MyQueueListener {
  6. // 这个方法用来处理消息,具体消息来自哪个queue,可以从参数queue中获取到
  7. public void testQueues(String message, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
  8. logger.debug("message..." + message + "...queue..." + queue);
  9. // 处理消息
  10. }
  11. }
  1. /**
  2. * 添加Listener
  3. **/
  4. @Service
  5. public class MyListenerService {
  6. @Autowired
  7. private SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory;
  8. @Autowired
  9. private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
  10. @Autowired
  11. private RabbitListenerErrorHandler rabbitListenerErrorHandler;
  12. @Autowired
  13. private MessageHandlerMethodFactory messageHandlerMethodFactory;
  14. @Autowired
  15. private MyQueueListener myQueueListener;
  16. // 添加Listener
  17. public void addListener(String queueName) {
  18. Method method = null;
  19. try {
  20. method = MyQueueListener.class.getMethod("testQueues", String.class, String.class);
  21. } catch (NoSuchMethodException e) {
  22. e.printStackTrace();
  23. }
  24. MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
  25. endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
  26. endpoint.setBean(myQueueListener);
  27. endpoint.setId(queueName);
  28. endpoint.setMethod(method);
  29. endpoint.setQueueNames(queueName);
  30. endpoint.setErrorHandler(rabbitListenerErrorHandler);
  31. rabbitListenerEndpointRegistry.registerListenerContainer(endpoint, rabbitListenerContainerFactory, true);
  32. }
  33. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/1018389
推荐阅读
相关标签
  

闽ICP备14008679号