当前位置:   article > 正文

RabbitMQ之消息确认机制ACK_rabbitmq ack required

rabbitmq ack required

消息确认机制(ack)

队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。
RabbitMQ默认的消息确认机制是:自动确认的 。
在这里插入图片描述

修改为手动确认模式,然后不手动确认看看结果
在application.yml中

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: manual # 开启手动确认,自动是auto
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
package com.yzm.rabbitmq_02.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    public static final String ACK_QUEUE = "ack_queue";

    /**
     * 消息队列
     * durable:设置是否持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息
     * exclusive:设置是否排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
     * autoDelete:设置是否自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
     */
    @Bean
    public Queue queue() {
        return QueueBuilder.durable(ACK_QUEUE).build();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
package com.yzm.rabbitmq_02.sender;

import com.yzm.rabbitmq_02.config.RabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 消息发布
 */
@RestController
public class AckSender {

    private final AmqpTemplate template;

    public AckSender(AmqpTemplate template) {
        this.template = template;
    }

    @GetMapping("/send")
    public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
        for (int i = 1; i <= 10; i++) {
            String msg = message + " ..." + i;
            System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
            template.convertAndSend(RabbitConfig.ACK_QUEUE, msg);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
package com.yzm.rabbitmq_02.receiver;

import com.yzm.rabbitmq_02.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听
 */
@Component
public class AckReceiver {

    private int count1 = 1;
    private int count2 = 1;

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive1(Message message) throws InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
    }

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive2(Message message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

运行结果:
在这里插入图片描述
消费者1号、2号分别拿到一条消息进行消费,但没有确认,处于阻塞状态,所以队列不会移除这两条消息,同时设置了prefetch=1,在消费者未确认之前不会重新推送消息给消费者

停止程序,发现2条未确认的消息会回到Ready里面等待重新消费
在这里插入图片描述
再次重启,再次消费2条消息,但仍未确认
在这里插入图片描述
访问/send,再次发布消息,消息堆积
在这里插入图片描述

好了,来看看如何手动确认吧。修改消费者

package com.yzm.rabbitmq_02.receiver;

import com.rabbitmq.client.Channel;
import com.yzm.rabbitmq_02.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * 消息监听
 */
@Component
public class AckReceiver {

    private int count1 = 1;
    private int count2 = 1;
    private int count3 = 1;

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive1(
            Message message, Channel channel) throws IOException, InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);

        // 确认消息
        // 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增)
        // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive2(
            Message message, Channel channel,
            @Headers Map<String, Object> map) throws IOException, InterruptedException {
        Thread.sleep(600);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);

        // 确认消息
        channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);
    }

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive3(
            Message message, Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++);

        // 确认消息
        channel.basicAck(deliveryTag, false);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

刚启动,就把前两次积累的消息先被消费完
在这里插入图片描述
接着发布消息
在这里插入图片描述
手动确认通过调用方法实现
basicAck(long deliveryTag, boolean multiple)
deliveryTag:交付标签,相当于消息ID 64位的长整数(从1开始递增)
multiple:false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签

能手动确认,同样也可以手动拒绝,修改消费者

@Component
public class AckReceiver {

    private int count1 = 1;
    private int count2 = 1;
    private int count3 = 1;

//    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive1(
            Message message, Channel channel) throws IOException, InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);

        // 确认消息
        // 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增)
        // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

//    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive2(
            Message message, Channel channel,
            @Headers Map<String, Object> map) throws IOException, InterruptedException {
        Thread.sleep(600);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);

        // 确认消息
        channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);
    }

//    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive3(
            Message message, Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++);

        // 确认消息
        channel.basicAck(deliveryTag, false);
    }

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive4(
            Message message, Channel channel) throws IOException, InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@4号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@4号 ] 消息被我拒绝了:" + count1++);
        
        // 拒绝消息方式一
        // 第一个参数,交付标签
        // 第二个参数,false表示仅拒绝提供的交付标签;true表示批量拒绝所有消息,包括提供的交付标签
        // 第三个参数,false表示直接丢弃消息,true表示重新排队
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

        // 拒绝消息方式二
        // 第一个参数,交付标签
        // 第二个参数,false表示直接丢弃消息,true表示重新排队
        // 跟basicNack的区别就是始终只拒绝提供的交付标签
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

运行结果:
在这里插入图片描述
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
这里是拒绝后,重新进入队列,所以消费的总是第一条消息并且循环不停
停止程序后,队列仍然是10条消息

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
改成false,拒绝后直接丢弃
重启后:
在这里插入图片描述

总结一下 手动确认模式的各种情况
未确认:什么也不用写,消息不会移除,重复消费,积攒越来越多
确认:channel.basicAck();确认后,消息从队列中移除
拒绝:channel.basicNack()或channel.basicReject();拒绝后,消息先从队列中移除,然后可以选择重新排队,或者直接丢弃(丢弃还有一种选择,就是加入到死信队列中,用于追踪问题)

相关链接

首页
上一篇:快速入门
下一篇:交换机

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/878478
推荐阅读
相关标签
  

闽ICP备14008679号