赞
踩
目录
项目中有关rocketMq及相关类的Maven依赖
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.1.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.3.0</version>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>30.1.1-jre</version>
- </dependency>
并发消费场景下的生产者代码,并发消息无法保证消息一定是按照顺序消费,在绝大多数场景下不需要过问消息的消费顺序,可通过此方式进行mq消息的发送:
- package com.fss.project.mq.producer;
-
- import com.alibaba.fastjson.JSON;
- import com.fss.project.mq.enums.DelayLevelEnum;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.client.producer.SendStatus;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.nio.charset.StandardCharsets;
- import java.util.Objects;
-
- @Slf4j
- @Component
- public class RocketMqProductComponent {
-
- private Logger logger = LoggerFactory.getLogger(RocketMqProductComponent.class);
-
- @Resource
- private DefaultMQProducer defaultMQProducer;
-
- /**
- * 发送消息
- *
- * @param dto 具体数据
- * @param topicName
- * @param tagName
- * @param key
- * @return 执行状态
- */
- public boolean sendMessage(Object dto, String topicName, String tagName, String key) {
- if (Objects.isNull(dto) || Objects.isNull(topicName) || Objects.isNull(tagName)) {
- return false;
- }
-
- boolean result = false;
- // 构造消息body
- String body = builderBody(dto);
-
- try {
- Message message = new Message(topicName, tagName, key, body.getBytes(StandardCharsets.UTF_8));
- SendResult send = defaultMQProducer.send(message);
-
- logger.info("发送者,发送消息:" + JSON.toJSONString(send));
- if (Objects.nonNull(send) && SendStatus.SEND_OK.equals(send.getSendStatus())) {
- result = true;
- } else {
- logger.warn("消息发送失败,send={},body={}", JSON.toJSONString(send), body);
- }
- } catch (MQClientException | RemotingException | MQBrokerException e) {
- logger.warn("发送消息失败:{}", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return result;
- }
-
- /**
- * 发送延时消息
- *
- * @param dto
- * @param topicName
- * @param tagName
- * @param delayLevelEnum 延时等级
- * @return
- */
- public boolean sendDelayMessage(Object dto, String topicName, String tagName, String key, DelayLevelEnum delayLevelEnum) {
- if (Objects.isNull(dto) || Objects.isNull(topicName) || Objects.isNull(tagName) || Objects.isNull(delayLevelEnum)) {
- return false;
- }
-
- boolean result = false;
- // 构造消息body
- String body = builderBody(dto);
-
- try {
- Message message = new Message(topicName, tagName, key, body.getBytes(StandardCharsets.UTF_8));
- message.setDelayTimeLevel(delayLevelEnum.getDelayLevel());
- logger.warn("发送延时消息 message:{}", JSON.toJSONString(message));
- SendResult send = defaultMQProducer.send(message);
- if (Objects.nonNull(send) && SendStatus.SEND_OK.equals(send.getSendStatus())) {
- result = true;
- } else {
- logger.warn("延时消息发送失败,send={},body={}", JSON.toJSONString(send), body);
- }
- } catch (MQClientException | RemotingException | MQBrokerException e) {
- logger.warn("发送延时消息失败:{}", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return result;
- }
-
- /**
- * 构造消息body
- *
- * @param dto
- * @return
- */
- public String builderBody(Object dto) {
- // 构造消息body
- String body = null;
- if (dto instanceof String) {
- body = (String) dto;
- } else {
- body = JSON.toJSONString(dto);
- }
- return body;
- }
- }
顺序消费场景下的生产者代码(局部顺序),在特定场景下需要消息按照顺序消费时,可通过此方式进行发送
- package com.fss.project.mq.producer;
-
- import com.alibaba.fastjson.JSON;
- import com.fss.project.mq.enums.DelayLevelEnum;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.MessageQueueSelector;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.client.producer.SendStatus;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.common.message.MessageQueue;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.nio.charset.StandardCharsets;
- import java.util.List;
- import java.util.Objects;
-
- @Slf4j
- @Component
- public class RocketMqProductOrderLyComponent {
-
- private Logger logger = LoggerFactory.getLogger(RocketMqProductOrderLyComponent.class);
-
- @Resource
- private DefaultMQProducer defaultMQProducer;
-
- /**
- * 发送消息
- *
- * @param dto 具体数据
- * @param topicName
- * @param tagName
- * @return 执行状态
- */
- public boolean sendMessage(Object dto, String topicName, String tagName, String key) {
- if (Objects.isNull(dto) || Objects.isNull(topicName) || Objects.isNull(tagName) || Objects.isNull(key)) {
- return false;
- }
-
- boolean result = false;
-
- // 构造消息body
- String body = builderBody(dto);
-
- try {
- Message message = new Message(topicName, tagName, key, body.getBytes(StandardCharsets.UTF_8));
- /**
- * 局部的顺序消息
- * message:消息信息
- * arg:选择队列的业务标识
- */
- SendResult send = defaultMQProducer.send(message, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
- Long key = Long.parseLong((String) o);
- int index = (int) (key % list.size());
- return list.get(index);
- }
- }, key);
-
- System.err.println("发送者,发送消息:" + JSON.toJSONString(send));
- if (Objects.nonNull(send) && SendStatus.SEND_OK.equals(send.getSendStatus())) {
- result = true;
- } else {
- logger.warn("消息发送失败,send={},body={}", JSON.toJSONString(send), body);
- }
- } catch (MQClientException | RemotingException | MQBrokerException e) {
- logger.warn("发送消息失败:{}", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return result;
- }
-
- /**
- * 发送延时消息
- *
- * @param dto
- * @param topicName
- * @param tagName
- * @param delayLevelEnum 延时等级
- * @return
- */
- public boolean sendDelayMessage(Object dto, String topicName, String tagName, String key, DelayLevelEnum delayLevelEnum) {
- if (Objects.isNull(dto) || Objects.isNull(topicName) || Objects.isNull(tagName) || Objects.isNull(key) || Objects.isNull(delayLevelEnum)) {
- return false;
- }
-
- boolean result = false;
-
- // 构造消息body
- String body = builderBody(dto);
- try {
- Message message = new Message(topicName, tagName, key, body.getBytes(StandardCharsets.UTF_8));
- message.setDelayTimeLevel(delayLevelEnum.getDelayLevel());
-
- SendResult send = defaultMQProducer.send(message, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
- Long key = (Long) o;
- int index = (int) (key % list.size());
- return list.get(index);
- }
- }, key);
- if (Objects.nonNull(send) && SendStatus.SEND_OK.equals(send.getSendStatus())) {
- result = true;
- } else {
- logger.warn("延时消息发送失败,send={},body={}", JSON.toJSONString(send), body);
- }
- } catch (MQClientException | RemotingException | MQBrokerException e) {
- logger.warn("发送延时消息失败:{}", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return result;
- }
-
- /**
- * 构造消息body
- *
- * @param dto
- * @return
- */
- public String builderBody(Object dto) {
- // 构造消息body
- String body = null;
- if (dto instanceof String) {
- body = (String) dto;
- } else {
- body = JSON.toJSONString(dto);
- }
- return body;
- }
- }
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
通常局部有序已经我完全可以满足需求,并且效率上会更高,故通常是使用局部消费
所以在代码中,要发送顺序消息时,须指定key,即作为message的key,也作为key对队列长度取余来选择某个queue,从而实现局部顺序,key可以传入业务的唯一ID,例:订单ID、退款ID等。
消费者是通过模板方法的方式,来让开发者更多的关注业务逻辑,在监听器中对消息已经有了统一的处理。
消费者Bean配置
- package com.fss.project.mq.consume;
-
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.SpringBootConfiguration;
- import org.springframework.context.annotation.Bean;
- import org.springframework.util.StringUtils;
- import java.util.List;
-
- /**
- * 消费者配置
- */
- @SpringBootConfiguration
- public class MQConsumerConfiguration {
- public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
-
-
- @Value("${rocketmq.consumer.namesrvAddr:127.0.0.1:9876}")
- private String namesrvAddr;
-
- @Value("${rocketmq.consumer.groupName:producer-group}")
- private String groupName;
-
- /**
- * 并发消费topic
- */
- @Value("#{'${rocketmq.consumer.topics:DEMO_TEST_TOPIC}'.split(',')}")
- private List<String> topicList;
-
- /**
- * 顺序消费topic
- */
- @Value("#{'${rocketmq.consumer.topics:DEMO_ORDERLY_TOPIC}'.split(',')}")
- private List<String> orderLyTopicList;
-
- @Value("${rocketmq.consumer.consumeMessageBatchMaxSize:1}")
- private int consumeMessageBatchMaxSize;
-
- /**
- * 并发消费监听器
- */
- @Autowired
- private RocketMqMessageListener registerMessageListener;
-
- /**
- * 顺序消费监听器
- */
- @Autowired
- private RocketMqMessageOrderLyListener rocketMqMessageOrderLyListener;
-
- @Bean
- public DefaultMQPushConsumer getRocketMQConsumer() throws RuntimeException {
- if (StringUtils.isEmpty(groupName)){
- throw new RuntimeException("groupName is null !!!");
- }
- if (StringUtils.isEmpty(namesrvAddr)){
- throw new RuntimeException("namesrvAddr is null !!!");
- }
- if(StringUtils.isEmpty(topicList)){
- throw new RuntimeException("topics is null !!!");
- }
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
- consumer.setNamesrvAddr(namesrvAddr);
- consumer.registerMessageListener(registerMessageListener);
- // consumer.registerMessageListener(rocketMqMessageOrderLyListener);
- /**
- * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
- * 如果非第一次启动,那么按照上次消费的位置继续消费
- */
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- /**
- * 设置消费模型,集群还是广播,默认为集群
- */
- consumer.setMessageModel(MessageModel.CLUSTERING);
- /**
- * 设置一次消费消息的条数,默认为1条
- */
- consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
- try {
- /**
- * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3
- */
- topicList.forEach(topic->{
- try {
- consumer.subscribe(topic,"*");
- } catch (MQClientException e) {
- e.printStackTrace();
- }
- });
- // orderLyTopicList.forEach(topic->{
- // try {
- // consumer.subscribe(topic,"*");
- // } catch (MQClientException e) {
- // e.printStackTrace();
- // }
- // });
- consumer.start();
- LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topicList,namesrvAddr);
- }catch (MQClientException e){
- LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topicList,namesrvAddr,e);
- throw new RuntimeException(e);
- }
- return consumer;
- }
-
- }
通过DefaultMQPushConsumer设置监听器的实现类,来将消费逻辑转移给监听器RocketMqMessageListener 。
MessageHandler:定义处理消息的接口handle及处理的消息topic、tag的类型。
- public interface MessageHandler {
-
- void handle(String body);
-
- List<String> tags();
-
- String topic();
- }
AbstractMessageHandler:MessageHandler的子类,主要是自动将body的数据转成对应的dto
- package com.fss.project.mq.consume;
-
- import com.alibaba.fastjson.JSON;
-
- import java.lang.reflect.ParameterizedType;
- import java.lang.reflect.Type;
-
- public abstract class AbstractMessageHandler<T> implements MessageHandler{
-
- private Class<T> paramClass;
-
- public AbstractMessageHandler() {
- Type genericSuperclass = this.getClass().getGenericSuperclass();
- if(genericSuperclass instanceof ParameterizedType){
- ParameterizedType parameterizedType = (ParameterizedType) genericSuperclass;
- Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
- paramClass = (Class<T>) actualTypeArguments[0];
- }
- }
-
- @Override
- public void handle(String body) {
- T param = JSON.parseObject(body, paramClass);
- if (checkDo(param)) {
- handler(param);
- }
- }
-
- /**
- * 执行消费的具体业务逻辑
- * @param param
- */
- public abstract void handler(T param);
-
- /**
- * 检查是否需要执行
- */
- boolean checkDo(T param){
- return true;
- }
- }
RocketMqMessageListener :通过topic、tag可以确定AbstractMessageHandler的子类(某个topic、tag消费者),若传入的有key,则可以根据key来防止重复消费消息(key通常作为唯一业务),如没有传入,在具体的消费业务代码中根据业务ID来特定处理也可以。
- package com.fss.project.mq.consume;
-
- import com.alibaba.fastjson.JSON;
- import com.google.common.collect.HashBasedTable;
- import com.google.common.collect.Table;
- import org.apache.commons.collections.CollectionUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import java.nio.charset.StandardCharsets;
- import java.util.*;
-
- @Component
- public class RocketMqMessageListener implements MessageListenerConcurrently, ApplicationContextAware {
-
- private Logger logger = LoggerFactory.getLogger(RocketMqMessageListener.class);
-
- private ApplicationContext context;
-
- @Value("#{'${rocketmq.consumer.topics:DEMO_TEST_TOPIC}'.split(',')}")
- private List<String> topicList;
-
- private Table<String, String, List<MessageHandler>> messageHandlerTable = HashBasedTable.create();
-
- @PostConstruct
- public void init() {
- Map<String, MessageHandler> consumers = context.getBeansOfType(MessageHandler.class);
- consumers.values().forEach(
- messageHandler -> {
- String topic = messageHandler.topic();
- for (String tagName : messageHandler.tags()) {
- List<MessageHandler> messageHandlers = messageHandlerTable.get(topic, tagName);
- if (messageHandlers == null) {
- messageHandlers = new ArrayList<>(3);
- }
- messageHandlers.add(messageHandler);
- messageHandlerTable.put(topic, tagName, messageHandlers);
- }
- }
- );
- }
-
- private List<MessageHandler> getHandler(String topic, String tag) {
- if (StringUtils.isBlank(topic) || StringUtils.isBlank(tag)) {
- return Collections.emptyList();
- }
-
- return messageHandlerTable.get(topic, tag);
- }
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- try {
- for (MessageExt messageExt : list) {
- handlerMessageExt(messageExt);
- logger.info("ThreadName:{},messageExt:{},消费成功", Thread.currentThread().getName(), messageExt);
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- } catch (Exception e) {
- logger.warn("消息消费异常:e:{}", e);
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
-
- private void handlerMessageExt(MessageExt messageExt) {
- String topic = messageExt.getTopic();
- String tag = messageExt.getTags();
- String key = messageExt.getKeys();
-
- // 若传入key,则做唯一性校验
- if (Objects.nonNull(key) && checkMessageKey(key)) {
-
- }
-
- String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
- logger.info("handlerMessageExt,topic:{},tag:{},body:{}", topic, tag, body);
- if (!topicList.contains(topic)) {
- return;
- }
-
- List<MessageHandler> messageHandlerList = getHandler(topic, tag);
- if (CollectionUtils.isNotEmpty(messageHandlerList)) {
- messageHandlerList.forEach(
- messageHandler -> {
- messageHandler.handle(body);
- }
- );
- }
-
- }
-
- /**
- * 判断是否重复消费
- */
- private boolean checkMessageKey(String key) {
- // TODO: 2021/9/4 可根据redis、数据库保证消息不重复消费
- return false;
- }
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- context = applicationContext;
- }
- }
消费业务demo
- package com.fss.project.mq.consume.component;
-
- import com.fss.project.mq.consume.AbstractMessageHandler;
- import com.fss.project.mq.consume.RocketMqTagEnum;
- import com.fss.project.mq.consume.TopicEnum;
- import com.fss.project.mq.dto.OrderDto;
- import org.springframework.stereotype.Component;
- import java.util.Collections;
- import java.util.List;
-
- @Component
- public class OrderConsumer extends AbstractMessageHandler<OrderDto> {
-
- @Override
- public void handler(OrderDto param) {
- if(checkConsumer()){
- return;
- }
- System.err.println("成功消费:"+param);
- }
-
- /**
- * 若message未设置key,则可以根据body的数据(例orderId、refundId),根据redis、数据库中当前数据的状态来判断是否要消费
- * 若数据没有唯一ID来区分,则可以认为该消息不重要不作为唯一校验
- * @return
- */
- private boolean checkConsumer() {
- return false;
- }
-
- @Override
- public List<String> tags() {
- return Collections.singletonList(RocketMqTagEnum.TEST_TAG.getTagName());
- }
-
- @Override
- public String topic() {
- return TopicEnum.TEST_TOPIC.getTopicName();
- }
- }
若要顺序消费,消费者监听类需要实现MessageListenerOrderly来实现,例:RocketMqMessageOrderLyListener
- package com.fss.project.mq.consume;
-
- import com.google.common.collect.HashBasedTable;
- import com.google.common.collect.Table;
- import org.apache.commons.collections.CollectionUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.rocketmq.client.consumer.listener.*;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import java.nio.charset.StandardCharsets;
- import java.util.*;
-
- @Component
- public class RocketMqMessageOrderLyListener implements MessageListenerOrderly, ApplicationContextAware {
-
- private Logger logger = LoggerFactory.getLogger(RocketMqMessageOrderLyListener.class);
-
- private ApplicationContext context;
-
- @Value("#{'${rocketmq.consumer.orderly.topics:DEMO_ORDERLY_TOPIC}'.split(',')}")
- private List<String> topicList;
-
- private Table<String, String, List<MessageHandler>> messageHandlerTable = HashBasedTable.create();
-
- @PostConstruct
- public void init() {
- Map<String, MessageHandler> consumers = context.getBeansOfType(MessageHandler.class);
- consumers.values().forEach(
- messageHandler -> {
- String topic = messageHandler.topic();
- for (String tagName : messageHandler.tags()) {
- List<MessageHandler> messageHandlers = messageHandlerTable.get(topic, tagName);
- if (messageHandlers == null) {
- messageHandlers = new ArrayList<>(3);
- }
- messageHandlers.add(messageHandler);
- messageHandlerTable.put(topic, tagName, messageHandlers);
- }
- }
- );
- }
-
- private List<MessageHandler> getHandler(String topic, String tag) {
- if (StringUtils.isBlank(topic) || StringUtils.isBlank(tag)) {
- return Collections.emptyList();
- }
-
- return messageHandlerTable.get(topic, tag);
- }
-
- private void handlerMessageExt(MessageExt messageExt) {
- String topic = messageExt.getTopic();
- String tag = messageExt.getTags();
- String key = messageExt.getKeys();
-
- // 若传入key,则做唯一性校验
- if (Objects.nonNull(key) && checkMessageKey(key)) {
-
- }
-
- String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
- logger.info("handlerMessageExt,topic:{},tag:{},body:{}", topic, tag, body);
- if (!topicList.contains(topic)) {
- return;
- }
-
- List<MessageHandler> messageHandlerList = getHandler(topic, tag);
- if (CollectionUtils.isNotEmpty(messageHandlerList)) {
- messageHandlerList.forEach(
- messageHandler -> {
- messageHandler.handle(body);
- }
- );
- }
-
- }
-
- /**
- * 判断是否重复消费
- */
- private boolean checkMessageKey(String key) {
- // TODO: 2021/9/4 可根据redis、数据库保证消息不重复消费
- return false;
- }
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- context = applicationContext;
- }
-
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
- try {
- for (MessageExt messageExt : list) {
- handlerMessageExt(messageExt);
- logger.info("ThreadName:{},messageExt:{},消费成功", Thread.currentThread().getName(), messageExt);
- }
- return ConsumeOrderlyStatus.SUCCESS;
- } catch (Exception e) {
- logger.warn("消息消费异常:e:{}", e);
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- }
- }
Producer的send方法本身支持内部重试,重试逻辑如下:
以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
1 提高消费并行度
绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:
2 批量方式消费
某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。
3 优化每条消息消费过程
举例如下,某条消息的消费过程如下:
这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。
消费慢的两种类型:CPU内部计算代码、外部I/O操作代码
通常是外部IO操作导致的:
1、数据库操作慢
2、缓存数据库IO操作慢
3、下游系统PRC请求,响应慢
消息堆积:下游服务异常、达到DBMS的容量限制
Broker 角色分为 ASYNC_MASTER(异步主机)、SYNC_MASTER(同步主机)以及SLAVE(从机)。如果对消息的可靠性要求比较严格,可以采用 SYNC_MASTER加SLAVE的部署方式。如果对消息可靠性要求不高,可以采用ASYNC_MASTER加SLAVE的部署方式。如果只是测试方便,则可以选择仅ASYNC_MASTER或仅SYNC_MASTER的部署方式。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。