赞
踩
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.3.0</version>
- </dependency>
- # 自定义属性
- system:
- environment:
- # 隔离环境名称,拼接到topic后,xxx_topic_tianxin,默认空字符串
- name: dev
- # 启动隔离,会自动在topic上拼接激活的配置文件,达到自动隔离的效果
- # 默认为true,配置类:EnvironmentIsolationConfig
- isolation: true
- rocketmq:
- # 多个NameServer,host:port;host:port,RocketMQProperties
- nameServer: 你的NameServer
- producer:
- # 发o送同一类消息的设置为同一个grup,保证唯一
- group: logistics_group
- # 发送消息失败重试次数,默认2
- retryTimesWhenSendFailed: 2
- # 异步消息重试此处,默认2
- retryTimesWhenSendAsyncFailed: 2
- # 发送消息超时时间,默认3000
- sendMessageTimeout: 10000
- # 消息最大长度,默认1024 * 1024 * 4(默认4M)
- maxMessageSize: 4096
- # 压缩消息阈值,默认4k(1024 * 4)
- compressMessageBodyThreshold: 4096
- # 是否在内部发送失败时重试另一个broker,默认false
- retryNextServer: false
- # access-key
- accessKey: 你的access-key
- # secret-key
- secretKey: 你的secret-key
- # 是否启用消息跟踪,默认false
- enableMsgTrace: false
- # 消息跟踪主题的名称值。如果不进行配置,可以使用默认的跟踪主题名称
- customizedTraceTopic: RMQ_SYS_TRACE_TOPIC
- consumer:
- # 指定消费组
- group: logistics_group
- #广播消费模式 CLUSTERING(集群消费)、BROADCASTING(广播消费)
- messageModel: CLUSTERING
- #设置消费超时时间(分钟)
- consumeTimeout: 1
- # 最大重试次数,默认16
- maxReconsumeTimes: 3
- # 其他配置参考属性类
- package com.logistics.common.rocketMq.config;
- import lombok.Data;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @author: 吴顺杰
- * @create: 2024-06-18 10:01
- * @Description:
- * RocketMQ多环境隔离配置
- * 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉
- */
- @Configuration
- @Data
- public class IsolationConfig {
- @Value("${system.environment.isolation:true}")
- private boolean enabledIsolation;
- @Value("${system.environment.name:''}")
- private String environmentName;
- @Value("${rocketmq.nameServer:''}")
- private String nameServer;
- @Value("${rocketmq.consumer.group:''}")
- private String group;
- @Value("${rocketmq.consumer.messageModel:''}")
- private String messageModel;
- @Value("${rocketmq.consumer.consumeTimeout:''}")
- private int consumeTimeout;
- @Value("${rocketmq.consumer.maxReconsumeTimes:''}")
- private int maxReconsumeTimes;
- }
主要是为了解决RocketMQ Jackson不支持Java时间类型配置
- package com.logistics.common.rocketMq.config;
-
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
- import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
- import org.springframework.messaging.converter.CompositeMessageConverter;
- import org.springframework.messaging.converter.MappingJackson2MessageConverter;
- import org.springframework.messaging.converter.MessageConverter;
-
- import java.util.List;
-
- /**
- * RocketMQ序列化器处理
- *
- * @author 吴顺杰
- * @since 2024/8/04
- */
- @Configuration
- public class RocketMqConfig {
-
- /**
- * 解决RocketMQ Jackson不支持Java时间类型配置
- */
- @Bean
- @Primary
- public RocketMQMessageConverter createRocketMQMessageConverter() {
- RocketMQMessageConverter converter = new RocketMQMessageConverter();
- CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
- List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
- for (MessageConverter messageConverter : messageConverterList) {
- if (messageConverter instanceof MappingJackson2MessageConverter) {
- MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
- ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
- // 增加Java8时间模块支持,实体类可以传递LocalDate/LocalDateTime
- objectMapper.registerModules(new JavaTimeModule());
- }
- }
- return converter;
- }
- }
JSON工具类封装
- package com.logistics.common.rocketMq.utils;
-
- import com.alibaba.fastjson.JSONObject;
-
- /**
- * JSON工具类
- * 像工具类这种,建议一定要二次封装,避免出现漏洞时可以快速替换
- *
- * @author 吴顺杰
- * @since 2024/6/16
- */
- public class JsonUtil {
- private JsonUtil() {}
-
- public static String toJson(Object value) {
- return JSONObject.toJSONString(value);
- }
-
- public static <T> T toObject(String jsonStr, Class<T> clazz) {
- return JSONObject.parseObject(jsonStr, clazz);
- }
- }
调度任务生产者:LogisticsAddDispatchMqProducer
- package com.logistics.business.rocketMq.producer;
-
- import com.logistics.common.exception.base.BaseException;
- import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
- import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
- import com.logistics.common.rocketMq.template.RocketMqTemplate;
- import com.logistics.common.utils.spring.SpringUtils;
- import lombok.extern.slf4j.Slf4j;
-
- /**
- * 新增调度任务生产者MQ队列
- */
- @Slf4j
- public class LogisticsAddDispatchMqProducer {
- private static RocketMqTemplate rocketMqTemplate = SpringUtils.getBean(RocketMqTemplate.class);
-
- public static void sendAddDispatchMqMessage(AddDispatchMqMessage message) {
- log.info("新增调度任务成功发送新增调度消息MQ,内容: {}", message);
- try {
- rocketMqTemplate.asyncSend(AddDispatchMqContant.ADD_DISPATCH_TOPIC, AddDispatchMqContant.ADD_DISPATCH_TAG, message);
- } catch (Exception e) {
- log.error("新增调度任务成功发送新增调度消息MQ失败,内容: {}", message, e);
- throw new BaseException("新增调度任务成功发送新增调度消息MQ失败,请联系管理员");
- }
- }
- }
RocketMQ模板类
- package com.logistics.common.rocketMq.template;
-
- import com.alibaba.fastjson.JSONObject;
- import com.logistics.common.rocketMq.config.IsolationConfig;
- import com.logistics.common.rocketMq.constant.RocketMqSysConstant;
- import com.logistics.common.rocketMq.domain.BaseMqMessage;
- import com.logistics.common.rocketMq.utils.JsonUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.producer.SendCallback;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.client.producer.SendStatus;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.apache.rocketmq.spring.support.RocketMQHeaders;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Component;
- import org.springframework.util.StringUtils;
-
- import javax.annotation.Resource;
-
- /**
- * RocketMQ模板类
- *
- */
- @Component
- @Slf4j
- public class RocketMqTemplate {
- private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);
- @Resource(name = "rocketMQTemplate")
- private RocketMQTemplate template;
- @Resource
- private IsolationConfig isolationConfig;
-
-
- /**
- * 获取模板,如果封装的方法不够提供原生的使用方式
- */
- public RocketMQTemplate getTemplate() {
- return template;
- }
-
- /**
- * 构建目的地
- */
- public String buildDestination(String topic, String tag) {
- return topic + RocketMqSysConstant.DELIMITER + tag;
- }
-
- /**
- * 发送同步消息
- */
- public <T extends BaseMqMessage> SendResult syncSend(String topic, String tag, T message) {
- // 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tag
- if (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {
- topic = topic + "_" + isolationConfig.getEnvironmentName();
- }
- // 设置业务键,此处根据公共的参数进行处理
- Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
- String buildDestination = buildDestination(topic, tag);
- SendResult sendResult = template.syncSend(buildDestination, sendMessage);
- // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
- LOGGER.info("[{}]同步消息[{}]发送结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
- return sendResult;
- }
-
-
- /**
- * 发送异步消息
- *
- * @param topic
- * @param tag
- * @param message
- * @param <T>
- */
- public <T extends BaseMqMessage> void asyncSend(String topic, String tag, T message) {
- // 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tag
- if (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {
- topic = topic + "_" + isolationConfig.getEnvironmentName();
- }
- // 设置业务键,此处根据公共的参数进行处理
- Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
- String buildDestination = buildDestination(topic, tag);
- template.asyncSend(buildDestination, sendMessage, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- LOGGER.info("[{}]MQ异步消息[{}]发送成功结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
- if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
- //可以存入数据库做处理
- log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
- }
- }
-
- @Override
- public void onException(Throwable throwable) {
- LOGGER.info("[{}]MQ异步消息[{}]发送失败结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(throwable.getMessage()));
- //可以存入数据库做处理
- }
- });
- }
-
- /**
- * 发送延迟消息
- *
- * @param message
- * @param delayLevel
- * @param <T>
- * @return
- */
- public <T extends BaseMqMessage> SendResult syncDelaySend(String topic, String tag, T message, int delayLevel) {
- // 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tag
- if (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {
- topic = topic + "_" + isolationConfig.getEnvironmentName();
- }
- Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
- String destination = buildDestination(topic, tag);
- SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
- LOGGER.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));
- return sendResult;
- }
-
- }
AddDispatchMqMessage消息实体
- package com.logistics.common.rocketMq.domain;
-
- import lombok.Builder;
- import lombok.Data;
- import lombok.NoArgsConstructor;
-
-
- /**
- * 新增调度任务发送队列消息体
- */
- @Data
- @Builder
- @NoArgsConstructor
- public class AddDispatchMqMessage extends BaseMqMessage {
- /**
- * 调度id
- */
- private Long dispatchId;
-
- public AddDispatchMqMessage(Long dispatchId) {
- this.dispatchId = dispatchId;
- }
- }
AddDispatchMqContant类
- package com.logistics.common.rocketMq.constant;
-
- /**
- * 新增调度任务MQ队列
- */
- public class AddDispatchMqContant {
- /**
- * 消费主题
- */
- public static final String ADD_DISPATCH_TOPIC = "add_dispatch_topic";
-
- /**
- * 消费标签
- */
- public static final String ADD_DISPATCH_TAG = "add_dispatch_tag";
-
- /**
- * 消费组
- */
- public static final String ADD_DISPATCH_GROUP = "add_dispatch_group";
- }
ACK简介
在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者一次性正常消费成功,可能会存在需要多次消费才能成功或者一直消费失败的情况,那作为发送者该做如何处理呢?
RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。
新增增调度任务消费者启动监听类
- package com.logistics.business.rocketMq.listener;
-
- import com.logistics.business.rocketMq.comsumer.LogisticsAddDispatchMqComsumer;
- import com.logistics.common.rocketMq.config.IsolationConfig;
- import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
- import org.springframework.stereotype.Component;
- import org.springframework.util.StringUtils;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
-
-
- /**
- * 新增调度任务消费者启动监听类
- */
- @Component
- @Slf4j
- public class RetryLogisticsAddDispatchListener {
- @Resource
- private IsolationConfig isolationConfig;
-
- private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
-
- @PostConstruct
- public void start() {
- try {
- //启动环境隔离
- String topic = AddDispatchMqContant.ADD_DISPATCH_TOPIC;
- if (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {
- consumer.setConsumerGroup(AddDispatchMqContant.ADD_DISPATCH_GROUP + "_" + isolationConfig.getEnvironmentName());
- topic = topic + "_" + isolationConfig.getEnvironmentName();
- } else {
- consumer.setConsumerGroup(AddDispatchMqContant.ADD_DISPATCH_GROUP);
- }
-
- consumer.setNamesrvAddr(isolationConfig.getNameServer());
- //设置集群消费模式
- consumer.setMessageModel(MessageModel.valueOf(isolationConfig.getMessageModel()));
- //设置消费超时时间(分钟)
- consumer.setConsumeTimeout(isolationConfig.getConsumeTimeout());
- //最大重试次数
- consumer.setMaxReconsumeTimes(isolationConfig.getMaxReconsumeTimes());
- //订阅主题
- consumer.subscribe(topic, AddDispatchMqContant.ADD_DISPATCH_TAG);
- //注册消息监听器
- consumer.registerMessageListener(new LogisticsAddDispatchMqComsumer());
- //启动消费端
- consumer.start();
- log.info("新增调度任务消费者MQ监听队列启动成功");
- } catch (MQClientException e) {
- e.printStackTrace();
- }
- }
- }
LogisticsAddDispatchMqComsumer注册消息监听器
- package com.logistics.business.rocketMq.comsumer;
-
- import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
- import com.logistics.common.rocketMq.utils.JsonUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.collections.CollectionUtils;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.common.message.MessageExt;
-
- import java.nio.charset.StandardCharsets;
- import java.util.List;
-
- /**
- * 新增调度任务消费者MQ队列
- */
- @Slf4j
- public class LogisticsAddDispatchMqComsumer implements MessageListenerConcurrently {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- if (CollectionUtils.isEmpty(msgs)) {
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
-
- MessageExt message = msgs.get(0);
- try {
- String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
- AddDispatchMqMessage addDispatchMqMessage = JsonUtil.toObject(messageBody, AddDispatchMqMessage.class);
-
- System.out.println("messageId: " + message.getMsgId() + ",topic: " +
- message.getTopic() + ",addDispatchMqMessage: " + addDispatchMqMessage);
-
- System.out.println(1 / 0);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- } catch (Exception e) {
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
- }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,mq的偏移量才会下移。也就是手动ack,也只有手动返回CONSUME_SUCCESS,消息体才会偏移。
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; mq会默认重试16次,每次执行间隔
不等。最长好像是2个多小时,具体多少自己看官方文档,一般线上环境设置重试五次失败就进入死信队列了,我这里设置的是重试三次
- onsumer Started.
- date=Fri Aug 05 14:08:52 CST 2022 *******
- msg=0A28A4923EC018B4AAC217A272330000
- date=Fri Aug 05 14:08:52 CST 2022
- ReconsumeTimes=0 '第一次处理'
-
- date=Fri Aug 05 14:09:02 CST 2022 *******
- msg=0A28A4923EC018B4AAC217A272330000
- date=Fri Aug 05 14:09:02 CST 2022
- ReconsumeTimes=1 '第2次处理 与第一次间隔10s'
-
- date=Fri Aug 05 14:09:33 CST 2022 *******
- msg=0A28A4923EC018B4AAC217A272330000
- date=Fri Aug 05 14:09:33 CST 2022
- ReconsumeTimes=2 '第3次处理 与第2次间隔20s'
-
- date=Fri Aug 05 14:10:33 CST 2022 *******
- msg=0A28A4923EC018B4AAC217A272330000
- date=Fri Aug 05 14:10:33 CST 2022
- ReconsumeTimes=3 '第4次处理 与第3次间隔1m'
BaseMqMessageListener封装
- package com.zhjt.rocketmq.listener;
-
-
- import com.zhjt.rocketmq.constant.RocketMqSysConstant;
- import com.zhjt.rocketmq.domain.BaseMqMessage;
- import com.zhjt.rocketmq.template.RocketMqTemplate;
- import com.zhjt.rocketmq.utils.JsonUtil;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.client.producer.SendStatus;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.slf4j.MDC;
-
- import javax.annotation.Resource;
- import java.time.Instant;
- import java.util.Objects;
-
- /**
- * 抽象消息监听器,封装了所有公共处理业务,如
- * 1、基础日志记录
- * 2、异常处理
- * 3、消息重试
- * 4、警告通知
- * 5、....
- *
- * @author 吴顺杰
- * @since 2024/6/17
- */
- public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
- /**
- * 这里的日志记录器是哪个子类的就会被哪个子类的类进行初始化
- */
- protected final Logger logger = LoggerFactory.getLogger(this.getClass());
- @Resource
- private RocketMqTemplate rocketMqTemplate;
-
- /**
- * 消息者名称
- *
- * @return 消费者名称
- */
- protected abstract String consumerName();
-
- /**
- * 消息处理
- *
- * @param message 待处理消息
- * @throws Exception 消费异常
- */
- protected abstract void handleMessage(T message) throws Exception;
-
- /**
- * 超过重试次数消息,需要启用isRetry
- *
- * @param message 待处理消息
- */
- protected abstract void overMaxRetryTimesMessage(T message);
- /**
- * 是否过滤消息,例如某些
- *
- * @param message 待处理消息
- * @return true: 本次消息被过滤,false:不过滤
- */
- protected boolean isFilter(T message) {
- return false;
- }
-
- /**
- * 是否异常时重复发送
- *
- * @return true: 消息重试,false:不重试
- */
- protected abstract boolean isRetry();
-
- /**
- * 消费异常时是否抛出异常
- *
- * @return true: 抛出异常,false:消费异常(如果没有开启重试则消息会被自动ack)
- */
- protected abstract boolean isThrowException();
-
- /**
- * 最大重试此处
- *
- * @return 最大重试次数,默认10次
- */
- protected int maxRetryTimes() {
- return 10;
- }
-
- /**
- * isRetry开启时,重新入队延迟时间
- *
- * @return -1:立即入队重试
- */
- protected int retryDelayLevel() {
- return -1;
- }
-
- /**
- * 由父类来完成基础的日志和调配,下面的只是提供一个思路
- */
- public void dispatchMessage(T message) {
- MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());
- // 基础日志记录被父类处理了
- logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message));
- if (isFilter(message)) {
- logger.info("消息不满足消费条件,已过滤");
- return;
- }
- // 超过最大重试次数时调用子类方法处理
- if (message.getRetryTimes() > maxRetryTimes()) {
- overMaxRetryTimesMessage(message);
- return;
- }
- try {
- long start = Instant.now().toEpochMilli();
- handleMessage(message);
- long end = Instant.now().toEpochMilli();
- logger.info("消息消费成功,耗时[{}ms]", (end - start));
- } catch (Exception e) {
- logger.error("消息消费异常", e);
- // 是捕获异常还是抛出,由子类决定
- if (isThrowException()) {
- throw new RuntimeException(e);
- }
- if (isRetry()) {
- // 获取子类RocketMQMessageListener注解拿到topic和tag
- RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
- if (Objects.nonNull(annotation)) {
- message.setSource(message.getSource() + "消息重试");
- message.setRetryTimes(message.getRetryTimes() + 1);
- SendResult sendResult;
- try {
- // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
- // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
- sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- // 发送失败的处理就是不进行ACK,由RocketMQ重试
- if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
- throw new RuntimeException("重试消息发送失败");
- }
- }
- }
- }
- }
- }
IsolationConfigNew文件里面的方法:
- package com.zhjt.rocketmq.config;
-
- /**
- * @author: 吴顺杰
- * @create: 2024-06-18 10:01
- * @Description:
- */
-
- import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.beans.factory.config.BeanPostProcessor;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.lang.NonNull;
- import org.springframework.util.StringUtils;
-
- /**
- * RocketMQ多环境隔离配置
- * 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉
- *
- * @author tianxincoord@163.com
- * @since 2022/5/18
- */
- @Configuration
- public class IsolationConfigNew implements BeanPostProcessor {
- @Value("${system.environment.isolation:true}")
- private boolean enabledIsolation;
- @Value("${system.environment.name:''}")
- private String environmentName;
-
- @Override
- public Object postProcessBeforeInitialization(@NonNull Object bean,
- @NonNull String beanName) throws BeansException {
- // DefaultRocketMQListenerContainer是监听器实现类
- if (bean instanceof DefaultRocketMQListenerContainer) {
- DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
- // 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tag
- if (enabledIsolation && StringUtils.hasText(environmentName)) {
- container.setTopic(String.join("_", container.getTopic(), environmentName));
- container.setConsumerGroup(String.join("_", container.getConsumerGroup(), environmentName));
- }
- return container;
- }
- return bean;
- }
- }
消费者监听实现LogisticsAddDispatchMqComsumer2
- package com.logistics.business.rocketMq.comsumer;
-
- import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
- import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
- import com.logistics.common.rocketMq.listener.BaseMqMessageListener;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 新增调度任务消费者MQ队列
- */
- @Slf4j
- @Component
- @RocketMQMessageListener(
- topic = AddDispatchMqContant.ADD_DISPATCH_TOPIC,
- consumerGroup = AddDispatchMqContant.ADD_DISPATCH_GROUP,
- selectorExpression = AddDispatchMqContant.ADD_DISPATCH_GROUP,
- // 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小
- consumeThreadMax = 21
- )
- public class LogisticsAddDispatchMqComsumer2 extends BaseMqMessageListener<AddDispatchMqMessage>
- implements RocketMQListener<AddDispatchMqMessage> {
-
- /**
- * 此处只是说明封装的思想,更多还是要根据业务操作决定
- * 内功心法有了,无论什么招式都可以发挥最大威力
- */
- @Override
- protected String consumerName() {
- return "RocketMq监听消息";
- }
-
- @Override
- public void onMessage(AddDispatchMqMessage message) {
- // 注意,此时这里没有直接处理业务,而是先委派给父类做基础操作,然后父类做完基础操作后会调用子类的实际处理类型
- super.dispatchMessage(message);
- }
-
- @Override
- protected void handleMessage(AddDispatchMqMessage message) throws Exception {
- // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
- // 业务异常直接抛出异常即可 否则捕获异常没有抛出 无法进行重试
- log.info("RocketMq监消息消费数据:{}", message);
- }
-
- @Override
- protected void overMaxRetryTimesMessage(AddDispatchMqMessage message) {
- // 当超过指定重试次数消息时此处方法会被调用
- // 生产中可以进行回退或其他业务操作
- }
-
- @Override
- protected boolean isRetry() {
- return true;
- }
-
- @Override
- protected int maxRetryTimes() {
- // 指定需要的重试次数,超过重试次数overMaxRetryTimesMessage会被调用
- return 5;
- }
-
- @Override
- protected boolean isThrowException() {
- // 是否抛出异常,到消费异常时是被父类拦截处理还是直接抛出异常
- return false;
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。