当前位置:   article > 正文

Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动_springboot应用内部用事件监听还是mq

springboot应用内部用事件监听还是mq

技术老男孩 2024-01-07 16:00 发表于广东

技术老男孩

分享技术路上的点滴,专注于后端技术,助力开发者成长,欢迎关注。

55篇原创内容

公众号

1、为什么要使用MQ?

在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?

首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。

通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。

图片

源码地址:

https://gitee.com/sparkle3021/springboot3-study

2、整合RocketMQ

依赖版本
  • JDK 17

  • Spring Boot 3.2.0

  • RocketMQ-Client 5.0.4

  • RocketMQ-Starter 2.2.0

Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。

引入RocketMQ依赖
  1. <dependency>
  2.     <groupId>org.apache.rocketmq</groupId>
  3.     <artifactId>rocketmq-client-java</artifactId>
  4.     <version>5.0.4</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.apache.rocketmq</groupId>
  8.     <artifactId>rocketmq-spring-boot-starter</artifactId>
  9.     <version>2.2.0</version>
  10. </dependency>
解决Spring Boot3+不兼容 spring.factories

rocketmq-spring-boot-starter:2.2.2版本中:

图片

参考配置文件
  1. # RocketMQ 配置
  2. rocketmq:
  3.   name-server: 127.0.0.1:9876
  4.   consumer:
  5.     group: event-mq-group
  6.     # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
  7.     pull-batch-size: 1
  8.   producer:
  9.     # 发送同一类消息的设置为同一个group,保证唯一
  10.     group: event-mq-group
  11.     # 发送消息超时时间,默认3000
  12.     sendMessageTimeout: 10000
  13.     # 发送消息失败重试次数,默认2
  14.     retryTimesWhenSendFailed: 2
  15.     # 异步消息重试此处,默认2
  16.     retryTimesWhenSendAsyncFailed: 2
  17.     # 消息最大长度,默认1024 * 1024 * 4(默认4M)
  18.     maxMessageSize: 4096
  19.     # 压缩消息阈值,默认4k(1024 * 4)
  20.     compressMessageBodyThreshold: 4096
  21.     # 是否在内部发送失败时重试另一个broker,默认false
  22.     retryNextServer: false

参考Issue

  • 方法一 :通过@Import(RocketMQAutoConfiguration.class)在配置类中引入

  • 方法二:在resources资源目录下创建文件夹及文件META-INF/spring,org.springframework.boot.autoconfigure.AutoConfiguration.imports

文件内容为RocketMQ自动配置类路径:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

3、RocketMQ 使用

解决Spring Boot3+不支持spring.factories的问题
  1. import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.context.annotation.Import;
  5. /**
  6.  * 启动类
  7.  */
  8. @Import(RocketMQAutoConfiguration.class)
  9. @SpringBootApplication
  10. public class MQEventApplication {
  11.     public static void main(String[] args) {
  12.         SpringApplication.run(MQEventApplication.class, args);
  13.     }
  14. }
RocketMQ操作工具

RocketMQ Message实体

  1. import cn.hutool.core.util.IdUtil;
  2. import jakarta.validation.constraints.NotBlank;
  3. import lombok.AllArgsConstructor;
  4. import lombok.Builder;
  5. import lombok.Data;
  6. import lombok.NoArgsConstructor;
  7. import org.apache.commons.collections.CollectionUtils;
  8. import org.apache.commons.lang3.ObjectUtils;
  9. import org.springframework.messaging.Message;
  10. import org.springframework.messaging.support.MessageBuilder;
  11. import java.io.Serializable;
  12. import java.util.List;
  13. /**
  14.  * RocketMQ 消息
  15.  */
  16. @Data
  17. @Builder
  18. @AllArgsConstructor
  19. @NoArgsConstructor
  20. public class RocketMQMessage<T> implements Serializable {
  21.     /**
  22.      * 消息队列主题
  23.      */
  24.     @NotBlank(message = "MQ Topic 不能为空")
  25.     private String topic;
  26.     /**
  27.      * 延迟级别
  28.      */
  29.     @Builder.Default
  30.     private DelayLevel delayLevel = DelayLevel.OFF;
  31.     /**
  32.      * 消息体
  33.      */
  34.     private T message;
  35.     /**
  36.      * 消息体
  37.      */
  38.     private List<T> messages;
  39.     /**
  40.      * 使用有序消息发送时,指定发送到队列
  41.      */
  42.     private String hashKey;
  43.     /**
  44.      * 任务Id,用于日志打印相关信息
  45.      */
  46.     @Builder.Default
  47.     private String taskId = IdUtil.fastSimpleUUID();
  48. }

RocketMQTemplate 二次封装

  1. import com.yiyan.study.domain.RocketMQMessage;
  2. import jakarta.annotation.Resource;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.apache.rocketmq.client.producer.SendCallback;
  6. import org.apache.rocketmq.client.producer.SendResult;
  7. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.stereotype.Component;
  10. /**
  11.  * RocketMQ 消息工具类
  12.  */
  13. @Slf4j
  14. @Component
  15. public class RocketMQService {
  16.     @Resource
  17.     private RocketMQTemplate rocketMQTemplate;
  18.     @Value("${rocketmq.producer.sendMessageTimeout}")
  19.     private int sendMessageTimeout;
  20.     /**
  21.      * 异步发送消息回调
  22.      *
  23.      * @param taskId 任务Id
  24.      * @param topic  消息主题
  25.      * @return the send callback
  26.      */
  27.     private static SendCallback asyncSendCallback(String taskId, String topic) {
  28.         return new SendCallback() {
  29.             @Override
  30.             public void onSuccess(SendResult sendResult) {
  31.                 log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());
  32.             }
  33.             @Override
  34.             public void onException(Throwable throwable) {
  35.                 log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());
  36.             }
  37.         };
  38.     }
  39.     /**
  40.      * 发送同步消息,使用有序发送请设置HashKey
  41.      *
  42.      * @param message 消息参数
  43.      */
  44.     public <T> void syncSend(RocketMQMessage<T> message) {
  45.         log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
  46.         SendResult sendResult;
  47.         if (StringUtils.isNotBlank(message.getHashKey())) {
  48.             sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
  49.         } else {
  50.             sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());
  51.         }
  52.         log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
  53.                 message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
  54.     }
  55.     /**
  56.      * 批量发送同步消息
  57.      *
  58.      * @param message 消息参数
  59.      */
  60.     public <T> void syncSendBatch(RocketMQMessage<T> message) {
  61.         log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
  62.                 message.getTaskId(), message.getTopic(), message.getMessages().size());
  63.         SendResult sendResult;
  64.         if (StringUtils.isNotBlank(message.getHashKey())) {
  65.             sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());
  66.         } else {
  67.             sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());
  68.         }
  69.         log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
  70.                 message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
  71.     }
  72.     /**
  73.      * 异步发送消息,异步返回消息结果
  74.      *
  75.      * @param message 消息参数
  76.      */
  77.     public <T> void asyncSend(RocketMQMessage<T> message) {
  78.         log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
  79.         if (StringUtils.isNotBlank(message.getHashKey())) {
  80.             rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),
  81.                     asyncSendCallback(message.getTaskId(), message.getTopic()));
  82.         } else {
  83.             rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),
  84.                     asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());
  85.         }
  86.     }
  87.     /**
  88.      * 批量异步发送消息
  89.      *
  90.      * @param message 消息参数
  91.      */
  92.     public <T> void asyncSendBatch(RocketMQMessage<T> message) {
  93.         log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
  94.                 message.getTaskId(), message.getTopic(), message.getMessages().size());
  95.         if (StringUtils.isNotBlank(message.getHashKey())) {
  96.             rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),
  97.                     asyncSendCallback(message.getTaskId(), message.getTopic()));
  98.         } else {
  99.             rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),
  100.                     asyncSendCallback(message.getTaskId(), message.getTopic()));
  101.         }
  102.     }
  103.     /**
  104.      * 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;
  105.      *
  106.      * @param message 消息参数
  107.      */
  108.     public <T> void sendOneWay(RocketMQMessage<T> message) {
  109.         sendOneWay(message, false);
  110.     }
  111.     /**
  112.      * 单向消息 - 批量发送
  113.      *
  114.      * @param message 消息体
  115.      * @param batch   是否为批量操作
  116.      */
  117.     public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {
  118.         log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]"
  119.                         : "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),
  120.                 message.getTaskId(), message.getTopic(), message.getMessages().size());
  121.         if (StringUtils.isNotBlank(message.getHashKey())) {
  122.             if (batch) {
  123.                 message.getMessages().
  124.                         forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));
  125.             } else {
  126.                 rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
  127.             }
  128.         } else {
  129.             if (batch) {
  130.                 message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));
  131.             } else {
  132.                 rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());
  133.             }
  134.         }
  135.     }
  136. }
定义RocketMQ消费者
  1. import com.yiyan.study.constants.MQConfig;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  4. import org.apache.rocketmq.spring.core.RocketMQListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7.  * MQ消息监听
  8.  */
  9. @Component
  10. @Slf4j
  11. @RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,
  12.         consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
  13. public class MQListener implements RocketMQListener<String> {
  14.     @Override
  15.     public void onMessage(String message) {
  16.         log.info("MQListener 接收消息 : {}", message);
  17.     }
  18. }
定义测试类发送消息
  1. import cn.hutool.core.thread.ThreadUtil;
  2. import com.yiyan.study.constants.MQConfig;
  3. import com.yiyan.study.domain.RocketMQMessage;
  4. import com.yiyan.study.utils.RocketMQService;
  5. import jakarta.annotation.Resource;
  6. import org.junit.jupiter.api.Test;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. /**
  9.  * MQ测试
  10.  */
  11. @SpringBootTest
  12. public class MQTest {
  13.     @Resource
  14.     private RocketMQService rocketMQService;
  15.     @Test
  16.     public void sendMessage() {
  17.         int count = 1;
  18.         while (count <= 50) {
  19.             rocketMQService.syncSend(RocketMQMessage.builder()
  20.                     .topic(MQConfig.EVENT_TOPIC)
  21.                     .message(count++)
  22.                     .build());
  23.         }
  24.         // 休眠等待消费消息
  25.         ThreadUtil.sleep(2000L);
  26.     }
  27. }

4、测试

图片

原文:

blog.csdn.net/m0_55712478/

article/details/135242345

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/567714
推荐阅读
相关标签
  

闽ICP备14008679号