赞
踩
技术老男孩 2024-01-07 16:00 发表于广东
技术老男孩
分享技术路上的点滴,专注于后端技术,助力开发者成长,欢迎关注。
55篇原创内容
公众号
首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。
通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。
源码地址:
https://gitee.com/sparkle3021/springboot3-study
JDK 17
Spring Boot 3.2.0
RocketMQ-Client 5.0.4
RocketMQ-Starter 2.2.0
Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client-java</artifactId>
- <version>5.0.4</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.0</version>
- </dependency>
rocketmq-spring-boot-starter:2.2.2版本中:
- # RocketMQ 配置
- rocketmq:
- name-server: 127.0.0.1:9876
- consumer:
- group: event-mq-group
- # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
- pull-batch-size: 1
- producer:
- # 发送同一类消息的设置为同一个group,保证唯一
- group: event-mq-group
- # 发送消息超时时间,默认3000
- sendMessageTimeout: 10000
- # 发送消息失败重试次数,默认2
- retryTimesWhenSendFailed: 2
- # 异步消息重试此处,默认2
- retryTimesWhenSendAsyncFailed: 2
- # 消息最大长度,默认1024 * 1024 * 4(默认4M)
- maxMessageSize: 4096
- # 压缩消息阈值,默认4k(1024 * 4)
- compressMessageBodyThreshold: 4096
- # 是否在内部发送失败时重试另一个broker,默认false
- retryNextServer: false
参考Issue
方法一 :通过@Import(RocketMQAutoConfiguration.class)
在配置类中引入
方法二:在resources资源目录下创建文件夹及文件META-INF/spring,org.springframework.boot.autoconfigure.AutoConfiguration.imports
。
文件内容为RocketMQ自动配置类路径:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
- import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.annotation.Import;
-
- /**
- * 启动类
- */
- @Import(RocketMQAutoConfiguration.class)
- @SpringBootApplication
- public class MQEventApplication {
- public static void main(String[] args) {
- SpringApplication.run(MQEventApplication.class, args);
- }
- }
RocketMQ Message实体
- import cn.hutool.core.util.IdUtil;
- import jakarta.validation.constraints.NotBlank;
- import lombok.AllArgsConstructor;
- import lombok.Builder;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- import org.apache.commons.collections.CollectionUtils;
- import org.apache.commons.lang3.ObjectUtils;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.support.MessageBuilder;
-
- import java.io.Serializable;
- import java.util.List;
-
- /**
- * RocketMQ 消息
- */
- @Data
- @Builder
- @AllArgsConstructor
- @NoArgsConstructor
- public class RocketMQMessage<T> implements Serializable {
-
- /**
- * 消息队列主题
- */
- @NotBlank(message = "MQ Topic 不能为空")
- private String topic;
-
- /**
- * 延迟级别
- */
- @Builder.Default
- private DelayLevel delayLevel = DelayLevel.OFF;
-
- /**
- * 消息体
- */
- private T message;
-
- /**
- * 消息体
- */
- private List<T> messages;
-
- /**
- * 使用有序消息发送时,指定发送到队列
- */
- private String hashKey;
-
- /**
- * 任务Id,用于日志打印相关信息
- */
- @Builder.Default
- private String taskId = IdUtil.fastSimpleUUID();
- }
RocketMQTemplate 二次封装
- import com.yiyan.study.domain.RocketMQMessage;
- import jakarta.annotation.Resource;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.rocketmq.client.producer.SendCallback;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
-
- /**
- * RocketMQ 消息工具类
- */
- @Slf4j
- @Component
- public class RocketMQService {
-
- @Resource
- private RocketMQTemplate rocketMQTemplate;
-
- @Value("${rocketmq.producer.sendMessageTimeout}")
- private int sendMessageTimeout;
-
- /**
- * 异步发送消息回调
- *
- * @param taskId 任务Id
- * @param topic 消息主题
- * @return the send callback
- */
- private static SendCallback asyncSendCallback(String taskId, String topic) {
- return new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());
- }
-
- @Override
- public void onException(Throwable throwable) {
- log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());
- }
- };
- }
-
- /**
- * 发送同步消息,使用有序发送请设置HashKey
- *
- * @param message 消息参数
- */
- public <T> void syncSend(RocketMQMessage<T> message) {
- log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
- SendResult sendResult;
- if (StringUtils.isNotBlank(message.getHashKey())) {
- sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
- } else {
- sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());
- }
- log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
- message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
- }
-
- /**
- * 批量发送同步消息
- *
- * @param message 消息参数
- */
- public <T> void syncSendBatch(RocketMQMessage<T> message) {
- log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
- message.getTaskId(), message.getTopic(), message.getMessages().size());
- SendResult sendResult;
- if (StringUtils.isNotBlank(message.getHashKey())) {
- sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());
- } else {
- sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());
- }
- log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
- message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
- }
-
- /**
- * 异步发送消息,异步返回消息结果
- *
- * @param message 消息参数
- */
- public <T> void asyncSend(RocketMQMessage<T> message) {
- log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
- if (StringUtils.isNotBlank(message.getHashKey())) {
- rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),
- asyncSendCallback(message.getTaskId(), message.getTopic()));
- } else {
- rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),
- asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());
- }
- }
-
- /**
- * 批量异步发送消息
- *
- * @param message 消息参数
- */
- public <T> void asyncSendBatch(RocketMQMessage<T> message) {
- log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
- message.getTaskId(), message.getTopic(), message.getMessages().size());
- if (StringUtils.isNotBlank(message.getHashKey())) {
- rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),
- asyncSendCallback(message.getTaskId(), message.getTopic()));
- } else {
- rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),
- asyncSendCallback(message.getTaskId(), message.getTopic()));
- }
- }
-
- /**
- * 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;
- *
- * @param message 消息参数
- */
- public <T> void sendOneWay(RocketMQMessage<T> message) {
- sendOneWay(message, false);
- }
-
- /**
- * 单向消息 - 批量发送
- *
- * @param message 消息体
- * @param batch 是否为批量操作
- */
- public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {
- log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]"
- : "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),
- message.getTaskId(), message.getTopic(), message.getMessages().size());
- if (StringUtils.isNotBlank(message.getHashKey())) {
- if (batch) {
- message.getMessages().
- forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));
- } else {
- rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
- }
- } else {
- if (batch) {
- message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));
- } else {
- rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());
- }
- }
- }
- }
- import com.yiyan.study.constants.MQConfig;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
- /**
- * MQ消息监听
- */
- @Component
- @Slf4j
- @RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,
- consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
- public class MQListener implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- log.info("MQListener 接收消息 : {}", message);
- }
- }
- import cn.hutool.core.thread.ThreadUtil;
- import com.yiyan.study.constants.MQConfig;
- import com.yiyan.study.domain.RocketMQMessage;
- import com.yiyan.study.utils.RocketMQService;
- import jakarta.annotation.Resource;
- import org.junit.jupiter.api.Test;
- import org.springframework.boot.test.context.SpringBootTest;
-
- /**
- * MQ测试
- */
- @SpringBootTest
- public class MQTest {
-
- @Resource
- private RocketMQService rocketMQService;
-
- @Test
- public void sendMessage() {
- int count = 1;
- while (count <= 50) {
- rocketMQService.syncSend(RocketMQMessage.builder()
- .topic(MQConfig.EVENT_TOPIC)
- .message(count++)
- .build());
- }
- // 休眠等待消费消息
- ThreadUtil.sleep(2000L);
- }
- }
原文:
blog.csdn.net/m0_55712478/
article/details/135242345
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。