当前位置:   article > 正文

浅析RabbitMQ死信队列_rabbitmq 死信队列

rabbitmq 死信队列

原文首发于公众号【程序员三木】

在现代分布式系统中,消息队列扮演着至关重要的角色。它们可以实现应用程序之间的异步通信,并确保数据的可靠传输和处理。而在这个领域中,RabbitMQ作为一种强大而受欢迎的消息队列解决方案,具备了高效、可靠和灵活的特性。

然而,即使使用了RabbitMQ,我们仍然会遇到一些不可预料的情况,例如消费者无法处理某些消息、消息过期或者队列溢出等。为了解决这些问题,RabbitMQ引入了死信队列(Dead Letter Queue)的概念,为开发人员提供了一种有效的错误处理机制。

那么,究竟什么是死信队列呢?

本文结合Spring Boot使用RabbitMQ的死信队列,着重从是什么、为什么、怎么用几个方面对死信队列进行简单介绍。

1. 是什么:

  • 死信队列(Dead Letter Queue)是一种特殊的消息队列,用于存储无法被消费的消息。
  • 当消息满足某些条件无法被正常消费时,将被发送到死信队列中进行处理。
  • 死信队列提供了一种延迟处理、异常消息处理等场景的解决方案。

2. 为什么

  • 用来处理消费者无法正确处理的消息,避免消息丢失或积压
  • 实现延迟消息处理,例如订单超时未支付,可以将该消息发送到死信队列,然后再进行后续处理。
  • 用于实现消息重试机制,当消费者处理失败时,将消息重新发送到死信队列进行重试。
  • 提高了系统的可伸缩性和容错性,能够应对高并发和异常情况。

3. 怎么用

  1. 在Spring Boot中配置和使用死信队列:
    • 首先,在pom.xml文件中添加RabbitMQ的依赖项。
    • 然后,在application.properties文件中配置RabbitMQ连接信息。
    • 接下来,创建生产者和消费者代码,并通过注解将队列和交换机进行绑定。
    • 在队列的声明中添加死信队列的相关参数,如x-dead-letter-exchangex-dead-letter-routing-key等。
    • 最后,在消费者中编写处理消息的逻辑,包括对异常消息进行处理,并设置是否重新发送到死信队列。

简而言之,死信队列可以认为是一个正常队列的备用队列(或者说是兜底队列),当正常队列的消息无法消费的时候mq会重新把该消息发送到死信交换机,由死信交换机根据路由键将消息投递到备用队列,启动服务备用方案。

消息从正常队列到死信队列的三种情况:

1、消息被否定确认使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false

2、消息在队列中的时间超过了设置的TTL())时间。

3、消息数量超过了队列的容量限制()。

当一个队列中的消息满足上述三种情况任一个时,改消息就会从原队列移至死信队列,若改队列没有绑定死信队列则消息被丢弃。

4. 实战

以下是一个简单的Spring Boot集成RabbitMQ的死信队列示例代码:

  • 配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
# 开启消费者手动确认
spring.rabbitmq.listener.type=direct

# 发送到队列失败时的手动处理
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.publisher-returns=true

# 发送到交换机手动确认
spring.rabbitmq.publisher-confirm-type=simple
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 配置类
@Configuration
@Slf4j
public class RabbitCof {

    @Resource
    private MqKeys mqKeys;

    @Bean("normalQueue")
    public Queue normalQueue() {
        /**
         * 为普通队列绑定交换机
         */
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", mqKeys.DIE_EXCHANGE);
        args.put("x-dead-letter-routing-key", mqKeys.DIE_ROUTING_KEY);
        args.put("x-message-ttl", 1000); // 队列中的消息未被消费则1秒后过期
        return new Queue(mqKeys.NORMAL_QUEUE, true, false, false, args);
    }

    @Bean("normalExchange")
    public Exchange normalExchange() {
        return new DirectExchange(mqKeys.NORMAL_EXCHANGE);
    }

    @Bean("normalBind")
    public Binding normalBinding(@Qualifier("normalQueue") Queue normalQueue, @Qualifier("normalExchange") Exchange normalExchange) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(mqKeys.ROUTING_KEY).noargs();
    }

    /**
     * 死信队列
     * @return
     */
    @Bean("dieQueue")
    public Queue dlQueue() {
        return new Queue(mqKeys.DIE_QUEUE, true, false, false);
    }

    /**
     * 死信交换机
     * @return
     */
    @Bean("dieExchange")
    public Exchange dlExchange() {
        return new DirectExchange(mqKeys.DIE_EXCHANGE);
    }

    @Bean("dieBind")
    public Binding dlBinding(@Qualifier("dieQueue") Queue dlQueue, @Qualifier("dieExchange") Exchange dlExchange) {
        return BindingBuilder.bind(dlQueue).to(dlExchange).with(mqKeys.DIE_ROUTING_KEY).noargs();
    }



    @Resource
    private ConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {

        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        /**
         * 消费者确认收到消息后,手动ack回调处理
         * spring.rabbitmq.publisher-confirm-type=simple
         */
        rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause)->{
            if(!ack) {
                log.info("消息投递到交换机失败,correlationData={} ,ack={}, cause={}", correlationData == null ? "null" : correlationData.getId(), ack, cause);
            } else {
                log.info("消息成功投递到交换机,correlationData={} ,ack={}, cause={}", correlationData == null ? "null" : correlationData.getId(), ack, cause);
            }
        });

        /**
         * 消息投递到队列失败回调处理
         * spring.rabbitmq.listener.direct.acknowledge-mode=manual
         * spring.rabbitmq.publisher-returns=true
         */
        rabbitTemplate.setReturnsCallback((returnedMessage)->{
            Message message = returnedMessage.getMessage();
            log.error("分发到到队列失败, body->{}", message.getBody());
        });
        return rabbitTemplate;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 生产者类
@Component
public class Producer {

    @Resource
    private  MqKeys mqKeys;

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend(mqKeys.NORMAL_EXCHANGE, mqKeys.ROUTING_KEY, message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 消费者类
@Component
@RabbitListener(queues = "normal.queue")
@Slf4j
public class Consumer {

    @RabbitHandler
    public void handleMessage(String data, Message message, Channel channel) {
        boolean success = false;
        int retryCount = 3;
        System.out.println(message.toString());
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        while (!success && retryCount-- > 0){
            try {
                // 处理消息
                log.info("收到消息: {}, deliveryTag = {}", data, deliveryTag);
                // 正常处理完毕,手动确认,此处不确认让他进入死信队列
//                success = true;
//                channel.basicAck(deliveryTag, false);
                Thread.sleep(3 * 1000L);
            }catch (Exception e){
                log.error("程序异常:{}", e.getMessage());
            }
        }
        // 达到最大重试次数后仍然消费失败
        if(!success){
            try {
                log.info("move to die queue");
                // 手动拒绝,移至死信队列
                /**
                 *
                 deliveryTag – the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Deliver
                 multiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag.
                 requeue – true if the rejected message(s) should be requeued rather than discarded/dead-lettered
                 */
                channel.basicNack(deliveryTag, false, false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

以上代码演示了如何在Spring Boot中配置一个普通队列和一个死信队列,然后通过生产者发送消息到普通队列,在消费者中处理消息,并模拟了当发生异常时将消息重新发送到死信队列。

参考连接

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/569729
推荐阅读
相关标签
  

闽ICP备14008679号