当前位置:   article > 正文

springboot对接rabbitmq并且实现动态创建队列和消费_javaspringbootmq消息队列

javaspringbootmq消息队列

背景

1、对接多个节点上的MQ(如master-MQ,slave-MQ),若读者需要自己模拟出两个MQ,可以部署多个VM然后参考 docker 安装rabbitmq_Steven-Russell的博客-CSDN博客

2、队列名称不是固定的,需要接受外部参数,并且通过模板进行格式化,才能够得到队列名称

3、需要在master-MQ上延迟一段时间,然后将消息再转发给slave-MQ

问题

1、采用springboot的自动注入bean需要事先知道队列的名称,但是队列名称是动态的情况下,无法实现自动注入

2、mq弱依赖,在没有master-mq或者slave-mq时,不能影响到现有能力

解决方案

1、由于mq的队列创建、exchange创建以及队列和exchange的绑定关系是可重入的,所以采用connectFactory进行手动声明

2、增加自定义条件OnMqCondition,防止不必要的bean创建

总体流程

实施过程

搭建springboot项目

参考 搭建最简单的SpringBoot项目_Steven-Russell的博客-CSDN博客

引入amqp依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

引入后续会用到的工具类依赖

<dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.11.0</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.28</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.40</version>
</dependency>

创建配置文件

在application.yml中增加如下配置

mq:
  master:
    addresses: 192.168.30.128:5672
    username: guest
    password: guest
    vhost: /
  slave:
    addresses: 192.168.30.131:5672
    username: guest
    password: guest
    vhost: /

创建自定义Condition注解和注解实现

  1. package com.wd.config.condition;
  2. import org.springframework.context.annotation.Conditional;
  3. import java.lang.annotation.*;
  4. @Target({ElementType.TYPE, ElementType.METHOD})
  5. @Retention(RetentionPolicy.RUNTIME)
  6. @Documented
  7. @Conditional(OnMqCondition.class)
  8. public @interface MqConditional {
  9. String[] keys();
  10. }
  1. package com.wd.config.condition;
  2. import org.springframework.context.annotation.Condition;
  3. import org.springframework.context.annotation.ConditionContext;
  4. import org.springframework.core.type.AnnotatedTypeMetadata;
  5. import org.springframework.lang.NonNull;
  6. import org.springframework.util.ObjectUtils;
  7. import java.util.Map;
  8. public class OnMqCondition implements Condition {
  9. @Override
  10. public boolean matches(@NonNull ConditionContext context, @NonNull AnnotatedTypeMetadata metadata) {
  11. Map<String, Object> annotationAttributes = metadata.getAnnotationAttributes(MqConditional.class.getName());
  12. if (annotationAttributes == null || annotationAttributes.isEmpty()) {
  13. // 为空则不进行校验了
  14. return true;
  15. }
  16. String[] keys = (String[])annotationAttributes.get("keys");
  17. for (String key : keys) {
  18. String property = context.getEnvironment().getProperty(key);
  19. if (ObjectUtils.isEmpty(property)) {
  20. return false;
  21. }
  22. }
  23. return true;
  24. }
  25. }

创建多个链接工厂connectFactory

  1. package com.wd.config;
  2. import com.wd.config.condition.MqConditional;
  3. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.context.annotation.Primary;
  9. @Configuration
  10. public class MqConnectionFactory {
  11. @Value("${mq.master.addresses}")
  12. private String masterAddresses;
  13. @Value("${mq.master.username}")
  14. private String masterUsername;
  15. @Value("${mq.master.password}")
  16. private String masterPassword;
  17. @Value("${mq.master.vhost}")
  18. private String masterVhost;
  19. @Value("${mq.slave.addresses}")
  20. private String slaveAddresses;
  21. @Value("${mq.slave.username}")
  22. private String slaveUsername;
  23. @Value("${mq.slave.password}")
  24. private String slavePassword;
  25. @Value("${mq.slave.vhost}")
  26. private String slaveVhost;
  27. @Bean
  28. @Primary
  29. @MqConditional(keys = {"mq.master.addresses", "mq.master.vhost", "mq.master.username", "mq.master.password"})
  30. public ConnectionFactory masterConnectionFactory() {
  31. return doCreateConnectionFactory(masterAddresses, masterUsername, masterPassword, masterVhost);
  32. }
  33. @Bean
  34. @MqConditional(keys = {"mq.slave.addresses", "mq.slave.vhost", "mq.slave.username", "mq.slave.password"})
  35. public ConnectionFactory slaveConnectionFactory() {
  36. return doCreateConnectionFactory(slaveAddresses, slaveUsername, slavePassword, slaveVhost);
  37. }
  38. private ConnectionFactory doCreateConnectionFactory(String addresses,
  39. String username,
  40. String password,
  41. String vhost) {
  42. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
  43. cachingConnectionFactory.setAddresses(addresses);
  44. cachingConnectionFactory.setUsername(username);
  45. cachingConnectionFactory.setPassword(password);
  46. cachingConnectionFactory.setVirtualHost(vhost);
  47. return cachingConnectionFactory;
  48. }
  49. }

创建交换机名称枚举 DeclareQueueExchange

  1. package com.wd.config;
  2. public enum DeclareQueueExchange {
  3. EXCHANGE("exchange"),
  4. DEAD_EXCHANGE("deadExchange"),
  5. DELAY_EXCHANGE("delayExchange");
  6. private final String exchangeName;
  7. DeclareQueueExchange(String exchangeName) {
  8. this.exchangeName = exchangeName;
  9. }
  10. public String getExchangeName() {
  11. return exchangeName;
  12. }
  13. }

创建消息队列模板枚举 DeclareQueueName

  1. package com.wd.config;
  2. public enum DeclareQueueName {
  3. DELAY_QUEUE_NAME_SUFFIX("_delay"),
  4. DEAD_QUEUE_NAME_SUFFIX("_dead"),
  5. QUEUE_NAME_TEMPLATE("wd.simple.queue.{0}");
  6. private final String queueName;
  7. DeclareQueueName(String queueName) {
  8. this.queueName = queueName;
  9. }
  10. public String getQueueName() {
  11. return queueName;
  12. }
  13. }

创建消息VO和消息

  1. package com.wd.controller.vo;
  2. import com.wd.pojo.Phone;
  3. import lombok.Data;
  4. @Data
  5. public class DelayMsgVo {
  6. private String queueId;
  7. private Phone phone;
  8. }
  1. package com.wd.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import java.io.Serializable;
  6. import java.util.Date;
  7. import java.util.List;
  8. @Data
  9. @AllArgsConstructor
  10. @NoArgsConstructor
  11. public class Phone implements Serializable {
  12. private static final long serialVersionUID = -1L;
  13. private String id;
  14. private String name;
  15. private Date createTime;
  16. private List<User> userList;
  17. }
  1. package com.wd.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import java.io.Serializable;
  6. import java.util.Date;
  7. @Data
  8. @AllArgsConstructor
  9. @NoArgsConstructor
  10. public class User implements Serializable {
  11. private static final long serialVersionUID = -1L;
  12. private String username;
  13. private Date create;
  14. }

定义队列id列表缓存,用于替换三方缓存,用于队列名称模板初始化

  1. package com.wd.config;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. public interface QueueIdListConfig {
  5. /**
  6. * 先用本地缓存维护队列id
  7. */
  8. List<Integer> QUEUE_ID_LIST = new ArrayList<Integer>() {{
  9. add(111);
  10. add(222);
  11. add(333);
  12. }};
  13. }

创建消息接受入口 controller

注意:此处就以web用户输入为入口,所以创建controller

  1. package com.wd.controller;
  2. import com.alibaba.fastjson2.JSONObject;
  3. import com.rabbitmq.client.*;
  4. import com.wd.config.DeclareQueueExchange;
  5. import com.wd.config.DeclareQueueName;
  6. import com.wd.controller.vo.DelayMsgVo;
  7. import org.springframework.amqp.rabbit.connection.Connection;
  8. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  9. import org.springframework.beans.factory.annotation.Qualifier;
  10. import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
  11. import org.springframework.web.bind.annotation.*;
  12. import java.io.IOException;
  13. import java.nio.charset.StandardCharsets;
  14. import java.text.MessageFormat;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. import java.util.concurrent.TimeoutException;
  18. @RestController
  19. @ConditionalOnBean(value = ConnectionFactory.class, name = "masterConnectionFactory")
  20. public class DynamicCreateQueueController {
  21. private final ConnectionFactory masterConnectionFactory;
  22. public DynamicCreateQueueController(@Qualifier(value = "masterConnectionFactory") ConnectionFactory masterConnectionFactory) {
  23. this.masterConnectionFactory = masterConnectionFactory;
  24. }
  25. @PostMapping(value = "sendDelayMsg")
  26. public String sendMsg2DelayQueue(@RequestBody DelayMsgVo delayMsgVo) throws IOException, TimeoutException {
  27. doSendMsg2DelayQueue(delayMsgVo);
  28. return "success";
  29. }
  30. private void doSendMsg2DelayQueue(DelayMsgVo delayMsgVo) throws IOException, TimeoutException {
  31. // 根据id 动态生成队列名称
  32. String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();
  33. String queueName = MessageFormat.format(queueNameTemplate, delayMsgVo.getQueueId());
  34. String delayQueueName = queueName + DeclareQueueName.DELAY_QUEUE_NAME_SUFFIX.getQueueName();
  35. String deadQueueName = queueName + DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName();
  36. // 注意:下述声明交换机和队列的操作是可以重入的,MQ并不会报错
  37. try (Connection connection = masterConnectionFactory.createConnection();
  38. Channel channel = connection.createChannel(false)){
  39. // 声明死信交换机
  40. channel.exchangeDeclare(DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
  41. // 声明死信队列
  42. AMQP.Queue.DeclareOk deadQueueDeclareOk = channel.queueDeclare(deadQueueName,
  43. true, false, false, null);
  44. // 定时任务 绑定消费者,避免出现多个消费者以及重启后无法消费存量消息的问题
  45. // 注意:因为需要保证消费顺序,所以此处仅声明一个消费者
  46. // 死信队列和交换机绑定
  47. channel.queueBind(deadQueueName, DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), deadQueueName);
  48. // 声明延迟队列
  49. Map<String, Object> args = new HashMap<>();
  50. //设置延迟队列绑定的死信交换机
  51. args.put("x-dead-letter-exchange", DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName());
  52. //设置延迟队列绑定的死信路由键
  53. args.put("x-dead-letter-routing-key", deadQueueName);
  54. //设置延迟队列的 TTL 消息存活时间
  55. args.put("x-message-ttl", 10 * 1000);
  56. channel.queueDeclare(delayQueueName, true, false, false, args);
  57. channel.exchangeDeclare(DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
  58. channel.queueBind(delayQueueName, DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), delayQueueName);
  59. // 发送消息到延迟队列
  60. channel.basicPublish(DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), delayQueueName, null,
  61. JSONObject.toJSONString(delayMsgVo.getPhone()).getBytes(StandardCharsets.UTF_8));
  62. }
  63. }
  64. }

创建master延迟消息消费者

  1. package com.wd.mq.consumer;
  2. import com.rabbitmq.client.*;
  3. import com.wd.config.DeclareQueueExchange;
  4. import com.wd.config.DeclareQueueName;
  5. import org.springframework.amqp.rabbit.connection.Connection;
  6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  7. import java.io.IOException;
  8. import java.util.concurrent.TimeoutException;
  9. /**
  10. * 死信消费者,消费消息转发给targetConnectionFactory对应的目标MQ
  11. */
  12. public class MasterDeadQueueConsumer extends DefaultConsumer {
  13. private final ConnectionFactory targetConnectionFactory;
  14. public MasterDeadQueueConsumer(Channel channel, ConnectionFactory targetConnectionFactory) {
  15. super(channel);
  16. this.targetConnectionFactory = targetConnectionFactory;
  17. }
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. // 从死信队列的名称中截取队列名称,作为后续队列的名称
  21. String routingKey = envelope.getRoutingKey();
  22. String targetQueueName = routingKey.substring(0, routingKey.length() - DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName().length());
  23. try (Connection targetConnection = targetConnectionFactory.createConnection();
  24. Channel targetChannel = targetConnection.createChannel(false)){
  25. // 声明交换机和队列
  26. targetChannel.exchangeDeclare(DeclareQueueExchange.EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
  27. targetChannel.queueDeclare(targetQueueName, true, false, false, null);
  28. targetChannel.queueBind(targetQueueName, DeclareQueueExchange.EXCHANGE.getExchangeName(), targetQueueName);
  29. // 转发消息
  30. targetChannel.basicPublish(DeclareQueueExchange.EXCHANGE.getExchangeName(), targetQueueName, properties, body);
  31. } catch (TimeoutException e) {
  32. e.printStackTrace();
  33. // 注意此处获取的源队列的channel
  34. getChannel().basicNack(envelope.getDeliveryTag(), false, true);
  35. }
  36. // 注意此处获取的源队列的channel
  37. getChannel().basicAck(envelope.getDeliveryTag(), false);
  38. }
  39. }

创建slave队列消息消费者

  1. package com.wd.mq.consumer;
  2. import com.alibaba.fastjson2.JSONObject;
  3. import com.rabbitmq.client.AMQP;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DefaultConsumer;
  6. import com.rabbitmq.client.Envelope;
  7. import com.wd.pojo.Phone;
  8. import java.io.IOException;
  9. public class SlaveQueueConsumer extends DefaultConsumer {
  10. public SlaveQueueConsumer(Channel channel) {
  11. super(channel);
  12. }
  13. @Override
  14. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  15. Phone phone = JSONObject.parseObject(new String(body), Phone.class);
  16. System.out.println("SlaveQueueConsumer consume ==> " + phone);
  17. getChannel().basicAck(envelope.getDeliveryTag(), false);
  18. }
  19. }

创建定时任务,消费延迟消息

注意:因为采用的是死信队列的方式实现的延迟效果,此处只需要消费对应的死信队列即可

  1. package com.wd.mq.quartz;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.wd.config.DeclareQueueExchange;
  6. import com.wd.config.DeclareQueueName;
  7. import com.wd.config.QueueIdListConfig;
  8. import com.wd.mq.consumer.MasterDeadQueueConsumer;
  9. import org.springframework.amqp.rabbit.connection.Connection;
  10. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  11. import org.springframework.beans.factory.annotation.Qualifier;
  12. import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
  13. import org.springframework.context.annotation.Configuration;
  14. import org.springframework.scheduling.annotation.Scheduled;
  15. import java.io.IOException;
  16. import java.text.MessageFormat;
  17. import java.util.concurrent.TimeoutException;
  18. @Configuration
  19. @ConditionalOnBean(value = ConnectionFactory.class, name = {"slaveConnectionFactory", "masterConnectionFactory"})
  20. public class MasterDeadQueueSubscribeProcessor {
  21. private final ConnectionFactory masterConnectionFactory;
  22. private final ConnectionFactory slaveConnectionFactory;
  23. public MasterDeadQueueSubscribeProcessor(@Qualifier(value = "masterConnectionFactory") ConnectionFactory masterConnectionFactory,
  24. @Qualifier(value = "slaveConnectionFactory") ConnectionFactory slaveConnectionFactory) {
  25. this.masterConnectionFactory = masterConnectionFactory;
  26. this.slaveConnectionFactory = slaveConnectionFactory;
  27. }
  28. /**
  29. * 消费死信队列信息,并且转发到其他mq
  30. */
  31. @Scheduled(fixedDelay = 10 * 1000)
  32. public void subscribeMasterDeadQueue() throws IOException, TimeoutException {
  33. // 根据id 动态生成队列名称
  34. // 此处的queueIdList可以从第三方缓存查询得到,并且和sendDelayMsg接口保持同步刷新,此处先用本地缓存代替,id同步刷新机制不是重点,此处暂不讨论
  35. for (Integer id : QueueIdListConfig.QUEUE_ID_LIST) {
  36. String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();
  37. String deadQueueName = MessageFormat.format(queueNameTemplate, id) + DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName();
  38. try (Connection connection = masterConnectionFactory.createConnection();
  39. Channel channel = connection.createChannel(false)){
  40. AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(deadQueueName, true, false, false, null);
  41. if (queueDeclare.getConsumerCount() == 0) {
  42. channel.exchangeDeclare(DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
  43. }
  44. channel.queueBind(deadQueueName, DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), deadQueueName);
  45. channel.basicConsume(deadQueueName, false, new MasterDeadQueueConsumer(channel, slaveConnectionFactory));
  46. }
  47. }
  48. }
  49. }

创建定时任务,消费slave队列的消息

  1. package com.wd.mq.quartz;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.wd.config.DeclareQueueExchange;
  6. import com.wd.config.DeclareQueueName;
  7. import com.wd.config.QueueIdListConfig;
  8. import com.wd.mq.consumer.SlaveQueueConsumer;
  9. import org.springframework.amqp.rabbit.connection.Connection;
  10. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  11. import org.springframework.beans.factory.annotation.Qualifier;
  12. import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
  13. import org.springframework.context.annotation.Configuration;
  14. import org.springframework.scheduling.annotation.Scheduled;
  15. import java.io.IOException;
  16. import java.text.MessageFormat;
  17. import java.util.concurrent.TimeoutException;
  18. @Configuration
  19. @ConditionalOnBean(value = ConnectionFactory.class, name = "slaveConnectionFactory")
  20. public class SlaveQueueSubscribeProcessor {
  21. private final ConnectionFactory slaveConnectionFactory;
  22. public SlaveQueueSubscribeProcessor(@Qualifier(value = "slaveConnectionFactory") ConnectionFactory slaveConnectionFactory) {
  23. this.slaveConnectionFactory = slaveConnectionFactory;
  24. }
  25. /**
  26. * 消费队列信息
  27. */
  28. @Scheduled(fixedDelay = 10 * 1000)
  29. public void subscribeSlaveDeadQueue() throws IOException, TimeoutException {
  30. // 根据id 动态生成队列名称
  31. // 此处的queueIdList可以从第三方缓存查询得到,并且和sendDelayMsg接口保持同步刷新,此处先用本地缓存代替
  32. for (Integer id : QueueIdListConfig.QUEUE_ID_LIST) {
  33. String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();
  34. String queueName = MessageFormat.format(queueNameTemplate, id);
  35. try (Connection connection = slaveConnectionFactory.createConnection();
  36. Channel channel = connection.createChannel(false)){
  37. AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(queueName, true, false, false, null);
  38. if (queueDeclare.getConsumerCount() == 0) {
  39. channel.basicConsume(queueName, false, new SlaveQueueConsumer(channel));
  40. }
  41. channel.exchangeDeclare(DeclareQueueExchange.EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
  42. channel.queueBind(queueName, DeclareQueueExchange.EXCHANGE.getExchangeName(), queueName);
  43. }
  44. }
  45. }
  46. }

启动项目

请求接口发送消息 http://localhost:8080/sendDelayMsg

检查消息传递过程

先在master-mq延迟队列发现消息

再到master-mq死信队列中发现消息

再到slave-mq中发现消息

检查日志打印

发现SlaveQueueConsumer打印如下日志:

结论

消息传递流程如下,验证通过

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/420175
推荐阅读
相关标签
  

闽ICP备14008679号