赞
踩
先参考单个springboot使用rabbitmq和了解rabbitmq的五种模式
单个springboot整合rabbitmq_java-zh的博客-CSDN博客
1、先创建两个springboot项目,一个做生产者,一个做消费者
2、导包(生产者和消费者对应的内容都是一样)
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.4.1</version>
- <relativePath/>
- </parent>
- <groupId>com.mq</groupId>
- <artifactId>mqcloud</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <properties>
- <spring-cloud-alibaba.version>2021.1</spring-cloud-alibaba.version>
- <java.version>1.8</java.version>
- <spring-boot.version>2.5.3</spring-boot.version>
- <lombok.version>1.18.14</lombok.version>
- <mybaits.plus.version>3.3.2</mybaits.plus.version>
- <alibaba.json.version>1.2.75</alibaba.json.version>
- <spring-cloud-starter-bootstrap.version>3.0.2</spring-cloud-starter-bootstrap.version>
- </properties>
-
- <!-- <dependencyManagement>-->
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <!--AOP-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-aop</artifactId>
- </dependency>
- <!--bootstrap 启动器-->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-bootstrap</artifactId>
- <version>${spring-cloud-starter-bootstrap.version}</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>${alibaba.json.version}</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>${lombok.version}</version>
- <optional>true</optional>
- </dependency>
- <!--jjwt-->
- <dependency>
- <groupId>io.jsonwebtoken</groupId>
- <artifactId>jjwt</artifactId>
- <version>0.9.0</version>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>31.1-jre</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
- <version>${spring-cloud-alibaba.version}</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
- <version>${spring-cloud-alibaba.version}</version>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.12.0</version>
- </dependency>
- <!--zhong 基础jar-->
- <dependency>
- <groupId>com.common</groupId>
- <artifactId>base</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
-
- <!--消息队列 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- <version>3.0.4</version>
- </dependency>
- </dependencies>
- <!-- </dependencyManagement>-->
-
-
-
- </project>

3、编写配置文件
这里的配置文件生产者和消费者都一样
spring: rabbitmq: #如果是集群,用,隔开 connection-timeout: 15000 username: admin password: 123456 #虚拟host,可以不设置 virtual-host: / listener: simple: acknowledge-mode: auto #manual:手动处理 auto:自动处理 #消费端监听个数(即@RabbitListenter开启几个线程去处理) concurrency: 10 #消费端监听的最大个数 max-concurrency: 10 prefetch: 5 default-requeue-rejected: true #消费不成功的消息,拒绝入队 retry: enabled: true #开启消息重试 max-attempts: 4 #重试次数 max-interval: 10000 #重试最大间隔时间 initial-interval: 2000 #重试初始间隔时间 #消息确认方式,通过correlated来确认 publisher-confirm-type: correlated publisher-returns: true host: 192.168.139.128 port: 5672 rabbitmq: #订阅模式(扇形模式) fanout: exchange: #交换机名称 name1: exchange.fanout queue: #通道名称 name1: exchange.fanout.queue_1 name2: exchange.fanout.queue_2 #交换机模式 direct: exchange: name1: exchange.direct queue: name1: exchange.direct.queue_1 name2: exchange.direct.queue_2 routing: name1: exchange.direct.routing.1 name2: exchange.direct.routing.2 #主题模式 topic: exchange: name1: exchange.topic queue: name1: exchange.topic.queue_1 name2: exchange.topic.queue_2 routing: name1: '#.routingkey.#' name2: routingkey.*
这里以rabbitmq的订阅模式(扇形模式)、路由器模式、主题模式为案例
生产者
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- // 订阅模式的交换机名称
- @Value("${rabbitmq.fanout.exchange.name1}")
- private String exchangeName;
-
-
- /**
- * 订阅模式(扇形模式)生产者
- *
- * @param context
- */
- @GetMapping("/fanout/{context}")
- public void sendMessage(@PathVariable String context) {
- System.out.println("需要发送得内容为:" + context);
- rabbitTemplate.convertAndSend(exchangeName, "", context);
- }

消费者
注意:消费者绑定交换机和通道的值必须是固定常量值,所以我们直接从配置文件中读取
- @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbitmq.fanout.exchange.name1}", type = ExchangeTypes.FANOUT),
- value = @Queue(value = "${rabbitmq.fanout.queue.name1}")))
- @RabbitHandler
- public void consumerFanoutExchange(String context) {
- System.out.println("订阅模式,通道一接收到的内容为内容:" + context);
- }
-
- @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbitmq.fanout.exchange.name1}", type = ExchangeTypes.FANOUT),
- value = @Queue(value = "${rabbitmq.fanout.queue.name2}")))
- @RabbitHandler
- public void consumerFanoutExchange2(String context) {
- System.out.println("订阅模式,通道二接收到的内容为内容:" + context);
- }
生产者
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- // 路由器模式的交换机名称
- @Value("${rabbitmq.direct.exchange.name1}")
- private String directName;
-
- // 路由器模式的路由key1
- @Value("${rabbitmq.direct.routing.name1}")
- private String directRoutingKeyName1;
-
- //路由器模式的路由key2
- @Value("${rabbitmq.direct.routing.name2}")
- private String directRoutingKeyName2;
-
- /**
- * 路由器模式
- * @param context
- * @param routingkey
- */
- @GetMapping("/direct")
- public void sendMessageDirect(@RequestParam("context") String context, @RequestParam("routingkey") Integer routingkey) {
- if (1 == routingkey) {
- rabbitTemplate.convertAndSend(directName, directRoutingKeyName1, context);
- } else if (2 == routingkey) {
- rabbitTemplate.convertAndSend(directName, directRoutingKeyName2, context);
- } else {
- System.out.println("数据非法");
- }
- }

消费者
-
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(value = "${rabbitmq.direct.exchange.name1}", type = ExchangeTypes.DIRECT),
- value = @Queue(value = "${rabbitmq.direct.queue.name1}"),
- key = "${rabbitmq.direct.routing.name1}"))
- public void exchangeDirectRoutingKey1(String context, Message message) {
- System.out.println("key1:" + message.getMessageProperties().getReceivedRoutingKey());
- System.out.println("路由器模式1 接收到的消息为:" + context);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(value = "${rabbitmq.direct.exchange.name1}", type = ExchangeTypes.DIRECT),
- value = @Queue(value = "${rabbitmq.direct.queue.name2}"),
- key = "${rabbitmq.direct.routing.name2}"))
- public void exchangeDirectRoutingKey2(String context, Message message) {
- System.out.println("key2:" + message.getMessageProperties().getReceivedRoutingKey());
- System.out.println("路由器模式2 接收到的消息为" + context);
- }

生产者
- // 主题模式的交换机名称
- @Value("${rabbitmq.topic.exchange.name1}")
- private String topicName;
-
- //用来匹配主题模式对应的key
- public static final String EXCHANGE_TOPIC_CASE_KEY_1 = "topic.routingkey.case1";
-
- //如果case_key_2这样写,那么绑定case_key_1的队列一样会接收到,因为case_key_2也一样和key1匹配上
- public static final String EXCHANGE_TOPIC_CASE_KEY_2 = "routingkey.case2";
-
- @GetMapping("/topic")
- public void sendMessageTopic(@RequestParam("context") String context, @RequestParam("routingkey") Integer routingkey) {
- if (1 == routingkey) {
- rabbitTemplate.convertAndSend(topicName, EXCHANGE_TOPIC_CASE_KEY_1, context + routingkey);
- } else if (2 == routingkey) {
- rabbitTemplate.convertAndSend(topicName, EXCHANGE_TOPIC_CASE_KEY_2, context + routingkey);
- } else {
- System.out.println("数据非法");
- }
- }

消费者
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(value = "${rabbitmq.topic.exchange.name1", type = ExchangeTypes.TOPIC),
- value = @Queue(value = "${rabbitmq.topic.queue.name1}"),
- key = "${rabbitmq.topic.routing.name1}"))
- @RabbitHandler
- public void exchangeTopicRoutingKey1(String context, Message message) {
- System.out.println("key1:"+message.getMessageProperties().getReceivedRoutingKey());
- System.out.println("主题模式1:接收的内容为:"+ context);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(value = "${rabbitmq.topic.exchange.name1", type = ExchangeTypes.TOPIC),
- value = @Queue(value = "${rabbitmq.topic.queue.name2}"),
- key = "${rabbitmq.topic.routing.name2}"))
- @RabbitHandler
- public void exchangeTopicRoutingKey2(String context, Message message) {
- System.out.println("key2:"+message.getMessageProperties().getReceivedRoutingKey());
- System.out.println("主题模式2:接收的内容为:"+ context);
- }

生产者
- @RestController
- @RequiredArgsConstructor
- @RequestMapping("/test/mq")
- public class ProductController {
-
- private final RabbitTemplate rabbitTemplate;
-
- // 订阅模式的交换机名称
- @Value("${rabbitmq.fanout.exchange.name1}")
- private String exchangeName;
-
- // 路由器模式的交换机名称
- @Value("${rabbitmq.direct.exchange.name1}")
- private String directName;
-
- // 路由器模式的路由key1
- @Value("${rabbitmq.direct.routing.name1}")
- private String directRoutingKeyName1;
-
- //路由器模式的路由key2
- @Value("${rabbitmq.direct.routing.name2}")
- private String directRoutingKeyName2;
-
- // 主题模式的交换机名称
- @Value("${rabbitmq.topic.exchange.name1}")
- private String topicName;
-
- //用来匹配主题模式对应的key
- public static final String EXCHANGE_TOPIC_CASE_KEY_1 = "topic.routingkey.case1";
-
- //如果case_key_2这样写,那么绑定case_key_1的队列一样会接收到,因为case_key_2也一样和key1匹配上
- public static final String EXCHANGE_TOPIC_CASE_KEY_2 = "routingkey.case2";
-
-
- /**
- * 订阅模式(扇形模式)生产者
- *
- * @param context
- */
- @GetMapping("/fanout/{context}")
- public void sendMessage(@PathVariable String context) {
- System.out.println("需要发送得内容为:" + context);
- rabbitTemplate.convertAndSend(exchangeName, "", context);
- }
-
- /**
- * 路由器模式
- * @param context
- * @param routingkey
- */
- @GetMapping("/direct")
- public void sendMessageDirect(@RequestParam("context") String context, @RequestParam("routingkey") Integer routingkey) {
- if (1 == routingkey) {
- rabbitTemplate.convertAndSend(directName, directRoutingKeyName1, context);
- } else if (2 == routingkey) {
- rabbitTemplate.convertAndSend(directName, directRoutingKeyName2, context);
- } else {
- System.out.println("数据非法");
- }
- }
-
- @GetMapping("/topic")
- public void sendMessageTopic(@RequestParam("context") String context, @RequestParam("routingkey") Integer routingkey) {
- if (1 == routingkey) {
- rabbitTemplate.convertAndSend(topicName, EXCHANGE_TOPIC_CASE_KEY_1, context + routingkey);
- } else if (2 == routingkey) {
- rabbitTemplate.convertAndSend(topicName, EXCHANGE_TOPIC_CASE_KEY_2, context + routingkey);
- } else {
- System.out.println("数据非法");
- }
- }
- }

消费者
- @Component
- public class ConsumerTest {
-
-
- @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbitmq.fanout.exchange.name1}", type = ExchangeTypes.FANOUT),
- value = @Queue(value = "${rabbitmq.fanout.queue.name1}")))
- @RabbitHandler
- public void consumerFanoutExchange(String context) {
- System.out.println("订阅模式,通道一接收到的内容为内容:" + context);
- }
-
- @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbitmq.fanout.exchange.name1}", type = ExchangeTypes.FANOUT),
- value = @Queue(value = "${rabbitmq.fanout.queue.name2}")))
- @RabbitHandler
- public void consumerFanoutExchange2(String context) {
- System.out.println("订阅模式,通道二接收到的内容为内容:" + context);
- }
-
-
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(value = "${rabbitmq.direct.exchange.name1}", type = ExchangeTypes.DIRECT),
- value = @Queue(value = "${rabbitmq.direct.queue.name1}"),
- key = "${rabbitmq.direct.routing.name1}"))
- public void exchangeDirectRoutingKey1(String context, Message message) {
- System.out.println("key1:" + message.getMessageProperties().getReceivedRoutingKey());
- System.out.println("路由器模式1 接收到的消息为:" + context);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(value = "${rabbitmq.direct.exchange.name1}", type = ExchangeTypes.DIRECT),
- value = @Queue(value = "${rabbitmq.direct.queue.name2}"),
- key = "${rabbitmq.direct.routing.name2}"))
- public void exchangeDirectRoutingKey2(String context, Message message) {
- System.out.println("key2:" + message.getMessageProperties().getReceivedRoutingKey());
- System.out.println("路由器模式2 接收到的消息为" + context);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(value = "${rabbitmq.topic.exchange.name1", type = ExchangeTypes.TOPIC),
- value = @Queue(value = "${rabbitmq.topic.queue.name1}"),
- key = "${rabbitmq.topic.routing.name1}"))
- @RabbitHandler
- public void exchangeTopicRoutingKey1(String context, Message message) {
- System.out.println("key1:"+message.getMessageProperties().getReceivedRoutingKey());
- System.out.println("主题模式1:接收的内容为:"+ context);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(value = "${rabbitmq.topic.exchange.name1", type = ExchangeTypes.TOPIC),
- value = @Queue(value = "${rabbitmq.topic.queue.name2}"),
- key = "${rabbitmq.topic.routing.name2}"))
- @RabbitHandler
- public void exchangeTopicRoutingKey2(String context, Message message) {
- System.out.println("key2:"+message.getMessageProperties().getReceivedRoutingKey());
- System.out.println("主题模式2:接收的内容为:"+ context);
- }
-
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。