赞
踩
系列文章:
如果消费者收到消息后没来得及处理就发生异常,或者处理过程中发生异常,会导致④失败。服务端应该以某种方式得知消费者对消息的接收情况,并决定是否重新投递这条消息给其他消费者。
RabbitMQ提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送ACK给服务端没有收到ACK的消息,消费者断开连接后,RabbitMQ会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。
消费者默认采用的是自动ack(autoAck=true),所以我们可以不断的一条一条接收消息。
而自动ack的问题在于消息丢失问题,当消息到达Consumer就会给broker返回ack,若Consumer在处理中就宕机,那么当前消息就丢失了
注:在 Kafka 中,自动 ACK 是每隔一段时间 ACK 一次,而且消息的清除是根据根据配置的消息的清除策略, 所以,消息自动 ACK 容易引起的是消息重复消费(与 RabbitMQ 正好相反)
有没有一种方式,等Consumer处理完消息后,在当前消息的ack发给服务端?手动ACK
RabbitMQ会等待消费者显式地回复确认信号后才从队列中移去消息,而这种方式的问题在于,若服务器未收到消费者的ack时会一直阻塞,最终可能引起消息大量堆积。
若是采用原生API,消费者在订阅队列时可以指定 autoAck=false
public class AckConsumer { private final static String QUEUE_NAME = "TEST_ACK_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 final Channel channel = conn.createChannel(); // 声明队列(默认交换机AMQP default,Direct) // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" Waiting for message...."); // 创建消费者,并接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("Received message : '" + msg + "'"); if (msg.contains("拒收")){ // 拒绝消息 // requeue:是否重新入队列,true:是;false:直接丢弃,相当于告诉队列可以直接删除掉 // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费 channel.basicReject(envelope.getDeliveryTag(), false); } else if (msg.contains("异常")){ // 批量拒绝(拒绝deliveryTag之前的消息) // requeue:是否重新入队列 // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费 channel.basicNack(envelope.getDeliveryTag(), true, false); } else { // 手工应答 // 如果不应答,队列中的消息会一直存在,重新连接的时候会重复消费 channel.basicAck(envelope.getDeliveryTag(), true); } } }; // 开始获取消息,注意这里开启了手工应答 // String queue, boolean autoAck, Consumer callback channel.basicConsume(QUEUE_NAME, false, consumer); } }
在 Spring AMQP中 MessageListener 相当于消费者,可以在AcknowledgeMode
枚举类中看到,关于ack的配置具体有三种选择:
@Bean // 构建MessageListenerContainer的Bean时配置 public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(getSecondQueue(),getThirdQueue()); // 监听的队列 container.setConcurrentConsumers(1); // 最小消费者数 container.setMaxConcurrentConsumers(5); // 最大的消费者数量 container.setDefaultRequeueRejected(false); // 是否重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 签收模式!!! container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { // 消费端的标签策略 public String createConsumerTag(String queue) { return null; } }); return container; }
若在SpringBoot的配置:
spring.rabbitmq.listener.direct.acknowledge-mode=manual # 默认为NONE,自动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
具体的调用代码如下:
@Component
@RabbitListener(queues = "SECOND_QUEUE")
public class SecondConsumer {
@RabbitHandler
// 当要手动确认时,参数中要有Channel和Message (注:Channel是rabbitmq.client.Channel不是amqp的)
public void process(String msg,Channel channel,Message message){
System.out.println("Second Queue received msg:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); // 手动ack
}
}
如果消息无法处理或者消费失败,也有两种拒绝的方式,
void basicReject(long deliveryTag, boolean requeue)
单条拒绝void basicNack(long deliveryTag, boolean multiple, boolean requeue)
批量拒绝如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况。可以投递到新的队列中,或者只打印异常日志)。如果requeue为false,当消息投递失败就会丢弃。
但无论消费者是发送ACK还是NACK,甚至是消费者出现异常,生产者也是完全不知情的。所以,生产者最终确定消费者有没有消费成功的方式:
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。