赞
踩
经常开发的人都知道、RabbitMq常用于并发、流量大的场景,因为RabbitMq属于中间件需要维护,所以一般小项目几乎不会使用。而在于大型的并发环境下,大量的流量积压到接口中,使Mysql连接分配出现不够使用的情况,此时就可以使用RabbitMq来解决。
削峰:
当流量洪峰到达接口时,可以用现实中来举例子,mq就相当于一个独木桥,mysq就相当于河对岸,使大量的人从容有序的排队过河,而不会出现所有人全部淌水过河到河对岸,大大减少MySQL的压力。
异步:
通常采用异步通知的方式,就好比我们在抢票的时候,点击提交,系统会返回一个提示正在努力抢票中,而实际上你的订单正在mq队列中排队处理,处理结果会在后续异步通知结果。
解耦:
解耦主要两方面:
1.生产消息的应用 和 消费消息的应用不是同一种语言可以解耦
2.生产消息的应用 宕机,不会影响到消费者消费消息
市面上比较火爆的几款MQ:
ActiveMQ,RocketMQ,Kafka,RabbitMQ。
语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
学习成本:RabbitMQ非常简单。
RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。
RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。
支持多语言:RabbitMQ,Kafaka ;ActiveMQ,RocketMQ只支持java
传输速度:RabbitMQ微秒级,其他毫秒级别
吞吐量:kafka 吞吐量和磁盘性能* 集群数量相关 次之RocketMQ
消息高可靠:每个一个都可以保证不丢失,不重复
我本机采用docker安装,比较简洁方便
第一步docker命令创建文件夹:mkdir rabbit
切换到刚刚创建的文件夹中:cd rabbit/
创建配置文件:vim docker-compose.yaml
将下面配置信息粘贴进去:
version: "3.1" services: rabbitmq: image: daocloud.io/library/rabbitmq:management restart: always container_name: rabbitmq ports: - 5672:5672 - 15672:15672 volumes: - ./data:/var/lib/rabbitmq
按esc按键然后按下:输入wq保存并退出
启动rabbitmq:docker-compose up
游览器输入虚拟机地址+15672端口号访问RabbitMq可视化界面 默认用户名密码都是guest
Publisher - 生产者:发布消息到RabbitMQ中的Exchange
Consumer - 消费者:监听RabbitMQ中的Queue中的消息
Exchange - 交换机:和生产者建立连接并接收生产者的消息
Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
Routes - 路由:交换机以什么样的策略将消息发布到Queue
主要模式:
Simple Work Queue (简单工作队列):也就是常说的点对点模式,一条消息由一个消费者进行消费。(当有多个消费者时,默认使用轮训机制把消息分配给消费者)。 Work Queues (工作队列):也叫公平队列,能者多劳的消息队列模型。队列必须接收到来自消费者的 手动ack 才可以继续往消费者发送消息。 Publish/Subscribe (发布订阅模式):一条消息被多个消费者消费。 Routing(路由模式):有选择的接收消息。 Topics (主题模式):通过一定的规则来选择性的接收消息 RPC 模式:发布者发布消息,并且通过 RPC 方式等待结果。目前这个应该场景少,而且代码也较为复杂
交换机类型:
direct(直连交换机):将队列绑定到交换机,消息的 routeKey 需要与队列绑定的 routeKey 相同。 fanout (扇形交换机):不处理 routeKey ,直接把消息转发到与其绑定的所有队列中。 topic(主题交换机):根据一定的规则,根据 routeKey 把消息转发到符合规则的队列中,其中 # 用于匹配符合一个或者多个词(范围更广), * 用于匹配一个词。 headers (头部交换机):根据消息的 headers 转发消息而不是根据 routeKey 来转发消息, 其中 header 是一个 Map,也就意味着不仅可以匹配字符串类型,也可以匹配其他类型数据。 规则可以分为所有键值对匹配或者单一键值对匹配。
导入依赖
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
这里有两种方式整合RabbitMq
第一种采用其本身的框架 获取连接
- package com.wwy.config;
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 9:39
- */
-
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 配置获取RabbitMq的静态方法
- */
- public class RabbitMqUtils {
-
-
- public static Connection getConnection() {
- ConnectionFactory factory = new ConnectionFactory(); //创建连接工厂
- //设置相关属性
- factory.setUsername("guest");
- factory.setPassword("guest");
- factory.setVirtualHost("/");
- factory.setHost("192.168.60.139");
- factory.setPort(5672);
- try {
- //获取连接
- Connection conn = factory.newConnection();
- return conn;
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- } catch (TimeoutException e) {
- e.printStackTrace();
- return null;
- }
-
- }
- }
创建生产者
- package com.wwy.producter;
-
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.wwy.config.RabbitMqUtils;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.io.IOException;
- import java.util.Objects;
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 9:52
- */
-
- @RestController
- @RequestMapping(value = "/test")
- public class producer {
-
- private final static String QUERE_NAME = "quere_name";
-
- @GetMapping(value = "/sendMessage")
- public String sendMessage(String message) {
- System.out.println(message);
- Connection connection = RabbitMqUtils.getConnection();
- if (Objects.nonNull(connection)) {
- try {
- Channel channel = connection.createChannel();
- // 参数1:指定exchange,使用""。 最简模式(helloword) 使用默认交换机
- // 参数2:指定路由的规则,
- // 使用具体的队列名称。
- // 参数2可以是队列名 也可以是路由规则
-
- // 参数3:指定传递的消息所携带的properties,使用null。
- // 参数4:指定发布的具体消息,byte[]类型
- channel.basicPublish("", QUERE_NAME, null, "马上下课".getBytes("utf-8"));
- return "发送消息成功!";
- } catch (IOException e) {
- e.printStackTrace();
- return "发送消息失败!";
- }
- }
- return "mq初始化失败!";
- }
-
- }
消费者消费消息
- package com.wwy.consumer;
-
- import com.rabbitmq.client.*;
- import com.wwy.config.RabbitMqUtils;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Service;
- import org.springframework.util.CollectionUtils;
-
- import java.io.IOException;
- import java.util.Objects;
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 10:33
- */
- @Service
- public class ConsumerTest {
-
- private final static String QUERE_NAME = "quere_name";
- @Bean
- public void consumeMessage() {
- Connection connection = RabbitMqUtils.getConnection();
- if(Objects.nonNull(connection)){
- try {
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUERE_NAME,true,false,false,null);
- // 第二步创建消费者
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
- // byte[] body 就是消费者得到的数据
-
- System.out.println("消费者 得到消息body = " + new String(body,"utf-8"));
-
- }
- };
- channel.basicConsume(QUERE_NAME,true,consumer);
-
- // 让当前程序卡在 这里
- System.in.read();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
消费者获取消息的另一种方式(官网)
- package com.wwy.consumer;
-
- import com.rabbitmq.client.*;
- import com.wwy.config.RabbitMqUtils;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Service;
- import org.springframework.util.CollectionUtils;
-
- import java.io.IOException;
- import java.util.Objects;
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 10:33
- */
- @Service
- public class ConsumerTest {
-
- private final static String QUERE_NAME = "quere_name";
- @Bean
- public void consumeMessage() {
- Connection connection = RabbitMqUtils.getConnection();
- if(Objects.nonNull(connection)){
- try {
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUERE_NAME,true,false,false,null);
- // // 第二步创建消费者
- // Consumer consumer = new DefaultConsumer(channel){
- // @Override
- // public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //
- // // byte[] body 就是消费者得到的数据
- //
- // System.out.println("消费者 得到消息body = " + new String(body,"utf-8"));
- //
- // }
- // };
- // channel.basicConsume(QUERE_NAME,true,consumer);
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- };
- channel.basicConsume(QUERE_NAME, true, deliverCallback, consumerTag -> { });
- // 让当前程序卡在 这里
- System.in.read();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
启动程序访问接口
生产者生产成功
消费者收到生产者的消息
使用封装好的RabbitTemplate进行操作,比较方便快捷
第一步配置信息
- server:
- port: 8083
- spring:
- rabbitmq: # 单机版配置
- host: 192.168.60.139
- port: 5672
- username: guest #账户名密码默认都是guest
- password: guest
- publisher-confirm-type: simple
- publisher-returns: true
- listener:
- simple:
- acknowledge-mode: manual
生产者:
- package com.wwy.producter;
-
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.wwy.config.RabbitMqUtils;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
-
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 9:52
- */
-
- @RestController
- @RequestMapping(value = "/test")
- public class producer {
-
- private final static String QUERE_NAME = "quere_name";
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- @GetMapping(value = "/sendMessage")
- public String sendMessage(String message) {
- System.out.println(message);
- rabbitTemplate.convertAndSend(QUERE_NAME,message);
- return "发送成功!";
- }
-
- }
消费者:
- package com.wwy.consumer;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 10:33
- */
- @Component
- public class ConsumerTest {
-
- private final static String QUERE_NAME = "quere_name";
-
-
- @RabbitListener(queues = QUERE_NAME)
- public void handleMessage(String msg) {
- System.out.println("listener 收到消息4 " + msg);
- }
- }
运行
生产者:
- package com.wwy.producter;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.MessageProperties;
- import com.wwy.config.RabbitMqUtils;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.util.CollectionUtils;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
- import java.io.IOException;
- import java.util.Objects;
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 13:53
- */
- @RestController
- @RequestMapping(value = "/workQueues")
- public class WorkQueuesProducer {
-
- private final static String WORK_QUEUES = "work_queues";
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
-
- //第一种
- @GetMapping(value = "/workQueuesSendMessage")
- public String workQueuesSendMessage(String message) {
- System.out.println("接收消息");
- //第一种生产消息方法
- //获取连接
- Connection connection = RabbitMqUtils.getConnection();
- if (Objects.nonNull(connection)) {
- try {
- Channel channel = connection.createChannel();
- //发送消息
- for (int i = 0; i < 10; i++) {
- channel.basicPublish("", WORK_QUEUES, null, (message + i).getBytes("utf-8"));
- }
- return "发送成功!";
- } catch (IOException e) {
- e.printStackTrace();
- return "发送失败!";
- }
- }
- return "初始mq失败!";
- }
-
- //第二种
- @GetMapping(value = "/workQueuesSendMessage01")
- public String workQueuesSendMessage01(String message) {
- System.out.println("接收消息");
- //发送消息
- for (int i = 0; i < 10; i++) {
- System.out.println(i);
- rabbitTemplate.convertAndSend(WORK_QUEUES, message);
- }
- return "发送成功!";
- }
-
-
- }
消费者消费消息
第一种:
- package com.wwy.consumer;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.DeliverCallback;
- import com.wwy.config.RabbitMqUtils;
- import org.springframework.boot.SpringBootConfiguration;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.core.annotation.Order;
- import org.springframework.stereotype.Service;
-
-
- import javax.annotation.Resource;
- import java.io.IOException;
- import java.util.Objects;
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 14:13
- */
- @Service
- public class WorkQueuesConsumerOne {
- private final static String WORK_QUEUES = "work_queues";
-
- @Bean
- public void getMessageInfoOne() {
- Connection connection = RabbitMqUtils.getConnection();
- if (Objects.nonNull(connection)) {
- try {
- Channel channel = connection.createChannel();
- channel.queueDeclare(WORK_QUEUES, true, false, false, null);
- //设置每次消费消息的数量
- channel.basicQos(1);
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] 队列1消息内容 '" + message + "'");
- System.out.println("队列1获取到消息");
- //手动ACK
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- };
- channel.basicConsume(WORK_QUEUES, false, deliverCallback, consumerTag -> {
- System.out.println("队列1消息消费被中断");
- });
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
- }
-
- @Bean
- public void getMessageInfoTwo() {
- Connection connection = RabbitMqUtils.getConnection();
- if (Objects.nonNull(connection)) {
- try {
- Channel channel = connection.createChannel();
- channel.queueDeclare(WORK_QUEUES, true, false, false, null);
-
- //设置每次消费消息的数量
- channel.basicQos(1);
- System.out.println("队列2获取到消息");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] 队列2消息内容 '" + message + "'");
- //手动ACK
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- };
- channel.basicConsume(WORK_QUEUES, false, deliverCallback, consumerTag -> {
- System.out.println("队列2消息消费被中断");
- });
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- }
第二种注解方式:
- package com.wwy.consumer;
-
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 15:10
- */
-
- @Component
- public class WorkQueuesConsumersOne {
-
- public final static String WORK_QUEUES = "work_queues";
-
-
- @RabbitListener(queues = WORK_QUEUES)
- public void consumerOne(String message) {
- System.out.println("队列1收到消息"+message);
- }
-
- @RabbitListener(queues = WORK_QUEUES)
- public void consumerTwo(String message) {
- System.out.println("队列2收到消息"+message);
- }
- }
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的
X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
第一种生产者方式:
- package com.wwy.producter;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.wwy.config.RabbitMqUtils;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.io.IOException;
- import java.util.Objects;
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 16:54
- */
-
- @RestController
- @RequestMapping(value = "/publishProducer")
- public class PublishProducer {
-
-
- private final static String PUBLISH_PRODUCER1 = "publish_name_one";
- private final static String PUBLISH_PRODUCER2 = "publish_name_two";
-
- //交换机名称
- private final static String EXCHANGE_NAME = "publish_exchange";
-
- @GetMapping(value = "/publishSendMessage")
- public String publishSendMessage(String message) {
- Connection connection = RabbitMqUtils.getConnection();
- if (Objects.nonNull(connection)) {
- Channel channel = null;
- try {
- channel = connection.createChannel();
- // 绑定交换机
- //参数1: exchange的名称
- //参数2: 指定exchange的类型
- // FANOUT - pubsub , 发布订阅
- // DIRECT - Routing , 路由模式
- // TOPIC - Topics topic
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
- //给交换机绑定对应的队列
- //将队列和交换机绑定
- //String var1, 队列名
- //String var2, 交换机名
- //String var3, 对应绑定队列 路由规则 "" 没有规则所有的队列消息一样
- channel.queueBind(PUBLISH_PRODUCER1, EXCHANGE_NAME, "");
- channel.queueBind(PUBLISH_PRODUCER2, EXCHANGE_NAME, "");
- // 参数1:指定exchange,使用""。 最简模式(helloword) 使用默认交换机
- // 参数2:指定路由的规则,
- // 使用具体的队列名称。
- // 参数2可以是队列名 也可以是路由规则
-
- // 参数3:指定传递的消息所携带的properties,使用null。
- // 参数4:指定发布的具体消息,byte[]类型
-
- for (int i = 1; i < 11; i++) {
- // 消息向交换机发送,没有匹配路由规则
- channel.basicPublish(EXCHANGE_NAME, PUBLISH_PRODUCER1, null, (message + i).getBytes("utf-8"));
- }
- return "发送成功!";
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- return "mq出错!";
- }
- }
消费者消费信息:
- package com.wwy.consumer;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.DeliverCallback;
- import com.wwy.config.RabbitMqUtils;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
- import java.util.Objects;
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 17:06
- */
- @Component
- public class PublishConsumerOne {
-
- private final static String PUBLISH_PRODUCER1 = "publish_name_one";
- private final static String PUBLISH_PRODUCER2 = "publish_name_two";
-
- //交换机名称
- private final static String EXCHANGE_NAME = "publish_exchange";
-
- @Bean
- public void publishGetInfo() {
- Connection connection = RabbitMqUtils.getConnection();
- if (Objects.nonNull(connection)) {
- try {
- Channel channel = connection.createChannel();
- //保证消费者 每次只消费一条消息
- channel.basicQos(1);
-
- // 第一步声明 队列
- channel.queueDeclare(PUBLISH_PRODUCER1,true,false,false,null);
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- channel.queueBind(PUBLISH_PRODUCER1, EXCHANGE_NAME, "");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" 队列一消费消息" + message);
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- };
- channel.basicConsume(PUBLISH_PRODUCER1, true, deliverCallback, consumerTag -> {
- });
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Bean
- public void publishGetInfoTwo() {
- Connection connection = RabbitMqUtils.getConnection();
- if (Objects.nonNull(connection)) {
- try {
- Channel channel = connection.createChannel();
- //保证消费者 每次只消费一条消息
- channel.basicQos(1);
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- channel.queueDeclare(PUBLISH_PRODUCER2,true,false,false,null);
- channel.queueBind(PUBLISH_PRODUCER2, EXCHANGE_NAME, "");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" 队列二消费消息" + message);
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- };
- channel.basicConsume(PUBLISH_PRODUCER2, true, deliverCallback, consumerTag -> {
- });
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
运行结果可知,两个消费者都能获取到信息,此种情况适合用户注册业务,一个队列接收短信发送,一个队列接收邮件发送
第二种简便方式 生产者
-
- @GetMapping(value = "/PublicSubscribe")
- public void PublicSubscribe() {
- rabbitTemplate.convertAndSend("publish_exchange_one", "", "发布订阅模式", new CorrelationData("我是大帅逼"));
- }
消费者:
- package com.wwy.consumer;
-
- import lombok.extern.apachecommons.CommonsLog;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.boot.SpringBootConfiguration;
- import org.springframework.stereotype.Component;
-
- /**
- * @author 王伟羽
- * @date 2024/3/13 17:49
- */
-
- @Component
- public class PublishConsumerTwo {
-
- @RabbitListener(queues = "publish_queue_one")
- public void publishOne(String message) {
- System.out.println("队列1收到的消息" + message);
- }
-
-
- @RabbitListener(queues = "publish_queue_two")
- public void publishTwo(String message) {
- System.out.println("队列2收到的消息" + message);
- }
- }
由于上述第一种方法太过于繁琐,所以主题模式只采用第二种方法,第一种在后续代码里展示
配置主题模式下的队列、交换机并将其绑定起来
- package com.wwy.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.boot.SpringBootConfiguration;
- import org.springframework.context.annotation.Bean;
-
- /**
- * @author 王伟羽
- * @date 2024/3/14 9:29
- */
- @SpringBootConfiguration
- public class TopicConfiguration {
-
- @Bean
- public TopicExchange getTopicExchange(){
- return new TopicExchange("topic_exchange_one");
- }
-
- @Bean
- public Queue getTopicQueueOne(){
- return new Queue("topic_queue_one");
- }
-
- @Bean
- public Queue getTopicQueueTwo(){
- return new Queue("topic_queue_two");
- }
-
- @Bean
- public Queue getTopicQueueThree(){
- return new Queue("topic_queue_three");
- }
- //* 代表一个词
- //# 代表零个或者多个词
- @Bean
- public Binding getTopicBindingOne(){
- return BindingBuilder.bind(getTopicQueueOne()).to(getTopicExchange()).with("a.*");
- }
-
- @Bean
- public Binding getTopicBindingThree(){
- return BindingBuilder.bind(getTopicQueueThree()).to(getTopicExchange()).with("a.#");
- }
-
-
- @Bean
- public Binding getTopicBindingTwo(){
- return BindingBuilder.bind(getTopicQueueTwo()).to(getTopicExchange()).with("a.111");
- }
- }
创建生产者:
- package com.wwy.producter;
-
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
- import java.io.UnsupportedEncodingException;
-
- /**
- * @author 王伟羽
- * @date 2024/3/14 9:36
- */
- @RestController
- @RequestMapping(value = "/topic")
- public class TopicProducer {
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- @GetMapping(value = "/sendTopicMessage")
- public String sendTopicMessage(String message){
- try {
- rabbitTemplate.convertAndSend("topic_exchange_one","a.123",message.getBytes("utf-8"));
- return "生产者发送消息成功";
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- return "发送消息失败!";
- }
- }
- }
创建消费者:
- package com.wwy.consumer;
-
- import lombok.extern.apachecommons.CommonsLog;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * @author 王伟羽
- * @date 2024/3/14 9:45
- */
-
- @Component
- public class TopicConsumerOne {
-
- @RabbitListener(queues = "topic_queue_one")
- public void getTopicMessageOne(String message) {
- System.out.println("队列一收到消息:" + message);
- }
-
- @RabbitListener(queues = "topic_queue_two")
- public void getTopicMessageTwo(String message) {
- System.out.println("队列二收到消息:" + message);
- }
-
- @RabbitListener(queues = "topic_queue_three")
- public void getTopicMessageThree(String message) {
- System.out.println("队列三收到消息:" + message);
- }
- }
这里有一个问题,在每次消费端重启的时候会继续消费队列里的数据,为了防止这种情况,可以消费者在消费到数据的时候进行手动ack
- package com.wwy.consumer;
-
- import com.rabbitmq.client.Channel;
- import lombok.extern.apachecommons.CommonsLog;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.messaging.handler.annotation.Payload;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
- /**
- * @author 王伟羽
- * @date 2024/3/14 9:45
- */
-
- @Component
- public class TopicConsumerOne {
-
- @RabbitListener(queues = "topic_queue_one")
- public void getTopicMessageOne(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
- try {
- // 获取消息内容
- String messageBody = new String(message.getBody());
- System.out.println("队列一收到消息:"+messageBody);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- channel.basicAck(deliveryTag, false);
- System.out.println("队列一收到ack");
- } catch (Exception e) {
- // 如果处理消息时出现异常,可以拒绝消息
- Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
- if (!channel.isOpen()) {
- // 如果channel已经关闭,则无法执行basicNack或basicReject
- return;
- }
- try {
- channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- // 或者可以选择 basicReject 如果不需要重新放回队列
- // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
- }
- }
-
- @RabbitListener(queues = "topic_queue_two")
- public void getTopicMessageTwo(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
- try {
- // 获取消息内容
- String messageBody = new String(message.getBody());
- System.out.println("队列二收到消息:"+messageBody);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- channel.basicAck(deliveryTag, false);
- System.out.println("队列二收到ack");
- } catch (Exception e) {
- // 如果处理消息时出现异常,可以拒绝消息
- Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
- if (!channel.isOpen()) {
- // 如果channel已经关闭,则无法执行basicNack或basicReject
- return;
- }
- try {
- channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- // 或者可以选择 basicReject 如果不需要重新放回队列
- // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
- }
- }
-
- @RabbitListener(queues = "topic_queue_three")
- public void getTopicMessageThree(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
- try {
- // 获取消息内容
- String messageBody = new String(message.getBody());
- System.out.println("队列三收到消息:"+messageBody);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- channel.basicAck(deliveryTag, false);
- System.out.println("队列三收到ack");
- } catch (Exception e) {
- // 如果处理消息时出现异常,可以拒绝消息
- Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
- if (!channel.isOpen()) {
- // 如果channel已经关闭,则无法执行basicNack或basicReject
- return;
- }
- try {
- channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- // 或者可以选择 basicReject 如果不需要重新放回队列
- // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
- }
- }
- }
路由模式几乎与主题模式相同,也是通过key去发送到对应的消费者中去
配置队列,交换机并将其绑定到一起
- package com.wwy.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.boot.SpringBootConfiguration;
- import org.springframework.context.annotation.Bean;
-
- /**
- * @author 王伟羽
- * @date 2024/3/14 10:49
- */
-
- @SpringBootConfiguration
- public class RouterConfiguration {
-
-
- @Bean
- public DirectExchange getRouterExchange(){
- return new DirectExchange("router_exchange_one");
- }
-
- @Bean
- public Queue getRouterQueueOne(){
- return new Queue("router_queue_one");
- }
-
- @Bean
- public Queue getRouterQueueTwo(){
- return new Queue("router_queue_two");
- }
-
- @Bean
- public Queue getRouterQueueThree(){
- return new Queue("router_queue_three");
- }
-
- @Bean
- public Binding getRouterBindingOne(){
- return BindingBuilder.bind(getRouterQueueOne()).to(getRouterExchange()).with("aaa");
- }
-
- @Bean
- public Binding getRouterBindingThree(){
- return BindingBuilder.bind(getRouterQueueThree()).to(getRouterExchange()).with("bbb");
- }
-
-
- @Bean
- public Binding getRouterBindingTwo(){
- return BindingBuilder.bind(getRouterQueueTwo()).to(getRouterExchange()).with("ccc");
- }
-
-
- }
生产者:
- package com.wwy.producter;
-
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
- import java.io.UnsupportedEncodingException;
-
- /**
- * @author 王伟羽
- * @date 2024/3/14 10:48
- */
- @RestController
- @RequestMapping(value = "/router")
- public class RouterProducer {
-
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- @GetMapping(value = "/sendRouterMessage")
- public String sendTopicMessage(String message){
- try {
- rabbitTemplate.convertAndSend("router_exchange_one","aaa",message.getBytes("utf-8"));
- return "生产者发送消息成功";
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- return "发送消息失败!";
- }
- }
- }
消费者:
- package com.wwy.consumer;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.messaging.handler.annotation.Payload;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
- /**
- * @author 王伟羽
- * @date 2024/3/14 10:57
- */
- @Component
- public class RouterConsumer {
-
- @RabbitListener(queues = "router_queue_one")
- public void getTopicMessageOne(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
- try {
- // 获取消息内容
- String messageBody = new String(message.getBody());
- System.out.println("队列一收到消息:"+messageBody);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- channel.basicAck(deliveryTag, false);
- System.out.println("队列一收到ack");
- } catch (Exception e) {
- // 如果处理消息时出现异常,可以拒绝消息
- Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
- if (!channel.isOpen()) {
- // 如果channel已经关闭,则无法执行basicNack或basicReject
- return;
- }
- try {
- channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- // 或者可以选择 basicReject 如果不需要重新放回队列
- // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
- }
- }
-
- @RabbitListener(queues = "router_queue_two")
- public void getTopicMessageTwo(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
- try {
- // 获取消息内容
- String messageBody = new String(message.getBody());
- System.out.println("队列二收到消息:"+messageBody);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- channel.basicAck(deliveryTag, false);
- System.out.println("队列二收到ack");
- } catch (Exception e) {
- // 如果处理消息时出现异常,可以拒绝消息
- Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
- if (!channel.isOpen()) {
- // 如果channel已经关闭,则无法执行basicNack或basicReject
- return;
- }
- try {
- channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- // 或者可以选择 basicReject 如果不需要重新放回队列
- // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
- }
- }
-
- @RabbitListener(queues = "router_queue_three")
- public void getTopicMessageThree(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
- try {
- // 获取消息内容
- String messageBody = new String(message.getBody());
- System.out.println("队列三收到消息:"+messageBody);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- channel.basicAck(deliveryTag, false);
- System.out.println("队列三收到ack");
- } catch (Exception e) {
- // 如果处理消息时出现异常,可以拒绝消息
- Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
- if (!channel.isOpen()) {
- // 如果channel已经关闭,则无法执行basicNack或basicReject
- return;
- }
- try {
- channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- // 或者可以选择 basicReject 如果不需要重新放回队列
- // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
- }
- }
- }
这里生产者发送消息指定了key为aaa的,所以只有消费者一匹配并接收到消息
以上就是本次测试的所以队列名,大家可以在测试的时候进入可视化界面查看消息、队列、交换机状态。本文只是简单的对RabbitMq的各种模式进行简单了解,后续的如何在项目中实现、死信队列等在下章博客分享,对于本篇如有错误的地方欢迎大家指正。
wangweiyuyu/rabbitmq - 码云 - 开源中国 (gitee.com)https://gitee.com/wangweiyuyu/rabbitmq
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。