当前位置:   article > 正文

SpringBoot优雅的封装不同研发环境下(环境隔离)RocketMq自动ack和手动ack

SpringBoot优雅的封装不同研发环境下(环境隔离)RocketMq自动ack和手动ack

1. RocketMq的maven依赖版本:

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.3.0</version>
  5. </dependency>

2.RocketMq的yml文件:

  1. # 自定义属性
  2. system:
  3. environment:
  4. # 隔离环境名称,拼接到topic后,xxx_topic_tianxin,默认空字符串
  5. name: dev
  6. # 启动隔离,会自动在topic上拼接激活的配置文件,达到自动隔离的效果
  7. # 默认为true,配置类:EnvironmentIsolationConfig
  8. isolation: true
  9. rocketmq:
  10. # 多个NameServer,host:port;host:port,RocketMQProperties
  11. nameServer: 你的NameServer
  12. producer:
  13. # 发o送同一类消息的设置为同一个grup,保证唯一
  14. group: logistics_group
  15. # 发送消息失败重试次数,默认2
  16. retryTimesWhenSendFailed: 2
  17. # 异步消息重试此处,默认2
  18. retryTimesWhenSendAsyncFailed: 2
  19. # 发送消息超时时间,默认3000
  20. sendMessageTimeout: 10000
  21. # 消息最大长度,默认1024 * 1024 * 4(默认4M)
  22. maxMessageSize: 4096
  23. # 压缩消息阈值,默认4k(1024 * 4)
  24. compressMessageBodyThreshold: 4096
  25. # 是否在内部发送失败时重试另一个broker,默认false
  26. retryNextServer: false
  27. # access-key
  28. accessKey: 你的access-key
  29. # secret-key
  30. secretKey: 你的secret-key
  31. # 是否启用消息跟踪,默认false
  32. enableMsgTrace: false
  33. # 消息跟踪主题的名称值。如果不进行配置,可以使用默认的跟踪主题名称
  34. customizedTraceTopic: RMQ_SYS_TRACE_TOPIC
  35. consumer:
  36. # 指定消费组
  37. group: logistics_group
  38. #广播消费模式 CLUSTERING(集群消费)、BROADCASTING(广播消费)
  39. messageModel: CLUSTERING
  40. #设置消费超时时间(分钟)
  41. consumeTimeout: 1
  42. # 最大重试次数,默认16
  43. maxReconsumeTimes: 3
  44. # 其他配置参考属性类

3 IsolationConfig读取yml文件配置

  1. package com.logistics.common.rocketMq.config;
  2. import lombok.Data;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @author: 吴顺杰
  7. * @create: 2024-06-18 10:01
  8. * @Description:
  9. * RocketMQ多环境隔离配置
  10. * 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉
  11. */
  12. @Configuration
  13. @Data
  14. public class IsolationConfig {
  15. @Value("${system.environment.isolation:true}")
  16. private boolean enabledIsolation;
  17. @Value("${system.environment.name:''}")
  18. private String environmentName;
  19. @Value("${rocketmq.nameServer:''}")
  20. private String nameServer;
  21. @Value("${rocketmq.consumer.group:''}")
  22. private String group;
  23. @Value("${rocketmq.consumer.messageModel:''}")
  24. private String messageModel;
  25. @Value("${rocketmq.consumer.consumeTimeout:''}")
  26. private int consumeTimeout;
  27. @Value("${rocketmq.consumer.maxReconsumeTimes:''}")
  28. private int maxReconsumeTimes;
  29. }

4.RocketMQ序列化器处理RocketMqConfig文件

主要是为了解决RocketMQ Jackson不支持Java时间类型配置

  1. package com.logistics.common.rocketMq.config;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
  4. import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.context.annotation.Primary;
  8. import org.springframework.messaging.converter.CompositeMessageConverter;
  9. import org.springframework.messaging.converter.MappingJackson2MessageConverter;
  10. import org.springframework.messaging.converter.MessageConverter;
  11. import java.util.List;
  12. /**
  13. * RocketMQ序列化器处理
  14. *
  15. * @author 吴顺杰
  16. * @since 2024/8/04
  17. */
  18. @Configuration
  19. public class RocketMqConfig {
  20. /**
  21. * 解决RocketMQ Jackson不支持Java时间类型配置
  22. */
  23. @Bean
  24. @Primary
  25. public RocketMQMessageConverter createRocketMQMessageConverter() {
  26. RocketMQMessageConverter converter = new RocketMQMessageConverter();
  27. CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
  28. List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
  29. for (MessageConverter messageConverter : messageConverterList) {
  30. if (messageConverter instanceof MappingJackson2MessageConverter) {
  31. MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
  32. ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
  33. // 增加Java8时间模块支持,实体类可以传递LocalDate/LocalDateTime
  34. objectMapper.registerModules(new JavaTimeModule());
  35. }
  36. }
  37. return converter;
  38. }
  39. }

JSON工具类封装

  1. package com.logistics.common.rocketMq.utils;
  2. import com.alibaba.fastjson.JSONObject;
  3. /**
  4. * JSON工具类
  5. * 像工具类这种,建议一定要二次封装,避免出现漏洞时可以快速替换
  6. *
  7. * @author 吴顺杰
  8. * @since 2024/6/16
  9. */
  10. public class JsonUtil {
  11. private JsonUtil() {}
  12. public static String toJson(Object value) {
  13. return JSONObject.toJSONString(value);
  14. }
  15. public static <T> T toObject(String jsonStr, Class<T> clazz) {
  16. return JSONObject.parseObject(jsonStr, clazz);
  17. }
  18. }

5.rocketMq生产者封装

调度任务生产者:LogisticsAddDispatchMqProducer

  1. package com.logistics.business.rocketMq.producer;
  2. import com.logistics.common.exception.base.BaseException;
  3. import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
  4. import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
  5. import com.logistics.common.rocketMq.template.RocketMqTemplate;
  6. import com.logistics.common.utils.spring.SpringUtils;
  7. import lombok.extern.slf4j.Slf4j;
  8. /**
  9. * 新增调度任务生产者MQ队列
  10. */
  11. @Slf4j
  12. public class LogisticsAddDispatchMqProducer {
  13. private static RocketMqTemplate rocketMqTemplate = SpringUtils.getBean(RocketMqTemplate.class);
  14. public static void sendAddDispatchMqMessage(AddDispatchMqMessage message) {
  15. log.info("新增调度任务成功发送新增调度消息MQ,内容: {}", message);
  16. try {
  17. rocketMqTemplate.asyncSend(AddDispatchMqContant.ADD_DISPATCH_TOPIC, AddDispatchMqContant.ADD_DISPATCH_TAG, message);
  18. } catch (Exception e) {
  19. log.error("新增调度任务成功发送新增调度消息MQ失败,内容: {}", message, e);
  20. throw new BaseException("新增调度任务成功发送新增调度消息MQ失败,请联系管理员");
  21. }
  22. }
  23. }

RocketMQ模板类

  1. package com.logistics.common.rocketMq.template;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.logistics.common.rocketMq.config.IsolationConfig;
  4. import com.logistics.common.rocketMq.constant.RocketMqSysConstant;
  5. import com.logistics.common.rocketMq.domain.BaseMqMessage;
  6. import com.logistics.common.rocketMq.utils.JsonUtil;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.apache.rocketmq.client.producer.SendCallback;
  9. import org.apache.rocketmq.client.producer.SendResult;
  10. import org.apache.rocketmq.client.producer.SendStatus;
  11. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  12. import org.apache.rocketmq.spring.support.RocketMQHeaders;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import org.springframework.messaging.Message;
  16. import org.springframework.messaging.support.MessageBuilder;
  17. import org.springframework.stereotype.Component;
  18. import org.springframework.util.StringUtils;
  19. import javax.annotation.Resource;
  20. /**
  21. * RocketMQ模板类
  22. *
  23. */
  24. @Component
  25. @Slf4j
  26. public class RocketMqTemplate {
  27. private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);
  28. @Resource(name = "rocketMQTemplate")
  29. private RocketMQTemplate template;
  30. @Resource
  31. private IsolationConfig isolationConfig;
  32. /**
  33. * 获取模板,如果封装的方法不够提供原生的使用方式
  34. */
  35. public RocketMQTemplate getTemplate() {
  36. return template;
  37. }
  38. /**
  39. * 构建目的地
  40. */
  41. public String buildDestination(String topic, String tag) {
  42. return topic + RocketMqSysConstant.DELIMITER + tag;
  43. }
  44. /**
  45. * 发送同步消息
  46. */
  47. public <T extends BaseMqMessage> SendResult syncSend(String topic, String tag, T message) {
  48. // 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tag
  49. if (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {
  50. topic = topic + "_" + isolationConfig.getEnvironmentName();
  51. }
  52. // 设置业务键,此处根据公共的参数进行处理
  53. Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
  54. String buildDestination = buildDestination(topic, tag);
  55. SendResult sendResult = template.syncSend(buildDestination, sendMessage);
  56. // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
  57. LOGGER.info("[{}]同步消息[{}]发送结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
  58. return sendResult;
  59. }
  60. /**
  61. * 发送异步消息
  62. *
  63. * @param topic
  64. * @param tag
  65. * @param message
  66. * @param <T>
  67. */
  68. public <T extends BaseMqMessage> void asyncSend(String topic, String tag, T message) {
  69. // 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tag
  70. if (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {
  71. topic = topic + "_" + isolationConfig.getEnvironmentName();
  72. }
  73. // 设置业务键,此处根据公共的参数进行处理
  74. Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
  75. String buildDestination = buildDestination(topic, tag);
  76. template.asyncSend(buildDestination, sendMessage, new SendCallback() {
  77. @Override
  78. public void onSuccess(SendResult sendResult) {
  79. LOGGER.info("[{}]MQ异步消息[{}]发送成功结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
  80. if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
  81. //可以存入数据库做处理
  82. log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
  83. }
  84. }
  85. @Override
  86. public void onException(Throwable throwable) {
  87. LOGGER.info("[{}]MQ异步消息[{}]发送失败结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(throwable.getMessage()));
  88. //可以存入数据库做处理
  89. }
  90. });
  91. }
  92. /**
  93. * 发送延迟消息
  94. *
  95. * @param message
  96. * @param delayLevel
  97. * @param <T>
  98. * @return
  99. */
  100. public <T extends BaseMqMessage> SendResult syncDelaySend(String topic, String tag, T message, int delayLevel) {
  101. // 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tag
  102. if (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {
  103. topic = topic + "_" + isolationConfig.getEnvironmentName();
  104. }
  105. Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
  106. String destination = buildDestination(topic, tag);
  107. SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
  108. LOGGER.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));
  109. return sendResult;
  110. }
  111. }

AddDispatchMqMessage消息实体

  1. package com.logistics.common.rocketMq.domain;
  2. import lombok.Builder;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. /**
  6. * 新增调度任务发送队列消息体
  7. */
  8. @Data
  9. @Builder
  10. @NoArgsConstructor
  11. public class AddDispatchMqMessage extends BaseMqMessage {
  12. /**
  13. * 调度id
  14. */
  15. private Long dispatchId;
  16. public AddDispatchMqMessage(Long dispatchId) {
  17. this.dispatchId = dispatchId;
  18. }
  19. }

AddDispatchMqContant类

  1. package com.logistics.common.rocketMq.constant;
  2. /**
  3. * 新增调度任务MQ队列
  4. */
  5. public class AddDispatchMqContant {
  6. /**
  7. * 消费主题
  8. */
  9. public static final String ADD_DISPATCH_TOPIC = "add_dispatch_topic";
  10. /**
  11. * 消费标签
  12. */
  13. public static final String ADD_DISPATCH_TAG = "add_dispatch_tag";
  14. /**
  15. * 消费组
  16. */
  17. public static final String ADD_DISPATCH_GROUP = "add_dispatch_group";
  18. }

6.rocketMq消费者封装

ACK简介
在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者一次性正常消费成功,可能会存在需要多次消费才能成功或者一直消费失败的情况,那作为发送者该做如何处理呢?

RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

1.手动ack封装

新增增调度任务消费者启动监听类

  1. package com.logistics.business.rocketMq.listener;
  2. import com.logistics.business.rocketMq.comsumer.LogisticsAddDispatchMqComsumer;
  3. import com.logistics.common.rocketMq.config.IsolationConfig;
  4. import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  7. import org.apache.rocketmq.client.exception.MQClientException;
  8. import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
  9. import org.springframework.stereotype.Component;
  10. import org.springframework.util.StringUtils;
  11. import javax.annotation.PostConstruct;
  12. import javax.annotation.Resource;
  13. /**
  14. * 新增调度任务消费者启动监听类
  15. */
  16. @Component
  17. @Slf4j
  18. public class RetryLogisticsAddDispatchListener {
  19. @Resource
  20. private IsolationConfig isolationConfig;
  21. private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
  22. @PostConstruct
  23. public void start() {
  24. try {
  25. //启动环境隔离
  26. String topic = AddDispatchMqContant.ADD_DISPATCH_TOPIC;
  27. if (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {
  28. consumer.setConsumerGroup(AddDispatchMqContant.ADD_DISPATCH_GROUP + "_" + isolationConfig.getEnvironmentName());
  29. topic = topic + "_" + isolationConfig.getEnvironmentName();
  30. } else {
  31. consumer.setConsumerGroup(AddDispatchMqContant.ADD_DISPATCH_GROUP);
  32. }
  33. consumer.setNamesrvAddr(isolationConfig.getNameServer());
  34. //设置集群消费模式
  35. consumer.setMessageModel(MessageModel.valueOf(isolationConfig.getMessageModel()));
  36. //设置消费超时时间(分钟)
  37. consumer.setConsumeTimeout(isolationConfig.getConsumeTimeout());
  38. //最大重试次数
  39. consumer.setMaxReconsumeTimes(isolationConfig.getMaxReconsumeTimes());
  40. //订阅主题
  41. consumer.subscribe(topic, AddDispatchMqContant.ADD_DISPATCH_TAG);
  42. //注册消息监听器
  43. consumer.registerMessageListener(new LogisticsAddDispatchMqComsumer());
  44. //启动消费端
  45. consumer.start();
  46. log.info("新增调度任务消费者MQ监听队列启动成功");
  47. } catch (MQClientException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. }

LogisticsAddDispatchMqComsumer注册消息监听器

  1. package com.logistics.business.rocketMq.comsumer;
  2. import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
  3. import com.logistics.common.rocketMq.utils.JsonUtil;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.commons.collections.CollectionUtils;
  6. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  7. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  8. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  9. import org.apache.rocketmq.common.message.MessageExt;
  10. import java.nio.charset.StandardCharsets;
  11. import java.util.List;
  12. /**
  13. * 新增调度任务消费者MQ队列
  14. */
  15. @Slf4j
  16. public class LogisticsAddDispatchMqComsumer implements MessageListenerConcurrently {
  17. @Override
  18. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  19. if (CollectionUtils.isEmpty(msgs)) {
  20. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  21. }
  22. MessageExt message = msgs.get(0);
  23. try {
  24. String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
  25. AddDispatchMqMessage addDispatchMqMessage = JsonUtil.toObject(messageBody, AddDispatchMqMessage.class);
  26. System.out.println("messageId: " + message.getMsgId() + ",topic: " +
  27. message.getTopic() + ",addDispatchMqMessage: " + addDispatchMqMessage);
  28. System.out.println(1 / 0);
  29. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  30. } catch (Exception e) {
  31. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  32. }
  33. }
  34. }

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,mq的偏移量才会下移。也就是手动ack,也只有手动返回CONSUME_SUCCESS,消息体才会偏移。

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; mq会默认重试16次,每次执行间隔

不等。最长好像是2个多小时,具体多少自己看官方文档,一般线上环境设置重试五次失败就进入死信队列了,我这里设置的是重试三次

  1. onsumer Started.
  2. date=Fri Aug 05 14:08:52 CST 2022 *******
  3. msg=0A28A4923EC018B4AAC217A272330000
  4. date=Fri Aug 05 14:08:52 CST 2022
  5. ReconsumeTimes=0 '第一次处理'
  6. date=Fri Aug 05 14:09:02 CST 2022 *******
  7. msg=0A28A4923EC018B4AAC217A272330000
  8. date=Fri Aug 05 14:09:02 CST 2022
  9. ReconsumeTimes=1 '第2次处理 与第一次间隔10s'
  10. date=Fri Aug 05 14:09:33 CST 2022 *******
  11. msg=0A28A4923EC018B4AAC217A272330000
  12. date=Fri Aug 05 14:09:33 CST 2022
  13. ReconsumeTimes=2 '第3次处理 与第2次间隔20s'
  14. date=Fri Aug 05 14:10:33 CST 2022 *******
  15. msg=0A28A4923EC018B4AAC217A272330000
  16. date=Fri Aug 05 14:10:33 CST 2022
  17. ReconsumeTimes=3 '第4次处理 与第3次间隔1m'

2.自动ack封装

BaseMqMessageListener封装

  1. package com.zhjt.rocketmq.listener;
  2. import com.zhjt.rocketmq.constant.RocketMqSysConstant;
  3. import com.zhjt.rocketmq.domain.BaseMqMessage;
  4. import com.zhjt.rocketmq.template.RocketMqTemplate;
  5. import com.zhjt.rocketmq.utils.JsonUtil;
  6. import org.apache.rocketmq.client.producer.SendResult;
  7. import org.apache.rocketmq.client.producer.SendStatus;
  8. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.slf4j.MDC;
  12. import javax.annotation.Resource;
  13. import java.time.Instant;
  14. import java.util.Objects;
  15. /**
  16. * 抽象消息监听器,封装了所有公共处理业务,如
  17. * 1、基础日志记录
  18. * 2、异常处理
  19. * 3、消息重试
  20. * 4、警告通知
  21. * 5、....
  22. *
  23. * @author 吴顺杰
  24. * @since 2024/6/17
  25. */
  26. public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
  27. /**
  28. * 这里的日志记录器是哪个子类的就会被哪个子类的类进行初始化
  29. */
  30. protected final Logger logger = LoggerFactory.getLogger(this.getClass());
  31. @Resource
  32. private RocketMqTemplate rocketMqTemplate;
  33. /**
  34. * 消息者名称
  35. *
  36. * @return 消费者名称
  37. */
  38. protected abstract String consumerName();
  39. /**
  40. * 消息处理
  41. *
  42. * @param message 待处理消息
  43. * @throws Exception 消费异常
  44. */
  45. protected abstract void handleMessage(T message) throws Exception;
  46. /**
  47. * 超过重试次数消息,需要启用isRetry
  48. *
  49. * @param message 待处理消息
  50. */
  51. protected abstract void overMaxRetryTimesMessage(T message);
  52. /**
  53. * 是否过滤消息,例如某些
  54. *
  55. * @param message 待处理消息
  56. * @return true: 本次消息被过滤,false:不过滤
  57. */
  58. protected boolean isFilter(T message) {
  59. return false;
  60. }
  61. /**
  62. * 是否异常时重复发送
  63. *
  64. * @return true: 消息重试,false:不重试
  65. */
  66. protected abstract boolean isRetry();
  67. /**
  68. * 消费异常时是否抛出异常
  69. *
  70. * @return true: 抛出异常,false:消费异常(如果没有开启重试则消息会被自动ack)
  71. */
  72. protected abstract boolean isThrowException();
  73. /**
  74. * 最大重试此处
  75. *
  76. * @return 最大重试次数,默认10次
  77. */
  78. protected int maxRetryTimes() {
  79. return 10;
  80. }
  81. /**
  82. * isRetry开启时,重新入队延迟时间
  83. *
  84. * @return -1:立即入队重试
  85. */
  86. protected int retryDelayLevel() {
  87. return -1;
  88. }
  89. /**
  90. * 由父类来完成基础的日志和调配,下面的只是提供一个思路
  91. */
  92. public void dispatchMessage(T message) {
  93. MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());
  94. // 基础日志记录被父类处理了
  95. logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message));
  96. if (isFilter(message)) {
  97. logger.info("消息不满足消费条件,已过滤");
  98. return;
  99. }
  100. // 超过最大重试次数时调用子类方法处理
  101. if (message.getRetryTimes() > maxRetryTimes()) {
  102. overMaxRetryTimesMessage(message);
  103. return;
  104. }
  105. try {
  106. long start = Instant.now().toEpochMilli();
  107. handleMessage(message);
  108. long end = Instant.now().toEpochMilli();
  109. logger.info("消息消费成功,耗时[{}ms]", (end - start));
  110. } catch (Exception e) {
  111. logger.error("消息消费异常", e);
  112. // 是捕获异常还是抛出,由子类决定
  113. if (isThrowException()) {
  114. throw new RuntimeException(e);
  115. }
  116. if (isRetry()) {
  117. // 获取子类RocketMQMessageListener注解拿到topic和tag
  118. RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
  119. if (Objects.nonNull(annotation)) {
  120. message.setSource(message.getSource() + "消息重试");
  121. message.setRetryTimes(message.getRetryTimes() + 1);
  122. SendResult sendResult;
  123. try {
  124. // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
  125. // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
  126. sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
  127. } catch (Exception ex) {
  128. throw new RuntimeException(ex);
  129. }
  130. // 发送失败的处理就是不进行ACK,由RocketMQ重试
  131. if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
  132. throw new RuntimeException("重试消息发送失败");
  133. }
  134. }
  135. }
  136. }
  137. }
  138. }

IsolationConfigNew文件里面的方法:

  1. package com.zhjt.rocketmq.config;
  2. /**
  3. * @author: 吴顺杰
  4. * @create: 2024-06-18 10:01
  5. * @Description:
  6. */
  7. import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
  8. import org.springframework.beans.BeansException;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.beans.factory.config.BeanPostProcessor;
  11. import org.springframework.context.annotation.Configuration;
  12. import org.springframework.lang.NonNull;
  13. import org.springframework.util.StringUtils;
  14. /**
  15. * RocketMQ多环境隔离配置
  16. * 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉
  17. *
  18. * @author tianxincoord@163.com
  19. * @since 2022/5/18
  20. */
  21. @Configuration
  22. public class IsolationConfigNew implements BeanPostProcessor {
  23. @Value("${system.environment.isolation:true}")
  24. private boolean enabledIsolation;
  25. @Value("${system.environment.name:''}")
  26. private String environmentName;
  27. @Override
  28. public Object postProcessBeforeInitialization(@NonNull Object bean,
  29. @NonNull String beanName) throws BeansException {
  30. // DefaultRocketMQListenerContainer是监听器实现类
  31. if (bean instanceof DefaultRocketMQListenerContainer) {
  32. DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
  33. // 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tag
  34. if (enabledIsolation && StringUtils.hasText(environmentName)) {
  35. container.setTopic(String.join("_", container.getTopic(), environmentName));
  36. container.setConsumerGroup(String.join("_", container.getConsumerGroup(), environmentName));
  37. }
  38. return container;
  39. }
  40. return bean;
  41. }
  42. }

消费者监听实现LogisticsAddDispatchMqComsumer2

  1. package com.logistics.business.rocketMq.comsumer;
  2. import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
  3. import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
  4. import com.logistics.common.rocketMq.listener.BaseMqMessageListener;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  7. import org.apache.rocketmq.spring.core.RocketMQListener;
  8. import org.springframework.stereotype.Component;
  9. /**
  10. * 新增调度任务消费者MQ队列
  11. */
  12. @Slf4j
  13. @Component
  14. @RocketMQMessageListener(
  15. topic = AddDispatchMqContant.ADD_DISPATCH_TOPIC,
  16. consumerGroup = AddDispatchMqContant.ADD_DISPATCH_GROUP,
  17. selectorExpression = AddDispatchMqContant.ADD_DISPATCH_GROUP,
  18. // 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小
  19. consumeThreadMax = 21
  20. )
  21. public class LogisticsAddDispatchMqComsumer2 extends BaseMqMessageListener<AddDispatchMqMessage>
  22. implements RocketMQListener<AddDispatchMqMessage> {
  23. /**
  24. * 此处只是说明封装的思想,更多还是要根据业务操作决定
  25. * 内功心法有了,无论什么招式都可以发挥最大威力
  26. */
  27. @Override
  28. protected String consumerName() {
  29. return "RocketMq监听消息";
  30. }
  31. @Override
  32. public void onMessage(AddDispatchMqMessage message) {
  33. // 注意,此时这里没有直接处理业务,而是先委派给父类做基础操作,然后父类做完基础操作后会调用子类的实际处理类型
  34. super.dispatchMessage(message);
  35. }
  36. @Override
  37. protected void handleMessage(AddDispatchMqMessage message) throws Exception {
  38. // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
  39. // 业务异常直接抛出异常即可 否则捕获异常没有抛出 无法进行重试
  40. log.info("RocketMq监消息消费数据:{}", message);
  41. }
  42. @Override
  43. protected void overMaxRetryTimesMessage(AddDispatchMqMessage message) {
  44. // 当超过指定重试次数消息时此处方法会被调用
  45. // 生产中可以进行回退或其他业务操作
  46. }
  47. @Override
  48. protected boolean isRetry() {
  49. return true;
  50. }
  51. @Override
  52. protected int maxRetryTimes() {
  53. // 指定需要的重试次数,超过重试次数overMaxRetryTimesMessage会被调用
  54. return 5;
  55. }
  56. @Override
  57. protected boolean isThrowException() {
  58. // 是否抛出异常,到消费异常时是被父类拦截处理还是直接抛出异常
  59. return false;
  60. }
  61. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/988457
推荐阅读
相关标签
  

闽ICP备14008679号