当前位置:   article > 正文

springboot整合RabbitMQ消费端手动ACK确认机制_rabbitmq-springboot消费者如何手动ack

rabbitmq-springboot消费者如何手动ack

上文: RabbitMQ
代码地址: 源代码

ack——acknowledge(vt. 承认;答谢;报偿;告知已收到;确认的意思),在RabbitMQ中指代的是消费者收到消息后确认的一种行为,关注点在于消费者能否实际接收到MQ发送的消息

其提供了三种确认方式:

  • 自动确认acknowledge=“none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。

  • 手动确认acknowledge=“manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向MQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让MQ重新向消费者发送消息都是可以的。

  • 根据异常情况确认acknowledge=“auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等布拉不拉布拉)。这种方式比较麻烦。

当消息一旦被消费者接收到,会立刻自动向MQ确认接收,并将响应的message从RabbitMQ消息缓存中移除,但是在实际的业务处理中,会出现消息收到了,但是业务处理出现异常的情况,在自动确认的模式下,该条业务处理失败的message就相当于被丢弃了。

如果设置了手动确认,则需要在业务处理完成之后,手动调用channel.basicAck(),手动的签收,

如果业务处理失败,则手动调用channel.basicNack()方法拒收,并让MQ重新发送该消息。

如果不做任何关于acknowledge的配置,默认就是自动确认签收的。

1. 消费端配置:开启ack

在这里插入图片描述

2. 通过channel的basicAck方法手动签收,通过basicNack方法拒绝签收

/*
        通过channel的basicAck方法手动签收,通过basicNack方法拒绝签收
     */
    @Override
    @RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE}) //监听
    public void receiveMessage(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try{
            //String message= (String)amqpTemplate.receiveAndConvert(RabbitMQConfig.DIRECT_QUEUE);
            System.out.println("接收的mq消息:"+message);

            // 业务处理 异常测试
            // System.out.println("业务处理"+1/0);

            // long deliveryTag 消息接收tag boolean multiple 是否批量确认
            System.out.println("deliveryTag="+deliveryTag);

            /**
             * 无异常就确认消息
             * basicAck(long deliveryTag, boolean multiple)
             * deliveryTag:取出来当前消息在队列中的的索引;
             * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
             * deliveryTag为5及其以下的消息;一般设置为false
             */

            if(deliveryTag==5){
                channel.basicAck(deliveryTag,true);
            }

        }catch (Exception e){

            e.printStackTrace();

            /**
             * 有异常就绝收消息
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
             *         false:将消息丢弃
             */

            // long deliveryTag, boolean multiple, boolean requeue

            try {

                channel.basicNack(deliveryTag,false,true);
                // long deliveryTag, boolean requeue
                // channel.basicReject(deliveryTag,true);

                Thread.sleep(1000);     // 这里只是便于出现死循环时查看

                /*
				 * 一般实际异常情况下的处理过程:记录出现异常的业务数据,将它单独插入到一个单独的模块,
				 * 然后尝试3次,如果还是处理失败的话,就进行人工介入处理
				*/

            } catch (Exception e1) {
                e1.printStackTrace();
            }

        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

3. 测试

3.1 测试消费1条消息

在这里插入图片描述
在这里插入图片描述
由于代码里面deliveryTag为5才会确认,所以这条消息Unacked,如下图:
在这里插入图片描述

3.2 测试消费5条消息

在这里插入图片描述
在这里插入图片描述

3.3 拒签测试

在这里插入图片描述

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/127110
推荐阅读
相关标签
  

闽ICP备14008679号