赞
踩
为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。
当autoAck 参数为 false 时,对于 RabbitMQ 服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 服务器端一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开,这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
RabbitMQ 的 Web 管理平台上可以看到当前队列中的 “Ready” 状态和 “Unacknowledged” 状态的消息数,分别对应等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。
RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认。
其中发送方确认又分为:生产者到交换器到确认、交换器到队列的确认。如下图:
- //true:告诉RabbitMQ服务器 我处理完成了 队列中数据可以删除了
- //false:告诉RabbitMQ服务器 我处理失败了 你不要删除 待会我重启你再发我一次
- boolean autoAck = true;
- package com.rabbit.ack;
-
- import com.rabbit.util.ConnectionUtil;
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 数据接收端
- */
- public class Recv {
- //简单队列中:接收端和发送端需要是同一个队列
- private static String simple_queue = "simple_queue";
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、拿到RabbitMQ的连接
- Connection connection = ConnectionUtil.getConnection();
-
- //2、创建通道
- Channel channel = connection.createChannel();
-
- //3、声明一个队列
- channel.queueDeclare(simple_queue,true,false,false,null);
-
- //4、获得数据
- Consumer consumer = new DefaultConsumer(channel){
- //处理接收到的数据
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body);
- System.out.println("接收到的数据 : " + message);
- }
- };
-
- //5、自动应答:告诉RabbitMQ服务器 我已经处理完成数据了
- //true:自动告诉服务器 我处理完成了 false:手动告诉服务器处理完【后面再学习】
- //true:告诉RabbitMQ服务器 我处理完成了 队列中数据可以删除了
- //false:告诉RabbitMQ服务器 我处理失败了 你不要删除 待会我重启你再发我一次
- boolean autoAck = true;
- /**
- * queue – 队列的名称
- * autoAck – 如果服务器应考虑消息在传递后确认,则为 true;如果服务器应该期待明确的确认,则为 false
- * 回调——消费者对象的接口
- * String basicConsume(String queue, boolean autoAck, Consumer callback)
- */
- channel.basicConsume(simple_queue,autoAck,consumer);
-
-
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。