赞
踩
有这样一个需求,在程序运行过程中,需要动态添加Queue,每个Queue需要动态添加Listener,Google了半天,发现大都实现的是让已存在的Listener去增加监听Queue,而不是动态增加Listener。
于是扒了下spring amqp的源代码,从RabbitListener找到RabbitListenerAnnotationBeanPostProcessor,再找到RabbitListenerEndpointRegistry,发现RabbitListenerEndpointRegistry可以注册Listener,如下:
- public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
- boolean startImmediately)
便试了下,果然好使,如何动态添加Queue,网上的文章有很多,这里不多做描述了,下面主要说明如何动态添加Listener设置:
- /**
- * 核心配置类
- **/
- @Configuration
- public class MyMqConfig implements RabbitListenerConfigurer {
- @Bean
- public RabbitListenerErrorHandler rabbitListenerErrorHandler() {
- return (amqpMessage, message, exception) -> {
- exception.printStackTrace();
- throw exception;
- };
- }
-
- @Bean
- public ConnectionFactory connectionFactory() {
- return this.connectionFactory(rabbitMqProperties.getHostname(), rabbitMqProperties.getUsername(), rabbitMqProperties.getPasswd()
- , rabbitMqProperties.getPort(), rabbitMqProperties.getEnv());
- }
-
- @Bean
- public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory());
- factory.setConcurrentConsumers(rabbitMqProperties.getConcurrentConsumers());
- factory.setMaxConcurrentConsumers(rabbitMqProperties.getMaxConcurrentConsumers());
- factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
- factory.setAdviceChain(retryTemplate());
- factory.setPrefetchCount(rabbitMqProperties.getPrefetchCount());
- return factory;
- }
-
- @Bean
- public MessageHandlerMethodFactory messageHandlerMethodFactory() {
- DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
- messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
- return messageHandlerMethodFactory;
- }
-
- @Bean
- public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
- return new MappingJackson2MessageConverter();
- }
-
- @Override
- public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
- registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
- }
-
- // 其他配置忽略
-
- }
- /**
- * 监听方法类
- **/
- @Component
- public class MyQueueListener {
-
- // 这个方法用来处理消息,具体消息来自哪个queue,可以从参数queue中获取到
- public void testQueues(String message, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
- logger.debug("message..." + message + "...queue..." + queue);
- // 处理消息
- }
-
- }
- /**
- * 添加Listener
- **/
- @Service
- public class MyListenerService {
-
- @Autowired
- private SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory;
-
- @Autowired
- private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
-
- @Autowired
- private RabbitListenerErrorHandler rabbitListenerErrorHandler;
-
- @Autowired
- private MessageHandlerMethodFactory messageHandlerMethodFactory;
-
- @Autowired
- private MyQueueListener myQueueListener;
-
- // 添加Listener
- public void addListener(String queueName) {
- Method method = null;
- try {
- method = MyQueueListener.class.getMethod("testQueues", String.class, String.class);
- } catch (NoSuchMethodException e) {
- e.printStackTrace();
- }
- MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
- endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
- endpoint.setBean(myQueueListener);
- endpoint.setId(queueName);
- endpoint.setMethod(method);
- endpoint.setQueueNames(queueName);
- endpoint.setErrorHandler(rabbitListenerErrorHandler);
- rabbitListenerEndpointRegistry.registerListenerContainer(endpoint, rabbitListenerContainerFactory, true);
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。