当前位置:   article > 正文

2024/5/13 SpringBoot配置多个RabbitMQ_springboot 多个rabbitmq

springboot 多个rabbitmq

目录

 一、单个RabbitMQ配置

        1.1、导入Maven坐标

        1.2、yaml配置

        1.3、java配置类

                1.3.1、交换机配置

                1.3.2、队列配置

                1.3.3、绑定配置

                1.3.4、连接配置

        1.4、生产者与消费者操作配置

                1.4.1、生产者操作配置

                1.4.2、消费者操作配置

二、多个RabbitMQ配置

        2.1、yaml配置 

        2.2、java配置类

         2.3、生产者与消费者操作配置

                2.3.1、生产者操作配置

                2.3.1、消费者操作配置

三、总结 


需求描述:原SpringBoot工程已经配置了一个RabbitMQ,现需求是再配置一个RabbitMQ,实现效果是不同RabbitMQ推送到不同的队列中,且互不干扰影响使用。

 一、单个RabbitMQ配置

        1.1、导入Maven坐标

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.10.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-amqp</artifactId>
  9. <version>2.4.4</version>
  10. </dependency>

        1.2、yaml配置

  1. rabbitmq:
  2. host: xx.xxx.xxx.xxx
  3. port: xxxx
  4. username: xxxx
  5. password: xxxxxx
  6. virtual-host: xxxx
  7. publisher-returns: true
  8. publisher-confirms: true
  9. listener:
  10. simple:
  11. default-requeue-rejected: true
  12. retry:
  13. enabled: false
  14. max-attempts: 3
  15. initial-interval: 5000

        1.3、java配置类

                1.3.1、交换机配置

  1. package com.ruoyi.report.config;
  2. import org.springframework.amqp.core.DirectExchange;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class ExchangeConfig {
  7. public static final String ecoa_exchange = "ecoaExchange";
  8. /**
  9. * 1.定义direct exchange
  10. * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机
  11. * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
  12. */
  13. @Bean
  14. public DirectExchange ecoaExchange() {
  15. DirectExchange directExchange = new DirectExchange(ecoa_exchange, true, false);
  16. return directExchange;
  17. }
  18. }

                1.3.2、队列配置

  1. package com.ruoyi.report.config;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @ClassName QueueConfig
  7. * @Description
  8. * @Author Mr.Huang
  9. * @Date 2023/9/22 16:26
  10. * @Version 1.0
  11. **/
  12. @Component
  13. public class QueueConfig {
  14. private static final String ecoa_file_upload_queue = "ecoa_file_upload_queue";
  15. @Bean
  16. public Queue ecoaFileUploadDispatchQueue() {
  17. /**
  18. durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
  19. auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
  20. exclusive 表示该消息队列是否只在当前connection生效,默认是false
  21. */
  22. return new Queue(ecoa_file_upload_queue, true, false, false);
  23. }
  24. }

                1.3.3、绑定配置

  1. package com.ruoyi.report.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * @ClassName BindingConfig
  9. * @Description
  10. * @Author Mr.Huang
  11. * @Date 2023/9/22 16:31
  12. * @Version 1.0
  13. **/
  14. @Component
  15. public class BindingConfig {
  16. @Autowired
  17. private QueueConfig queueConfig;
  18. @Autowired
  19. private ExchangeConfig exchangeConfig;
  20. public static final String ECOA_file_upload_key = "ecoa_file_upload_key";
  21. @Bean
  22. public Binding ecoaFileUploadDispatchBinding() {
  23. return BindingBuilder.bind(queueConfig.ecoaFileUploadDispatchQueue()).to(exchangeConfig.ecoaExchange()).with(ECOA_file_upload_key);
  24. }
  25. }

                1.3.4、连接配置

  1. package com.ruoyi.report.config;
  2. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /**
  8. * @ClassName RabbitMqConfig
  9. * @Description
  10. * @Author Mr.Huang
  11. * @Date 2023/9/22 16:14
  12. * @Version 1.0
  13. **/
  14. @Configuration
  15. public class RabbitMqConfig {
  16. /**
  17. * 连接工厂
  18. */
  19. @Autowired
  20. private ConnectionFactory connectionFactory;
  21. /**
  22. * 自定义rabbit template用于数据的接收和发送
  23. * 可以设置消息确认机制和回调
  24. *
  25. * @return
  26. */
  27. @Bean
  28. public RabbitTemplate rabbitTemplate() {
  29. RabbitTemplate template = new RabbitTemplate(connectionFactory);
  30. return template;
  31. }
  32. }

        1.4、生产者与消费者操作配置

                1.4.1、生产者操作配置

  1. package com.ruoyi.report.utils;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.serializer.SerializerFeature;
  4. import com.ruoyi.common.utils.StringUtils;
  5. import com.ruoyi.report.config.BindingConfig;
  6. import com.ruoyi.report.config.ExchangeConfig;
  7. import org.springframework.amqp.core.Message;
  8. import org.springframework.amqp.core.MessageBuilder;
  9. import org.springframework.amqp.core.MessageProperties;
  10. import org.springframework.amqp.rabbit.connection.CorrelationData;
  11. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.stereotype.Component;
  14. import java.util.UUID;
  15. /**
  16. * @ClassName MessageUtils
  17. * @Description
  18. * @Author Mr.Huang
  19. * @Date 2023/9/22 16:36
  20. * @Version 1.0
  21. **/
  22. @Component
  23. public class MessageUtils {
  24. @Autowired
  25. private RabbitTemplate rabbitTemplate;
  26. /**
  27. * 发送消息
  28. * 发送随货单信息
  29. * @param message 消息
  30. */
  31. public void sendMessage(Object message) {
  32. String uuid = UUID.randomUUID().toString();
  33. CorrelationData correlationId = new CorrelationData(uuid);
  34. Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
  35. rabbitTemplate.convertAndSend(ExchangeConfig.ecoa_exchange, BindingConfig.ECOA_file_upload_key, msg, correlationId);
  36. }
  37. }

                1.4.2、消费者操作配置

  1. package com.ruoyi.report.consumer;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.rabbitmq.client.Channel;
  4. import com.ruoyi.report.config.RabbitMqConfig;
  5. import com.ruoyi.report.entity.open.PrintResult;
  6. import com.ruoyi.report.service.open.PrintSendLogService;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.amqp.core.Message;
  9. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.stereotype.Component;
  12. import java.io.IOException;
  13. /**
  14. * @ClassName PrintFeedbackConsumer
  15. * @Description
  16. * @Author Mr.Huang
  17. * @Date 2024/4/30 10:23
  18. * @Version 1.0
  19. **/
  20. @Slf4j
  21. @Component
  22. public class PrintFeedbackConsumer {
  23. @Autowired
  24. private PrintSendLogService printSendLogService;
  25. @RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
  26. public void receiveMq(Message message, Channel channel) {
  27. try {
  28. String body = new String(message.getBody());
  29. log.info("接受【Print结果推送】RabbitMQ消息:"+body);
  30. JSONObject objJson = JSONObject.parseObject(body);
  31. Thread.sleep(1000);
  32. PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
  33. printSendLogService.updatePrintSendLog(printResult);
  34. }catch (Exception e){
  35. log.error("",e);
  36. try {
  37. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  38. } catch (IOException ex) {
  39. ex.printStackTrace();
  40. }
  41. }
  42. }
  43. }

二、多个RabbitMQ配置

        Maven坐标与上面单个RabbitMQ配置一致

        2.1、yaml配置 

  1. rabbitmq:
  2. first:
  3. host: xx.xxx.xxx.xxx
  4. port: xxxx
  5. username: xxxx
  6. password: xxxxxx
  7. virtual-host: xxxx
  8. publisher-returns: true
  9. publisher-confirms: true
  10. listener:
  11. simple:
  12. default-requeue-rejected: true
  13. retry:
  14. enabled: false
  15. max-attempts: 3
  16. initial-interval: 5000
  17. second:
  18. host: xx.xxx.xxx.xxx
  19. port: xxxx
  20. username: xxxx
  21. password: xxxxxx
  22. publisher-returns: true
  23. publisher-confirms: true
  24. virtual-host: xxxx
  25. listener:
  26. simple:
  27. default-requeue-rejected: true
  28. retry:
  29. enabled: false
  30. max-attempts: 3
  31. initial-interval: 5000

        2.2、java配置类

  1. package com.ruoyi.report.config;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  4. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  5. import org.springframework.amqp.rabbit.connection.Connection;
  6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Qualifier;
  10. import org.springframework.beans.factory.annotation.Value;
  11. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.context.annotation.Configuration;
  14. import org.springframework.context.annotation.Primary;
  15. /**
  16. * @ClassName RabbitMqConfig
  17. * @Description
  18. * @Author Mr.Huang
  19. * @Date 2023/9/22 16:14
  20. * @Version 1.0
  21. **/
  22. @Configuration
  23. public class RabbitMqConfig {
  24. // 第一个MQ电子药检队列与key
  25. public static final String ECOA_file_upload_queue = "ecoa_file_upload_queue";
  26. public static final String ECOA_file_upload_key = "ecoa_file_upload_key";
  27. // 第二个MQ单据打印平台队列与key
  28. public static final String print_tms_dispatch_info_queue = "print_tms_dispatch_info_queue";
  29. public static final String print_4pl_dispatch_info_feedback_queue = "print_4pl_dispatch_info_feedback_queue";
  30. public static final String print_tms_dispatch_info_key = "print_tms_dispatch_info_key";
  31. public static final String print_4pl_dispatch_info_feedback_key = "print_4pl_dispatch_info_feedback_key";
  32. /** 交换机名称 */
  33. public static final String EXCHANGE = "ecoaExchange";
  34. public static final String EXCHANGE2 = "tms_exchange";
  35. /** 第一个rabbitMq队列 */
  36. @Bean(name = "ECOAConnectionFactory")
  37. @Primary
  38. public ConnectionFactory ECOAConnectionFactory(@Value("${spring.rabbitmq.first.host}") String host,
  39. @Value("${spring.rabbitmq.first.port}") int port,
  40. @Value("${spring.rabbitmq.first.username}") String username,
  41. @Value("${spring.rabbitmq.first.password}") String password,
  42. @Value("${spring.rabbitmq.first.virtual-host}") String virtualHost) {
  43. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  44. connectionFactory.setHost(host);
  45. connectionFactory.setPort(port);
  46. connectionFactory.setUsername(username);
  47. connectionFactory.setPassword(password);
  48. connectionFactory.setVirtualHost(virtualHost);
  49. return connectionFactory;
  50. }
  51. /** 第二个rabbitMq队列 */
  52. @Bean(name = "printConnectionFactory")
  53. public ConnectionFactory printConnectionFactory(@Value("${spring.rabbitmq.second.host}") String host,
  54. @Value("${spring.rabbitmq.second.port}") int port,
  55. @Value("${spring.rabbitmq.second.username}") String username,
  56. @Value("${spring.rabbitmq.second.password}") String password,
  57. @Value("${spring.rabbitmq.second.virtual-host}") String virtualHost) {
  58. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  59. connectionFactory.setHost(host);
  60. connectionFactory.setPort(port);
  61. connectionFactory.setUsername(username);
  62. connectionFactory.setPassword(password);
  63. connectionFactory.setVirtualHost(virtualHost);
  64. return connectionFactory;
  65. }
  66. /** 第一个rabbitMq操作模板 */
  67. @Bean(name="ECOARabbitTemplate")
  68. @Primary
  69. public RabbitTemplate fplRabbitTemplate(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory){
  70. RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
  71. return firstRabbitTemplate;
  72. }
  73. /** 第二个rabbitMq操作模板 */
  74. @Bean(name="printRabbitTemplate")
  75. public RabbitTemplate tcscRabbitTemplate(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory){
  76. RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
  77. return secondRabbitTemplate;
  78. }
  79. /** 第一个rabbitMq连接工厂 */
  80. @Bean(name="ECOAContainerFactory")
  81. public SimpleRabbitListenerContainerFactory ECOAContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
  82. @Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
  83. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  84. factory.setMaxConcurrentConsumers(5);
  85. factory.setConcurrentConsumers(1);
  86. factory.setPrefetchCount(1);
  87. configurer.configure(factory, connectionFactory);
  88. return factory;
  89. }
  90. /** 第二个rabbitMq连接工厂 */
  91. @Bean(name="printContainerFactory")
  92. public SimpleRabbitListenerContainerFactory printContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
  93. @Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
  94. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  95. factory.setMaxConcurrentConsumers(5);
  96. factory.setConcurrentConsumers(1);
  97. factory.setPrefetchCount(1);
  98. configurer.configure(factory, connectionFactory);
  99. return factory;
  100. }
  101. /** 第一个mq绑定队列绑定交换机 */
  102. @Bean
  103. public String runECOAQueue(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
  104. System.out.println("configuration ECOAQueue ........................");
  105. Connection connection = connectionFactory.createConnection();
  106. Channel channel = connection.createChannel(false);
  107. try {
  108. channel.exchangeDeclare(EXCHANGE, "direct", true, false, null);
  109. // 单据推送电子药检队列
  110. channel.queueDeclare(ECOA_file_upload_queue, true, false, false, null);
  111. channel.queueBind(ECOA_file_upload_queue, EXCHANGE, ECOA_file_upload_key);
  112. } catch (Exception e) {
  113. e.printStackTrace();
  114. } finally {
  115. return "ECOAQueue";
  116. }
  117. }
  118. /** 第二个mq绑定队列绑定交换机 */
  119. @Bean
  120. public String runPrintQueue(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
  121. System.out.println("configuration printQueue ........................");
  122. Connection connection = connectionFactory.createConnection();
  123. Channel channel = connection.createChannel(false);
  124. try {
  125. channel.exchangeDeclare(EXCHANGE2, "direct", true, false, null);
  126. // 单据推送单据打印平台队列
  127. channel.queueDeclare(print_tms_dispatch_info_queue, true, false, false, null);
  128. channel.queueBind(print_tms_dispatch_info_queue, EXCHANGE2, print_tms_dispatch_info_key);
  129. // 单据打印平台反馈队列
  130. channel.queueDeclare(print_4pl_dispatch_info_feedback_queue,true,false,false,null);
  131. channel.queueBind(print_4pl_dispatch_info_feedback_queue,EXCHANGE2,print_4pl_dispatch_info_feedback_key);
  132. } catch (Exception e) {
  133. e.printStackTrace();
  134. } finally {
  135. return "printQueue";
  136. }
  137. }
  138. }

注意:需将原MQ:交换机、队列、绑定配置类注释掉,只留这一个配置文件即可,这个配置文件已经将对应的:交换机、队列绑定好,只是需要注意队列名字、交换机不要绑定错了

         2.3、生产者与消费者操作配置

                2.3.1、生产者操作配置

  1. package com.ruoyi.report.utils;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.serializer.SerializerFeature;
  4. import com.ruoyi.report.config.RabbitMqConfig;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.core.MessageBuilder;
  7. import org.springframework.amqp.core.MessageProperties;
  8. import org.springframework.amqp.rabbit.connection.CorrelationData;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10. import org.springframework.stereotype.Component;
  11. import javax.annotation.Resource;
  12. import java.util.UUID;
  13. /**
  14. * @ClassName MessageUtils
  15. * @Description
  16. * @Author Mr.Huang
  17. * @Date 2023/9/22 16:36
  18. * @Version 1.0
  19. **/
  20. @Component
  21. public class MessageUtils {
  22. @Resource(name = "ECOARabbitTemplate")
  23. private RabbitTemplate ECOARabbitTemplate;
  24. @Resource(name = "printRabbitTemplate")
  25. private RabbitTemplate printRabbitTemplate;
  26. /**
  27. * 向ECOA发送消息
  28. * 发送随货单信息
  29. * @param message 消息
  30. */
  31. public void sendMessage(Object message) {
  32. String uuid = UUID.randomUUID().toString();
  33. CorrelationData correlationId = new CorrelationData(uuid);
  34. Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
  35. ECOARabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ECOA_file_upload_key, msg, correlationId);
  36. }
  37. /**
  38. * 向print发送消息
  39. * 发送派车单信息
  40. * @param message 消息
  41. */
  42. public void sendPrintMessage(Object message) {
  43. String uuid = UUID.randomUUID().toString();
  44. CorrelationData correlationId = new CorrelationData(uuid);
  45. Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
  46. printRabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE2, RabbitMqConfig.print_tms_dispatch_info_key, msg, correlationId);
  47. }
  48. }

                2.3.1、消费者操作配置

  1. package com.ruoyi.report.consumer;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.rabbitmq.client.Channel;
  4. import com.ruoyi.report.config.RabbitMqConfig;
  5. import com.ruoyi.report.entity.open.PrintResult;
  6. import com.ruoyi.report.service.open.PrintSendLogService;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.amqp.core.Message;
  9. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.stereotype.Component;
  12. import java.io.IOException;
  13. /**
  14. * @ClassName PrintFeedbackConsumer
  15. * @Description
  16. * @Author Mr.Huang
  17. * @Date 2024/4/30 10:23
  18. * @Version 1.0
  19. **/
  20. @Slf4j
  21. @Component
  22. public class PrintFeedbackConsumer {
  23. @Autowired
  24. private PrintSendLogService printSendLogService;
  25. @RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
  26. public void receiveMq(Message message, Channel channel) {
  27. try {
  28. String body = new String(message.getBody());
  29. log.info("接受【Print结果推送】RabbitMQ消息:"+body);
  30. JSONObject objJson = JSONObject.parseObject(body);
  31. Thread.sleep(1000);
  32. PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
  33. printSendLogService.updatePrintSendLog(printResult);
  34. }catch (Exception e){
  35. log.error("",e);
  36. try {
  37. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  38. } catch (IOException ex) {
  39. ex.printStackTrace();
  40. }
  41. }
  42. }
  43. }

与单个RabbitMQ消费者操作一致,只是注意要消费的队列和连接工厂不要搞错了

三、总结 

配置单个RabbitMQ时不需要关心底层的连接工厂是如何配置的,当把yaml内容填好它会自动配置连接工厂,只需要把交换机、队列、配置绑定起来即可。 当需要配置多个mq时才需要自己手动配置连接工厂,并不是只能配置两个RabbitMQ,可以按这个格式配置更多个。唯一注意的是不要把这些队列和交换机搞混了即可。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/789188
推荐阅读
相关标签
  

闽ICP备14008679号