当前位置:   article > 正文

RabbitMQ:消费者ACK机制、生产者消息确认_rabbitmq 生产ack

rabbitmq 生产ack

基础案例环境搭建:

基础环境搭建(手把手教你环境搭建和五种工作模式):https://blog.csdn.net/m0_48325361/article/details/123174843?spm=1001.2014.3001.5502

环境:

队列:
在这里插入图片描述
交换机:
在这里插入图片描述
交换机和队列进行绑定:
在这里插入图片描述

1. 生产者发送消息确认

如果保证消息的可靠性?需要解决如下问题

问题1:生产者能百分之百将消息发送给消息队列!

  • 两种意外情况:
    • 第一,消费者发送消息给MQ失败,消息丢失;
    • 第二,交换机路由到队列失败,路由键写错;

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式

  • return 退回模式

rabbitmq 整个消息投递的路径为:
在这里插入图片描述

  • 消息从生产者(producer)发送消息到交换机(exchange),不论是否成功,都会执行一个确认回调方法confirmCallback 。

  • 消息从交换机(exchange)到消息队列( queue )投递失败则会执行一个返回回调方法 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

1.1 confirm 确认模式

目标:演示消息确认模式效果

生产者发布消息确认模式特点,不论消息是否进入交换机均执行回调方法

实现过程:

  1. 生产者配置文件中,开启生产者发布消息确认模式

    # 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
    spring.rabbitmq.publisher-confirm-type=correlated
    # 开启return退回模式
    spring.rabbitmq.publisher-returns=true
    
    • 1
    • 2
    • 3
    • 4
  2. 编写生产者确认回调方法

    //发送消息回调确认类,实现回调接口ConfirmCallback,重写其中confirm()方法
    @Component
    public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
        /**
         * 投递到交换机,不论投递成功还是失败都回调次方法
         * @param correlationData 投递相关数据
         * @param ack 是否投递到交换机
         * @param cause 投递失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack){
                System.out.println("消息进入交换机成功{}");
            } else {
                System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
  3. 在RabbitTemplate中,设置消息发布确认回调方法

    @Component
    public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback{
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
        /**
         * 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
         * 设置消息确认回调方法
         * 设置消息回退回调方法
         */
        @PostConstruct
        public void initRabbitTemplate(){
            //设置消息确认回调方法
            rabbitTemplate.setConfirmCallback(this::confirm);
        }
        /**
         * 投递到交换机,不论投递成功还是失败都回调次方法
         * @param correlationData 投递相关数据
         * @param ack 是否投递到交换机
         * @param cause 投递失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack){
                System.out.println("消息进入交换机成功{}");
            } else {
                System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
  4. 测试:
    成功:
    在这里插入图片描述
    失败:
    在这里插入图片描述

1.2 return 退回模式

消息回退模式特点:消息进入交换机,路由到队列过程中出现异常则执行回调方法
实现ReturnCallback接口

实现过程:

  1. 在配置文件中,开启生产者发布消息回退模式

    # 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
    spring.rabbitmq.publisher-returns=true
    
    • 1
    • 2
  2. 在MessageConfirmCallback类中,实现接口RabbitTemplate.ReturnCallback

    @Component
    public class RabbitConfirm implements RabbitTemplate.ConfirmCallback
        ,RabbitTemplate.ReturnCallback {
    	//..省略
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
  3. 并重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法

    /**
         * 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessaged方法
         * @param message 投递消息内容
         * @param replyCode 返回错误状态码
         * @param replyText 返回错误内容
         * @param exchange 交换机名称
         * @param routingKey 路由键
         */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("交换机路由至消息队列出错:>>>>>>>");
            System.out.println("交换机:"+exchange);
            System.out.println("路由键:"+routingKey);
            System.out.println("错误状态码:"+replyCode);
            System.out.println("错误原因:"+replyText);
            System.out.println("发送消息内容:"+message.toString());
            System.out.println("<<<<<<<<");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
  4. 在RabbitTemplate中,设置消息发布回退回调方法

    @PostConstruct
    public void initRabbitTemplate(){
        //设置消息确认回调方法
        rabbitTemplate.setConfirmCallback(this::confirm);
        //设置消息回退回调方法
        rabbitTemplate.setReturnCallback(this::returnedMessage);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

测试:

成功:在这里插入图片描述失败:
在这里插入图片描述

源代码
@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
     * 设置消息确认回调方法
     * 设置消息回退回调方法
     */
    @PostConstruct
    public void initRabbitTemplate() {
        //设置消息确认回调方法
        rabbitTemplate.setConfirmCallback(this::confirm);
        //设置消息退回方法
        rabbitTemplate.setReturnsCallback(this::returnedMessage);
    }
    /**
     * 投递到交换机,不论投递成功还是失败都回调次方法
     *
     * @param correlationData 投递相关数据
     * @param ack             是否投递到交换机
     * @param cause           投递失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息进入交换机成功{}");
        } else {
            System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
        }
    }
    /**
     * 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessage方法
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("交换机路由至消息队列出错:>>>>>>>");
        System.out.println("错误原因:" + returnedMessage.getReplyText());
        System.out.println("发送消息内容:" + returnedMessage.getMessage());
        System.out.println("错误状态码:" + returnedMessage.getReplyCode());
        System.out.println("路由键:" + returnedMessage.getRoutingKey());
        System.out.println("交换机:" + returnedMessage.getExchange());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

1.1.3 小结

确认模式:

  • 设置publisher-confirms=“true” 开启 确认模式。
  • 实现RabbitTemplate.ConfirmCallback接口,重写confirm方法
  • 特点:不论消息是否成功投递至交换机,都回调confirm方法,只有在发送失败时需要写业务代码进行处理。

退回模式

  • 设置publisher-returns=“true” 开启 退回模式。
  • 实现RabbitTemplate.ReturnCallback接口,重写returnedMessage方法
  • 特点:消息进入交换机后,只有当从exchange路由到queue失败,才触发回调returnedMessage方法;

2. 消费者签收消息(ACK)

问题2:如何保证消费者能百分百接收到请求,且业务执行过程中还不能出错!

ack指 Acknowledge,拥有确认的含义,是消费端收到消息的一种确认机制;

消息确认的三种类型:

  • 自动确认:acknowledge=“none

  • 手动确认:acknowledge=“manual

  • 根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。

如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tO0A0lBl-1646208065783)(RabbitMQ%E9%AB%98%E7%BA%A7%20%E8%AE%B2%E4%B9%89.assets/image-20191215164355458.png)]

2.1 代码实现

目标:演示消费者手动确认效果

  • 1.在消费者配置文件中开启ack机制
  • 2.在@RabbitListener消费者监听器方法中加入Message和Channel参数

实现过程:

消费者工程中,创建自定义监听器类CustomAckConsumerListener,实现ChannelAwareMessageListener接口

#rabbitmq启动ack机制,手动确认
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2
  • 3

修改消费者监听器方法
在这里插入图片描述
测试成功案例:
发送请求:
在这里插入图片描述
消费者控制台打印:
在这里插入图片描述
测试失败案例:
修改topic_queue1队列的业务逻辑,让其抛出异常
在这里插入图片描述
在可视化界面也可以看到消息一直在队列中
在这里插入图片描述

  • 如果想手动清楚队列的消息
    点击队列
    在这里插入图片描述
    在这里插入图片描述
源代码
    @RabbitListener(queues = "topic_queue2")
    public void topic2Ack(String msg, Channel channel, Message message) throws IOException {
        System.out.println("=====routingInfo====>" + msg);
        /**
         * 手动拒绝签收
         * 参数1:当前消息的投递标签
         * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
         */
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }


    //ack方式二
    @RabbitListener(queues = "topic_queue1")
    public void routingAck(String msg, Channel channel, Message message) throws Exception {
        System.out.println("=====routingAck====>" + msg);
        try {
            int i = 1 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("receiver success");
        } catch (Exception e) {
            System.out.println("业务逻辑产生异常" + e.getMessage());
            /**
             * 手动拒绝签收
             * 参数1:当前消息的投递标签
             * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
             * 参数3:是否重回队列,true为重回队列,false为不重回
             */
            //消息重回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            System.out.println("receiver fail");
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

总结

后续还会更新TTL,死信队列,延迟队列等内容

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/951380
推荐阅读
相关标签
  

闽ICP备14008679号