赞
踩
作者:ChenZhen
博客地址:https://www.chenzhen.space/
版权:来自b站视频
【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】【尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmq】
将视频中的笔记整理出来
如果对你有帮助,请给一个小小的star⭐
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包
裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是
一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,
存储和转发消息数据。
文件上传
安装文件:
链接:https://pan.baidu.com/s/1YA0-pBKXDbDMR9UnBGpe2g?pwd=7mo0
提取码:7mo0
上传我提供文件到/usr/local/software 目录下(如果没有 software 需要自己创建)
3. 安装文件(分别按照以下顺序安装)
1.安装socat 插件
yum install -y socat
2.安装erlang,erlang是rabbitmq需要的环境语言,el7表示可以在linux7上安装
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
rpm -ivh openssl-libs-1.0.2k-19.el7.x86_64.rpm --force
dnf install libnsl
安装完依赖和类库之后,继续执行安装erlang
3.安装rabbitmq
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
分别输入上面给出的3条命令
添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
启动服务
/sbin/service rabbitmq-server start
查看服务状态
/sbin/service rabbitmq-server status
停止服务
/sbin/service rabbitmq-server stop
开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
重新启动服务
/sbin/service rabbitmq-server stop
/sbin/service rabbitmq-server start
使用
uname - a
可以查看版本
web管理界面会在15672端口开启,所以我们需要将15672端口开放( 需要重启防火墙服务)
firewall-cmd --zone=public --add-port=15672/tcp --permanent
重启防火墙服务
firewall-cmd --reload
访问地址 http://192.168.43.128:15672/
将ip地址改为你们自己的linux服务器的ip地址
初始账户密码都为:guest
但是一开始用默认账号密码(guest)访问会出现权限问题,guest账户只能在localhost(本机)下面访问,所以需要创建一个管理员账号我们需要添加一个新的用户来进行登录。
添加一个新的用户
创建账号(用户名:admin,密码:123)
rabbitmqctl add_user admin 123
设置用户角色(设置admin的角色为administrator)
rabbitmqctl set_user_tags admin administrator
设置用户权限
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
用户 user_admin 具有/ 这个 virtual host 中所有资源的配置、写、读权限
查询当前用户和角色
rabbitmqctl list_users
再次利用 admin 用户登录,成功登录
我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。我们将介绍 Java API 中的一些细节。
在这之前我们要先将Linux服务器的5672端口开放,用于tcp连接。
firewall-cmd --zone=public --add-port=5672/tcp --permanent
重启防火墙服务
firewall-cmd --reload
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
java代码
public class Producer { //队列名称 public static final String QUEUE_NAME = "queue1"; //发消息 public static void main(String[] args) throws IOException, TimeoutException { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //工厂IP 连接RabbitMQ队列 factory.setHost("192.168.43.128"); //用户名 factory.setUsername("admin"); //密码 factory.setPassword("123"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); /** * 生成队列 * 1.队列名称 * 2.队列里面的消息是否持久化 * 3.是否只供一个消费者进行消费 是否进行消息共享 * 4.是否进行自动删除 * 5.其他参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //消息 String message = "hello world"; /** * 发消息 * 1.发送到哪个交换机 * 2.路由的key值是哪个 本次队列的名称 * 3.其他参数 * 4.发送的消息的消息体 */ channel.basicPublish(QUEUE_NAME,"",null, message.getBytes()); System.out.println("消息发送完毕"); } }
运行后,查看web管理界面,可以发现已经接受到了一个消息
package com.chenzhen.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author ChenZhen * @Description * @create 2023/3/6 17:11 * @QQ 1583296383 * @WeXin(WeChat) ShockChen7 */ public class Consumer { //队列名称 public static final String QUEUE_NAME = "queue1"; //接受消息 public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //工厂IP 连接RabbitMQ队列 factory.setHost("192.168.43.128"); //用户名 factory.setUsername("admin"); //密码 factory.setPassword("123"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); //声明 接受消息 DeliverCallback deliverCallback = (consumerTag,message) ->{ System.out.println("message = " + new String(message.getBody())); }; //取消消息时的回调 CancelCallback cancelCallback = consumerTag ->{ System.out.println("消息消费被中断"); }; /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功后是否要自动应答 true:自动 false:手动 * 3.消费成功之后的回调 * 4.消费者取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback); } }
运行后
package com.chenzhen.rabbitmq.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author ChenZhen * @Description * @create 2023/3/6 17:43 * @QQ 1583296383 * @WeXin(WeChat) ShockChen7 */ public class RabbitMqUtils { public static Channel getChannel() throws Exception{ //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //工厂IP 连接RabbitMQ队列 factory.setHost("192.168.43.128"); //用户名 factory.setUsername("admin"); //密码 factory.setPassword("123"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); return channel; } }
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用
A.Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)
C.Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
消息生产者代码
package com.chenzhen.rabbitmq.three; import com.chenzhen.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import java.util.Scanner; /** * @author ChenZhen * @Description * @create 2023/3/7 16:51 * @QQ 1583296383 * @WeXin(WeChat) ShockChen7 */ public class Task2 { //队列名称 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明队列 channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNextInt()){ String message = scanner.next(); channel.basicPublish(TASK_QUEUE_NAME,"", null,message.getBytes("UTF-8")); System.out.println("生产者发出消息:"+ message); } } }
消息消费者代码,其一
package com.chenzhen.rabbitmq.three; import com.chenzhen.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.util.concurrent.TimeUnit; /** * @author ChenZhen * @Description * @create 2023/3/7 17:17 * @QQ 1583296383 * @WeXin(WeChat) ShockChen7 */ public class Work3 { //队列名称 public static final String TASK_QUEUE_NAME = "ack_queue"; //接收消息 public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("c3等待接受消息处理"); DeliverCallback deliverCallback = (consumerTag, message) -> { try { //等待1s 模拟处理消息 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new String (message.getBody(),"UTF-8")); //手动应答 /** * 1.消息的标记 tag * 2.是否批量应答 false:不批量应答 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; //取消消息时的回调 CancelCallback cancelCallback = consumerTag ->{ System.out.println("消息消费被中断"); }; boolean autoAck = false; //采用手动应答 channel.basicConsume(TASK_QUEUE_NAME, autoAck,deliverCallback,cancelCallback); } }
消息消费者,其二
package com.chenzhen.rabbitmq.three; import com.chenzhen.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.util.concurrent.TimeUnit; /** * @author ChenZhen * @Description * @create 2023/3/7 17:17 * @QQ 1583296383 * @WeXin(WeChat) ShockChen7 */ public class Work4 { //队列名称 public static final String TASK_QUEUE_NAME = "ack_queue"; //接收消息 public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("c4等待接受消息处理"); DeliverCallback deliverCallback = (consumerTag, message) -> { try { //等待30s 模拟处理消息 Thread.sleep(1000*30); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new String (message.getBody(),"UTF-8")); //手动应答 /** * 1.消息的标记 tag * 2.是否批量应答 false:不批量应答 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; //取消消息时的回调 CancelCallback cancelCallback = consumerTag ->{ System.out.println("消息消费被中断"); }; boolean autoAck = false; //采用手动应答 channel.basicConsume(TASK_QUEUE_NAME, autoAck,deliverCallback,cancelCallback); } }
分别启动3个进程
生产者发送2个消息
如果第二个消费者在消费的时候挂机了,就不会进行应答,队列中的消息不会被销毁,则被转发给第一个消费者
//消息队列持久化
boolean durable = true;
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
在声明队列持久化前,如果该队列已存在,则需要先将该队列删除
否则会出现以下错误
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
运行代码后,ack_queue队列已开启持久化
要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添
加这个属性。
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
添加持久化参数后
//设置生产者发送信息为持久化消息(要求保存到磁盘上)
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了.
RabbitMQ官网介绍了,它支持六种应用场景:简单队列、工作队列、发布/订阅、路由模式、Topics主题模式、RPC,接下来分别介绍。
简单队列缺点
耦合度高,队列名在一端改动,另一端也要跟着改动。生产者和消费者一一对应,不支持多个消费者。
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
一般在实际应用中,生产者发送消息耗时较少,反应较快,反而是消费者因为要处理业务逻辑,处理时间可能会很慢,这样队列中会积压很多消息,所以需要多个消费者分摊压力,这个时候可以使用工作队列。
工作队列的逻辑就是队列拿到生产者的消息后会在消费者中选择一个把消息发送过去,并不是把消息同时发送给两个消费者。
听不太懂,总之就是用多个消费者来并发的处理一个队列里的消息。
在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。
package com.chenzhen.rabbitmq.two; import com.chenzhen.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * @author ChenZhen * @Description 这是一个工作线程,相当于是消费者 * @create 2023/3/6 18:02 * @QQ 1583296383 * @WeXin(WeChat) ShockChen7 */ //这是一个工作线程,相当于是消费者 public class Worker01 { //队列的名称 public static final String QUEUE_NAME = "queue1"; //接受消息 public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println(new String (message.getBody())); }; //取消消息时的回调 CancelCallback cancelCallback = consumerTag ->{ System.out.println("消息消费被中断"); }; System.out.println("c1等待接受消息......"); //消息的接受 channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback); } }
运行后
我们在设置中开启允许多线程。
随后修改输出语句c1为c2,然后再次启动主函数。
可以看到此时运行了2个工作进程。
用来发送多个消息的生产者的代码,即消息发送线程
package com.chenzhen.rabbitmq.two; import com.chenzhen.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import java.util.Scanner; /** * @author ChenZhen * @Description * @create 2023/3/7 11:14 * @QQ 1583296383 * @WeXin(WeChat) ShockChen7 */ public class Task01 { public static final String QUEUE_NAME = "queue1"; //发送大量消息 public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); /** * 生成队列 * 1.队列名称 * 2.队列里面的消息是否持久化 * 3.是否只供一个消费者进行消费 是否进行消息共享 * 4.是否进行自动删除 * 5.其他参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //从控制台中接受信息 Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String message = scanner.next(); /** * 发消息 * 1.发送到哪个交换机 * 2.路由的key值是哪个 本次队列的名称 * 3.其他参数 * 4.发送的消息的消息体 */ channel.basicPublish(QUEUE_NAME,"", null, message.getBytes()); System.out.println("发送消息:"+ message); } } }
消息发送线程,发送4个消息abcd
第一个消费者接受a c 消息
第二个消费者接受b d消息
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
⚫ P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
⚫ C:消费者,消息的接收者,会一直等待消息到来
⚫ Queue:消息队列,接收消息、缓存消息
⚫ Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、
递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
➢ Fanout:广播,将消息交给所有绑定到交换机的队列
➢ Direct:定向,把消息交给符合指定routing key 的队列
➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合
路由规则的队列,那么消息会丢失!
创建生产者代码
package com.chenzhen.rabbitmq.four; import com.chenzhen.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.util.Scanner; /** * @author ChenZhen * @Description * @create 2023/3/7 16:51 * @QQ 1583296383 * @WeXin(WeChat) ShockChen7 */ public class PubSub_Producer { public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 创建交换机 /** * 1.exchange:交换机名称 * 2.type : 交换机类型 * DIRECT("direct"):定向 * FANOUT("fanout"):扇形(广播) * TOPIC("topic"):通配符 * HEADERS("headers"):参数匹配 * 3.durable : 是否持久化 * 4.autoDelete : 是否自动删除 * 5.internal : 内部使用。默认false * 6.arguments : 参数列表 */ String exchangeName = "test_fanout"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,false,false,false,null); //创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; //消息队列持久化 boolean durable = false; //声明队列 channel.queueDeclare(queue1Name, durable, false, false, null); channel.queueDeclare(queue2Name, durable, false, false, null); //绑定队列和交换机 /** * 1. queue :队列名称 * 2.exchange: 交换机名称 * 3.routingKey : 路由键,绑定规则 * 如果交换机的类型为 fanout,routinKye设置为“”, */ channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); //发送消息 channel.basicPublish(exchangeName,"", null ,message.getBytes("UTF-8")); System.out.println("生产者发出消息:"+ message); } } }
启动后发出消息
查看队列,2个队列都接受到了消息。
代码跟前面的基本都差不多,这里就不重复打了
在生产者代码里,将模式修改为BuiltinExchangeType.DIRECT
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,false,false,false,null);
并在队列和交换机绑定时指定路由key
channel.queueBind(queue1Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型
Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc
或者 item.insert,item.* 只能匹配 item.insert
这个和路由模式很像,只不过将路由key改成了通配符的形式,代码也是十分简单,修改交换机规则和路由键即可。就不将代码给出了。
<!-- Spring Boot整合RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# 配置rabbitmq的基本信息
spring:
rabbitmq:
host: 192.168.43.128
port: 5672
username: admin
password: 123
virtual-host: /
package com.chenzhen.springboot_rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author ChenZhen * @Description * @create 2023/3/8 17:45 * @QQ 1583296383 * @WeXin(WeChat) ShockChen7 */ @Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME = "boot_topic_exchange"; public static final String QUEUE_NAME = "boot_queue"; //1.交换机 @Bean("bootExchange") public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } //2.queue队列 @Bean("bootQueue") public Queue bootQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } //3.队列和交换机的绑定关系 @Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); } }
package com.chenzhen.springboot_rabbitmq; import com.chenzhen.springboot_rabbitmq.config.RabbitMQConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class SpringbootRabbitmqApplicationTests { //1.注入rabbitmqTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello "); } }
运行测试类后,队列成功接受到消息
消费者的代码比较简单,只要创建一个监听类并加上@RabbitListener注解即可。
package com.chenzhen.springboot_rabbitmq; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author ChenZhen * @Description * @create 2023/3/11 16:14 * @QQ 1583296383 * @WeXin(WeChat) ShockChen7 */ @Component public class RabbitMQListener { @RabbitListener(queues = "boot_queue") public void listenerQueue(Message message){ System.out.println("message = " + new String(message.getBody())); } }
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提
供了两种方式用来控制消息的投递可靠性模式。
rabbitmq 整个消息投递的路径为:
producer—>rabbitmq broker—>exchange—>queue—>consumer
我们将利用这两个 callback 控制消息的可靠性投递
在rabbitmq-provider项目的application.yml文件上,加上消息确认的配置项后:
server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: root password: root #虚拟host 可以不设置,使用server默认host virtual-host: / #消息确认配置项 #确认消息已发送到交换机(Exchange) publisher-confirms: true #确认消息已发送到队列(Queue) publisher-returns: true
然后是配置相关的消息确认回调函数,RabbitConfig.java:
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback: "+"相关数据:"+correlationData); System.out.println("ConfirmCallback: "+"确认情况:"+ack); System.out.println("ConfirmCallback: "+"原因:"+cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("ReturnCallback: "+"消息:"+message); System.out.println("ReturnCallback: "+"回应码:"+replyCode); System.out.println("ReturnCallback: "+"回应信息:"+replyText); System.out.println("ReturnCallback: "+"交换机:"+exchange); System.out.println("ReturnCallback: "+"路由键:"+routingKey); } }); return rabbitTemplate; } }
到这里,生产者推送消息的消息确认调用回调函数已经完毕。
可以看到上面写了两个回调函数, ConfirmCallback 和RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?
在 RabbitMQ 中,ConfirmCallback 和 ReturnCallback 是两种不同的回调函数,用于处理消息的确认和退回(返回)情况。
1. ConfirmCallback:
ConfirmCallback 用于处理消息的确认情况。当消息发送到 Exchange 时,Broker(RabbitMQ 服务器)会发送一个确认消息给生产者,告诉生产者消息已经被成功接收。这样可以确保消息成功发送到了 RabbitMQ,从而提供了一定的可靠性保证。
2. ReturnCallback
用于处理消息退回情况。当消息无法被正确路由到任何队列时,Broker 会将该消息退回给生产者。这可以帮助生产者处理无法投递的消息。
和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式:
1、不确认, 这也是默认的消息确认情况。
AcknowledgeMode.NONE
RabbitMQ
成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch
捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
2、 自动确认
根据请况确认 listener.simple.acknowledge-mode: auto
,主要分成以下几种情况:
AmqpRejectAndDontRequeueException
异常的时候,则消息会被拒绝,且该消息不会重回队列。ImmediateAcknowledgeAmqpException
异常,消息会被确认。3、手动确认
这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject
后,RabbitMQ
收到这些消息后,才认为本次投递成功。
basic.ack
用于肯定确认basic.nack
用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)basic.reject
用于否定确认,但与basic.nack
相比有一个限制:一次只能拒绝单条消息消费者端以上的3``个方法都表示消息已经被正确投递,但是basic.ack
表示消息已经被正确处理。
而basic.nack,basic.reject
表示没有被正确处理:
着重讲下reject
,因为有时候一些场景是需要重新入列的。
channel.basicReject(deliveryTag, true);
拒绝消费当前消息,如果第二参数传入true
,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false
,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器把这个消息丢掉就行,下次不想再消费这条消息了。
使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch
异常再拒绝入列,选择是否重入列。
但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。
顺便也简单讲讲 nack
,这个也是相当于设置不消费某条消息。
channel.basicNack(deliveryTag, false, true);
第一个参数依然是当前消息到的数据的唯一id
;
第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID
小于当前这条消息的,都拒绝确认。
第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。
同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。
看了上面这么多介绍,接下来我们一起配置下,看看一般的消息接收手动确认是怎么样的。
在消费者项目里,新建MessageListenerConfig.java
上添加代码相关的配置代码:
import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MessageListenerConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private MyAckReceiver myAckReceiver;//消息接收处理类 @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(1); // RabbitMQ默认是自动确认,这里改为手动确认消息 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置一个队列 container.setQueueNames("TestDirectQueue"); //如果同时设置多个如下: 前提是队列都是必须已经创建存在的 // container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3"); //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues //container.setQueues(new Queue("TestDirectQueue",true)); //container.addQueues(new Queue("TestDirectQueue2",true)); //container.addQueues(new Queue("TestDirectQueue3",true)); container.setMessageListener(myAckReceiver); return container; } }
对应的手动确认消息监听类,MyAckReceiver.java(手动确认模式需要实现 ChannelAwareMessageListener):
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component public class ManualAckConsumer implements ChannelAwareMessageListener { @RabbitListener(queues = "your-queue-name") @Override public void onMessage(Message message, Channel channel) throws Exception { try { // 模拟消费逻辑 String messageBody = new String(message.getBody()); System.out.println("Received message: " + messageBody); // 手动确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 发生异常时可以选择拒绝消息或进行其他处理 // 重新入队列:channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 拒绝并丢弃:channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } } }
VirtualHost 作用及用法
Virtual Hosts的使用场景
多租户的使用场景,比如主机资源紧缺情况下开发和测试共用一个RabbitMQ,可以使用Virtual Hosts将开发和测试隔离开
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。