当前位置:   article > 正文

【RabbitMQ】消息可靠性投递(四)Queue-->Consumer_rabbitmq $queue->consume

rabbitmq $queue->consume

系列文章:

如果消费者收到消息后没来得及处理就发生异常,或者处理过程中发生异常,会导致④失败。服务端应该以某种方式得知消费者对消息的接收情况,并决定是否重新投递这条消息给其他消费者。

RabbitMQ提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送ACK给服务端没有收到ACK的消息,消费者断开连接后,RabbitMQ会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。

1.自动ACK

消费者默认采用的是自动ack(autoAck=true),所以我们可以不断的一条一条接收消息。

而自动ack的问题在于消息丢失问题,当消息到达Consumer就会给broker返回ack,若Consumer在处理中就宕机,那么当前消息就丢失了

注:在 Kafka 中,自动 ACK 是每隔一段时间 ACK 一次,而且消息的清除是根据根据配置的消息的清除策略, 所以,消息自动 ACK 容易引起的是消息重复消费(与 RabbitMQ 正好相反)

有没有一种方式,等Consumer处理完消息后,在当前消息的ack发给服务端?手动ACK

2.手动ACK

RabbitMQ会等待消费者显式地回复确认信号后才从队列中移去消息,而这种方式的问题在于,若服务器未收到消费者的ack时会一直阻塞,最终可能引起消息大量堆积。

若是采用原生API,消费者在订阅队列时可以指定 autoAck=false

public class AckConsumer {
    private final static String QUEUE_NAME = "TEST_ACK_QUEUE";

    public static void main(String[] args) throws Exception {
    
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        final Channel channel = conn.createChannel();

        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" Waiting for message....");

        // 创建消费者,并接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("Received message : '" + msg + "'");

                if (msg.contains("拒收")){
                    // 拒绝消息
                    // requeue:是否重新入队列,true:是;false:直接丢弃,相当于告诉队列可以直接删除掉
                    // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
                    channel.basicReject(envelope.getDeliveryTag(), false);
                } else if (msg.contains("异常")){
                    // 批量拒绝(拒绝deliveryTag之前的消息)
                    // requeue:是否重新入队列
                    // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
                    channel.basicNack(envelope.getDeliveryTag(), true, false);
                } else {
                    // 手工应答
                    // 如果不应答,队列中的消息会一直存在,重新连接的时候会重复消费
                    channel.basicAck(envelope.getDeliveryTag(), true);
                }
            }
        };

        // 开始获取消息,注意这里开启了手工应答
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

在 Spring AMQP中 MessageListener 相当于消费者,可以在AcknowledgeMode枚举类中看到,关于ack的配置具体有三种选择:

  • NONE:自动ACK(默认)
  • MANUAL: 手动ACK
  • AUTO:如果方法未抛出异常,则发送 ack。
    • 当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且不重新入队。
    • 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会发送ACK。
    • 其他的异常,则消息会被拒绝,且 requeue = true会重新入队。
@Bean // 构建MessageListenerContainer的Bean时配置
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    
    container.setQueues(getSecondQueue(),getThirdQueue()); // 监听的队列 
    container.setConcurrentConsumers(1); // 最小消费者数 
    container.setMaxConcurrentConsumers(5); // 最大的消费者数量 
    container.setDefaultRequeueRejected(false); // 是否重回队列 
    container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 签收模式!!!
    container.setExposeListenerChannel(true); 
    container.setConsumerTagStrategy(new ConsumerTagStrategy() { // 消费端的标签策略
        public String createConsumerTag(String queue) {
            return null;
        }
    });
    return container;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

若在SpringBoot的配置:

spring.rabbitmq.listener.direct.acknowledge-mode=manual  # 默认为NONE,自动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2

具体的调用代码如下:

@Component
@RabbitListener(queues = "SECOND_QUEUE")
public class SecondConsumer {
    
    @RabbitHandler 
    // 当要手动确认时,参数中要有Channel和Message (注:Channel是rabbitmq.client.Channel不是amqp的)
    public void process(String msg,Channel channel,Message message){ 
        System.out.println("Second Queue received msg:" + msg); 
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); // 手动ack
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3.拒绝策略

如果消息无法处理或者消费失败,也有两种拒绝的方式,

  • void basicReject(long deliveryTag, boolean requeue)单条拒绝
  • void basicNack(long deliveryTag, boolean multiple, boolean requeue)批量拒绝

如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况。可以投递到新的队列中,或者只打印异常日志)。如果requeue为false,当消息投递失败就会丢弃。

但无论消费者是发送ACK还是NACK,甚至是消费者出现异常,生产者也是完全不知情的。所以,生产者最终确定消费者有没有消费成功的方式:

  • 消费者收到消息,处理完毕后,调用生产者的API
  • 消费者收到消息,处理完毕后,发送一条响应消息给生产者
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/126949?site
推荐阅读
相关标签
  

闽ICP备14008679号