赞
踩
死信队列是什么?
死信队列指的是种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
在RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列。
可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:
%RETRY%消费组名称(重试Topic)
%DLQ%消费组名称(死信Topic)
死信队列也可以被订阅和消费,并且也会过期RocketMQ 中
其中包括重试之后也无法消费的消息也会
如我们平时下单后未在指定时间内付款,过来这个时间,我们的订单会被放入死信队列中。当我们再去付款时候,会发现订单已经被取消,此时我们只需要去死信队列中查该订单是否存在。
如当一些消息出现异常迟迟未被消费(或者最大重试次数后也未成功消费),这时候就会将消息存放到死信队列中。
这里我们定义生产者
// 实例化生产者,并指定生产组名称 DefaultMQProducer producer = new DefaultMQProducer("myproducer_group_topic_name_dle_01"); //设置实例名称,一个jvm中有多个生产者可以根据实例名区分 //默认default producer.setInstanceName("topic_name_dle"); // 指定nameserver的地址 producer.setNamesrvAddr("localhost:9876"); //设置同步重试次数 producer.setRetryTimesWhenSendFailed(2); //设置异步发送次数 //producer.setRetryTimesWhenSendAsyncFailed(2); // 初始化生产者 producer.start(); for (int i = 0; i <10 ; i++) { Message message = new Message("topic_name_dle", ("key=" + i).getBytes("utf-8")); // 1 同步发送 如果发送失败会根据重试次数重试 SendResult send = producer.send(message); SendStatus sendStatus = send.getSendStatus(); System.out.println(sendStatus.toString()); }
消费者
这里默认返回消息消费失败,指定消费者重试一次。
/** * 推消息消费 */ DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("consumer_group_topic_name_dle_01"); // 指定nameserver的地址 defaultMQPushConsumer.setNamesrvAddr("localhost:9876"); // defaultMQPushConsumer.subscribe("topic_name_dle", "*"); /** * 推送消息 提高消费处理能力 * 1 提高消费并行度 * 2 以批量方式进行 消费 * 3 检测延时情况,跳过非重要消息 */ //消费限流 只针对推送来设置,拉取消息自己控制 // 1 提高消费并行度 defaultMQPushConsumer.setConsumeThreadMax(10); defaultMQPushConsumer.setConsumeThreadMin(1); // 2 以批量方式进行 消费 // 设置消息批处理的一个批次中消息的最大个数 defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10); //设置重试次数 默认16次 defaultMQPushConsumer.setMaxReconsumeTimes(1); // 添加消息监听器,一旦有消息推送过来,就进行消费 defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //final MessageQueue messageQueue = context.getMessageQueue(); for (MessageExt msg : msgs) { System.out.println(msg); try { System.out.println(new String(msg.getBody(), "utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } // 消息消费成功 //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //null 也表示推送失败,会进行重试 return null; // 消息消费失败 // return ConsumeConcurrentlyStatus.RECONSUME_LATER; } });
启动消费者和生产者之后,消费者可以看到,消息重发了一次,这里图没截全。
RocketMq 可视化工具:rocketmq-console
下载地址:
https://github.com/apache/rocketmq-externals/archive/rocketmq-console-1.0.0.zip
下载成功带入idea 将配置文件改成自己的地址,然后启动
可以从控制台中看到,没有被正常消费的消息被发送到死信队列中
这里与RocketMQ不同的是RabbitMQ需要自己定义一个队列与交换机绑定,没有被成功消费会将消息发送到自己创建的死信队列中去,而RocketMQ不需要我们自己去指定死信队列,会自己根据重试次数以及消息是否消费成功,将消息发送到死信队列(不需要我们去创建)。
死信队列
重试的队列以
可以看到队列名前会%RETRY%前缀 表示是重试队列
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。