当前位置:   article > 正文

RocketMQ-消息重试_rocketmq消息java手动重发

rocketmq消息java手动重发

        任何MQ产品都可能存在各种异常,这些异常可能导致消息无法被发送到Broker,或者消息无法被消费者接收到,因此大部分MQ产品都会提供消息失败的重试机制。RocketMQ也不例外,在RocketMQ中消息重试分为生产者端重试和消费者端重试两种类型。

 

生产者端重试

        生产者端重试是指当生产者向Broker发送消息时,如果当前网络抖动等原因导致消息发送失败,此时可以通过手动设置发送失败重试次数的方式让消息重发一次。

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class RockermqproducerApplicationTests {
  4. @Value("${apache.rocketmq.producer.producerGroup}")
  5. private String producerGroup;
  6. /**
  7. * NameServer 地址
  8. */
  9. @Value("${apache.rocketmq.namesrvAddr}")
  10. private String namesrvAddr;
  11. @Test
  12. public void contextLoads() {
  13. //生产者的组名
  14. DefaultMQProducer producer=new DefaultMQProducer(producerGroup);
  15. //指定NameServer地址,多个地址以 ; 隔开
  16. producer.setNamesrvAddr(namesrvAddr);
  17. //消息发送失败重试次数
  18. producer.setRetryTimesWhenSendFailed(3);
  19. //异步发送失败重试次数
  20. producer.setRetryTimesWhenSendAsyncFailed(3);
  21. //消息没有发送成功,是否发送到另外一个Broker中
  22. producer.setRetryAnotherBrokerWhenNotStoreOK(true);
  23. try {
  24. /**
  25. * Producer对象在使用之前必须要调用start初始化,初始化一次即可
  26. * 注意:切记不可以在每次发送消息时,都调用start方法
  27. */
  28. producer.start();
  29. for (int i=0;i<=10000;i++)
  30. {
  31. Message msg=new Message("topic_example_java","TagA",("Hello Java Demo RocketMQ:"+i).getBytes(Charset.defaultCharset()));
  32. SendResult result=producer.send(msg);
  33. System.out.println("消息发送结果:"+result);
  34. }
  35. }catch (Exception e)
  36. {
  37. e.printStackTrace();
  38. }finally {
  39. producer.shutdown();
  40. }
  41. }
  42. }

        可以看到通过org.apache.rocketmq.client.producer.DefaultMQProducer类的setRetryTimesWhenSendFailed和setRetryTimesWhenSendAsyncFailed方法设置了重试次数。这里实现重试逻辑的代码主要在DefaultMQProducerImpl类的sendDefaultImpl方法中。

 

消费者端重试

        消费者端的失败一般分为两种情况,一种是由于网络等原因导致消息没法从Broker发送到消费者端,这时在RocketMQ内部会不断尝试发送这条消息,直到发送成功为止(比如向集群中的一个Broker实例发送失败,就尝试发往另一个Broker实例);二是消费者端已经正常接收到消息了,但是在执行后续消息处理逻辑时发生了异常,最终反馈给MQ消费者处理失败,例如所接收到的消息数据可能不符合本身的业务要求,如当前卡号未激活不能执行业务等,这时就需要通过业务代码返回消息消费的不同状态来控制。

        接下来以普通消费为例,看一下当消费者端出现业务消息消费异常之后时如何进行重试的。下面死在消费者端代码中注册消息监听器的consumeMessage方法最终返回的消息消费状态ConsumeConcurrentlyStatus的定义:

  1. public enum ConsumeConcurrentlyStatus {
  2. CONSUME_SUCCESS,
  3. RECONSUME_LATER;
  4. private ConsumeConcurrentlyStatus() {
  5. }
  6. }

        CONSUME_SUCCESS表示消费成功,这是正常业务代码中返回的状态。RECONSUME_LATER表示当前消费失败,需要稍后进行重试。在RocketMQ中只有业务消费者侧返了CONSUME_SUCCESS才会认为消息消费时成功的,如果返回RECONSUME_LATER,RocketMQ则会认为消费失败,需要重新投递。为了保证消息至少被成功消费一次,RocketMQ会把认为消费失败的消息发回Broker,在接下来的某个时间点(默认是10秒,可修改)再次投递给消费者。如果一直重复消息都失败的话,当失败累积到一定次数后(默认16次)将消息投递到死信队列(Dead Letter Queue)中,此时需要监控死信队列进行人工干预。

  1. @Bean("consumer1")
  2. public DefaultMQPushConsumer consumer1()
  3. {
  4. //创建一个消息消费者,并设置一个消息消费者组
  5. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
  6. //指定 NameServer 地址
  7. consumer.setNamesrvAddr("localhost:9876");
  8. consumer.setInstanceName("PushConsumer1");
  9. //设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
  10. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  11. try {
  12. //订阅指定 Topic 下的所有消息
  13. consumer.subscribe("topic_example_java", "*");
  14. //注册消息监听器
  15. consumer.registerMessageListener(new MessageListenerConcurrently() {
  16. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
  17. //默认 list 里只有一条消息,可以通过设置参数来批量接收消息
  18. if (list != null) {
  19. for (MessageExt ext : list) {
  20. String msgBody=new String(ext.getBody());
  21. if (ext.getReconsumeTimes()==3)
  22. {
  23. saveReconsumeStillMessage(ext);
  24. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  25. }else
  26. {
  27. try {
  28. doBusiness(msgBody);
  29. }catch (Exception e)
  30. {
  31. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  32. }
  33. }
  34. }
  35. }
  36. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  37. }
  38. });
  39. // 消费者对象在使用之前必须要调用 start 初始化
  40. consumer.start();
  41. System.out.println("消息消费者已启动");
  42. }catch (Exception e)
  43. {
  44. e.printStackTrace();
  45. }
  46. return consumer;
  47. }

        示例中,判断当前消息是经过重试3次之后发出的,则不再继续执行业务代码,直接记录消息数据并返回消费成功状态。如果早业务的回调中没有处理好异常返回状态,而是在方法执行过程中抛出异常,那么RocketMQ认为消费也是失败的,会当作RECONSUME_LATER来处理。

       当使用顺序消费的回调(实现了MessageListenerOrderly接口)时,由于顺序消费是只有前一条消息消费成功了才能继续,所以在其消息状态定义(ConsumeOrderlyStatus)中并没有RECONSUME_LATER状态,而是用SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停当前队列的消费动作,直到消息经过不断重试成功为止。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/603881
推荐阅读
相关标签
  

闽ICP备14008679号