当前位置:   article > 正文

RocketMQ 死信队列_rocketmq死信队列

rocketmq死信队列

RocketMQ 死信队列

死信队列

死信队列是什么?

死信队列指的是种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

在RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列

可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:

%RETRY%消费组名称(重试Topic)

%DLQ%消费组名称(死信Topic)

死信队列也可以被订阅和消费,并且也会过期RocketMQ 中

其中包括重试之后也无法消费的消息也会

死信队列应用场景

如我们平时下单后未在指定时间内付款,过来这个时间,我们的订单会被放入死信队列中。当我们再去付款时候,会发现订单已经被取消,此时我们只需要去死信队列中查该订单是否存在。

如当一些消息出现异常迟迟未被消费(或者最大重试次数后也未成功消费),这时候就会将消息存放到死信队列中。

示例

这里我们定义生产者

     // 实例化生产者,并指定生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("myproducer_group_topic_name_dle_01");


        //设置实例名称,一个jvm中有多个生产者可以根据实例名区分
        //默认default
        producer.setInstanceName("topic_name_dle");

        // 指定nameserver的地址
        producer.setNamesrvAddr("localhost:9876");

        //设置同步重试次数
        producer.setRetryTimesWhenSendFailed(2);
        //设置异步发送次数
        //producer.setRetryTimesWhenSendAsyncFailed(2);


        // 初始化生产者
        producer.start();

        for (int i = 0; i <10 ; i++) {
            Message message = new Message("topic_name_dle", ("key=" + i).getBytes("utf-8"));
            // 1 同步发送  如果发送失败会根据重试次数重试
            SendResult send = producer.send(message);
            SendStatus sendStatus = send.getSendStatus();
            System.out.println(sendStatus.toString());

        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

消费者

这里默认返回消息消费失败,指定消费者重试一次。

  /**
         * 推消息消费
         */
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("consumer_group_topic_name_dle_01");
        // 指定nameserver的地址
        defaultMQPushConsumer.setNamesrvAddr("localhost:9876");

        //
        defaultMQPushConsumer.subscribe("topic_name_dle", "*");
        /**
         * 推送消息 提高消费处理能力
         * 1 提高消费并行度
         * 2 以批量方式进行 消费
         * 3 检测延时情况,跳过非重要消息
         */
        //消费限流 只针对推送来设置,拉取消息自己控制
        // 1 提高消费并行度
        defaultMQPushConsumer.setConsumeThreadMax(10);
        defaultMQPushConsumer.setConsumeThreadMin(1);

        // 2 以批量方式进行 消费
        // 设置消息批处理的一个批次中消息的最大个数
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);

        //设置重试次数 默认16次
        defaultMQPushConsumer.setMaxReconsumeTimes(1);

        // 添加消息监听器,一旦有消息推送过来,就进行消费
        defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                //final MessageQueue messageQueue = context.getMessageQueue();
                for (MessageExt msg : msgs) {
                    System.out.println(msg);
                    try {
                        System.out.println(new String(msg.getBody(), "utf-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }

                // 消息消费成功
                //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                //null  也表示推送失败,会进行重试
                return null;
                // 消息消费失败
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

启动消费者和生产者之后,消费者可以看到,消息重发了一次,这里图没截全。
在这里插入图片描述

RocketMq 可视化工具:rocketmq-console下载地址:

https://github.com/apache/rocketmq-externals/archive/rocketmq-console-1.0.0.zip

下载成功带入idea 将配置文件改成自己的地址,然后启动

在这里插入图片描述

可以从控制台中看到,没有被正常消费的消息被发送到死信队列中

这里与RocketMQ不同的是RabbitMQ需要自己定义一个队列与交换机绑定,没有被成功消费会将消息发送到自己创建的死信队列中去,而RocketMQ不需要我们自己去指定死信队列,会自己根据重试次数以及消息是否消费成功,将消息发送到死信队列(不需要我们去创建)。

死信队列
在这里插入图片描述
重试的队列以
可以看到队列名前会%RETRY%前缀 表示是重试队列
在这里插入图片描述

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号