赞
踩
一个spring boot项目配置一个rabbit mq很常见,如何配置两个以及两个以上的mq?本篇文章将结合代码说明如何配置两个rabbit mq(talk is cheap, show me the code)。
- <groupId>cn.honorzhang</groupId>
- <artifactId>my-springboot-rabbitmq</artifactId>
- <version>1.0.0</version>
- <modules>
- <module>my-springboot-rabbitmq-amqp</module>
- <module>my-springboot-rabbitmq-model</module>
- </modules>
- <packaging>pom</packaging>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.2.7.RELEASE</version></parent>
如上代码所示,本项目包含两个模型,一个model模型,主要定义mq传递消息所用的模型,解析mq消息并转换成自己设计的模型。amqp模型主要是两个mq的配置,mq的公共配置,以及收发消息的逻辑代码。本例中定义了两个mq,分别包含消息的生产,消息的接收。
- spring.rabbitmq.first.host=${spring.rabbitmq.first.host}
- spring.rabbitmq.first.port=${spring.rabbitmq.first.port}
- spring.rabbitmq.first.user=${spring.rabbitmq.first.user}
- spring.rabbitmq.first.password=${spring.rabbitmq.first.password}
- spring.rabbitmq.first.virtual-host=${spring.rabbitmq.first.virtual-host}
- spring.rabbitmq.first.exchange=${spring.rabbitmq.first.exchange}
- spring.rabbitmq.first.routing.key=${spring.rabbitmq.first.routing.key}
其中queue的名称配置为环境变量,在消费端@RabbitListener中直接读取此环境变量的值。
- @Configuration
- @PropertySource(value = "classpath:first-mq-config.properties")
- public class FirstMqConfig {
-
- @Bean(name = "firstConnectionFactory")
- @Primary
- public ConnectionFactory firstConnectionFactory(
- @Value("${spring.rabbitmq.first.host}") String host,
- @Value("${spring.rabbitmq.first.port}") String port,
- @Value("${spring.rabbitmq.first.user}") String username,
- @Value("${spring.rabbitmq.first.password}") String password,
- @Value("${spring.rabbitmq.first.virtual-host}") String virtualHost){
-
- return constructConnectionFactory(host, port, username, password, virtualHost);
- }
-
- @Bean(name = "firstRabbitTemplate")
- public RabbitTemplate firstRabbitTemplate(
- @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
- ) {
- RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
- return firstRabbitTemplate;
- }
-
-
- // 配置监听1
- @Bean(name = "firstFactory")
- public SimpleRabbitListenerContainerFactory firstFactory(
- SimpleRabbitListenerContainerFactoryConfigurer configurer,
- @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
- ) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- configurer.configure(factory, connectionFactory);
- return factory;
- }
-
-
-
-
- private ConnectionFactory constructConnectionFactory(
- String host, String port, String username, String password, String virtualHost){
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setHost(host);
- connectionFactory.setPort(Integer.parseInt(port));
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost(virtualHost);
-
- return connectionFactory;
- }
-
- }

还有mq的一般配置,对于发送消息的序列化配置,以及接收消息的模型转换配置
- @Configuration
- public class MqCommonConfig {
- @Bean
- public MessageConverter jackson2JsonMessageConverter(){
- return new Jackson2JsonMessageConverter();
- }
-
- /**
- * 解决方法:添加这个类进行序列化解析
- * 会自动识别
- * @param objectMapper json序列化实现类
- * @return mq 消息序列化工具
- */
- @Bean
- public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
- return new Jackson2JsonMessageConverter(objectMapper);
- }
- }

生产端代码
- @Resource(name = "firstRabbitTemplate")
- private RabbitTemplate firstRabbitTemplate;
-
-
- @Value("${spring.rabbitmq.first.exchange:@null}")
- private String firstExchange;
-
- @Value("${spring.rabbitmq.first.routing.key:@null}")
- private String firstRoutingKeyRoute;
-
-
- public void sendMessageByFirstMq() {
- RabbitMqMsg rabbitMqMsg = constructMqMsg("first", "**********first**********");
- log.info("-------------------send first rabbit mq info-----------------");
- firstRabbitTemplate.convertAndSend(
- firstExchange, firstRoutingKeyRoute, rabbitMqMsg, generateMqHeader());
-
- }
-
-
- private RabbitMqMsg constructMqMsg(String id, String text) {
-
- return RabbitMqMsg.builder()
- .msgId(id)
- .msgText(text)
- .build();
-
- }
-
-
- private MessagePostProcessor generateMqHeader() {
- return message -> {
- MessageProperties properties = message.getMessageProperties();
- properties.setHeader("content-type", "application/json");
- properties.setHeader("content-encoding", "UTF-8");
- return message;
- };
- }

消费端代码
- @RabbitListener(queues = "${spring.rabbitmq.first.queue.name}", containerFactory="firstFactory")
- public void processFirstMqMessage(@Payload RabbitMqMsg message) {
- log.info("**********processMessage first message****************: {}", message);
- }
第三部分的代码只展示了配置一个mq,配置多个mq是依葫芦画瓢,自行配置即可,本例中作者配置了两个mq,分别发送消息与接收消息,最后结果如下图所示。
本篇博客的完整代码地址为github
本篇博客给出了spring boot配置多个mq的实例,代码在实际应用中跑通,欢迎一起交流。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。