RocketMQ 重试机制
消息重试分为2种:Producer端重试和Consumer端重试。
Producer端重试
生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。
这种消息失败重试我们可以手动设置发送失败重试的次数,看一下代码:
- package com.rocketmq.demo.retry;
-
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
-
- public class RetryProducer {
-
- public static void main(String[] args) throws MQClientException, InterruptedException {
- DefaultMQProducer producer = new DefaultMQProducer("retry_producer_group");
- producer.setNamesrvAddr("127.0.0.1:9876");
- // 消息发送失败重试次数
- producer.setRetryTimesWhenSendFailed(3);
- // 消息没有存储成功是否发送到另外一个broker
- producer.setRetryAnotherBrokerWhenNotStoreOK(true);
- producer.start();
-
- for (int i = 0; i < 100; i++) {
- try {
- // Create a message instance, specifying topic, tag and message body.
- Message msg = new Message(
- "RetryTopicTest" /* Topic */,
- "TagA" /* Tag */,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
- );
- //Call send message to deliver message to one of brokers.
- SendResult sendResult = producer.send(msg, 1000);
- System.out.printf("%s%n", sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- Thread.sleep(1000);
- }
- }
- // Shut down once the producer instance is not longer in use.
- producer.shutdown();
- }
- }
通过下面这行代码设置重试的次数,
producer.setRetryTimesWhenSendFailed(3);
DefaultMQProducerImpl 的代码实现重试的逻辑,
- private SendResult sendDefaultImpl(//
- Message msg, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, //
- final long timeout//
- ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-
- // ....
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- MessageQueue mq = null;
- Exception exception = null;
- SendResult sendResult = null;
- int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
- int times = 0;
- String[] brokersSent = new String[timesTotal];
-
- // 重试 判断重试的次数
- for (; times < timesTotal; times++) {
- String lastBrokerName = null == mq ? null : mq.getBrokerName();
- MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
- if (tmpmq != null) {
- mq = tmpmq;
- brokersSent[times] = mq.getBrokerName();
- try {
- // ...
- sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
- // ...
- switch (communicationMode) {
- case ASYNC:
- return null;
- case ONEWAY:
- return null;
- case SYNC:
- if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
- // 如果发送失败,是否发送到另外一个broker
- if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
- continue;
- }
- }
- // 发送消息的结果返回
- return sendResult;
- default:
- break;
- }
- } catch (RemotingException e) {
- //.... 发生了RemotingException异常时,进行重试
- continue;
- } catch (MQClientException e) {
- //....
- //.... 发生了MQClientException异常时,进行重试
- continue;
- } catch (MQBrokerException e) {
- //....
- switch (e.getResponseCode()) {
- case ResponseCode.TOPIC_NOT_EXIST:
- case ResponseCode.SERVICE_NOT_AVAILABLE:
- case ResponseCode.SYSTEM_ERROR:
- case ResponseCode.NO_PERMISSION:
- case ResponseCode.NO_BUYER_ID:
- case ResponseCode.NOT_IN_CURRENT_UNIT:
- continue;
- default:
- if (sendResult != null) {
- return sendResult;
- }
-
- throw e;
- }
- } catch (InterruptedException e) {
- //...
- throw e;
- }
- } else {
- break;
- }
- }
-
- if (sendResult != null) {
- return sendResult;
- }
-
- String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
- times,
- System.currentTimeMillis() - beginTimestampFirst,
- msg.getTopic(),
- Arrays.toString(brokersSent));
-
- info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
-
- MQClientException mqClientException = new MQClientException(info, exception);
- // ...
- throw mqClientException;
- }
-
- //...
- }
Consumer端重试
Consumer端消息消费有两种状态,
- public enum ConsumeConcurrentlyStatus {
- /**
- * Success consumption
- */
- CONSUME_SUCCESS,
- /**
- * Failure consumption,later try to consume
- */
- RECONSUME_LATER;
- }
一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER) 。
Consumer为了保证消息消费成功,只有使用方明确表示消费成功,返回CONSUME_SUCCESS,RocketMQ才会认为消息消费成功。
如果消息消费失败,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为消息消费失败了,需要重新投递。
为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是一个RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到死信队列(DLQ-Dead Letter Queue)。应用可以监控死信队列来做人工干预。
在启动Broker的过程中,可以观察到如下输出,
2017-12-04 16:29:58 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
你会发现RECONSUME_LATER的策略:如果消费失败,那么1S后再次消费,如果失败,那么5S后,再次消费,…… 直至2H后如果消费还失败,那么该条消息就会终止发送给消费者了!RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要再重试,因为重试解决不了问题了!这该如何做呢?
- package com.rocketmq.demo.retry;
-
- import org.apache.commons.lang3.StringUtils;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.apache.rocketmq.common.message.MessageExt;
-
- import java.util.List;
-
- public class RetryConsumer02 {
-
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RetryConsumerGroup");
-
- consumer.setNamesrvAddr("127.0.0.1:9876");
- consumer.setInstanceName("RetryConsumerGroup");
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
- consumer.subscribe("RetryTopicTest", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
- for (MessageExt message : messages) {
- String msg = new String(message.getBody());
- int num = Integer.parseInt(StringUtils.substring(msg, 15));
- if (message.getReconsumeTimes() == 3) {
- // 如果消息重试了三次,不再重试,把重试次数到达三次的消息可以选择记录下来
- System.out.println("Receive message[msgId=" + message.getMsgId() + "],[body=" + msg + "]final failed!!!!");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- } else {
- if (num % 2 == 0) {
- System.out.println("Receive message[msgId=" + message.getMsgId() + "],[body=" + msg + "]," +
- "[reconsumeTimes=" + message.getReconsumeTimes() + "]failed!!!!");
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
- }
- System.out.println("Receive message[msgId=" + message.getMsgId() + "],[body=" + msg + "]");
- }
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- // Launch consumer
- consumer.start();
- }
- }
打印日志,
- Receive message[msgId=0A63209364C214DAD5DC1316E3720000],[body=Hello RocketMQ 0],[reconsumeTimes=0]failed!!!!
- Receive message[msgId=0A63209364C214DAD5DC1316E3810001],[body=Hello RocketMQ 1]
- Receive message[msgId=0A63209364C214DAD5DC1316E3720000],[body=Hello RocketMQ 0],[reconsumeTimes=1]failed!!!!
- Receive message[msgId=0A63209364C214DAD5DC1316E3720000],[body=Hello RocketMQ 0],[reconsumeTimes=2]failed!!!!
- Receive message[msgId=0A63209364C214DAD5DC1316E3720000],[body=Hello RocketMQ 0]final failed!!!!
消息在重试了三次之后,最后一次返回CONSUME_SUCCESS,此后便不在发送。
注:
- 如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
- 当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。
参考:
http://blog.csdn.net/zhanglianhai555/article/details/77162208
https://zhuanlan.zhihu.com/p/25265380?refer=rocketmq
==============END==============