赞
踩
本章内容主要介绍编写一个rabbitmq starter,能够通过配置文件进行配置交换机、队列以及绑定关系等等。项目引用该组件后能够自动初始化交换机和队列,并进行简单通信。
如若有其他需求,可自行扩展,例如消息消费的确认等
参考文章:SpringBoot日常:自定义实现SpringBoot Starter
下面直接进入主题,介绍整体用到的文件和逻辑内容
交换机枚举类,四种交换机类型,分别是直连交换机、主题交换机、扇出交换机和标题交换机
/**
* @Author 码至终章
* @Version 1.0
*/
public enum RabbitExchangeEnum {
DIRECT,
TOPIC,
FANOUT,
HEADERS;
}
初始化配置文件
import org.springframework.amqp.core.AmqpAdmin; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author 码至终章 * @Version 1.0 */ @Configuration public class RabbitConfig { /** * 通过yaml配置,创建队列、交换机初始化器 */ @Bean @ConditionalOnMissingBean public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) { return new RabbitModuleInitializer(amqpAdmin, rabbitProperties); } }
配置信息的映射的文件,用于接收配置文件中配置的交换机和队列属性
import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum; import lombok.Data; import java.util.Map; /** * 队列和交换机机绑定关系实体对象 * * @Author 码至终章 * @Version 1.0 */ @Data public class RabbitModuleInfo { /** * 路由Key */ private String routingKey; /** * 队列信息 */ private Queue queue; /** * 交换机信息 */ private Exchange exchange; /** * 交换机信息类 */ @Data public static class Exchange { /** * 交换机类型 * 默认直连交换机 */ private RabbitExchangeEnum type = RabbitExchangeEnum.DIRECT; /** * 交换机名称 */ private String name; /** * 是否持久化 * 默认true持久化,重启消息不会丢失 */ private boolean durable = true; /** * 当所有队绑定列均不在使用时,是否自动删除交换机 * 默认false,不自动删除 */ private boolean autoDelete = false; /** * 交换机其他参数 */ private Map<String, Object> arguments; } /** * 队列信息类 */ @Data public static class Queue { /** * 队列名称 */ private String name; /** * 是否持久化 * 默认true持久化,重启消息不会丢失 */ private boolean durable = true; /** * 是否具有排他性 * 默认false,可多个消费者消费同一个队列 */ private boolean exclusive = false; /** * 当消费者均断开连接,是否自动删除队列 * 默认false,不自动删除,避免消费者断开队列丢弃消息 */ private boolean autoDelete = false; /** * 绑定死信队列的交换机名称 */ private String deadLetterExchange; /** * 绑定死信队列的路由key */ private String deadLetterRoutingKey; private Map<String, Object> arguments; } }
执行初始化逻辑详情文件,具体的逻辑为根据配置文件信息创建对应的交换机和队列,并设置其属性和绑定关系。
import cn.hutool.core.convert.Convert; import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.*; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * @Author cys * @Date 2024/6/17 14:23 * @Version 1.0 */ @Slf4j public class RabbitModuleInitializer implements SmartInitializingSingleton { AmqpAdmin amqpAdmin; RabbitProperties rabbitProperties; public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) { this.amqpAdmin = amqpAdmin; this.rabbitProperties = rabbitProperties; } @Override public void afterSingletonsInstantiated() { log.info("初始化rabbitmq交换机、队列----------------start"); declareRabbitModule(); log.info("初始化rabbitmq交换机、队列----------------end"); } /** * RabbitMQ 根据配置动态创建和绑定队列、交换机 */ private void declareRabbitModule() { List<RabbitModuleInfo> rabbitModuleInfos = rabbitProperties.getModules(); if (CollectionUtils.isEmpty(rabbitModuleInfos)) { return; } for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) { configParamValidate(rabbitModuleInfo); // 队列 Queue queue = convertQueue(rabbitModuleInfo.getQueue()); // 交换机 Exchange exchange = convertExchange(rabbitModuleInfo.getExchange()); // 绑定关系 String routingKey = rabbitModuleInfo.getRoutingKey(); String queueName = rabbitModuleInfo.getQueue().getName(); String exchangeName = rabbitModuleInfo.getExchange().getName(); Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null); // 创建队列 if (!isExistQueue(queueName)) { amqpAdmin.declareQueue(queue); } // 创建交换机 amqpAdmin.declareExchange(exchange); // 队列 绑定 交换机 amqpAdmin.declareBinding(binding); } } /** * RabbitMQ动态配置参数校验 * * @param rabbitModuleInfo 队列和交换机机绑定关系 */ public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) { String routingKey = rabbitModuleInfo.getRoutingKey(); Assert.isTrue(StringUtils.isNotBlank(routingKey), "RoutingKey 未配置"); Assert.isTrue(rabbitModuleInfo.getExchange() != null, String.format("routingKey:%s未配置exchange", routingKey)); Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getExchange().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey)); Assert.isTrue(rabbitModuleInfo.getQueue() != null, String.format("routingKey:%s未配置queue", routingKey)); Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getQueue().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey)); } /** * 转换生成RabbitMQ队列 * * @param queue 队列 * @return Queue */ public Queue convertQueue(RabbitModuleInfo.Queue queue) { Map<String, Object> arguments = queue.getArguments(); // 转换ttl的类型为long if (arguments != null && arguments.containsKey("x-message-ttl")) { arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl"))); } // 是否需要绑定死信队列 String deadLetterExchange = queue.getDeadLetterExchange(); String deadLetterRoutingKey = queue.getDeadLetterRoutingKey(); if (StringUtils.isNotBlank(deadLetterExchange) && StringUtils.isNotBlank(deadLetterRoutingKey)) { if (arguments == null) { arguments = new HashMap<>(4); } arguments.put("x-dead-letter-exchange", deadLetterExchange); arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey); } return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments); } /** * 转换生成RabbitMQ交换机 * * @param exchangeInfo 交换机信息 * @return Exchange */ public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) { AbstractExchange exchange = null; RabbitExchangeEnum exchangeType = exchangeInfo.getType(); String exchangeName = exchangeInfo.getName(); boolean isDurable = exchangeInfo.isDurable(); boolean isAutoDelete = exchangeInfo.isAutoDelete(); Map<String, Object> arguments = exchangeInfo.getArguments(); switch (exchangeType) { case DIRECT: // 直连交换机 exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments); break; case TOPIC: // 主题交换机 exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments); break; case FANOUT: //扇形交换机 exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments); break; case HEADERS: // 头交换机 exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments); break; } return exchange; } /** * 判断队列是否存在 * * @param queueName 队列名 * @return boolean */ private boolean isExistQueue(String queueName) { if (StringUtils.isBlank(queueName)) { throw new RuntimeException("队列名称为空"); } boolean flag = true; Properties queueProperties = amqpAdmin.getQueueProperties(queueName); if (queueProperties == null) { flag = false; } return flag; } }
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.List; /** * @Author 码至终章 * @Version 1.0 */ @Component @ConfigurationProperties(prefix = "cys.rabbit") @Data public class RabbitProperties { private List<RabbitModuleInfo> modules; }
发送消息的生产者方法
public class RabbitProducerManager { private static final Logger log = LoggerFactory.getLogger(RabbitProducerManager.class); private final RabbitTemplate rabbitTemplate; public void sendMessage(String exchange, String rabbitRouting, Object message) { this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message); log.info("向路由:{}, 发送消息成功:{}", rabbitRouting, message); } public void sendMessage(String exchange, String rabbitRouting, Object message, CorrelationData correlationData) { this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message); log.info("向路由:{}, 发送消息成功:{}, correlationData:{}", new Object[]{rabbitRouting, message, correlationData}); } public RabbitProducerManager(final RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } }
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.7.18</version> </dependency> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.7.18</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.25</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> </dependency> </dependencies>
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.mycomponent.starter.rabbitmq.config.RabbitConfig,\
cn.mycomponent.starter.rabbitmq.client.RabbitProducerManager,\
cn.mycomponent.starter.rabbitmq.config.RabbitProperties
spring: profiles: active: dev ## rabbitmq链接配置 rabbitmq: host: 192.168.199.199 port: 5672 username: test password: 123456789 virtual-host: test cys: rabbit: modules: - exchange: name: mytest #type为RabbitExchangeTypeEnum枚举中的值。不配置默认为Direct type: DIRECT queue: name: default.queue arguments: # 队列中所有消息的最大存活时间。单位毫秒。 1分钟 x-message-ttl: 60000 # routing-key可以为空 routing-key: default.queue.key
@TableName(value ="task",autoResultMap = true) @Data public class TaskEntity implements Serializable { /** * 主键 */ @TableId(type = IdType.AUTO) @TableField(value = "cust_id") private Long custId; } @RestController @RequestMapping("/mqtest") public class MqController { @Autowired RabbitProducerManager rabbitProducerManager; @Autowired MailService mailService; @GetMapping("/mqtest") public void test(){ TaskEntity taskEntity = new TaskEntity(); taskEntity.setCustId(211212L); rabbitProducerManager.sendMessage("mytest","default.queue.key", JSON.toJSONString(taskEntity)); } }
@Component
public class MyListener {
@RabbitListener(queues = "default.queue")
public void handMessage(String message){
TaskEntity taskEntity = JSON.parseObject(message, TaskEntity.class);
System.out.println("接收到的消息"+taskEntity);
}
}
请求接口/mqtest/mqtest
到这为止,关于封装rabbitmq starter就结束了。当然,本文只是介绍了最基础的部分,后续大家可以在这基础上实现扩展,比如统一接受消息再通过事件监听、同一队列设置多个消费者线程等等,说到这里,如果只是丰富的小伙伴可能会想到spring-cloud-starter-stream-rabbit,大家也可以参考参考这个是如何实现的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。