当前位置:   article > 正文

SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压_springcloud整合rabbitmq中延时队列+死信队列

springcloud整合rabbitmq中延时队列+死信队列

1、消息可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

rabbitmq 整个消息投递的路径为:

  • producer—>rabbitmq broker—>exchange—>queue—>consumer

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。

  • 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

  • 将利用这两个 callback 控制消息的可靠性投递

因SpringBoot 整合RabbitMQ 当队列或交换机不存在是,自动创建,所以可靠性检测的一般是服务是否宕机。与消费者是否接收/确认消息无无关

1.1、SpringBoot整合

生产端

  • yaml

    spring:
      rabbitmq:
        host: 192.168.0.134
        port: 5672
        username: admin
        password: admin
        virtual-host: /admin
        # 开启publisher-confirm 有以下可选值
        # simple:同步等待confirm结果,直到超时
        # correlated:异步回调,定义ConfirmCallback。mq返回结果时会回调这个ConfirmCallback
        # NONE:默认不开启
        publisher-confirm-type: correlated
        # 开启publish-return功能。可以定义ReturnCallback
        # true:调用ReturnCallback
        # false:直接丢弃消息
        publisher-returns: true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
  • 自定义Callback类

    
    /**
     * 消息推送确认机制配置文件
     * @author codinganhour
     */
    @Component
    public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        /**
         * 初始化方法
         */
        @PostConstruct
        public void initMethod() {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnsCallback(this);
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String s) {
            Integer receivedDelay = null;
            if(null != correlationData){
                correlationData.getReturned().getMessage().getMessageProperties().getReceivedDelay();
            }
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            if (ack) {
                System.out.println("消息已经送达Exchange,ack已发");
            } else {
                System.out.println("消息没有送达Exchange");
            }
        }
    
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.println("消息没有送到队列中");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

2、手动ACK确认机制

在RabbitMQ中指代的是消费者收到消息后确认的一种行为,关注点在于消费者能否实际接收到MQ发送的消息。

  • 自动Ack时,消费者接收消息后立即ack,然后慢慢处理,重启消费者会丢失消息。
  • 手动Ack时,消费者接收消息后,消息状态为 Unacked,如果消费的时候没有手动ack,则mq中的消息总量Total不会减少。

RabbitMQ默认的消息确认机制是:自动确认的

队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。

其提供了三种确认方式:

  • 自动确认acknowledge=“none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。

  • 手动确认acknowledge=“manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向MQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让MQ重新向消费者发送消息都是可以的。

  • 根据异常情况确认acknowledge=“auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等)。这种方式比较麻烦。

1.1、SpringBoot 整合RabbitMQ ACK

消费端

manual方式
  • yaml配置文件
    spring:
      rabbitmq:
        host: 192.168.0.134
        port: 5672
        username: admin
        password: admin
        virtual-host: /admin
        listener:
          # 容器类型simple或direct 简单理解为一对一;direct理解为一对多个消费者
          simple:
            # ACK模式(none,auto,manual,默认为auto)
            acknowledge-mode: manual
            # 开启重试
            retry:
              # 是否开启重试机制
              enabled: true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
  • 消费者

/**
 * @author
 */
@Slf4j
@Component
public class DirectManualListener {


    /**
     * 消息最大重试次数
     */
    private static final int MAX_RETRIES = 3;

    /**
     * 重试间隔(秒)
     */
    private static final long RETRY_INTERVAL = 5;

    /**
     * 手动进入死信队列
     * RabbitListener中的参数用于表示监听的是哪一个队列
     */
    @RabbitListener(queues = DirectManualConfig.DIRECT_QUEUE)
    public void manualListenerQueue(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {

        // 重试次数
        int retryCount = 0;
        boolean success = false;
        // 消费失败并且重试次数<=重试上限次数
        while (!success && retryCount < MAX_RETRIES) {
            retryCount++;
            // 具体业务逻辑
            System.out.println("处理业务逻辑");
            // 如果失败则重试
            if (!success) {
                String errorTip = "第" + retryCount + "次消费失败" +
                        ((retryCount < 3) ? "," + RETRY_INTERVAL + "s后重试" : ",进入死信队列");
                log.error(errorTip);
                Thread.sleep(RETRY_INTERVAL * 1000);
            }
        }
        if (success) {
            // 消费成功,确认
            channel.basicAck(deliveryTag, false);
            log.info("创建订单数据消费成功");
        } else {
            // requeue:false 手动拒绝,进入抛弃或进入死信队列
            channel.basicNack(deliveryTag, false, false);
            log.info("创建订单数据消费失败");
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
auto方式
  • yaml配置文件
    spring:
      rabbitmq:
        host: 192.168.0.134
        port: 5672
        username: admin
        password: admin
        virtual-host: /admin
        listener:
          simple:
            # ACK模式(none,auto,manual,默认为auto)
            acknowledge-mode: auto
            # 开启重试
            retry:
              # 是否开启重试机制
              enabled: true
              # 最大重试次数,默认3
              max-attempts: 5
              # 重试间隔(ms) 默认1秒
              initial-interval: 500
              # 重试因子,默认是1。本次推送时间间隔 = 上一次间隔时间 * multiplier
              multiplier: 2
              # 最大间隔时间(ms),默认10秒
              maxInterval: 20000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
  • 消费者

@Slf4j
@Component
public class DirectAutoListener {


    /**
     * auto手动抛出异常方式进入死信队列,yaml中max-attempts,initial-interval生效
     * RabbitListener中的参数用于表示监听的是哪一个队列
     */
    @RabbitListener(queues = DirectManualConfig.DIRECT_QUEUE)
    public void autoListenerQueue(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {
        log.info("消息信息:"+message+";消息deliveryTag="+deliveryTag);
        Thread.sleep(1000);
        if(deliveryTag != 8){
            throw new RuntimeException("操作异常");
        }else{
            log.info("消息Ack deliveryTag="+deliveryTag);
            channel.basicAck(deliveryTag, false);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

3、延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

场景:

  1. 下单后,30分钟未支付,取消订单,回滚库存。

  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器
    缺点:触发时,会扫描数据库,难以精确定位触发时间,数据量大时数据库承受压力过大;

  2. 延迟队列(TTL+死信队列组合实现延迟队列的效果)
    精确触发,触发时只查询单一数据即可
    在这里插入图片描述

延迟队列
在这里插入图片描述


/**
 * 延迟队列
 * @author
 */
@Slf4j
@Configuration
public class DirectTtlConfig {

    /**
     * direct路由模式-交换机
     */
    public static final String  DIRECT_EXCHANGE = "direct_ttl_exchange";
    /**
     * direct路由模式-队列
     */
    public static final String DIRECT_QUEUE = "direct_ttl_queue";
    /**
     * direct路由模式-路由键
     */
    public static final String DIRECT_ROUTING = "direct.ttl.routing";


    /**
     * direct路由模式-死信交换机
     */
    public static final String  DIRECT_DLX_EXCHANGE = "direct_ttl_dlx_exchange";
    /**
     * direct路由模式-死信队列
     */
    public static final String DIRECT_DLX_QUEUE = "direct_ttl_dlx_queue";
    /**
     * direct路由模式-路由键
     */
    public static final String DIRECT_DLX_ROUTING = "direct.ttl.dlx.routing";

    /**
     * 1、声明交换机
     * direct路由模式,默认持久化,非自动删除
     * @return
     */
    @Bean(DIRECT_EXCHANGE)
    public Exchange directTtlExchange(){

        return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).build();
    }

    /**
     * 2、声明队列
     * direct路由模式
     * @return
     */
    @Bean(DIRECT_QUEUE)
    public Queue directTtlQueue(){
        // ttl:延迟队列时间,超时为消费则进入死信队列中
        // deadLetterExchange:绑定死信交换机
        // deadLetterRoutingKey:绑定死信路由
        return QueueBuilder.durable(DIRECT_QUEUE).ttl(1000).deadLetterExchange(DIRECT_DLX_EXCHANGE).deadLetterRoutingKey(DIRECT_DLX_ROUTING).build();
    }

    /**
     * 3、队列与交换机进行绑定
     * direct路由模式
     * @param queue @Qualifier 将 value 对应的bean 注入到参数中
     * @param exchange @Qualifier 将 value 对应的bean 注入到参数中
     * @return
     */
    @Bean
    public Binding directTtlQueueExchange(@Qualifier(DIRECT_QUEUE) Queue queue, @Qualifier(DIRECT_EXCHANGE) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING).noargs();
    }



    /**
     * 1、声明死信交换机
     * direct路由模式,默认持久化,非自动删除
     * @return
     */
    @Bean(DIRECT_DLX_EXCHANGE)
    public Exchange directDlxExchange(){

        return ExchangeBuilder.directExchange(DIRECT_DLX_EXCHANGE).build();
    }

    /**
     * 2、声明死信队列
     * direct路由模式
     * @return
     */
    @Bean(DIRECT_DLX_QUEUE)
    public Queue directDlxQueue(){
        return QueueBuilder.durable(DIRECT_DLX_QUEUE).build();
    }

    /**
     * 3、死信队列与死信交换机进行绑定
     * direct路由模式
     * @param queue @Qualifier 将 value 对应的bean 注入到参数中
     * @param exchange @Qualifier 将 value 对应的bean 注入到参数中
     * @return
     */
    @Bean
    public Binding directDlxQueueExchange(@Qualifier(DIRECT_DLX_QUEUE) Queue queue, @Qualifier(DIRECT_DLX_EXCHANGE) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DIRECT_DLX_ROUTING).noargs();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107

消费者只需要监听死信队列中消息即可

4、死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
在这里插入图片描述
死信的三种情况:

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(手动ack(auto,manual)都可以触发)

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

死信队列与延期队列实现方式一致,只是会监听2个消费者,正常队列采用ack(auto,manual)触发是否进入死信队列

QueueBuilder.durable(DIRECT_QUEUE).maxLength():队列中等待消费的数量大于maxLength的数量就会进入死信队列

5、消息幂等性保障

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

处理方式

  • 传递消息唯一值记录数据库中或者redis中,消费时判断,防止重复消费

  • 更新数据库时可以采用乐观锁方式,关键字段值发生变化则不消费

6、消息积压

  • 消费者宕机积压
  • 消费者消费能力不足积压
  • 发送者发流量太大

解决方案:上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,再慢慢处理

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/127119
推荐阅读
相关标签
  

闽ICP备14008679号