当前位置:   article > 正文

RocketMQ发送及消费重试机制_rocketmq 广播模式不支持消费重试怎么办

rocketmq 广播模式不支持消费重试怎么办

发送重试机制:

        RocketMQ支持发送失败内部重试,默认是2次,在异步、SendOneWay模式下不支持重试。

消费重试机制:

        RocketMQ在消费失败时,支持消费端重试继续消费消息,默认支持16次重试,每次重试的时间间隔增加,只有在Cluster模式支持重试、广播模式不支持重试。

        在重试期间,消息的key和ID不会发生改变,应用程序可以以此做好幂等性控制。

        在重试期间,还可以继续消费其它新消息。

 

  1. package com.tech.rocketmq.jms;
  2. /**
  3. * @author lw
  4. * @since 2021/11/15
  5. */
  6. public class JmsConfig {
  7. public static final String NAME_SERVER = "192.168.50.135:9876;192.168.50.136:9876";
  8. public static final String TOPIC = "tech_pay_test_topic_aaa";
  9. }
  1. package com.tech.rocketmq.jms;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author lw
  7. * @since 2021/11/15
  8. */
  9. @Component
  10. public class PayProducer {
  11. private String producerGroup="pay_group";
  12. private DefaultMQProducer producer;
  13. public PayProducer(){
  14. producer=new DefaultMQProducer(producerGroup);
  15. //指定NameServer地址,多个地址以;隔开
  16. //如producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877")
  17. producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
  18. //生产者投递消息重试次数(内部重试),默认是2次,异步和SendOneWay下配置无效
  19. producer.setRetryTimesWhenSendFailed(3);
  20. start();
  21. }
  22. public DefaultMQProducer getProducer() {
  23. return producer;
  24. }
  25. /**
  26. * 对象使用之前必须调用一次,只能初始化一次
  27. */
  28. private void start() {
  29. try {
  30. this.producer.start();
  31. } catch (MQClientException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. /**
  36. * 一般在应用上下文,使用上下文监听器,进行关闭
  37. */
  38. private void shutDown(){
  39. this.producer.shutdown();
  40. }
  41. }
  1. package com.tech.rocketmq.controller;
  2. import com.tech.rocketmq.jms.JmsConfig;
  3. import com.tech.rocketmq.jms.PayProducer;
  4. import org.apache.rocketmq.client.exception.MQBrokerException;
  5. import org.apache.rocketmq.client.exception.MQClientException;
  6. import org.apache.rocketmq.client.producer.SendResult;
  7. import org.apache.rocketmq.common.message.Message;
  8. import org.apache.rocketmq.remoting.exception.RemotingException;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.web.bind.annotation.GetMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import java.util.HashMap;
  13. /**
  14. * @author lw
  15. * @since 2021/11/15
  16. */
  17. @RestController
  18. public class PayController {
  19. @Autowired
  20. private PayProducer payProducer;
  21. @GetMapping("send")
  22. Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  23. // Message message = new Message(JmsConfig.TOPIC, "taga", ("hello word = " + text).getBytes());
  24. //发送消息时,指定消息的key,比如订单编号
  25. Message message = new Message(JmsConfig.TOPIC, "taga", "666", ("hello word = " + text).getBytes());
  26. SendResult sendResult = payProducer.getProducer().send(message);
  27. System.out.println(sendResult);
  28. return new HashMap<>();
  29. }
  30. }
  1. package com.tech.rocketmq.jms;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import org.apache.rocketmq.client.exception.MQClientException;
  8. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  9. import org.apache.rocketmq.common.message.Message;
  10. import org.apache.rocketmq.common.message.MessageExt;
  11. import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  12. import org.springframework.stereotype.Component;
  13. import java.util.List;
  14. /**
  15. * @author lw
  16. * @since 2021/11/15
  17. */
  18. @Slf4j
  19. @Component
  20. public class PayConsumer {
  21. private DefaultMQPushConsumer consumer;
  22. private String consumerGroup = "pay_consumer_group";
  23. public PayConsumer() throws MQClientException {
  24. consumer = new DefaultMQPushConsumer(consumerGroup);
  25. consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
  26. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  27. //默认是集群模式,如果改为广播模式不支持消费端重试
  28. // consumer.setMessageModel(MessageModel.BROADCASTING);
  29. consumer.subscribe(JmsConfig.TOPIC, "*");
  30. consumer.registerMessageListener(new MessageListenerConcurrently() {
  31. @Override
  32. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  33. MessageExt message = list.get(0);
  34. int reconsumeTimes = message.getReconsumeTimes();
  35. log.info("重试次数:{}", reconsumeTimes);
  36. try {
  37. log.info("Receive New Message: {}", new String(message.getBody()));
  38. String topic = message.getTopic();
  39. String tags = message.getTags();
  40. String keys = message.getKeys();
  41. if(keys.equals("666")){
  42. throw new Exception("模拟异常");
  43. }
  44. log.info("topic={} tags={} keys={}", topic, tags, keys);
  45. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  46. } catch (Exception e) {
  47. log.error("消费异常",e);
  48. if(reconsumeTimes>=2){
  49. log.info("重试次数大于等于2,记录数据库,发短信通知开发人员或者运营人员");
  50. //告诉broker,本次消费成功
  51. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  52. }
  53. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  54. }
  55. }
  56. });
  57. consumer.start();
  58. System.out.println("consumer start ...");
  59. }
  60. }

当重试2次,给broker回复 CONSUME_SUCCESS 不再重试。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/603768
推荐阅读
相关标签
  

闽ICP备14008679号