当前位置:   article > 正文

RocketMQ 重试机制

rocketmq setretryanotherbrokerwhennotstoreok

RocketMQ 重试机制

消息重试分为2种:Producer端重试和Consumer端重试。

Producer端重试

生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。
这种消息失败重试我们可以手动设置发送失败重试的次数,看一下代码:

  1. package com.rocketmq.demo.retry;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. import org.apache.rocketmq.client.producer.SendResult;
  5. import org.apache.rocketmq.common.message.Message;
  6. import org.apache.rocketmq.remoting.common.RemotingHelper;
  7. public class RetryProducer {
  8. public static void main(String[] args) throws MQClientException, InterruptedException {
  9. DefaultMQProducer producer = new DefaultMQProducer("retry_producer_group");
  10. producer.setNamesrvAddr("127.0.0.1:9876");
  11. // 消息发送失败重试次数
  12. producer.setRetryTimesWhenSendFailed(3);
  13. // 消息没有存储成功是否发送到另外一个broker
  14. producer.setRetryAnotherBrokerWhenNotStoreOK(true);
  15. producer.start();
  16. for (int i = 0; i < 100; i++) {
  17. try {
  18. // Create a message instance, specifying topic, tag and message body.
  19. Message msg = new Message(
  20. "RetryTopicTest" /* Topic */,
  21. "TagA" /* Tag */,
  22. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
  23. );
  24. //Call send message to deliver message to one of brokers.
  25. SendResult sendResult = producer.send(msg, 1000);
  26. System.out.printf("%s%n", sendResult);
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. Thread.sleep(1000);
  30. }
  31. }
  32. // Shut down once the producer instance is not longer in use.
  33. producer.shutdown();
  34. }
  35. }

通过下面这行代码设置重试的次数,

producer.setRetryTimesWhenSendFailed(3);

DefaultMQProducerImpl 的代码实现重试的逻辑,

  1. private SendResult sendDefaultImpl(//
  2. Message msg, //
  3. final CommunicationMode communicationMode, //
  4. final SendCallback sendCallback, //
  5. final long timeout//
  6. ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  7. // ....
  8. if (topicPublishInfo != null && topicPublishInfo.ok()) {
  9. MessageQueue mq = null;
  10. Exception exception = null;
  11. SendResult sendResult = null;
  12. int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
  13. int times = 0;
  14. String[] brokersSent = new String[timesTotal];
  15. // 重试 判断重试的次数
  16. for (; times < timesTotal; times++) {
  17. String lastBrokerName = null == mq ? null : mq.getBrokerName();
  18. MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
  19. if (tmpmq != null) {
  20. mq = tmpmq;
  21. brokersSent[times] = mq.getBrokerName();
  22. try {
  23. // ...
  24. sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
  25. // ...
  26. switch (communicationMode) {
  27. case ASYNC:
  28. return null;
  29. case ONEWAY:
  30. return null;
  31. case SYNC:
  32. if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
  33. // 如果发送失败,是否发送到另外一个broker
  34. if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
  35. continue;
  36. }
  37. }
  38. // 发送消息的结果返回
  39. return sendResult;
  40. default:
  41. break;
  42. }
  43. } catch (RemotingException e) {
  44. //.... 发生了RemotingException异常时,进行重试
  45. continue;
  46. } catch (MQClientException e) {
  47. //....
  48. //.... 发生了MQClientException异常时,进行重试
  49. continue;
  50. } catch (MQBrokerException e) {
  51. //....
  52. switch (e.getResponseCode()) {
  53. case ResponseCode.TOPIC_NOT_EXIST:
  54. case ResponseCode.SERVICE_NOT_AVAILABLE:
  55. case ResponseCode.SYSTEM_ERROR:
  56. case ResponseCode.NO_PERMISSION:
  57. case ResponseCode.NO_BUYER_ID:
  58. case ResponseCode.NOT_IN_CURRENT_UNIT:
  59. continue;
  60. default:
  61. if (sendResult != null) {
  62. return sendResult;
  63. }
  64. throw e;
  65. }
  66. } catch (InterruptedException e) {
  67. //...
  68. throw e;
  69. }
  70. } else {
  71. break;
  72. }
  73. }
  74. if (sendResult != null) {
  75. return sendResult;
  76. }
  77. String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
  78. times,
  79. System.currentTimeMillis() - beginTimestampFirst,
  80. msg.getTopic(),
  81. Arrays.toString(brokersSent));
  82. info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
  83. MQClientException mqClientException = new MQClientException(info, exception);
  84. // ...
  85. throw mqClientException;
  86. }
  87. //...
  88. }

 

Consumer端重试

Consumer端消息消费有两种状态,

  1. public enum ConsumeConcurrentlyStatus {
  2. /**
  3. * Success consumption
  4. */
  5. CONSUME_SUCCESS,
  6. /**
  7. * Failure consumption,later try to consume
  8. */
  9. RECONSUME_LATER;
  10. }

一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER) 。

Consumer为了保证消息消费成功,只有使用方明确表示消费成功,返回CONSUME_SUCCESS,RocketMQ才会认为消息消费成功。

如果消息消费失败,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为消息消费失败了,需要重新投递。

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是一个RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到死信队列(DLQ-Dead Letter Queue)。应用可以监控死信队列来做人工干预。

在启动Broker的过程中,可以观察到如下输出,

2017-12-04 16:29:58 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

你会发现RECONSUME_LATER的策略:如果消费失败,那么1S后再次消费,如果失败,那么5S后,再次消费,…… 直至2H后如果消费还失败,那么该条消息就会终止发送给消费者了!RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要再重试,因为重试解决不了问题了!这该如何做呢?

  1. package com.rocketmq.demo.retry;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import java.util.List;
  10. public class RetryConsumer02 {
  11. public static void main(String[] args) throws Exception {
  12. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RetryConsumerGroup");
  13. consumer.setNamesrvAddr("127.0.0.1:9876");
  14. consumer.setInstanceName("RetryConsumerGroup");
  15. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  16. consumer.subscribe("RetryTopicTest", "*");
  17. consumer.registerMessageListener(new MessageListenerConcurrently() {
  18. @Override
  19. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
  20. for (MessageExt message : messages) {
  21. String msg = new String(message.getBody());
  22. int num = Integer.parseInt(StringUtils.substring(msg, 15));
  23. if (message.getReconsumeTimes() == 3) {
  24. // 如果消息重试了三次,不再重试,把重试次数到达三次的消息可以选择记录下来
  25. System.out.println("Receive message[msgId=" + message.getMsgId() + "],[body=" + msg + "]final failed!!!!");
  26. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  27. } else {
  28. if (num % 2 == 0) {
  29. System.out.println("Receive message[msgId=" + message.getMsgId() + "],[body=" + msg + "]," +
  30. "[reconsumeTimes=" + message.getReconsumeTimes() + "]failed!!!!");
  31. return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
  32. }
  33. System.out.println("Receive message[msgId=" + message.getMsgId() + "],[body=" + msg + "]");
  34. }
  35. }
  36. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  37. }
  38. });
  39. // Launch consumer
  40. consumer.start();
  41. }
  42. }

打印日志,

  1. Receive message[msgId=0A63209364C214DAD5DC1316E3720000],[body=Hello RocketMQ 0],[reconsumeTimes=0]failed!!!!
  2. Receive message[msgId=0A63209364C214DAD5DC1316E3810001],[body=Hello RocketMQ 1]
  3. Receive message[msgId=0A63209364C214DAD5DC1316E3720000],[body=Hello RocketMQ 0],[reconsumeTimes=1]failed!!!!
  4. Receive message[msgId=0A63209364C214DAD5DC1316E3720000],[body=Hello RocketMQ 0],[reconsumeTimes=2]failed!!!!
  5. Receive message[msgId=0A63209364C214DAD5DC1316E3720000],[body=Hello RocketMQ 0]final failed!!!!

消息在重试了三次之后,最后一次返回CONSUME_SUCCESS,此后便不在发送。

注:

  1. 如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
  2. 当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

参考:

http://blog.csdn.net/zhanglianhai555/article/details/77162208

https://zhuanlan.zhihu.com/p/25265380?refer=rocketmq

==============END==============

转载于:https://my.oschina.net/xinxingegeya/blog/1584617

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/604023
推荐阅读
相关标签
  

闽ICP备14008679号