赞
踩
目录
需求描述:原SpringBoot工程已经配置了一个RabbitMQ,现需求是再配置一个RabbitMQ,实现效果是不同RabbitMQ推送到不同的队列中,且互不干扰影响使用。
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.10.0</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- <version>2.4.4</version>
- </dependency>
- rabbitmq:
- host: xx.xxx.xxx.xxx
- port: xxxx
- username: xxxx
- password: xxxxxx
- virtual-host: xxxx
- publisher-returns: true
- publisher-confirms: true
- listener:
- simple:
- default-requeue-rejected: true
- retry:
- enabled: false
- max-attempts: 3
- initial-interval: 5000
- package com.ruoyi.report.config;
-
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
-
- @Component
- public class ExchangeConfig {
-
- public static final String ecoa_exchange = "ecoaExchange";
-
- /**
- * 1.定义direct exchange
- * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机
- * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
- */
- @Bean
- public DirectExchange ecoaExchange() {
- DirectExchange directExchange = new DirectExchange(ecoa_exchange, true, false);
- return directExchange;
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package com.ruoyi.report.config;
-
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
-
- /**
- * @ClassName QueueConfig
- * @Description
- * @Author Mr.Huang
- * @Date 2023/9/22 16:26
- * @Version 1.0
- **/
- @Component
- public class QueueConfig {
-
- private static final String ecoa_file_upload_queue = "ecoa_file_upload_queue";
-
- @Bean
- public Queue ecoaFileUploadDispatchQueue() {
- /**
- durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
- auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
- exclusive 表示该消息队列是否只在当前connection生效,默认是false
- */
- return new Queue(ecoa_file_upload_queue, true, false, false);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package com.ruoyi.report.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
-
- /**
- * @ClassName BindingConfig
- * @Description
- * @Author Mr.Huang
- * @Date 2023/9/22 16:31
- * @Version 1.0
- **/
- @Component
- public class BindingConfig {
- @Autowired
- private QueueConfig queueConfig;
- @Autowired
- private ExchangeConfig exchangeConfig;
-
- public static final String ECOA_file_upload_key = "ecoa_file_upload_key";
-
-
- @Bean
- public Binding ecoaFileUploadDispatchBinding() {
- return BindingBuilder.bind(queueConfig.ecoaFileUploadDispatchQueue()).to(exchangeConfig.ecoaExchange()).with(ECOA_file_upload_key);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package com.ruoyi.report.config;
-
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @ClassName RabbitMqConfig
- * @Description
- * @Author Mr.Huang
- * @Date 2023/9/22 16:14
- * @Version 1.0
- **/
- @Configuration
- public class RabbitMqConfig {
- /**
- * 连接工厂
- */
- @Autowired
- private ConnectionFactory connectionFactory;
-
- /**
- * 自定义rabbit template用于数据的接收和发送
- * 可以设置消息确认机制和回调
- *
- * @return
- */
- @Bean
- public RabbitTemplate rabbitTemplate() {
- RabbitTemplate template = new RabbitTemplate(connectionFactory);
- return template;
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package com.ruoyi.report.utils;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.serializer.SerializerFeature;
- import com.ruoyi.common.utils.StringUtils;
- import com.ruoyi.report.config.BindingConfig;
- import com.ruoyi.report.config.ExchangeConfig;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.util.UUID;
-
- /**
- * @ClassName MessageUtils
- * @Description
- * @Author Mr.Huang
- * @Date 2023/9/22 16:36
- * @Version 1.0
- **/
- @Component
- public class MessageUtils {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- /**
- * 发送消息
- * 发送随货单信息
- * @param message 消息
- */
- public void sendMessage(Object message) {
- String uuid = UUID.randomUUID().toString();
- CorrelationData correlationId = new CorrelationData(uuid);
- Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
- rabbitTemplate.convertAndSend(ExchangeConfig.ecoa_exchange, BindingConfig.ECOA_file_upload_key, msg, correlationId);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package com.ruoyi.report.consumer;
-
- import com.alibaba.fastjson.JSONObject;
- import com.rabbitmq.client.Channel;
- import com.ruoyi.report.config.RabbitMqConfig;
- import com.ruoyi.report.entity.open.PrintResult;
- import com.ruoyi.report.service.open.PrintSendLogService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
- /**
- * @ClassName PrintFeedbackConsumer
- * @Description
- * @Author Mr.Huang
- * @Date 2024/4/30 10:23
- * @Version 1.0
- **/
- @Slf4j
- @Component
- public class PrintFeedbackConsumer {
-
- @Autowired
- private PrintSendLogService printSendLogService;
-
- @RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
- public void receiveMq(Message message, Channel channel) {
- try {
- String body = new String(message.getBody());
- log.info("接受【Print结果推送】RabbitMQ消息:"+body);
- JSONObject objJson = JSONObject.parseObject(body);
- Thread.sleep(1000);
- PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
- printSendLogService.updatePrintSendLog(printResult);
- }catch (Exception e){
- log.error("",e);
- try {
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
Maven坐标与上面单个RabbitMQ配置一致
- rabbitmq:
- first:
- host: xx.xxx.xxx.xxx
- port: xxxx
- username: xxxx
- password: xxxxxx
- virtual-host: xxxx
- publisher-returns: true
- publisher-confirms: true
- listener:
- simple:
- default-requeue-rejected: true
- retry:
- enabled: false
- max-attempts: 3
- initial-interval: 5000
- second:
- host: xx.xxx.xxx.xxx
- port: xxxx
- username: xxxx
- password: xxxxxx
- publisher-returns: true
- publisher-confirms: true
- virtual-host: xxxx
- listener:
- simple:
- default-requeue-rejected: true
- retry:
- enabled: false
- max-attempts: 3
- initial-interval: 5000
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package com.ruoyi.report.config;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.Connection;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
-
- /**
- * @ClassName RabbitMqConfig
- * @Description
- * @Author Mr.Huang
- * @Date 2023/9/22 16:14
- * @Version 1.0
- **/
- @Configuration
- public class RabbitMqConfig {
- // 第一个MQ电子药检队列与key
- public static final String ECOA_file_upload_queue = "ecoa_file_upload_queue";
- public static final String ECOA_file_upload_key = "ecoa_file_upload_key";
-
- // 第二个MQ单据打印平台队列与key
- public static final String print_tms_dispatch_info_queue = "print_tms_dispatch_info_queue";
- public static final String print_4pl_dispatch_info_feedback_queue = "print_4pl_dispatch_info_feedback_queue";
- public static final String print_tms_dispatch_info_key = "print_tms_dispatch_info_key";
- public static final String print_4pl_dispatch_info_feedback_key = "print_4pl_dispatch_info_feedback_key";
- /** 交换机名称 */
- public static final String EXCHANGE = "ecoaExchange";
- public static final String EXCHANGE2 = "tms_exchange";
-
- /** 第一个rabbitMq队列 */
- @Bean(name = "ECOAConnectionFactory")
- @Primary
- public ConnectionFactory ECOAConnectionFactory(@Value("${spring.rabbitmq.first.host}") String host,
- @Value("${spring.rabbitmq.first.port}") int port,
- @Value("${spring.rabbitmq.first.username}") String username,
- @Value("${spring.rabbitmq.first.password}") String password,
- @Value("${spring.rabbitmq.first.virtual-host}") String virtualHost) {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setHost(host);
- connectionFactory.setPort(port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost(virtualHost);
- return connectionFactory;
- }
-
- /** 第二个rabbitMq队列 */
- @Bean(name = "printConnectionFactory")
- public ConnectionFactory printConnectionFactory(@Value("${spring.rabbitmq.second.host}") String host,
- @Value("${spring.rabbitmq.second.port}") int port,
- @Value("${spring.rabbitmq.second.username}") String username,
- @Value("${spring.rabbitmq.second.password}") String password,
- @Value("${spring.rabbitmq.second.virtual-host}") String virtualHost) {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setHost(host);
- connectionFactory.setPort(port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost(virtualHost);
- return connectionFactory;
- }
- /** 第一个rabbitMq操作模板 */
- @Bean(name="ECOARabbitTemplate")
- @Primary
- public RabbitTemplate fplRabbitTemplate(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory){
- RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
- return firstRabbitTemplate;
- }
- /** 第二个rabbitMq操作模板 */
- @Bean(name="printRabbitTemplate")
- public RabbitTemplate tcscRabbitTemplate(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory){
- RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
- return secondRabbitTemplate;
- }
- /** 第一个rabbitMq连接工厂 */
- @Bean(name="ECOAContainerFactory")
- public SimpleRabbitListenerContainerFactory ECOAContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
- @Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setMaxConcurrentConsumers(5);
- factory.setConcurrentConsumers(1);
- factory.setPrefetchCount(1);
- configurer.configure(factory, connectionFactory);
- return factory;
- }
- /** 第二个rabbitMq连接工厂 */
- @Bean(name="printContainerFactory")
- public SimpleRabbitListenerContainerFactory printContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
- @Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setMaxConcurrentConsumers(5);
- factory.setConcurrentConsumers(1);
- factory.setPrefetchCount(1);
- configurer.configure(factory, connectionFactory);
- return factory;
- }
- /** 第一个mq绑定队列绑定交换机 */
- @Bean
- public String runECOAQueue(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
- System.out.println("configuration ECOAQueue ........................");
- Connection connection = connectionFactory.createConnection();
- Channel channel = connection.createChannel(false);
- try {
- channel.exchangeDeclare(EXCHANGE, "direct", true, false, null);
- // 单据推送电子药检队列
- channel.queueDeclare(ECOA_file_upload_queue, true, false, false, null);
- channel.queueBind(ECOA_file_upload_queue, EXCHANGE, ECOA_file_upload_key);
-
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- return "ECOAQueue";
- }
- }
- /** 第二个mq绑定队列绑定交换机 */
- @Bean
- public String runPrintQueue(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
- System.out.println("configuration printQueue ........................");
- Connection connection = connectionFactory.createConnection();
- Channel channel = connection.createChannel(false);
- try {
- channel.exchangeDeclare(EXCHANGE2, "direct", true, false, null);
- // 单据推送单据打印平台队列
- channel.queueDeclare(print_tms_dispatch_info_queue, true, false, false, null);
- channel.queueBind(print_tms_dispatch_info_queue, EXCHANGE2, print_tms_dispatch_info_key);
- // 单据打印平台反馈队列
- channel.queueDeclare(print_4pl_dispatch_info_feedback_queue,true,false,false,null);
- channel.queueBind(print_4pl_dispatch_info_feedback_queue,EXCHANGE2,print_4pl_dispatch_info_feedback_key);
-
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- return "printQueue";
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
注意:需将原MQ:交换机、队列、绑定配置类注释掉,只留这一个配置文件即可,这个配置文件已经将对应的:交换机、队列绑定好,只是需要注意队列名字、交换机不要绑定错了
- package com.ruoyi.report.utils;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.serializer.SerializerFeature;
- import com.ruoyi.report.config.RabbitMqConfig;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.util.UUID;
-
- /**
- * @ClassName MessageUtils
- * @Description
- * @Author Mr.Huang
- * @Date 2023/9/22 16:36
- * @Version 1.0
- **/
- @Component
- public class MessageUtils {
-
- @Resource(name = "ECOARabbitTemplate")
- private RabbitTemplate ECOARabbitTemplate;
-
- @Resource(name = "printRabbitTemplate")
- private RabbitTemplate printRabbitTemplate;
-
- /**
- * 向ECOA发送消息
- * 发送随货单信息
- * @param message 消息
- */
- public void sendMessage(Object message) {
- String uuid = UUID.randomUUID().toString();
- CorrelationData correlationId = new CorrelationData(uuid);
- Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
- ECOARabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ECOA_file_upload_key, msg, correlationId);
- }
-
- /**
- * 向print发送消息
- * 发送派车单信息
- * @param message 消息
- */
- public void sendPrintMessage(Object message) {
- String uuid = UUID.randomUUID().toString();
- CorrelationData correlationId = new CorrelationData(uuid);
- Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
- printRabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE2, RabbitMqConfig.print_tms_dispatch_info_key, msg, correlationId);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package com.ruoyi.report.consumer;
-
- import com.alibaba.fastjson.JSONObject;
- import com.rabbitmq.client.Channel;
- import com.ruoyi.report.config.RabbitMqConfig;
- import com.ruoyi.report.entity.open.PrintResult;
- import com.ruoyi.report.service.open.PrintSendLogService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
- /**
- * @ClassName PrintFeedbackConsumer
- * @Description
- * @Author Mr.Huang
- * @Date 2024/4/30 10:23
- * @Version 1.0
- **/
- @Slf4j
- @Component
- public class PrintFeedbackConsumer {
-
- @Autowired
- private PrintSendLogService printSendLogService;
-
- @RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
- public void receiveMq(Message message, Channel channel) {
- try {
- String body = new String(message.getBody());
- log.info("接受【Print结果推送】RabbitMQ消息:"+body);
- JSONObject objJson = JSONObject.parseObject(body);
- Thread.sleep(1000);
- PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
- printSendLogService.updatePrintSendLog(printResult);
- }catch (Exception e){
- log.error("",e);
- try {
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
与单个RabbitMQ消费者操作一致,只是注意要消费的队列和连接工厂不要搞错了
配置单个RabbitMQ时不需要关心底层的连接工厂是如何配置的,当把yaml内容填好它会自动配置连接工厂,只需要把交换机、队列、配置绑定起来即可。 当需要配置多个mq时才需要自己手动配置连接工厂,并不是只能配置两个RabbitMQ,可以按这个格式配置更多个。唯一注意的是不要把这些队列和交换机搞混了即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。