当前位置:   article > 正文

Spring Boot 通过监听器方式整合 RocketMq(基于模板方法的并发消费、局部顺序消息消费)_springboot rocketmq 消费者监听

springboot rocketmq 消费者监听

目录

生产者发送消息

消费者消费消息

常见问题

 1、消息发送失败处理方式

2、消费过程幂等

3、消费速度慢的处理方式

 Broker 角色


项目中有关rocketMq及相关类的Maven依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.1.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.rocketmq</groupId>
  8. <artifactId>rocketmq-client</artifactId>
  9. <version>4.3.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>com.google.guava</groupId>
  13. <artifactId>guava</artifactId>
  14. <version>30.1.1-jre</version>
  15. </dependency>

生产者发送消息

并发消费场景下的生产者代码,并发消息无法保证消息一定是按照顺序消费,在绝大多数场景下不需要过问消息的消费顺序,可通过此方式进行mq消息的发送:

  1. package com.fss.project.mq.producer;
  2. import com.alibaba.fastjson.JSON;
  3. import com.fss.project.mq.enums.DelayLevelEnum;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.rocketmq.client.exception.MQBrokerException;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  8. import org.apache.rocketmq.client.producer.SendResult;
  9. import org.apache.rocketmq.client.producer.SendStatus;
  10. import org.apache.rocketmq.common.message.Message;
  11. import org.apache.rocketmq.remoting.exception.RemotingException;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import org.springframework.stereotype.Component;
  15. import javax.annotation.Resource;
  16. import java.nio.charset.StandardCharsets;
  17. import java.util.Objects;
  18. @Slf4j
  19. @Component
  20. public class RocketMqProductComponent {
  21. private Logger logger = LoggerFactory.getLogger(RocketMqProductComponent.class);
  22. @Resource
  23. private DefaultMQProducer defaultMQProducer;
  24. /**
  25. * 发送消息
  26. *
  27. * @param dto 具体数据
  28. * @param topicName
  29. * @param tagName
  30. * @param key
  31. * @return 执行状态
  32. */
  33. public boolean sendMessage(Object dto, String topicName, String tagName, String key) {
  34. if (Objects.isNull(dto) || Objects.isNull(topicName) || Objects.isNull(tagName)) {
  35. return false;
  36. }
  37. boolean result = false;
  38. // 构造消息body
  39. String body = builderBody(dto);
  40. try {
  41. Message message = new Message(topicName, tagName, key, body.getBytes(StandardCharsets.UTF_8));
  42. SendResult send = defaultMQProducer.send(message);
  43. logger.info("发送者,发送消息:" + JSON.toJSONString(send));
  44. if (Objects.nonNull(send) && SendStatus.SEND_OK.equals(send.getSendStatus())) {
  45. result = true;
  46. } else {
  47. logger.warn("消息发送失败,send={},body={}", JSON.toJSONString(send), body);
  48. }
  49. } catch (MQClientException | RemotingException | MQBrokerException e) {
  50. logger.warn("发送消息失败:{}", e);
  51. } catch (InterruptedException e) {
  52. Thread.currentThread().interrupt();
  53. }
  54. return result;
  55. }
  56. /**
  57. * 发送延时消息
  58. *
  59. * @param dto
  60. * @param topicName
  61. * @param tagName
  62. * @param delayLevelEnum 延时等级
  63. * @return
  64. */
  65. public boolean sendDelayMessage(Object dto, String topicName, String tagName, String key, DelayLevelEnum delayLevelEnum) {
  66. if (Objects.isNull(dto) || Objects.isNull(topicName) || Objects.isNull(tagName) || Objects.isNull(delayLevelEnum)) {
  67. return false;
  68. }
  69. boolean result = false;
  70. // 构造消息body
  71. String body = builderBody(dto);
  72. try {
  73. Message message = new Message(topicName, tagName, key, body.getBytes(StandardCharsets.UTF_8));
  74. message.setDelayTimeLevel(delayLevelEnum.getDelayLevel());
  75. logger.warn("发送延时消息 message:{}", JSON.toJSONString(message));
  76. SendResult send = defaultMQProducer.send(message);
  77. if (Objects.nonNull(send) && SendStatus.SEND_OK.equals(send.getSendStatus())) {
  78. result = true;
  79. } else {
  80. logger.warn("延时消息发送失败,send={},body={}", JSON.toJSONString(send), body);
  81. }
  82. } catch (MQClientException | RemotingException | MQBrokerException e) {
  83. logger.warn("发送延时消息失败:{}", e);
  84. } catch (InterruptedException e) {
  85. Thread.currentThread().interrupt();
  86. }
  87. return result;
  88. }
  89. /**
  90. * 构造消息body
  91. *
  92. * @param dto
  93. * @return
  94. */
  95. public String builderBody(Object dto) {
  96. // 构造消息body
  97. String body = null;
  98. if (dto instanceof String) {
  99. body = (String) dto;
  100. } else {
  101. body = JSON.toJSONString(dto);
  102. }
  103. return body;
  104. }
  105. }

顺序消费场景下的生产者代码(局部顺序),在特定场景下需要消息按照顺序消费时,可通过此方式进行发送

  1. package com.fss.project.mq.producer;
  2. import com.alibaba.fastjson.JSON;
  3. import com.fss.project.mq.enums.DelayLevelEnum;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.rocketmq.client.exception.MQBrokerException;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  8. import org.apache.rocketmq.client.producer.MessageQueueSelector;
  9. import org.apache.rocketmq.client.producer.SendResult;
  10. import org.apache.rocketmq.client.producer.SendStatus;
  11. import org.apache.rocketmq.common.message.Message;
  12. import org.apache.rocketmq.common.message.MessageQueue;
  13. import org.apache.rocketmq.remoting.exception.RemotingException;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import org.springframework.stereotype.Component;
  17. import javax.annotation.Resource;
  18. import java.nio.charset.StandardCharsets;
  19. import java.util.List;
  20. import java.util.Objects;
  21. @Slf4j
  22. @Component
  23. public class RocketMqProductOrderLyComponent {
  24. private Logger logger = LoggerFactory.getLogger(RocketMqProductOrderLyComponent.class);
  25. @Resource
  26. private DefaultMQProducer defaultMQProducer;
  27. /**
  28. * 发送消息
  29. *
  30. * @param dto 具体数据
  31. * @param topicName
  32. * @param tagName
  33. * @return 执行状态
  34. */
  35. public boolean sendMessage(Object dto, String topicName, String tagName, String key) {
  36. if (Objects.isNull(dto) || Objects.isNull(topicName) || Objects.isNull(tagName) || Objects.isNull(key)) {
  37. return false;
  38. }
  39. boolean result = false;
  40. // 构造消息body
  41. String body = builderBody(dto);
  42. try {
  43. Message message = new Message(topicName, tagName, key, body.getBytes(StandardCharsets.UTF_8));
  44. /**
  45. * 局部的顺序消息
  46. * message:消息信息
  47. * arg:选择队列的业务标识
  48. */
  49. SendResult send = defaultMQProducer.send(message, new MessageQueueSelector() {
  50. @Override
  51. public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
  52. Long key = Long.parseLong((String) o);
  53. int index = (int) (key % list.size());
  54. return list.get(index);
  55. }
  56. }, key);
  57. System.err.println("发送者,发送消息:" + JSON.toJSONString(send));
  58. if (Objects.nonNull(send) && SendStatus.SEND_OK.equals(send.getSendStatus())) {
  59. result = true;
  60. } else {
  61. logger.warn("消息发送失败,send={},body={}", JSON.toJSONString(send), body);
  62. }
  63. } catch (MQClientException | RemotingException | MQBrokerException e) {
  64. logger.warn("发送消息失败:{}", e);
  65. } catch (InterruptedException e) {
  66. Thread.currentThread().interrupt();
  67. }
  68. return result;
  69. }
  70. /**
  71. * 发送延时消息
  72. *
  73. * @param dto
  74. * @param topicName
  75. * @param tagName
  76. * @param delayLevelEnum 延时等级
  77. * @return
  78. */
  79. public boolean sendDelayMessage(Object dto, String topicName, String tagName, String key, DelayLevelEnum delayLevelEnum) {
  80. if (Objects.isNull(dto) || Objects.isNull(topicName) || Objects.isNull(tagName) || Objects.isNull(key) || Objects.isNull(delayLevelEnum)) {
  81. return false;
  82. }
  83. boolean result = false;
  84. // 构造消息body
  85. String body = builderBody(dto);
  86. try {
  87. Message message = new Message(topicName, tagName, key, body.getBytes(StandardCharsets.UTF_8));
  88. message.setDelayTimeLevel(delayLevelEnum.getDelayLevel());
  89. SendResult send = defaultMQProducer.send(message, new MessageQueueSelector() {
  90. @Override
  91. public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
  92. Long key = (Long) o;
  93. int index = (int) (key % list.size());
  94. return list.get(index);
  95. }
  96. }, key);
  97. if (Objects.nonNull(send) && SendStatus.SEND_OK.equals(send.getSendStatus())) {
  98. result = true;
  99. } else {
  100. logger.warn("延时消息发送失败,send={},body={}", JSON.toJSONString(send), body);
  101. }
  102. } catch (MQClientException | RemotingException | MQBrokerException e) {
  103. logger.warn("发送延时消息失败:{}", e);
  104. } catch (InterruptedException e) {
  105. Thread.currentThread().interrupt();
  106. }
  107. return result;
  108. }
  109. /**
  110. * 构造消息body
  111. *
  112. * @param dto
  113. * @return
  114. */
  115. public String builderBody(Object dto) {
  116. // 构造消息body
  117. String body = null;
  118. if (dto instanceof String) {
  119. body = (String) dto;
  120. } else {
  121. body = JSON.toJSONString(dto);
  122. }
  123. return body;
  124. }
  125. }

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

通常局部有序已经我完全可以满足需求,并且效率上会更高,故通常是使用局部消费

所以在代码中,要发送顺序消息时,须指定key,即作为message的key,也作为key对队列长度取余来选择某个queue,从而实现局部顺序,key可以传入业务的唯一ID,例:订单ID、退款ID等。

消费者消费消息

消费者是通过模板方法的方式,来让开发者更多的关注业务逻辑,在监听器中对消息已经有了统一的处理。

消费者Bean配置

  1. package com.fss.project.mq.consume;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  5. import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.boot.SpringBootConfiguration;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.util.StringUtils;
  13. import java.util.List;
  14. /**
  15. * 消费者配置
  16. */
  17. @SpringBootConfiguration
  18. public class MQConsumerConfiguration {
  19. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
  20. @Value("${rocketmq.consumer.namesrvAddr:127.0.0.1:9876}")
  21. private String namesrvAddr;
  22. @Value("${rocketmq.consumer.groupName:producer-group}")
  23. private String groupName;
  24. /**
  25. * 并发消费topic
  26. */
  27. @Value("#{'${rocketmq.consumer.topics:DEMO_TEST_TOPIC}'.split(',')}")
  28. private List<String> topicList;
  29. /**
  30. * 顺序消费topic
  31. */
  32. @Value("#{'${rocketmq.consumer.topics:DEMO_ORDERLY_TOPIC}'.split(',')}")
  33. private List<String> orderLyTopicList;
  34. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize:1}")
  35. private int consumeMessageBatchMaxSize;
  36. /**
  37. * 并发消费监听器
  38. */
  39. @Autowired
  40. private RocketMqMessageListener registerMessageListener;
  41. /**
  42. * 顺序消费监听器
  43. */
  44. @Autowired
  45. private RocketMqMessageOrderLyListener rocketMqMessageOrderLyListener;
  46. @Bean
  47. public DefaultMQPushConsumer getRocketMQConsumer() throws RuntimeException {
  48. if (StringUtils.isEmpty(groupName)){
  49. throw new RuntimeException("groupName is null !!!");
  50. }
  51. if (StringUtils.isEmpty(namesrvAddr)){
  52. throw new RuntimeException("namesrvAddr is null !!!");
  53. }
  54. if(StringUtils.isEmpty(topicList)){
  55. throw new RuntimeException("topics is null !!!");
  56. }
  57. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
  58. consumer.setNamesrvAddr(namesrvAddr);
  59. consumer.registerMessageListener(registerMessageListener);
  60. // consumer.registerMessageListener(rocketMqMessageOrderLyListener);
  61. /**
  62. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
  63. * 如果非第一次启动,那么按照上次消费的位置继续消费
  64. */
  65. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  66. /**
  67. * 设置消费模型,集群还是广播,默认为集群
  68. */
  69. consumer.setMessageModel(MessageModel.CLUSTERING);
  70. /**
  71. * 设置一次消费消息的条数,默认为1条
  72. */
  73. consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
  74. try {
  75. /**
  76. * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3
  77. */
  78. topicList.forEach(topic->{
  79. try {
  80. consumer.subscribe(topic,"*");
  81. } catch (MQClientException e) {
  82. e.printStackTrace();
  83. }
  84. });
  85. // orderLyTopicList.forEach(topic->{
  86. // try {
  87. // consumer.subscribe(topic,"*");
  88. // } catch (MQClientException e) {
  89. // e.printStackTrace();
  90. // }
  91. // });
  92. consumer.start();
  93. LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topicList,namesrvAddr);
  94. }catch (MQClientException e){
  95. LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topicList,namesrvAddr,e);
  96. throw new RuntimeException(e);
  97. }
  98. return consumer;
  99. }
  100. }

通过DefaultMQPushConsumer设置监听器的实现类,来将消费逻辑转移给监听器RocketMqMessageListener 。

MessageHandler:定义处理消息的接口handle及处理的消息topic、tag的类型。

  1. public interface MessageHandler {
  2. void handle(String body);
  3. List<String> tags();
  4. String topic();
  5. }
AbstractMessageHandler:MessageHandler的子类,主要是自动将body的数据转成对应的dto
  1. package com.fss.project.mq.consume;
  2. import com.alibaba.fastjson.JSON;
  3. import java.lang.reflect.ParameterizedType;
  4. import java.lang.reflect.Type;
  5. public abstract class AbstractMessageHandler<T> implements MessageHandler{
  6. private Class<T> paramClass;
  7. public AbstractMessageHandler() {
  8. Type genericSuperclass = this.getClass().getGenericSuperclass();
  9. if(genericSuperclass instanceof ParameterizedType){
  10. ParameterizedType parameterizedType = (ParameterizedType) genericSuperclass;
  11. Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
  12. paramClass = (Class<T>) actualTypeArguments[0];
  13. }
  14. }
  15. @Override
  16. public void handle(String body) {
  17. T param = JSON.parseObject(body, paramClass);
  18. if (checkDo(param)) {
  19. handler(param);
  20. }
  21. }
  22. /**
  23. * 执行消费的具体业务逻辑
  24. * @param param
  25. */
  26. public abstract void handler(T param);
  27. /**
  28. * 检查是否需要执行
  29. */
  30. boolean checkDo(T param){
  31. return true;
  32. }
  33. }

RocketMqMessageListener :通过topic、tag可以确定AbstractMessageHandler的子类(某个topic、tag消费者),若传入的有key,则可以根据key来防止重复消费消息(key通常作为唯一业务),如没有传入,在具体的消费业务代码中根据业务ID来特定处理也可以。

  1. package com.fss.project.mq.consume;
  2. import com.alibaba.fastjson.JSON;
  3. import com.google.common.collect.HashBasedTable;
  4. import com.google.common.collect.Table;
  5. import org.apache.commons.collections.CollectionUtils;
  6. import org.apache.commons.lang3.StringUtils;
  7. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  8. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  9. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  10. import org.apache.rocketmq.common.message.MessageExt;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import org.springframework.beans.BeansException;
  14. import org.springframework.beans.factory.annotation.Value;
  15. import org.springframework.context.ApplicationContext;
  16. import org.springframework.context.ApplicationContextAware;
  17. import org.springframework.stereotype.Component;
  18. import javax.annotation.PostConstruct;
  19. import java.nio.charset.StandardCharsets;
  20. import java.util.*;
  21. @Component
  22. public class RocketMqMessageListener implements MessageListenerConcurrently, ApplicationContextAware {
  23. private Logger logger = LoggerFactory.getLogger(RocketMqMessageListener.class);
  24. private ApplicationContext context;
  25. @Value("#{'${rocketmq.consumer.topics:DEMO_TEST_TOPIC}'.split(',')}")
  26. private List<String> topicList;
  27. private Table<String, String, List<MessageHandler>> messageHandlerTable = HashBasedTable.create();
  28. @PostConstruct
  29. public void init() {
  30. Map<String, MessageHandler> consumers = context.getBeansOfType(MessageHandler.class);
  31. consumers.values().forEach(
  32. messageHandler -> {
  33. String topic = messageHandler.topic();
  34. for (String tagName : messageHandler.tags()) {
  35. List<MessageHandler> messageHandlers = messageHandlerTable.get(topic, tagName);
  36. if (messageHandlers == null) {
  37. messageHandlers = new ArrayList<>(3);
  38. }
  39. messageHandlers.add(messageHandler);
  40. messageHandlerTable.put(topic, tagName, messageHandlers);
  41. }
  42. }
  43. );
  44. }
  45. private List<MessageHandler> getHandler(String topic, String tag) {
  46. if (StringUtils.isBlank(topic) || StringUtils.isBlank(tag)) {
  47. return Collections.emptyList();
  48. }
  49. return messageHandlerTable.get(topic, tag);
  50. }
  51. @Override
  52. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  53. try {
  54. for (MessageExt messageExt : list) {
  55. handlerMessageExt(messageExt);
  56. logger.info("ThreadName:{},messageExt:{},消费成功", Thread.currentThread().getName(), messageExt);
  57. }
  58. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  59. } catch (Exception e) {
  60. logger.warn("消息消费异常:e:{}", e);
  61. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  62. }
  63. }
  64. private void handlerMessageExt(MessageExt messageExt) {
  65. String topic = messageExt.getTopic();
  66. String tag = messageExt.getTags();
  67. String key = messageExt.getKeys();
  68. // 若传入key,则做唯一性校验
  69. if (Objects.nonNull(key) && checkMessageKey(key)) {
  70. }
  71. String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
  72. logger.info("handlerMessageExt,topic:{},tag:{},body:{}", topic, tag, body);
  73. if (!topicList.contains(topic)) {
  74. return;
  75. }
  76. List<MessageHandler> messageHandlerList = getHandler(topic, tag);
  77. if (CollectionUtils.isNotEmpty(messageHandlerList)) {
  78. messageHandlerList.forEach(
  79. messageHandler -> {
  80. messageHandler.handle(body);
  81. }
  82. );
  83. }
  84. }
  85. /**
  86. * 判断是否重复消费
  87. */
  88. private boolean checkMessageKey(String key) {
  89. // TODO: 2021/9/4 可根据redis、数据库保证消息不重复消费
  90. return false;
  91. }
  92. @Override
  93. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  94. context = applicationContext;
  95. }
  96. }

消费业务demo

  1. package com.fss.project.mq.consume.component;
  2. import com.fss.project.mq.consume.AbstractMessageHandler;
  3. import com.fss.project.mq.consume.RocketMqTagEnum;
  4. import com.fss.project.mq.consume.TopicEnum;
  5. import com.fss.project.mq.dto.OrderDto;
  6. import org.springframework.stereotype.Component;
  7. import java.util.Collections;
  8. import java.util.List;
  9. @Component
  10. public class OrderConsumer extends AbstractMessageHandler<OrderDto> {
  11. @Override
  12. public void handler(OrderDto param) {
  13. if(checkConsumer()){
  14. return;
  15. }
  16. System.err.println("成功消费:"+param);
  17. }
  18. /**
  19. * 若message未设置key,则可以根据body的数据(例orderId、refundId),根据redis、数据库中当前数据的状态来判断是否要消费
  20. * 若数据没有唯一ID来区分,则可以认为该消息不重要不作为唯一校验
  21. * @return
  22. */
  23. private boolean checkConsumer() {
  24. return false;
  25. }
  26. @Override
  27. public List<String> tags() {
  28. return Collections.singletonList(RocketMqTagEnum.TEST_TAG.getTagName());
  29. }
  30. @Override
  31. public String topic() {
  32. return TopicEnum.TEST_TOPIC.getTopicName();
  33. }
  34. }

若要顺序消费,消费者监听类需要实现MessageListenerOrderly来实现,例:RocketMqMessageOrderLyListener

  1. package com.fss.project.mq.consume;
  2. import com.google.common.collect.HashBasedTable;
  3. import com.google.common.collect.Table;
  4. import org.apache.commons.collections.CollectionUtils;
  5. import org.apache.commons.lang3.StringUtils;
  6. import org.apache.rocketmq.client.consumer.listener.*;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.beans.BeansException;
  11. import org.springframework.beans.factory.annotation.Value;
  12. import org.springframework.context.ApplicationContext;
  13. import org.springframework.context.ApplicationContextAware;
  14. import org.springframework.stereotype.Component;
  15. import javax.annotation.PostConstruct;
  16. import java.nio.charset.StandardCharsets;
  17. import java.util.*;
  18. @Component
  19. public class RocketMqMessageOrderLyListener implements MessageListenerOrderly, ApplicationContextAware {
  20. private Logger logger = LoggerFactory.getLogger(RocketMqMessageOrderLyListener.class);
  21. private ApplicationContext context;
  22. @Value("#{'${rocketmq.consumer.orderly.topics:DEMO_ORDERLY_TOPIC}'.split(',')}")
  23. private List<String> topicList;
  24. private Table<String, String, List<MessageHandler>> messageHandlerTable = HashBasedTable.create();
  25. @PostConstruct
  26. public void init() {
  27. Map<String, MessageHandler> consumers = context.getBeansOfType(MessageHandler.class);
  28. consumers.values().forEach(
  29. messageHandler -> {
  30. String topic = messageHandler.topic();
  31. for (String tagName : messageHandler.tags()) {
  32. List<MessageHandler> messageHandlers = messageHandlerTable.get(topic, tagName);
  33. if (messageHandlers == null) {
  34. messageHandlers = new ArrayList<>(3);
  35. }
  36. messageHandlers.add(messageHandler);
  37. messageHandlerTable.put(topic, tagName, messageHandlers);
  38. }
  39. }
  40. );
  41. }
  42. private List<MessageHandler> getHandler(String topic, String tag) {
  43. if (StringUtils.isBlank(topic) || StringUtils.isBlank(tag)) {
  44. return Collections.emptyList();
  45. }
  46. return messageHandlerTable.get(topic, tag);
  47. }
  48. private void handlerMessageExt(MessageExt messageExt) {
  49. String topic = messageExt.getTopic();
  50. String tag = messageExt.getTags();
  51. String key = messageExt.getKeys();
  52. // 若传入key,则做唯一性校验
  53. if (Objects.nonNull(key) && checkMessageKey(key)) {
  54. }
  55. String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
  56. logger.info("handlerMessageExt,topic:{},tag:{},body:{}", topic, tag, body);
  57. if (!topicList.contains(topic)) {
  58. return;
  59. }
  60. List<MessageHandler> messageHandlerList = getHandler(topic, tag);
  61. if (CollectionUtils.isNotEmpty(messageHandlerList)) {
  62. messageHandlerList.forEach(
  63. messageHandler -> {
  64. messageHandler.handle(body);
  65. }
  66. );
  67. }
  68. }
  69. /**
  70. * 判断是否重复消费
  71. */
  72. private boolean checkMessageKey(String key) {
  73. // TODO: 2021/9/4 可根据redis、数据库保证消息不重复消费
  74. return false;
  75. }
  76. @Override
  77. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  78. context = applicationContext;
  79. }
  80. @Override
  81. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
  82. try {
  83. for (MessageExt messageExt : list) {
  84. handlerMessageExt(messageExt);
  85. logger.info("ThreadName:{},messageExt:{},消费成功", Thread.currentThread().getName(), messageExt);
  86. }
  87. return ConsumeOrderlyStatus.SUCCESS;
  88. } catch (Exception e) {
  89. logger.warn("消息消费异常:e:{}", e);
  90. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  91. }
  92. }
  93. }

常见问题

 1、消息发送失败处理方式

Producer的send方法本身支持内部重试,重试逻辑如下:

  • 至多重试2次。
  • 如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
  • 如果本身向broker发送消息产生超时异常,就不会再重试。

以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。

2、消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)

msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

3、消费速度慢的处理方式

1 提高消费并行度

绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:

  • 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
  • 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。

2 批量方式消费

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

3 优化每条消息消费过程

举例如下,某条消息的消费过程如下:

  • 根据消息从 DB 查询【数据 1】
  • 根据消息从 DB 查询【数据 2】
  • 复杂的业务计算
  • 向 DB 插入【数据 3】
  • 向 DB 插入【数据 4】

这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。

消费慢的两种类型:CPU内部计算代码、外部I/O操作代码

通常是外部IO操作导致的:

1、数据库操作慢

2、缓存数据库IO操作慢

3、下游系统PRC请求,响应慢

消息堆积:下游服务异常、达到DBMS的容量限制

 Broker 角色

​ Broker 角色分为 ASYNC_MASTER(异步主机)、SYNC_MASTER(同步主机)以及SLAVE(从机)。如果对消息的可靠性要求比较严格,可以采用 SYNC_MASTER加SLAVE的部署方式。如果对消息可靠性要求不高,可以采用ASYNC_MASTER加SLAVE的部署方式。如果只是测试方便,则可以选择仅ASYNC_MASTER或仅SYNC_MASTER的部署方式。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/713089?site
推荐阅读
相关标签
  

闽ICP备14008679号