当前位置:   article > 正文

多个springboot整合使用rabbitmq(使用注解的方式)_springboot 配置多个rabbitmq

springboot 配置多个rabbitmq

一、简述

先参考单个springboot使用rabbitmq和了解rabbitmq的五种模式

单个springboot整合rabbitmq_java-zh的博客-CSDN博客

二、创建项目

1、先创建两个springboot项目,一个做生产者,一个做消费者

 2、导包(生产者和消费者对应的内容都是一样)

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.4.1</version>
  9. <relativePath/>
  10. </parent>
  11. <groupId>com.mq</groupId>
  12. <artifactId>mqcloud</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <properties>
  15. <spring-cloud-alibaba.version>2021.1</spring-cloud-alibaba.version>
  16. <java.version>1.8</java.version>
  17. <spring-boot.version>2.5.3</spring-boot.version>
  18. <lombok.version>1.18.14</lombok.version>
  19. <mybaits.plus.version>3.3.2</mybaits.plus.version>
  20. <alibaba.json.version>1.2.75</alibaba.json.version>
  21. <spring-cloud-starter-bootstrap.version>3.0.2</spring-cloud-starter-bootstrap.version>
  22. </properties>
  23. <!-- <dependencyManagement>-->
  24. <dependencies>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-web</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-actuator</artifactId>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.springframework.boot</groupId>
  35. <artifactId>spring-boot-devtools</artifactId>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-starter-test</artifactId>
  40. <scope>test</scope>
  41. </dependency>
  42. <!--AOP-->
  43. <dependency>
  44. <groupId>org.springframework.boot</groupId>
  45. <artifactId>spring-boot-starter-aop</artifactId>
  46. </dependency>
  47. <!--bootstrap 启动器-->
  48. <dependency>
  49. <groupId>org.springframework.cloud</groupId>
  50. <artifactId>spring-cloud-starter-bootstrap</artifactId>
  51. <version>${spring-cloud-starter-bootstrap.version}</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>com.alibaba</groupId>
  55. <artifactId>fastjson</artifactId>
  56. <version>${alibaba.json.version}</version>
  57. </dependency>
  58. <dependency>
  59. <groupId>org.projectlombok</groupId>
  60. <artifactId>lombok</artifactId>
  61. <version>${lombok.version}</version>
  62. <optional>true</optional>
  63. </dependency>
  64. <!--jjwt-->
  65. <dependency>
  66. <groupId>io.jsonwebtoken</groupId>
  67. <artifactId>jjwt</artifactId>
  68. <version>0.9.0</version>
  69. </dependency>
  70. <dependency>
  71. <groupId>com.google.guava</groupId>
  72. <artifactId>guava</artifactId>
  73. <version>31.1-jre</version>
  74. </dependency>
  75. <dependency>
  76. <groupId>com.alibaba.cloud</groupId>
  77. <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
  78. <version>${spring-cloud-alibaba.version}</version>
  79. </dependency>
  80. <dependency>
  81. <groupId>com.alibaba.cloud</groupId>
  82. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  83. <version>${spring-cloud-alibaba.version}</version>
  84. </dependency>
  85. <dependency>
  86. <groupId>org.apache.commons</groupId>
  87. <artifactId>commons-lang3</artifactId>
  88. <version>3.12.0</version>
  89. </dependency>
  90. <!--zhong 基础jar-->
  91. <dependency>
  92. <groupId>com.common</groupId>
  93. <artifactId>base</artifactId>
  94. <version>1.0-SNAPSHOT</version>
  95. </dependency>
  96. <!--消息队列 -->
  97. <dependency>
  98. <groupId>org.springframework.boot</groupId>
  99. <artifactId>spring-boot-starter-amqp</artifactId>
  100. <version>3.0.4</version>
  101. </dependency>
  102. </dependencies>
  103. <!-- </dependencyManagement>-->
  104. </project>

3、编写配置文件

这里的配置文件生产者和消费者都一样

  1. spring:
  2. rabbitmq:
  3. #如果是集群,用,隔开
  4. connection-timeout: 15000
  5. username: admin
  6. password: 123456
  7. #虚拟host,可以不设置
  8. virtual-host: /
  9. listener:
  10. simple:
  11. acknowledge-mode: auto #manual:手动处理 auto:自动处理
  12. #消费端监听个数(即@RabbitListenter开启几个线程去处理)
  13. concurrency: 10
  14. #消费端监听的最大个数
  15. max-concurrency: 10
  16. prefetch: 5
  17. default-requeue-rejected: true #消费不成功的消息,拒绝入队
  18. retry:
  19. enabled: true #开启消息重试
  20. max-attempts: 4 #重试次数
  21. max-interval: 10000 #重试最大间隔时间
  22. initial-interval: 2000 #重试初始间隔时间
  23. #消息确认方式,通过correlated来确认
  24. publisher-confirm-type: correlated
  25. publisher-returns: true
  26. host: 192.168.139.128
  27. port: 5672
  28. rabbitmq:
  29. #订阅模式(扇形模式)
  30. fanout:
  31. exchange:
  32. #交换机名称
  33. name1: exchange.fanout
  34. queue:
  35. #通道名称
  36. name1: exchange.fanout.queue_1
  37. name2: exchange.fanout.queue_2
  38. #交换机模式
  39. direct:
  40. exchange:
  41. name1: exchange.direct
  42. queue:
  43. name1: exchange.direct.queue_1
  44. name2: exchange.direct.queue_2
  45. routing:
  46. name1: exchange.direct.routing.1
  47. name2: exchange.direct.routing.2
  48. #主题模式
  49. topic:
  50. exchange:
  51. name1: exchange.topic
  52. queue:
  53. name1: exchange.topic.queue_1
  54. name2: exchange.topic.queue_2
  55. routing:
  56. name1: '#.routingkey.#'
  57. name2: routingkey.*

三、编码代码和测试结果

这里以rabbitmq的订阅模式(扇形模式)、路由器模式、主题模式为案例

3.1 扇形模式

生产者

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. // 订阅模式的交换机名称
  4. @Value("${rabbitmq.fanout.exchange.name1}")
  5. private String exchangeName;
  6. /**
  7. * 订阅模式(扇形模式)生产者
  8. *
  9. * @param context
  10. */
  11. @GetMapping("/fanout/{context}")
  12. public void sendMessage(@PathVariable String context) {
  13. System.out.println("需要发送得内容为:" + context);
  14. rabbitTemplate.convertAndSend(exchangeName, "", context);
  15. }

消费者

注意:消费者绑定交换机和通道的值必须是固定常量值,所以我们直接从配置文件中读取

  1. @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbitmq.fanout.exchange.name1}", type = ExchangeTypes.FANOUT),
  2. value = @Queue(value = "${rabbitmq.fanout.queue.name1}")))
  3. @RabbitHandler
  4. public void consumerFanoutExchange(String context) {
  5. System.out.println("订阅模式,通道一接收到的内容为内容:" + context);
  6. }
  7. @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbitmq.fanout.exchange.name1}", type = ExchangeTypes.FANOUT),
  8. value = @Queue(value = "${rabbitmq.fanout.queue.name2}")))
  9. @RabbitHandler
  10. public void consumerFanoutExchange2(String context) {
  11. System.out.println("订阅模式,通道二接收到的内容为内容:" + context);
  12. }

3.2 路由器模式

生产者

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. // 路由器模式的交换机名称
  4. @Value("${rabbitmq.direct.exchange.name1}")
  5. private String directName;
  6. // 路由器模式的路由key1
  7. @Value("${rabbitmq.direct.routing.name1}")
  8. private String directRoutingKeyName1;
  9. //路由器模式的路由key2
  10. @Value("${rabbitmq.direct.routing.name2}")
  11. private String directRoutingKeyName2;
  12. /**
  13. * 路由器模式
  14. * @param context
  15. * @param routingkey
  16. */
  17. @GetMapping("/direct")
  18. public void sendMessageDirect(@RequestParam("context") String context, @RequestParam("routingkey") Integer routingkey) {
  19. if (1 == routingkey) {
  20. rabbitTemplate.convertAndSend(directName, directRoutingKeyName1, context);
  21. } else if (2 == routingkey) {
  22. rabbitTemplate.convertAndSend(directName, directRoutingKeyName2, context);
  23. } else {
  24. System.out.println("数据非法");
  25. }
  26. }

消费者

  1. @RabbitListener(bindings = @QueueBinding(
  2. exchange = @Exchange(value = "${rabbitmq.direct.exchange.name1}", type = ExchangeTypes.DIRECT),
  3. value = @Queue(value = "${rabbitmq.direct.queue.name1}"),
  4. key = "${rabbitmq.direct.routing.name1}"))
  5. public void exchangeDirectRoutingKey1(String context, Message message) {
  6. System.out.println("key1:" + message.getMessageProperties().getReceivedRoutingKey());
  7. System.out.println("路由器模式1 接收到的消息为:" + context);
  8. }
  9. @RabbitListener(bindings = @QueueBinding(
  10. exchange = @Exchange(value = "${rabbitmq.direct.exchange.name1}", type = ExchangeTypes.DIRECT),
  11. value = @Queue(value = "${rabbitmq.direct.queue.name2}"),
  12. key = "${rabbitmq.direct.routing.name2}"))
  13. public void exchangeDirectRoutingKey2(String context, Message message) {
  14. System.out.println("key2:" + message.getMessageProperties().getReceivedRoutingKey());
  15. System.out.println("路由器模式2 接收到的消息为" + context);
  16. }

3.3 主题模式

生产者

  1. // 主题模式的交换机名称
  2. @Value("${rabbitmq.topic.exchange.name1}")
  3. private String topicName;
  4. //用来匹配主题模式对应的key
  5. public static final String EXCHANGE_TOPIC_CASE_KEY_1 = "topic.routingkey.case1";
  6. //如果case_key_2这样写,那么绑定case_key_1的队列一样会接收到,因为case_key_2也一样和key1匹配上
  7. public static final String EXCHANGE_TOPIC_CASE_KEY_2 = "routingkey.case2";
  8. @GetMapping("/topic")
  9. public void sendMessageTopic(@RequestParam("context") String context, @RequestParam("routingkey") Integer routingkey) {
  10. if (1 == routingkey) {
  11. rabbitTemplate.convertAndSend(topicName, EXCHANGE_TOPIC_CASE_KEY_1, context + routingkey);
  12. } else if (2 == routingkey) {
  13. rabbitTemplate.convertAndSend(topicName, EXCHANGE_TOPIC_CASE_KEY_2, context + routingkey);
  14. } else {
  15. System.out.println("数据非法");
  16. }
  17. }

消费者

  1. @RabbitListener(bindings = @QueueBinding(
  2. exchange = @Exchange(value = "${rabbitmq.topic.exchange.name1", type = ExchangeTypes.TOPIC),
  3. value = @Queue(value = "${rabbitmq.topic.queue.name1}"),
  4. key = "${rabbitmq.topic.routing.name1}"))
  5. @RabbitHandler
  6. public void exchangeTopicRoutingKey1(String context, Message message) {
  7. System.out.println("key1:"+message.getMessageProperties().getReceivedRoutingKey());
  8. System.out.println("主题模式1:接收的内容为:"+ context);
  9. }
  10. @RabbitListener(bindings = @QueueBinding(
  11. exchange = @Exchange(value = "${rabbitmq.topic.exchange.name1", type = ExchangeTypes.TOPIC),
  12. value = @Queue(value = "${rabbitmq.topic.queue.name2}"),
  13. key = "${rabbitmq.topic.routing.name2}"))
  14. @RabbitHandler
  15. public void exchangeTopicRoutingKey2(String context, Message message) {
  16. System.out.println("key2:"+message.getMessageProperties().getReceivedRoutingKey());
  17. System.out.println("主题模式2:接收的内容为:"+ context);
  18. }

3.4 全量代码

生产者

  1. @RestController
  2. @RequiredArgsConstructor
  3. @RequestMapping("/test/mq")
  4. public class ProductController {
  5. private final RabbitTemplate rabbitTemplate;
  6. // 订阅模式的交换机名称
  7. @Value("${rabbitmq.fanout.exchange.name1}")
  8. private String exchangeName;
  9. // 路由器模式的交换机名称
  10. @Value("${rabbitmq.direct.exchange.name1}")
  11. private String directName;
  12. // 路由器模式的路由key1
  13. @Value("${rabbitmq.direct.routing.name1}")
  14. private String directRoutingKeyName1;
  15. //路由器模式的路由key2
  16. @Value("${rabbitmq.direct.routing.name2}")
  17. private String directRoutingKeyName2;
  18. // 主题模式的交换机名称
  19. @Value("${rabbitmq.topic.exchange.name1}")
  20. private String topicName;
  21. //用来匹配主题模式对应的key
  22. public static final String EXCHANGE_TOPIC_CASE_KEY_1 = "topic.routingkey.case1";
  23. //如果case_key_2这样写,那么绑定case_key_1的队列一样会接收到,因为case_key_2也一样和key1匹配上
  24. public static final String EXCHANGE_TOPIC_CASE_KEY_2 = "routingkey.case2";
  25. /**
  26. * 订阅模式(扇形模式)生产者
  27. *
  28. * @param context
  29. */
  30. @GetMapping("/fanout/{context}")
  31. public void sendMessage(@PathVariable String context) {
  32. System.out.println("需要发送得内容为:" + context);
  33. rabbitTemplate.convertAndSend(exchangeName, "", context);
  34. }
  35. /**
  36. * 路由器模式
  37. * @param context
  38. * @param routingkey
  39. */
  40. @GetMapping("/direct")
  41. public void sendMessageDirect(@RequestParam("context") String context, @RequestParam("routingkey") Integer routingkey) {
  42. if (1 == routingkey) {
  43. rabbitTemplate.convertAndSend(directName, directRoutingKeyName1, context);
  44. } else if (2 == routingkey) {
  45. rabbitTemplate.convertAndSend(directName, directRoutingKeyName2, context);
  46. } else {
  47. System.out.println("数据非法");
  48. }
  49. }
  50. @GetMapping("/topic")
  51. public void sendMessageTopic(@RequestParam("context") String context, @RequestParam("routingkey") Integer routingkey) {
  52. if (1 == routingkey) {
  53. rabbitTemplate.convertAndSend(topicName, EXCHANGE_TOPIC_CASE_KEY_1, context + routingkey);
  54. } else if (2 == routingkey) {
  55. rabbitTemplate.convertAndSend(topicName, EXCHANGE_TOPIC_CASE_KEY_2, context + routingkey);
  56. } else {
  57. System.out.println("数据非法");
  58. }
  59. }
  60. }

消费者

  1. @Component
  2. public class ConsumerTest {
  3. @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbitmq.fanout.exchange.name1}", type = ExchangeTypes.FANOUT),
  4. value = @Queue(value = "${rabbitmq.fanout.queue.name1}")))
  5. @RabbitHandler
  6. public void consumerFanoutExchange(String context) {
  7. System.out.println("订阅模式,通道一接收到的内容为内容:" + context);
  8. }
  9. @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbitmq.fanout.exchange.name1}", type = ExchangeTypes.FANOUT),
  10. value = @Queue(value = "${rabbitmq.fanout.queue.name2}")))
  11. @RabbitHandler
  12. public void consumerFanoutExchange2(String context) {
  13. System.out.println("订阅模式,通道二接收到的内容为内容:" + context);
  14. }
  15. @RabbitListener(bindings = @QueueBinding(
  16. exchange = @Exchange(value = "${rabbitmq.direct.exchange.name1}", type = ExchangeTypes.DIRECT),
  17. value = @Queue(value = "${rabbitmq.direct.queue.name1}"),
  18. key = "${rabbitmq.direct.routing.name1}"))
  19. public void exchangeDirectRoutingKey1(String context, Message message) {
  20. System.out.println("key1:" + message.getMessageProperties().getReceivedRoutingKey());
  21. System.out.println("路由器模式1 接收到的消息为:" + context);
  22. }
  23. @RabbitListener(bindings = @QueueBinding(
  24. exchange = @Exchange(value = "${rabbitmq.direct.exchange.name1}", type = ExchangeTypes.DIRECT),
  25. value = @Queue(value = "${rabbitmq.direct.queue.name2}"),
  26. key = "${rabbitmq.direct.routing.name2}"))
  27. public void exchangeDirectRoutingKey2(String context, Message message) {
  28. System.out.println("key2:" + message.getMessageProperties().getReceivedRoutingKey());
  29. System.out.println("路由器模式2 接收到的消息为" + context);
  30. }
  31. @RabbitListener(bindings = @QueueBinding(
  32. exchange = @Exchange(value = "${rabbitmq.topic.exchange.name1", type = ExchangeTypes.TOPIC),
  33. value = @Queue(value = "${rabbitmq.topic.queue.name1}"),
  34. key = "${rabbitmq.topic.routing.name1}"))
  35. @RabbitHandler
  36. public void exchangeTopicRoutingKey1(String context, Message message) {
  37. System.out.println("key1:"+message.getMessageProperties().getReceivedRoutingKey());
  38. System.out.println("主题模式1:接收的内容为:"+ context);
  39. }
  40. @RabbitListener(bindings = @QueueBinding(
  41. exchange = @Exchange(value = "${rabbitmq.topic.exchange.name1", type = ExchangeTypes.TOPIC),
  42. value = @Queue(value = "${rabbitmq.topic.queue.name2}"),
  43. key = "${rabbitmq.topic.routing.name2}"))
  44. @RabbitHandler
  45. public void exchangeTopicRoutingKey2(String context, Message message) {
  46. System.out.println("key2:"+message.getMessageProperties().getReceivedRoutingKey());
  47. System.out.println("主题模式2:接收的内容为:"+ context);
  48. }
  49. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/789213
推荐阅读
相关标签
  

闽ICP备14008679号