赞
踩
队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。
RabbitMQ默认的消息确认机制是:自动确认的 。
修改为手动确认模式,然后不手动确认看看结果
在application.yml中
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
listener:
simple:
prefetch: 1
acknowledge-mode: manual # 开启手动确认,自动是auto
package com.yzm.rabbitmq_02.config; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public static final String ACK_QUEUE = "ack_queue"; /** * 消息队列 * durable:设置是否持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息 * exclusive:设置是否排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除 * autoDelete:设置是否自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除 */ @Bean public Queue queue() { return QueueBuilder.durable(ACK_QUEUE).build(); } }
package com.yzm.rabbitmq_02.sender; import com.yzm.rabbitmq_02.config.RabbitConfig; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * 消息发布 */ @RestController public class AckSender { private final AmqpTemplate template; public AckSender(AmqpTemplate template) { this.template = template; } @GetMapping("/send") public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) { for (int i = 1; i <= 10; i++) { String msg = message + " ..." + i; System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'"); template.convertAndSend(RabbitConfig.ACK_QUEUE, msg); } } }
package com.yzm.rabbitmq_02.receiver; import com.yzm.rabbitmq_02.config.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息监听 */ @Component public class AckReceiver { private int count1 = 1; private int count2 = 1; @RabbitListener(queues = RabbitConfig.ACK_QUEUE) public void receive1(Message message) throws InterruptedException { Thread.sleep(200); System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++); } @RabbitListener(queues = RabbitConfig.ACK_QUEUE) public void receive2(Message message) throws InterruptedException { Thread.sleep(1000); System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++); } }
运行结果:
消费者1号、2号分别拿到一条消息进行消费,但没有确认,处于阻塞状态,所以队列不会移除这两条消息,同时设置了prefetch=1,在消费者未确认之前不会重新推送消息给消费者
停止程序,发现2条未确认的消息会回到Ready里面等待重新消费
再次重启,再次消费2条消息,但仍未确认
访问/send,再次发布消息,消息堆积
好了,来看看如何手动确认吧。修改消费者
package com.yzm.rabbitmq_02.receiver; import com.rabbitmq.client.Channel; import com.yzm.rabbitmq_02.config.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; /** * 消息监听 */ @Component public class AckReceiver { private int count1 = 1; private int count2 = 1; private int count3 = 1; @RabbitListener(queues = RabbitConfig.ACK_QUEUE) public void receive1( Message message, Channel channel) throws IOException, InterruptedException { Thread.sleep(200); System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++); // 确认消息 // 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增) // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = RabbitConfig.ACK_QUEUE) public void receive2( Message message, Channel channel, @Headers Map<String, Object> map) throws IOException, InterruptedException { Thread.sleep(600); System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++); // 确认消息 channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false); } @RabbitListener(queues = RabbitConfig.ACK_QUEUE) public void receive3( Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException { Thread.sleep(1000); System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++); // 确认消息 channel.basicAck(deliveryTag, false); } }
刚启动,就把前两次积累的消息先被消费完
接着发布消息
手动确认通过调用方法实现
basicAck(long deliveryTag, boolean multiple)
deliveryTag:交付标签,相当于消息ID 64位的长整数(从1开始递增)
multiple:false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
能手动确认,同样也可以手动拒绝,修改消费者
@Component public class AckReceiver { private int count1 = 1; private int count2 = 1; private int count3 = 1; // @RabbitListener(queues = RabbitConfig.ACK_QUEUE) public void receive1( Message message, Channel channel) throws IOException, InterruptedException { Thread.sleep(200); System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++); // 确认消息 // 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增) // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } // @RabbitListener(queues = RabbitConfig.ACK_QUEUE) public void receive2( Message message, Channel channel, @Headers Map<String, Object> map) throws IOException, InterruptedException { Thread.sleep(600); System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++); // 确认消息 channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false); } // @RabbitListener(queues = RabbitConfig.ACK_QUEUE) public void receive3( Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException { Thread.sleep(1000); System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++); // 确认消息 channel.basicAck(deliveryTag, false); } @RabbitListener(queues = RabbitConfig.ACK_QUEUE) public void receive4( Message message, Channel channel) throws IOException, InterruptedException { Thread.sleep(200); System.out.println(" [ 消费者@4号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@4号 ] 消息被我拒绝了:" + count1++); // 拒绝消息方式一 // 第一个参数,交付标签 // 第二个参数,false表示仅拒绝提供的交付标签;true表示批量拒绝所有消息,包括提供的交付标签 // 第三个参数,false表示直接丢弃消息,true表示重新排队 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); // 拒绝消息方式二 // 第一个参数,交付标签 // 第二个参数,false表示直接丢弃消息,true表示重新排队 // 跟basicNack的区别就是始终只拒绝提供的交付标签 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } }
运行结果:
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
这里是拒绝后,重新进入队列,所以消费的总是第一条消息并且循环不停
停止程序后,队列仍然是10条消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
改成false,拒绝后直接丢弃
重启后:
总结一下 手动确认模式的各种情况
未确认:什么也不用写,消息不会移除,重复消费,积攒越来越多
确认:channel.basicAck();确认后,消息从队列中移除
拒绝:channel.basicNack()或channel.basicReject();拒绝后,消息先从队列中移除,然后可以选择重新排队,或者直接丢弃(丢弃还有一种选择,就是加入到死信队列中,用于追踪问题)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。