赞
踩
最近项目中需要用到了RabbitMQ
来监听消息队列,监听的消息队列的 虚拟主机(virtualHost
)和队列名(queueName
)是不一致的,但是接收到的消息格式相同的。而且可能还存在程序不停机的情况下,动态的增加新的队列(queue
)的监听,因此就需要我们自己在程序中实现一种方法实现动态配置RabbitMQ
。
我们有2
个RabbitMQ
的配置,在程序启动的时候,动态的配置好这2个RabbitMQ
,实现消息的监听。
RabbitMQ
的配置信息
host | port | username | password | virtualHost | queueName |
---|---|---|---|---|---|
47.101.130.164 | 5672 | rabbit-multi-01 | rabbit-multi-01 | /rabbit-multi-01 | queue-rabbit-multi-01 |
47.101.130.164 | 5672 | rabbit-multi-02 | rabbit-multi-02 | /rabbit-multi-02 | queue-rabbit-multi-02 |
包括 ConnectionFactory
,RabbitAdmin
,RabbitTemplate
,SimpleMessageListenerContainer
等
向Spring
容器中注入Bean
的方法
DefaultListableBeanFactory#registerSingleton
DefaultListableBeanFactory#registerBeanDefinition
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RabbitProperties {
private String host;
private Integer port;
private String username;
private String password;
private String virtualHost;
private String queueName;
}
配置 ConnectionFactory
,RabbitAdmin
,RabbitTemplate
,SimpleMessageListenerContainer
等,并动态注入到Spring
容器中
@Configuration @RequiredArgsConstructor @Slf4j public class MultiRabbitMqConfig { private final DefaultListableBeanFactory defaultListableBeanFactory; private static Map<String, RabbitProperties> multiMqPropertiesMap = new HashMap<String, RabbitProperties>() { { put("first", RabbitProperties.builder() .host("47.101.130.164") .port(5672) .username("rabbit-multi-01") .password("rabbit-multi-01") .virtualHost("/rabbit-multi-01") .queueName("queue-rabbit-multi-01").build()); put("second", RabbitProperties.builder() .host("47.101.130.164") .port(5672) .username("rabbit-multi-02") .password("rabbit-multi-02") .virtualHost("/rabbit-multi-02") .queueName("queue-rabbit-multi-02").build()); } }; @PostConstruct public void initRabbitmq() { multiMqPropertiesMap.forEach((key, rabbitProperties) -> { AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class) .addPropertyValue("cacheMode", CachingConnectionFactory.CacheMode.CHANNEL) .addPropertyValue("host", rabbitProperties.getHost()) .addPropertyValue("port", rabbitProperties.getPort()) .addPropertyValue("username", rabbitProperties.getUsername()) .addPropertyValue("password", rabbitProperties.getPassword()) .addPropertyValue("virtualHost", rabbitProperties.getVirtualHost()) .getBeanDefinition(); String connectionFactoryName = String.format("%s%s", key, "ConnectionFactory"); defaultListableBeanFactory.registerBeanDefinition(connectionFactoryName, beanDefinition); CachingConnectionFactory connectionFactory = defaultListableBeanFactory.getBean(connectionFactoryName, CachingConnectionFactory.class); String rabbitAdminName = String.format("%s%s", key, "RabbitAdmin"); AbstractBeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(RabbitAdmin.class) .addConstructorArgValue(connectionFactory) .addPropertyValue("autoStartup", true) .getBeanDefinition(); defaultListableBeanFactory.registerBeanDefinition(rabbitAdminName, rabbitAdminBeanDefinition); RabbitAdmin rabbitAdmin = defaultListableBeanFactory.getBean(rabbitAdminName, RabbitAdmin.class); log.info("rabbitAdmin:[{}]", rabbitAdmin); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "RabbitTemplate"), rabbitTemplate); SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); // 设置监听的队列 simpleMessageListenerContainer.setQueueNames(rabbitProperties.getQueueName()); // 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加 simpleMessageListenerContainer.setConcurrentConsumers(3); // 最大的并发消费者 simpleMessageListenerContainer.setMaxConcurrentConsumers(10); // 设置是否重回队列 simpleMessageListenerContainer.setDefaultRequeueRejected(false); // 设置签收模式 simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置非独占模式 simpleMessageListenerContainer.setExclusive(false); // 设置consumer未被 ack 的消息个数 simpleMessageListenerContainer.setPrefetchCount(1); // 设置消息监听器 simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { try { log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody())); log.info("====>connection:[{}]", channel.getConnection()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.error(e.getMessage(), e); // 发生异常此处需要捕获到 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }); /** * 1、simpleMessageListenerContainer.start(); * 2、simpleMessageListenerContainer.stop(); * 3、如果后期rabbitmq 的配置是从数据库中读取,即用户在页面上添加一个配置,就动态创建这个 * 此时就需要调用 SimpleMessageListenerContainer#start 方法 */ defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "SimpleMessageListenerContainer"), simpleMessageListenerContainer); }); new Thread(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } RabbitTemplate firstRabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean("firstRabbitTemplate"); firstRabbitTemplate.convertAndSend("exchange-rabbit-multi-01", "", "first queue message"); log.info("over..."); }).start(); } }
https://gitee.com/huan1993/rabbitmq/tree/master/rabbitmq-springboot-multi
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。