赞
踩
1、我的另一篇文章中介绍了RabbitMQ的交换机类型、消费类型,经过一系列的demo测试时,我们在RabbitMQ的后台监控中可以发现一个现象,就是消息在被成功消费后,队列中就没有这个消息了,也就是删除了,那么疑问来了,RabbitMQ是怎么知道这个消息该不该被删除?什么时候删除?
2、除了消费者,那么生产者怎么知道自己消息是否发送成功到RabbitMQ中了?
解决这个疑问的方式就是:RabbitMQ消息确认机制
RabbitMQ有两种解决消息丢失的方案,一种是事务机制,一种是消息确认(confirm机制),这里主要介绍消息确认
通过RabbitMQ的原理图可知,生产者和消费者并没有直接进行通信,中间要使用RabbitMQ传递消息,所以生产者只需要把消息发送到Rabbit,而消费者只需要从队列获取消息就可以了
上述两种就是消息确认机制
RabbitMQ是基于AMQP协议的,从官网给出的解释来说,会有两种建议:
前者被称作自动确认模式(automatic acknowledgement model),后者被称作显式确认模式(explicit acknowledgement model)。在显式模式下,由消费者应用来选择什么时候发送确认回执(acknowledgement)。应用可以在收到消息后立即发送,或将未处理的消息存储后发送,或等到消息被处理完毕后再发送确认回执(例如,成功获取一个网页内容并将其存储之后)。
如果一个消费者在尚未发送确认回执的情况下挂掉了,那AMQP代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会等下一个注册到此队列的消费者,然后再次尝试投递。
生产者端可通过两个callback接口来确认消息的是否传递到Broker,但是两个接口默认都是不开启的
ConfirmCallback方法:消息成功从生产者到Broker(RabbitMQ主机)后触发的回调,只用来确认是否正确到达了Exchange(交换机),发布者确认默认是不开启的,开启发布者确认需要添加如下配置
// springboot配置
spring.rabbitmq.publisher-confirm-type=correlated # 新版本
spring.rabbitmq.publisher-confirms=true # 老版本
ReturnCallback方法:消息从Exchange到queue投递失败,则会触发的回调,但是这个方法被触发的概率很小,因为交换机和队列的绑定是在代码中显式完成的,只要代码正确这个方法出发的概率就会很小,即便触发了也大概率是代码的问题,而这个方法也需要通过配置来开启
// springboot配置
spring.rabbitmq.publisher-returns=true
以上两种配置适用于springboot项目,如果没有使用springboot的话可以使用如下代码来进行配置
消费者确认有两种方式:自动、手动
消息自动确认是指消息发出后就认为消息消费成功,消息就会被RabbitMQ从队列中删除掉,并不会在意消费者处理业务的成功与否,也就是“发送即成功”,这种模式是一种非常不安全的方式,因为业务存在处理失败的情况,这样的话数据就会丢失
如下代码中的第二个参数,就是设定为自动确认模式(autoAck:true)
手动应答提供三种接口
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
channel.basicNack(message.getEnvelope().getDeliveryTag(),false, true);
相比nack方法,reject没有multiple参数,所以reject只能处理单个消息
channel.basicReject(message.getEnvelope().getDeliveryTag(), true);
RabbitMQ在通过队列以及信道(channel)传递消息给消费者的时候,都会带一个传递标签,每个channel给每条消息分配一个标签,一个channel对应一个传递标签。
这个标签是一个Long类型的序列号,从1 开始到 9223372036854775807,理论上这个范围非常大,按照每秒一百万的数据量来算的话,大概需要292年才能用完。
通过Delivery.getEnvelope().getDeliveryTag()可以获取当前消息的传递标签
代码示例中查看这个传递标签的值为22,这表示这个消息是这个channel传递的第22条
RabbitMQ就是通过传递标签确认消费者返回的是哪条消息的结果,从而进行处理
这里是指,在做消息确认的时候,并没有传递正确的传递标签,那么会出现什么情况?
经过测试如下
public class Producer { public static String QUEUE_NAME = "q_test_01"; public static void main(String[] args) throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); String msg = "测试消息一"; channel.queueDeclare(QUEUE_NAME, true, false, false, null); for (int i = 0; i < 10; i++) { channel.basicPublish("", QUEUE_NAME, null, (msg + i).getBytes()); } //关闭通道和连接 channel.close(); connection.close(); } public static Connection getConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("localhost"); //端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("testhost"); factory.setUsername("admin"); factory.setPassword("1111"); // 通过工程获取连接 Connection connection = factory.newConnection(); return connection; } }
public class ConsumerClient { public static void main(String[] args) throws Exception { Connection connection = Producer.getConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { String message1 = new String(message.getBody()); System.out.println("DeliverCallback:" + consumerTag + "-" + message1); // 注意这里message.getEnvelope().getDeliveryTag()就是获取正确的传递标签 // 然后我在后面 +9999 改变正确的值 channel.basicAck(message.getEnvelope().getDeliveryTag()+99999, false); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("CancelCallback:" + consumerTag); }; channel.basicConsume(Producer.QUEUE_NAME, false, deliverCallback, cancelCallback); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。