赞
踩
1、对接多个节点上的MQ(如master-MQ,slave-MQ),若读者需要自己模拟出两个MQ,可以部署多个VM然后参考 docker 安装rabbitmq_Steven-Russell的博客-CSDN博客
2、队列名称不是固定的,需要接受外部参数,并且通过模板进行格式化,才能够得到队列名称
3、需要在master-MQ上延迟一段时间,然后将消息再转发给slave-MQ
1、采用springboot的自动注入bean需要事先知道队列的名称,但是队列名称是动态的情况下,无法实现自动注入
2、mq弱依赖,在没有master-mq或者slave-mq时,不能影响到现有能力
1、由于mq的队列创建、exchange创建以及队列和exchange的绑定关系是可重入的,所以采用connectFactory进行手动声明
2、增加自定义条件OnMqCondition,防止不必要的bean创建
参考 搭建最简单的SpringBoot项目_Steven-Russell的博客-CSDN博客
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
<dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.28</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.40</version> </dependency>
在application.yml中增加如下配置
mq: master: addresses: 192.168.30.128:5672 username: guest password: guest vhost: / slave: addresses: 192.168.30.131:5672 username: guest password: guest vhost: /
- package com.wd.config.condition;
-
- import org.springframework.context.annotation.Conditional;
-
- import java.lang.annotation.*;
-
- @Target({ElementType.TYPE, ElementType.METHOD})
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- @Conditional(OnMqCondition.class)
- public @interface MqConditional {
-
- String[] keys();
-
- }
- package com.wd.config.condition;
-
- import org.springframework.context.annotation.Condition;
- import org.springframework.context.annotation.ConditionContext;
- import org.springframework.core.type.AnnotatedTypeMetadata;
- import org.springframework.lang.NonNull;
- import org.springframework.util.ObjectUtils;
-
- import java.util.Map;
-
- public class OnMqCondition implements Condition {
-
- @Override
- public boolean matches(@NonNull ConditionContext context, @NonNull AnnotatedTypeMetadata metadata) {
- Map<String, Object> annotationAttributes = metadata.getAnnotationAttributes(MqConditional.class.getName());
- if (annotationAttributes == null || annotationAttributes.isEmpty()) {
- // 为空则不进行校验了
- return true;
- }
- String[] keys = (String[])annotationAttributes.get("keys");
- for (String key : keys) {
- String property = context.getEnvironment().getProperty(key);
- if (ObjectUtils.isEmpty(property)) {
- return false;
- }
- }
-
- return true;
- }
- }

- package com.wd.config;
-
- import com.wd.config.condition.MqConditional;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
-
- @Configuration
- public class MqConnectionFactory {
-
- @Value("${mq.master.addresses}")
- private String masterAddresses;
-
- @Value("${mq.master.username}")
- private String masterUsername;
-
- @Value("${mq.master.password}")
- private String masterPassword;
-
- @Value("${mq.master.vhost}")
- private String masterVhost;
-
- @Value("${mq.slave.addresses}")
- private String slaveAddresses;
-
- @Value("${mq.slave.username}")
- private String slaveUsername;
-
- @Value("${mq.slave.password}")
- private String slavePassword;
-
- @Value("${mq.slave.vhost}")
- private String slaveVhost;
-
- @Bean
- @Primary
- @MqConditional(keys = {"mq.master.addresses", "mq.master.vhost", "mq.master.username", "mq.master.password"})
- public ConnectionFactory masterConnectionFactory() {
- return doCreateConnectionFactory(masterAddresses, masterUsername, masterPassword, masterVhost);
- }
-
- @Bean
- @MqConditional(keys = {"mq.slave.addresses", "mq.slave.vhost", "mq.slave.username", "mq.slave.password"})
- public ConnectionFactory slaveConnectionFactory() {
- return doCreateConnectionFactory(slaveAddresses, slaveUsername, slavePassword, slaveVhost);
- }
-
- private ConnectionFactory doCreateConnectionFactory(String addresses,
- String username,
- String password,
- String vhost) {
- CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
- cachingConnectionFactory.setAddresses(addresses);
- cachingConnectionFactory.setUsername(username);
- cachingConnectionFactory.setPassword(password);
- cachingConnectionFactory.setVirtualHost(vhost);
- return cachingConnectionFactory;
- }
-
- }

- package com.wd.config;
-
- public enum DeclareQueueExchange {
- EXCHANGE("exchange"),
-
- DEAD_EXCHANGE("deadExchange"),
-
- DELAY_EXCHANGE("delayExchange");
-
- private final String exchangeName;
-
- DeclareQueueExchange(String exchangeName) {
- this.exchangeName = exchangeName;
- }
-
- public String getExchangeName() {
- return exchangeName;
- }
- }

- package com.wd.config;
-
- public enum DeclareQueueName {
- DELAY_QUEUE_NAME_SUFFIX("_delay"),
-
- DEAD_QUEUE_NAME_SUFFIX("_dead"),
-
- QUEUE_NAME_TEMPLATE("wd.simple.queue.{0}");
-
- private final String queueName;
-
- DeclareQueueName(String queueName) {
- this.queueName = queueName;
- }
-
- public String getQueueName() {
- return queueName;
- }
- }

- package com.wd.controller.vo;
-
- import com.wd.pojo.Phone;
- import lombok.Data;
-
- @Data
- public class DelayMsgVo {
- private String queueId;
-
- private Phone phone;
- }
- package com.wd.pojo;
-
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
-
- import java.io.Serializable;
- import java.util.Date;
- import java.util.List;
-
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- public class Phone implements Serializable {
- private static final long serialVersionUID = -1L;
-
- private String id;
-
- private String name;
-
- private Date createTime;
-
- private List<User> userList;
-
- }

- package com.wd.pojo;
-
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
-
- import java.io.Serializable;
- import java.util.Date;
-
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- public class User implements Serializable {
- private static final long serialVersionUID = -1L;
-
- private String username;
-
- private Date create;
- }

- package com.wd.config;
-
- import java.util.ArrayList;
- import java.util.List;
-
- public interface QueueIdListConfig {
-
- /**
- * 先用本地缓存维护队列id
- */
- List<Integer> QUEUE_ID_LIST = new ArrayList<Integer>() {{
- add(111);
- add(222);
- add(333);
- }};
- }

注意:此处就以web用户输入为入口,所以创建controller
- package com.wd.controller;
-
- import com.alibaba.fastjson2.JSONObject;
- import com.rabbitmq.client.*;
- import com.wd.config.DeclareQueueExchange;
- import com.wd.config.DeclareQueueName;
- import com.wd.controller.vo.DelayMsgVo;
- import org.springframework.amqp.rabbit.connection.Connection;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
- import org.springframework.web.bind.annotation.*;
-
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- import java.text.MessageFormat;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
-
- @RestController
- @ConditionalOnBean(value = ConnectionFactory.class, name = "masterConnectionFactory")
- public class DynamicCreateQueueController {
-
- private final ConnectionFactory masterConnectionFactory;
-
- public DynamicCreateQueueController(@Qualifier(value = "masterConnectionFactory") ConnectionFactory masterConnectionFactory) {
- this.masterConnectionFactory = masterConnectionFactory;
- }
-
- @PostMapping(value = "sendDelayMsg")
- public String sendMsg2DelayQueue(@RequestBody DelayMsgVo delayMsgVo) throws IOException, TimeoutException {
- doSendMsg2DelayQueue(delayMsgVo);
- return "success";
- }
-
- private void doSendMsg2DelayQueue(DelayMsgVo delayMsgVo) throws IOException, TimeoutException {
- // 根据id 动态生成队列名称
- String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();
- String queueName = MessageFormat.format(queueNameTemplate, delayMsgVo.getQueueId());
- String delayQueueName = queueName + DeclareQueueName.DELAY_QUEUE_NAME_SUFFIX.getQueueName();
- String deadQueueName = queueName + DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName();
- // 注意:下述声明交换机和队列的操作是可以重入的,MQ并不会报错
- try (Connection connection = masterConnectionFactory.createConnection();
- Channel channel = connection.createChannel(false)){
- // 声明死信交换机
- channel.exchangeDeclare(DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
- // 声明死信队列
- AMQP.Queue.DeclareOk deadQueueDeclareOk = channel.queueDeclare(deadQueueName,
- true, false, false, null);
- // 定时任务 绑定消费者,避免出现多个消费者以及重启后无法消费存量消息的问题
- // 注意:因为需要保证消费顺序,所以此处仅声明一个消费者
- // 死信队列和交换机绑定
- channel.queueBind(deadQueueName, DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), deadQueueName);
-
- // 声明延迟队列
- Map<String, Object> args = new HashMap<>();
- //设置延迟队列绑定的死信交换机
- args.put("x-dead-letter-exchange", DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName());
- //设置延迟队列绑定的死信路由键
- args.put("x-dead-letter-routing-key", deadQueueName);
- //设置延迟队列的 TTL 消息存活时间
- args.put("x-message-ttl", 10 * 1000);
- channel.queueDeclare(delayQueueName, true, false, false, args);
- channel.exchangeDeclare(DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
- channel.queueBind(delayQueueName, DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), delayQueueName);
-
- // 发送消息到延迟队列
- channel.basicPublish(DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), delayQueueName, null,
- JSONObject.toJSONString(delayMsgVo.getPhone()).getBytes(StandardCharsets.UTF_8));
- }
-
- }
-
- }

- package com.wd.mq.consumer;
-
- import com.rabbitmq.client.*;
- import com.wd.config.DeclareQueueExchange;
- import com.wd.config.DeclareQueueName;
- import org.springframework.amqp.rabbit.connection.Connection;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 死信消费者,消费消息转发给targetConnectionFactory对应的目标MQ
- */
- public class MasterDeadQueueConsumer extends DefaultConsumer {
-
- private final ConnectionFactory targetConnectionFactory;
-
- public MasterDeadQueueConsumer(Channel channel, ConnectionFactory targetConnectionFactory) {
- super(channel);
- this.targetConnectionFactory = targetConnectionFactory;
- }
-
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 从死信队列的名称中截取队列名称,作为后续队列的名称
- String routingKey = envelope.getRoutingKey();
- String targetQueueName = routingKey.substring(0, routingKey.length() - DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName().length());
- try (Connection targetConnection = targetConnectionFactory.createConnection();
- Channel targetChannel = targetConnection.createChannel(false)){
- // 声明交换机和队列
- targetChannel.exchangeDeclare(DeclareQueueExchange.EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
- targetChannel.queueDeclare(targetQueueName, true, false, false, null);
- targetChannel.queueBind(targetQueueName, DeclareQueueExchange.EXCHANGE.getExchangeName(), targetQueueName);
- // 转发消息
- targetChannel.basicPublish(DeclareQueueExchange.EXCHANGE.getExchangeName(), targetQueueName, properties, body);
- } catch (TimeoutException e) {
- e.printStackTrace();
- // 注意此处获取的源队列的channel
- getChannel().basicNack(envelope.getDeliveryTag(), false, true);
- }
- // 注意此处获取的源队列的channel
- getChannel().basicAck(envelope.getDeliveryTag(), false);
- }
- }

- package com.wd.mq.consumer;
-
- import com.alibaba.fastjson2.JSONObject;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- import com.wd.pojo.Phone;
-
- import java.io.IOException;
-
- public class SlaveQueueConsumer extends DefaultConsumer {
-
-
- public SlaveQueueConsumer(Channel channel) {
- super(channel);
- }
-
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- Phone phone = JSONObject.parseObject(new String(body), Phone.class);
- System.out.println("SlaveQueueConsumer consume ==> " + phone);
- getChannel().basicAck(envelope.getDeliveryTag(), false);
- }
- }

注意:因为采用的是死信队列的方式实现的延迟效果,此处只需要消费对应的死信队列即可
- package com.wd.mq.quartz;
-
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.wd.config.DeclareQueueExchange;
- import com.wd.config.DeclareQueueName;
- import com.wd.config.QueueIdListConfig;
- import com.wd.mq.consumer.MasterDeadQueueConsumer;
- import org.springframework.amqp.rabbit.connection.Connection;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.annotation.Scheduled;
-
- import java.io.IOException;
- import java.text.MessageFormat;
- import java.util.concurrent.TimeoutException;
-
- @Configuration
- @ConditionalOnBean(value = ConnectionFactory.class, name = {"slaveConnectionFactory", "masterConnectionFactory"})
- public class MasterDeadQueueSubscribeProcessor {
-
- private final ConnectionFactory masterConnectionFactory;
-
- private final ConnectionFactory slaveConnectionFactory;
-
- public MasterDeadQueueSubscribeProcessor(@Qualifier(value = "masterConnectionFactory") ConnectionFactory masterConnectionFactory,
- @Qualifier(value = "slaveConnectionFactory") ConnectionFactory slaveConnectionFactory) {
- this.masterConnectionFactory = masterConnectionFactory;
- this.slaveConnectionFactory = slaveConnectionFactory;
- }
-
- /**
- * 消费死信队列信息,并且转发到其他mq
- */
- @Scheduled(fixedDelay = 10 * 1000)
- public void subscribeMasterDeadQueue() throws IOException, TimeoutException {
- // 根据id 动态生成队列名称
- // 此处的queueIdList可以从第三方缓存查询得到,并且和sendDelayMsg接口保持同步刷新,此处先用本地缓存代替,id同步刷新机制不是重点,此处暂不讨论
- for (Integer id : QueueIdListConfig.QUEUE_ID_LIST) {
- String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();
- String deadQueueName = MessageFormat.format(queueNameTemplate, id) + DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName();
-
- try (Connection connection = masterConnectionFactory.createConnection();
- Channel channel = connection.createChannel(false)){
- AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(deadQueueName, true, false, false, null);
- if (queueDeclare.getConsumerCount() == 0) {
- channel.exchangeDeclare(DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
- }
- channel.queueBind(deadQueueName, DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), deadQueueName);
- channel.basicConsume(deadQueueName, false, new MasterDeadQueueConsumer(channel, slaveConnectionFactory));
- }
- }
- }
-
- }

- package com.wd.mq.quartz;
-
-
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.wd.config.DeclareQueueExchange;
- import com.wd.config.DeclareQueueName;
- import com.wd.config.QueueIdListConfig;
- import com.wd.mq.consumer.SlaveQueueConsumer;
- import org.springframework.amqp.rabbit.connection.Connection;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.annotation.Scheduled;
-
- import java.io.IOException;
- import java.text.MessageFormat;
- import java.util.concurrent.TimeoutException;
-
- @Configuration
- @ConditionalOnBean(value = ConnectionFactory.class, name = "slaveConnectionFactory")
- public class SlaveQueueSubscribeProcessor {
-
- private final ConnectionFactory slaveConnectionFactory;
-
- public SlaveQueueSubscribeProcessor(@Qualifier(value = "slaveConnectionFactory") ConnectionFactory slaveConnectionFactory) {
- this.slaveConnectionFactory = slaveConnectionFactory;
- }
-
- /**
- * 消费队列信息
- */
- @Scheduled(fixedDelay = 10 * 1000)
- public void subscribeSlaveDeadQueue() throws IOException, TimeoutException {
- // 根据id 动态生成队列名称
- // 此处的queueIdList可以从第三方缓存查询得到,并且和sendDelayMsg接口保持同步刷新,此处先用本地缓存代替
- for (Integer id : QueueIdListConfig.QUEUE_ID_LIST) {
- String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();
- String queueName = MessageFormat.format(queueNameTemplate, id);
- try (Connection connection = slaveConnectionFactory.createConnection();
- Channel channel = connection.createChannel(false)){
- AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(queueName, true, false, false, null);
- if (queueDeclare.getConsumerCount() == 0) {
- channel.basicConsume(queueName, false, new SlaveQueueConsumer(channel));
- }
- channel.exchangeDeclare(DeclareQueueExchange.EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
- channel.queueBind(queueName, DeclareQueueExchange.EXCHANGE.getExchangeName(), queueName);
- }
- }
-
- }
-
- }

发现SlaveQueueConsumer打印如下日志:
消息传递流程如下,验证通过
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。