当前位置:   article > 正文

rabbitmq消息确认机制_rabbitmq手动确认机制

rabbitmq手动确认机制

在这里插入图片描述

一、发送端消息确认

(1)publish ===> broker
只要broker收到消息,就会执行 confirmCallback

# 开启发送端消息抵达Broker确认 
spring.rabbitmq.publisher-confirms=true
  • 1
  • 2

(2)exchange ===> queue
如果exchange有消息没有成功发送至queue,就会执行RuturnCallback,例:routing key错误导致发送消息到队列失败

# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
  • 1
  • 2
  • 3
  • 4

(3)RabbitmqConfig

/**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     */
    // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

二、消费端消息确认

在这里插入图片描述
(1) queue ===> consumer
默认是ack,consumer只要拿到消息就会自动确认,服务端就会删除queue中的消息,如果业务出现问题只有部分消息签收成功,剩余未签收的消息也会删除,为了能保存消息,需要设置为客户端手动确认签收

#手动签收ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2

(2) 签收消息channel.basicAck(自增id,是否批量)

设置手动ack签收之后,如果有消息没有签收(签收失败),会显示未签收
在这里插入图片描述

自增id:2,签收完成2
自增id:4,签收完成4

  • 1
  • 2
  • 3

如果服务重启,未签收的消息就会加入queue重新发送:Unacked ===> Ready

在这里插入图片描述

(3)拒签 channel.basicNack(自增id,是否批量,是否重新加入队列)
requeue=true 拒签的消息会加入queue重新发送
requeue=false 拒签的消息会直接丢弃,不会加入队列重新发送

@RabbitListener(queues = {"durunwu.queue"})
@Service("orderService")
public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> implements OrderService {

    @RabbitHandler
    public void msgListener2(Message message, OrderItemEntity orderItemEntity, Channel channel) throws IOException {
        //顺序id,channel内按顺序自增了
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        if (deliveryTag % 2 == 0){
            //basicAck(long deliveryTag, boolean multiple)
            //channel内按顺序自增 
            channel.basicAck(deliveryTag,false);
            System.out.println("自增id:"+deliveryTag+",签收完成" + orderItemEntity.getId());
        }else {
            //basicNack(long deliveryTag, boolean multiple, boolean requeue)
            channel.basicNack(deliveryTag,false,true);
            System.out.println("自增id:"+deliveryTag+",拒签"+orderItemEntity.getId());
        }

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

控制台打印

自增id:1,拒签1
自增id:2,签收完成2
自增id:3,拒签3
自增id:4,签收完成4
自增id:5,拒签5
自增id:6,签收完成1
自增id:7,拒签3
自增id:8,签收完成5
自增id:9,拒签3
自增id:10,签收完成3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

(4)整合业务
如何签收?
业务完成 ===> 签收:basicAck(deliveryTag, false)
业务未完成 ===> 拒签:basicNack(deliveryTag, false, true)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/385790
推荐阅读
相关标签
  

闽ICP备14008679号