赞
踩
前言
SpringBoot 集成 RabbitMQ 公司老大觉得使用注解太繁琐了,而且不能动态生成队列所以让我研究是否可以动态绑定,所以就有了这个事情。打工人就是命苦没办法,硬着头皮直接就上了,接下来进入主题吧。
根据老大的需求,大致分为使用配置文件进行配置,然后代码动态产生队列,交换机,生产者,消费者,以及如果配置了死信队列则动态绑定死信队列。由此得出所有的这些都是根据配置进行操作。然后百度有无代码创建就完事了。
问百度 RabbItMQ 支持代码创建队列,交换机,以及两者之间绑定的代码,根据这些资料得出以下配置,下面示例配置只给出常用配置,其他配置后面会有个配置类
spring: rabbitmq: # 动态创建和绑定队列、交换机的配置 modules: - routing-key: 路由KEY producer: 生产者 consumer: 消费者 autoAck: 是否自动ACK queue: 队列 name: 队列名称 dead-letter-exchange: 死信队列交换机 dead-letter-routing-key: 死信队列路由KEY arguments: 队列其他参数,此配置支持RabbitMQ的扩展配置 # 1分钟(测试),单位毫秒 x-message-ttl: 3000 # 延迟队列 exchange: 交换机 name: 交换机名称 ..... 省略其他配置
从这里开始就是定义核心代码模块需要的类,可以从这里开始跳过,直接看核心配置逻辑和核心代码,后续需要了解具体类的功能再回来看
配置有了,接下来就是创建Java对象把配置对象化了,由于支持多个所以用的是集合接收配置
/** * 绑定配置基础类 * @author FJW * @version 1.0 * @date 2023年04月11日 14:58 */ @Data @Configuration @ConfigurationProperties("spring.rabbitmq") public class RabbitModuleProperties { /** * 模块配置 */ List<ModuleProperties> modules; }
对应YML的配置类
/** * YML配置类 * @author FJW * @version 1.0 * @date 2023年04月11日 17:16 */ @Data public class ModuleProperties { /** * 路由Key */ private String routingKey; /** * 生产者 */ private String producer; /** * 消费者 */ private String consumer; /** * 自动确认 */ private Boolean autoAck = true; /** * 队列信息 */ private Queue queue; /** * 交换机信息 */ private Exchange exchange; /** * 交换机信息类 */ @Data public static class Exchange { /** * 交换机类型 * 默认主题交换机 */ private RabbitExchangeTypeEnum type = RabbitExchangeTypeEnum.TOPIC; /** * 交换机名称 */ private String name; /** * 是否持久化 * 默认true持久化,重启消息不会丢失 */ private boolean durable = true; /** * 当所有队绑定列均不在使用时,是否自动删除交换机 * 默认false,不自动删除 */ private boolean autoDelete = false; /** * 交换机其他参数 */ private Map<String, Object> arguments; } /** * 队列信息类 */ @Data public static class Queue { /** * 队列名称 */ private String name; /** * 是否持久化 */ private boolean durable = true; // 默认true持久化,重启消息不会丢失 /** * 是否具有排他性 */ private boolean exclusive = false; // 默认false,可多个消费者消费同一个队列 /** * 当消费者均断开连接,是否自动删除队列 */ private boolean autoDelete = false; // 默认false,不自动删除,避免消费者断开队列丢弃消息 /** * 绑定死信队列的交换机名称 */ private String deadLetterExchange; /** * 绑定死信队列的路由key */ private String deadLetterRoutingKey; /** * 交换机其他参数 */ private Map<String, Object> arguments; } }
生产者
/**
* 生产者接口
* @author FJW
* @version 1.0
* @date 2023年04月11日 13:52
*/
public interface ProducerService {
/**
* 发送消息
* @param message
*/
void send(Object message);
}
消费者, 这里需要继承 RabbitMQ 的消费者接口,后续会直接把此接口给动态绑定到 RabbitMQ 中
/**
* 消费者接口
* @author FJW
* @version 1.0
* @date 2023年04月11日 13:52
*/
public interface ConsumerService extends ChannelAwareMessageListener {
}
重试处理器
/** * 重试处理器 * @author FJW * @version 1.0 * @date 2023年04月19日 16:40 */ public interface CustomRetryListener { /** * 最后一次重试失败回调 * @param context * @param callback * @param throwable * @param <E> * @param <T> */ public <E extends Throwable, T> void lastRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable); /** * 每次失败的回调 * @param context * @param callback * @param throwable * @param <E> * @param <T> */ public <E extends Throwable, T> void onRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable); }
交换机类型枚举
/** * 交换机类型枚举 * @author FJW * @version 1.0 * @date 2023年04月11日 15:19 */ public enum RabbitExchangeTypeEnum { /** * 直连交换机 * <p> * 根据routing-key精准匹配队列(最常使用) */ DIRECT, /** * 主题交换机 * <p> * 根据routing-key模糊匹配队列,*匹配任意一个字符,#匹配0个或多个字符 */ TOPIC, /** * 扇形交换机 * <p> * 直接分发给所有绑定的队列,忽略routing-key,用于广播消息 */ FANOUT, /** * 头交换机 * <p> * 类似直连交换机,不同于直连交换机的路由规则建立在头属性上而不是routing-key(使用较少) */ HEADERS; }
队列,交换机,路由 常量枚举
/** * 队列,交换机。路由 常量枚举 * @author FJW * @version 1.0 * @date 2023年04月18日 16:39 */ public enum RabbitEnum { QUEUE("xxx.{}.queue", "队列名称"), EXCHANGE("xxx.{}.exchange", "交换机名称"), ROUTER_KEY("xxx.{}.key", "路由名称"), ; RabbitEnum(String value, String desc) { this.value = value; this.desc = desc; } @Getter private String value; @Getter private String desc; }
生产者实现类封装 /** * 生产者实现类 * @author FJW * @version 1.0 * @date 2023年04月18日 14:32 */ @Slf4j public class AbsProducerService implements ProducerService { @Resource private RabbitTemplate rabbitTemplate; /** * 交换机 */ private String exchange; /** * 路由 */ private String routingKey; @Override public void send(Object msg) { MessagePostProcessor messagePostProcessor = (message) -> { MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setMessageId(IdUtil.randomUUID()); messageProperties.setTimestamp(new Date()); return message; }; MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentEncoding("UTF-8"); messageProperties.setContentType("text/plain"); String data = JSONUtil.toJsonStr(msg); Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.convertAndSend(this.exchange, this.routingKey, message, messagePostProcessor); } public void setExchange(String exchange) { this.exchange = exchange; } public void setRoutingKey(String routingKey) { this.routingKey = routingKey; } } 消费者实现类封装 ```java /** * @author FJW * @version 1.0 * @date 2023年04月18日 17:53 */ @Slf4j public abstract class AbsConsumerService<T> implements ConsumerService { private Class<T> clazz = (Class<T>) new TypeToken<T>(getClass()) {}.getRawType(); /** * 消息 */ private Message message; /** * 通道 */ private Channel channel; @Override public void onMessage(Message message, Channel channel) throws Exception { this.message = message; this.channel = channel; String body = new String(message.getBody()); onConsumer(genObject(body)); } /** * 根据反射获取泛型 * @param body * @return */ private T genObject(String body) throws JsonProcessingException, IllegalAccessException, InstantiationException { try { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(body, clazz); }catch (Exception e) { log.error("MQ转发层错误,请检查泛型是否与实际类型匹配, 指定的泛型是: {}", clazz.getName(), e); } return clazz.newInstance(); } /** * 扩展消费方法,对消息进行封装 * @param data * @throws IOException */ public void onConsumer(T data) throws IOException { log.error("未对此方法进行实现: {}", data); } /** * 确认消息 */ protected void ack() throws IOException { ack(Boolean.FALSE); } /** * 拒绝消息 */ protected void nack() throws IOException { nack(Boolean.FALSE, Boolean.FALSE); } /** * 拒绝消息 */ protected void basicReject() throws IOException { basicReject(Boolean.FALSE); } /** * 拒绝消息 * @param multiple 当前 DeliveryTag 的消息是否确认所有 true 是, false 否 */ protected void basicReject(Boolean multiple) throws IOException { this.channel.basicReject(this.message.getMessageProperties().getDeliveryTag(), multiple); } /** * 是否自动确认 * @param multiple 当前 DeliveryTag 的消息是否确认所有 true 是, false 否 */ protected void ack(Boolean multiple) throws IOException { this.channel.basicAck(this.message.getMessageProperties().getDeliveryTag(), multiple); } /** * 拒绝消息 * @param multiple 当前 DeliveryTag 的消息是否确认所有 true 是, false 否 * @param requeue 当前 DeliveryTag 消息是否重回队列 true 是 false 否 */ protected void nack(Boolean multiple, Boolean requeue) throws IOException { this.channel.basicNack(this.message.getMessageProperties().getDeliveryTag(), multiple, requeue); } }
消息监听工厂类实现,此实现非常重要,此处的代码就是绑定消费者的核心代码
/** * MQ具体消息监听器工厂 * @author FJW * @version 1.0 * @date 2023年04月18日 10:48 */ @Data @Slf4j @Builder public class ConsumerContainerFactory implements FactoryBean<SimpleMessageListenerContainer> { /** * MQ连接工厂 */ private ConnectionFactory connectionFactory; /** * 操作MQ管理器 */ private AmqpAdmin amqpAdmin; /** * 队列 */ private Queue queue; /** * 交换机 */ private Exchange exchange; /** * 消费者 */ private ConsumerService consumer; /** * 重试回调 */ private CustomRetryListener retryListener; /** * 最大重试次数 */ private final Integer maxAttempts = 5; /** * 是否自动确认 */ private Boolean autoAck; @Override public SimpleMessageListenerContainer getObject() throws Exception { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setAmqpAdmin(amqpAdmin); container.setConnectionFactory(connectionFactory); container.setQueues(queue); container.setPrefetchCount(20); container.setConcurrentConsumers(20); container.setMaxConcurrentConsumers(100); container.setDefaultRequeueRejected(Boolean.FALSE); container.setAdviceChain(createRetry()); container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL); if (Objects.nonNull(consumer)) { container.setMessageListener(consumer); } return container; } /** * 配置重试 * @return */ private Advice createRetry() { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.registerListener(new RetryListener() { @Override public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) { // 第一次重试调用 return true; } @Override public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { } @Override public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { if (Objects.nonNull(retryListener)) { retryListener.onRetry(context, callback, throwable); if (maxAttempts.equals(context.getRetryCount())) { retryListener.lastRetry(context, callback, throwable); } } } }); retryTemplate.setRetryPolicy(new SimpleRetryPolicy(maxAttempts)); retryTemplate.setBackOffPolicy(genExponentialBackOffPolicy()); return RetryInterceptorBuilder.stateless() .retryOperations(retryTemplate).recoverer(new RejectAndDontRequeueRecoverer()).build(); } /** * 设置过期时间 * @return */ private BackOffPolicy genExponentialBackOffPolicy() { ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); // 重试间隔基数(秒) backOffPolicy.setInitialInterval(5000); // 从重试的第一次至最后一次,最大时间间隔(秒) backOffPolicy.setMaxInterval(86400000); // 重试指数 backOffPolicy.setMultiplier(1); return backOffPolicy; } @Override public Class<?> getObjectType() { return SimpleMessageListenerContainer.class; } }
核心配置类
/** * RabbitMQ 全局配置,SpringBoot 启动后会回调此类 * @author FJW * @version 1.0 * @date 2023年04月11日 13:55 */ @Slf4j public class RabbitMqConfig implements SmartInitializingSingleton { /** * MQ链接工厂 */ private ConnectionFactory connectionFactory; /** * MQ操作管理器 */ private AmqpAdmin amqpAdmin; /** * YML配置 */ private RabbitModuleProperties rabbitModuleProperties; @Autowired public RabbitMqConfig(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties, ConnectionFactory connectionFactory) { this.amqpAdmin = amqpAdmin; this.rabbitModuleProperties = rabbitModuleProperties; this.connectionFactory = connectionFactory; } @Override public void afterSingletonsInstantiated() { StopWatch stopWatch = StopWatch.create("MQ"); stopWatch.start(); log.debug("初始化MQ配置"); List<ModuleProperties> modules = rabbitModuleProperties.getModules(); if (CollUtil.isEmpty(modules)) { log.warn("未配置MQ"); return; } for (ModuleProperties module : modules) { try { Queue queue = genQueue(module); Exchange exchange = genQueueExchange(module); queueBindExchange(queue, exchange, module); bindProducer(module); bindConsumer(queue, exchange, module); } catch (Exception e) { log.error("初始化失败", e); } } stopWatch.stop(); log.info("初始化MQ配置成功耗时: {}ms", stopWatch.getTotal(TimeUnit.MILLISECONDS)); } /** * 绑定生产者 * @param module */ private void bindProducer(ModuleProperties module) { try { AbsProducerService producerService = SpringUtil.getBean(module.getProducer()); producerService.setExchange(module.getExchange().getName()); producerService.setRoutingKey(module.getRoutingKey()); log.debug("绑定生产者: {}", module.getProducer()); } catch (Exception e) { log.warn("无法在容器中找到该生产者[{}],若需要此生产者则需要做具体实现", module.getConsumer()); } } /** * 绑定消费者 * @param queue * @param exchange * @param module */ private void bindConsumer(Queue queue, Exchange exchange, ModuleProperties module) { CustomRetryListener customRetryListener = null; try { customRetryListener = SpringUtil.getBean(module.getRetry()); }catch (Exception e) { log.debug("无法在容器中找到该重试类[{}],若需要重试则需要做具体实现", module.getRetry()); } try { ConsumerContainerFactory factory = ConsumerContainerFactory.builder() .connectionFactory(connectionFactory) .queue(queue) .exchange(exchange) .consumer(SpringUtil.getBean(module.getConsumer())) .retryListener(customRetryListener) .autoAck(module.getAutoAck()) .amqpAdmin(amqpAdmin) .build(); SimpleMessageListenerContainer container = factory.getObject(); if (Objects.nonNull(container)) { container.start(); } log.debug("绑定消费者: {}", module.getConsumer()); } catch (Exception e) { log.warn("无法在容器中找到该消费者[{}],若需要此消费者则需要做具体实现", module.getConsumer()); } } /** * 队列绑定交换机 * @param queue * @param exchange * @param module */ private void queueBindExchange(Queue queue, Exchange exchange, ModuleProperties module) { log.debug("初始化交换机: {}", module.getExchange().getName()); String queueName = module.getQueue().getName(); String exchangeName = module.getExchange().getName(); module.setRoutingKey(StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), module.getRoutingKey())); String routingKey = module.getRoutingKey(); Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null); amqpAdmin.declareQueue(queue); amqpAdmin.declareExchange(exchange); amqpAdmin.declareBinding(binding); log.debug("队列绑定交换机: 队列: {}, 交换机: {}", queueName, exchangeName); } /** * 创建交换机 * @param module * @return */ private Exchange genQueueExchange(ModuleProperties module) { ModuleProperties.Exchange exchange = module.getExchange(); RabbitExchangeTypeEnum exchangeType = exchange.getType(); exchange.setName(StrUtil.format(RabbitEnum.EXCHANGE.getValue(), exchange.getName())); String exchangeName = exchange.getName(); Boolean isDurable = exchange.isDurable(); Boolean isAutoDelete = exchange.isAutoDelete(); Map<String, Object> arguments = exchange.getArguments(); return getExchangeByType(exchangeType, exchangeName, isDurable, isAutoDelete, arguments); } /** * 根据类型生成交换机 * @param exchangeType * @param exchangeName * @param isDurable * @param isAutoDelete * @param arguments * @return */ private Exchange getExchangeByType(RabbitExchangeTypeEnum exchangeType, String exchangeName, Boolean isDurable, Boolean isAutoDelete, Map<String, Object> arguments) { AbstractExchange exchange = null; switch (exchangeType) { // 直连交换机 case DIRECT: exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments); break; // 主题交换机 case TOPIC: exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments); break; //扇形交换机 case FANOUT: exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments); break; // 头交换机 case HEADERS: exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments); break; default: log.warn("未匹配到交换机类型"); break; } return exchange; } /** * 创建队列 * @param module * @return */ private Queue genQueue(ModuleProperties module) { ModuleProperties.Queue queue = module.getQueue(); queue.setName(StrUtil.format(RabbitEnum.QUEUE.getValue(), queue.getName())); log.debug("初始化队列: {}", queue.getName()); Map<String, Object> arguments = queue.getArguments(); if (MapUtil.isEmpty(arguments)) { arguments = new HashMap<>(); } // 转换ttl的类型为long if (arguments.containsKey("x-message-ttl")) { arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl"))); } // 绑定死信队列 String deadLetterExchange = queue.getDeadLetterExchange(); String deadLetterRoutingKey = queue.getDeadLetterRoutingKey(); if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) { deadLetterExchange = StrUtil.format(RabbitEnum.EXCHANGE.getValue(), deadLetterExchange); deadLetterRoutingKey = StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), deadLetterRoutingKey); arguments.put("x-dead-letter-exchange", deadLetterExchange); arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey); log.debug("绑定死信队列: 交换机: {}, 路由: {}", deadLetterExchange, deadLetterRoutingKey); } return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments); } }
以上配置完成后,开始RUN代码
这里配置了延迟队列和死信队列,开RUN
spring: rabbitmq: # 动态创建和绑定队列、交换机的配置 modules: # 正常队列 - routing-key: test consumer: testConsumerService producer: testProducerService autoAck: false queue: name: test dead-letter-exchange: dead dead-letter-routing-key: dead arguments: # 1分钟(测试),单位毫秒 x-message-ttl: 3000 exchange: name: test # 死信队列 - routing-key: dead consumer: deadConsumerService producer: deadProducerService autoAck: false queue: name: dead exchange: name: dead
项目启动部分日志
MQ管理端查看是否绑定成功
交换机
队列
由此可以得出代码是正常的,都有
生产者发送消息
PostMan
消费者,由于测试死信队列,所以这里拒绝消费了
消费者消费信息
搞定收工,接下来还有什么问题等到具体生产上去发现了,反正需求实现了,如果你们有方面需求此文章还是个不错的参考,各位看客记得留下赞哦
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。