当前位置:   article > 正文

SpringBoot整合多个RabbitMQ_springboot 链接两个rabbitmq

springboot 链接两个rabbitmq

一、背景

​ 最近项目中需要用到了RabbitMQ来监听消息队列,监听的消息队列的 虚拟主机(virtualHost)和队列名(queueName)是不一致的,但是接收到的消息格式相同的。而且可能还存在程序不停机的情况下,动态的增加新的队列(queue)的监听,因此就需要我们自己在程序中实现一种方法实现动态配置RabbitMQ

二、需求

我们有2RabbitMQ的配置,在程序启动的时候,动态的配置好这2个RabbitMQ,实现消息的监听。

RabbitMQ的配置信息

hostportusernamepasswordvirtualHostqueueName
47.101.130.1645672rabbit-multi-01rabbit-multi-01/rabbit-multi-01queue-rabbit-multi-01
47.101.130.1645672rabbit-multi-02rabbit-multi-02/rabbit-multi-02queue-rabbit-multi-02

三、实现思路

1、动态配置RabbitMQ

包括 ConnectionFactory,RabbitAdmin,RabbitTemplate,SimpleMessageListenerContainer

2、将上方配置好的Bean注入到Spring容器中,之后可能需要用到

Spring容器中注入Bean的方法

DefaultListableBeanFactory#registerSingleton
  • 1
DefaultListableBeanFactory#registerBeanDefinition
  • 1

四、实现步骤

1、引入maven依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2、创建RabbitProperties用来表示RabbitMQ的配置信息

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RabbitProperties {
    private String host;
    private Integer port;
    private String username;
    private String password;
    private String virtualHost;
    private String queueName;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

3、配置RabbitMQ

配置 ConnectionFactory,RabbitAdmin,RabbitTemplate,SimpleMessageListenerContainer等,并动态注入到Spring容器中

@Configuration
@RequiredArgsConstructor
@Slf4j
public class MultiRabbitMqConfig {

    private final DefaultListableBeanFactory defaultListableBeanFactory;

    private static Map<String, RabbitProperties> multiMqPropertiesMap = new HashMap<String, RabbitProperties>() {
        {
            put("first", RabbitProperties.builder()
                    .host("47.101.130.164")
                    .port(5672)
                    .username("rabbit-multi-01")
                    .password("rabbit-multi-01")
                    .virtualHost("/rabbit-multi-01")
                    .queueName("queue-rabbit-multi-01").build());
            put("second", RabbitProperties.builder()
                    .host("47.101.130.164")
                    .port(5672)
                    .username("rabbit-multi-02")
                    .password("rabbit-multi-02")
                    .virtualHost("/rabbit-multi-02")
                    .queueName("queue-rabbit-multi-02").build());
        }
    };

    @PostConstruct
    public void initRabbitmq() {
        multiMqPropertiesMap.forEach((key, rabbitProperties) -> {

            AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class)
                    .addPropertyValue("cacheMode", CachingConnectionFactory.CacheMode.CHANNEL)
                    .addPropertyValue("host", rabbitProperties.getHost())
                    .addPropertyValue("port", rabbitProperties.getPort())
                    .addPropertyValue("username", rabbitProperties.getUsername())
                    .addPropertyValue("password", rabbitProperties.getPassword())
                    .addPropertyValue("virtualHost", rabbitProperties.getVirtualHost())
                    .getBeanDefinition();
            String connectionFactoryName = String.format("%s%s", key, "ConnectionFactory");
            defaultListableBeanFactory.registerBeanDefinition(connectionFactoryName, beanDefinition);
            CachingConnectionFactory connectionFactory = defaultListableBeanFactory.getBean(connectionFactoryName, CachingConnectionFactory.class);

            String rabbitAdminName = String.format("%s%s", key, "RabbitAdmin");
            AbstractBeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(RabbitAdmin.class)
                    .addConstructorArgValue(connectionFactory)
                    .addPropertyValue("autoStartup", true)
                    .getBeanDefinition();
            defaultListableBeanFactory.registerBeanDefinition(rabbitAdminName, rabbitAdminBeanDefinition);
            RabbitAdmin rabbitAdmin = defaultListableBeanFactory.getBean(rabbitAdminName, RabbitAdmin.class);
            log.info("rabbitAdmin:[{}]", rabbitAdmin);

            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "RabbitTemplate"), rabbitTemplate);

            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            // 设置监听的队列
            simpleMessageListenerContainer.setQueueNames(rabbitProperties.getQueueName());
            // 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加
            simpleMessageListenerContainer.setConcurrentConsumers(3);
            // 最大的并发消费者
            simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
            // 设置是否重回队列
            simpleMessageListenerContainer.setDefaultRequeueRejected(false);
            // 设置签收模式
            simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            // 设置非独占模式
            simpleMessageListenerContainer.setExclusive(false);
            // 设置consumer未被 ack 的消息个数
            simpleMessageListenerContainer.setPrefetchCount(1);
            // 设置消息监听器
            simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
                try {
                    log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody()));
                    log.info("====>connection:[{}]", channel.getConnection());
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    // 发生异常此处需要捕获到
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                }
            });
             /**
             * 1、simpleMessageListenerContainer.start();
             * 2、simpleMessageListenerContainer.stop();
             * 3、如果后期rabbitmq 的配置是从数据库中读取,即用户在页面上添加一个配置,就动态创建这个
             * 此时就需要调用 SimpleMessageListenerContainer#start 方法
             */
             defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "SimpleMessageListenerContainer"), simpleMessageListenerContainer);
        });
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RabbitTemplate firstRabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean("firstRabbitTemplate");
            firstRabbitTemplate.convertAndSend("exchange-rabbit-multi-01", "", "first queue message");
            log.info("over...");
        }).start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101

五、实现效果

Multi-RabbitMQ

六、代码

https://gitee.com/huan1993/rabbitmq/tree/master/rabbitmq-springboot-multi

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/617010
推荐阅读
相关标签
  

闽ICP备14008679号