赞
踩
某服务配置了两个RabbitMQ数据源,并且在这两个数据源中分别建立一个exchange以及queue,但启动服务后发现所有的exchange和queue都被创建到某一个数据源中,服务启动失败。
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @Configuration public class RabbitMQConfig { @Value("${spring.rabbitmq.concurrentConsumers}") int devAlarmConcurrentConsumers; @Value("${spring.rabbitmq.maxConcurrentConsumers}") int devAlarmMaxConcurrentConsumers; @Value("${spring.rabbitmq.addresses}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtual_host; @Value("${spring.rabbitmq2.concurrentConsumers}") int concurrentConsumers2; @Value("${spring.rabbitmq2.maxConcurrentConsumers}") int maxConcurrentConsumers2; @Value("${spring.rabbitmq2.host}") private String host2; @Value("${spring.rabbitmq2.port}") private int port2; @Value("${spring.rabbitmq2.username}") private String username2; @Value("${spring.rabbitmq2.password}") private String password2; @Value("${spring.rabbitmq2.virtual-host}") private String virtual_host2; // ====================== 第一个mq数据源(主数据源) @Primary @Bean(name = "firstConnectionFactory") public ConnectionFactory firstConnectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(this.host); connectionFactory.setPort(this.port); connectionFactory.setUsername(this.username); connectionFactory.setPassword(this.password); connectionFactory.setVirtualHost(this.virtual_host); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "firstRabbitTemplate") public RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory){ RabbitTemplate rabbtiTemplate = new RabbitTemplate(connectionFactory); return rabbtiTemplate; } @Bean(name = "firstContainerFactory") public SimpleRabbitListenerContainerFactory firstContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer, @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory containerFactory=new SimpleRabbitListenerContainerFactory(); rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory); containerFactory.setConcurrentConsumers(devAlarmConcurrentConsumers); containerFactory.setMaxConcurrentConsumers(devAlarmMaxConcurrentConsumers); return containerFactory; } // ====================== 第二个mq数据源 @Bean(name = "secondConnectionFactory") public ConnectionFactory secondConnectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(this.host2); connectionFactory.setPort(this.port2); connectionFactory.setUsername(this.username2); connectionFactory.setPassword(this.password2); connectionFactory.setVirtualHost(this.virtual_host2); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "secondRabbitTemplate") public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory){ RabbitTemplate rabbtiTemplate = new RabbitTemplate(connectionFactory); return rabbtiTemplate; } @Bean(name = "secondContainerFactory") public SimpleRabbitListenerContainerFactory secondContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer, @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory containerFactory=new SimpleRabbitListenerContainerFactory(); rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory); containerFactory.setConcurrentConsumers(concurrentConsumers2); containerFactory.setMaxConcurrentConsumers(maxConcurrentConsumers2); return containerFactory; } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Slf4j @Component public class FirstReceiver { @RabbitListener(bindings = @QueueBinding( value = @Queue("test.first.queue"), exchange = @Exchange(name = "test.first.exchange", type = ExchangeTypes.TOPIC), key = "test.first" ), containerFactory = "firstContainerFactory") @RabbitHandler public void handleMsg(Message message) { // 代码逻辑... } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Slf4j @Component public class SecondReceiver { @RabbitListener(bindings = @QueueBinding( value = @Queue("test.second.queue"), exchange = @Exchange(name = "test.second.exchange", type = ExchangeTypes.FANOUT) ), containerFactory = "secondContainerFactory") @RabbitHandler public void handleMsg(Message message) { // 代码逻辑... } }
有了当前错误现象,进行如下步骤的问题根源定位:
如图所示,所有的exchange和queue都被同一个MQ数据源创建了。
发现【org.springframework.amqp.rabbit.core.RabbitAdmin】类有相关加载exchange和queue的逻辑,如图所示:
在开发环境服务中打断点,发现,谁能创建这个exchange和queue的原则是,是否指定了exchange和queue是被那个MQ数据源绑定的,也就是需要用到【RabbitAdmin.java
】实现【MQ数据源、exchange、queue
】三者之间的绑定,如果没有定义这个RabbitAdmin,那就会默认都被MQ主数据源
创建并绑定。
MQ主数据源:在一个集成了多个MQ数据源的Springboot项目中,必须要有一个被@Primary注解的主数据源,否则项目启动失败。
在原类基础上创建两个RabbitAdmin:
@Configuration
public class RabbitMQConfig {
@Bean(value = "firstRabbitAdmin")
public RabbitAdmin firstRabbitAdmin(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean(value = "secondRabbitAdmin")
public RabbitAdmin secondRabbitAdmin(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}
在原类的@RabbitListener注解中新增admins参数:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Slf4j @Component public class FirstReceiver { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "test.first.queue",admins = "firstRabbitAdmin"), exchange = @Exchange(name = "test.first.exchange", type = ExchangeTypes.TOPIC,admins = "firstRabbitAdmin"), key = "test.first", admins = "firstRabbitAdmin" ), containerFactory = "firstContainerFactory") @RabbitHandler public void handleMsg(Message message) { // 代码逻辑... } }
在原类的@RabbitListener注解中新增admins参数:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Slf4j @Component public class SecondReceiver { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "test.second.queue",admins = "secondRabbitAdmin"), exchange = @Exchange(name = "test.second.exchange", type = ExchangeTypes.FANOUT,admins = "secondRabbitAdmin"), admins = "secondRabbitAdmin" ), containerFactory = "secondContainerFactory") @RabbitHandler public void handleMsg(Message message) { // 代码逻辑... } }
最终两个MQ数据源可以分别创建对应的exchange和queue了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。