当前位置:   article > 正文

SpringBoot连接多RabbitMQ源_springboot 连接多个rabbitmq

springboot 连接多个rabbitmq

转自:

SpringBoot连接多RabbitMQ源 - 掘金在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但是只能连接一个RabbitMQ,当需要连接多个RabbitMQ时,默认的配置就…https://juejin.cn/post/6844904039797243917


在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但是只能连接一个RabbitMQ,当需要连接多个RabbitMQ时,默认的配置就不太适用了,需要单独编写每个连接。

在SpringBoot框架中,我们常用的两个类一般是:

  • RabbitTemplate:作为生产、消费消息使用;
  • RabbitAdmin:作为申明、删除交换机和队列,绑定和解绑队列和交换机的绑定关系使用。

所以我们连接多个RabbitMQ就需要重新建立连接、重新实现这两个类。 代码如下:

配置

application.properties配置文件需要配置两个连接:


 

  1. server.port=8080
  2. # rabbitmq
  3. v2.spring.rabbitmq.host=host
  4. v2.spring.rabbitmq.port=5672
  5. v2.spring.rabbitmq.username=username
  6. v2.spring.rabbitmq.password=password
  7. v2.spring.rabbitmq.virtual-host=virtual-host
  8. #consume 手动 ack
  9. v2.spring.rabbitmq.listener.simple.acknowledge-mode=manual
  10. #1.当mandatory标志位设置为true时,
  11. # 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
  12. # 那么broker会调用basic.return方法将消息返还给生产者;
  13. #2.当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
  14. # mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
  15. # 否则就将消息return给发送者;
  16. v2.spring.rabbitmq.template.mandatory=true
  17. #publisher confirms 发送确认
  18. v2.spring.rabbitmq.publisher-confirms=true
  19. #returns callback :
  20. # 1.未送达exchange
  21. # 2.送达exchange却未送道queue的消息 回调returnCallback.(注意)出现2情况时,publisher-confirms 回调的是true
  22. v2.spring.rabbitmq.publisher-returns=true
  23. v2.spring.rabbitmq.listener.simple.prefetch=5
  24. # rabbitmq
  25. v1.spring.rabbitmq.host=host
  26. v1.spring.rabbitmq.port=5672
  27. v1.spring.rabbitmq.username=username
  28. v1.spring.rabbitmq.password=password
  29. v1.spring.rabbitmq.virtual-host=virtual-host
  30. #consume 手动 ack
  31. v1.spring.rabbitmq.listener.simple.acknowledge-mode=manual
  32. #1.当mandatory标志位设置为true时,
  33. # 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
  34. # 那么broker会调用basic.return方法将消息返还给生产者;
  35. #2.当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
  36. # mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
  37. # 否则就将消息return给发送者;
  38. v1.spring.rabbitmq.template.mandatory=true
  39. #publisher confirms 发送确认
  40. v1.spring.rabbitmq.publisher-confirms=true
  41. #returns callback :
  42. # 1.未送达exchange
  43. # 2.送达exchange却未送道queue的消息 回调returnCallback.(注意)出现2情况时,publisher-confirms 回调的是true
  44. v1.spring.rabbitmq.publisher-returns=true
  45. v1.spring.rabbitmq.listener.simple.prefetch=5

重写连接工厂

需要注意的是,在多源的情况下,需要在某个连接加上@Primary注解,表示主连接,默认使用这个连接;

  1. package com.example.config.rabbitmq;
  2. import com.alibaba.fastjson.JSON;
  3. import org.springframework.amqp.core.AcknowledgeMode;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  6. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  7. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  8. import org.springframework.amqp.rabbit.connection.CorrelationData;
  9. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  10. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.beans.factory.annotation.Qualifier;
  13. import org.springframework.beans.factory.annotation.Value;
  14. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  15. import org.springframework.context.annotation.Bean;
  16. import org.springframework.context.annotation.Configuration;
  17. import org.springframework.context.annotation.Primary;
  18. /**
  19. * Created by shuai on 2019/4/23.
  20. */
  21. @Configuration
  22. public class MultipleRabbitMQConfig {
  23. @Bean(name = "v2ConnectionFactory")
  24. public CachingConnectionFactory hospSyncConnectionFactory(
  25. @Value("${v2.spring.rabbitmq.host}") String host,
  26. @Value("${v2.spring.rabbitmq.port}") int port,
  27. @Value("${v2.spring.rabbitmq.username}") String username,
  28. @Value("${v2.spring.rabbitmq.password}") String password,
  29. @Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost,
  30. @Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
  31. @Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
  32. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  33. connectionFactory.setHost(host);
  34. connectionFactory.setPort(port);
  35. connectionFactory.setUsername(username);
  36. connectionFactory.setPassword(password);
  37. connectionFactory.setVirtualHost(virtualHost);
  38. connectionFactory.setPublisherConfirms(publisherConfirms);
  39. connectionFactory.setPublisherReturns(publisherReturns);
  40. return connectionFactory;
  41. }
  42. @Bean(name = "v2RabbitTemplate")
  43. public RabbitTemplate firstRabbitTemplate(
  44. @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
  45. @Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
  46. RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);
  47. v2RabbitTemplate.setMandatory(mandatory);
  48. v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
  49. if (!ack) {
  50. // LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));
  51. } else {
  52. // LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));
  53. }
  54. });
  55. v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
  56. // LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
  57. });
  58. return v2RabbitTemplate;
  59. }
  60. @Bean(name = "v2ContainerFactory")
  61. public SimpleRabbitListenerContainerFactory hospSyncFactory(
  62. @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
  63. @Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
  64. @Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch
  65. ) {
  66. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  67. factory.setConnectionFactory(connectionFactory);
  68. factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
  69. factory.setPrefetchCount(prefetch);
  70. return factory;
  71. }
  72. @Bean(name = "v2RabbitAdmin")
  73. public RabbitAdmin iqianzhanRabbitAdmin(
  74. @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) {
  75. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  76. rabbitAdmin.setAutoStartup(true);
  77. return rabbitAdmin;
  78. }
  79. // mq主连接
  80. @Bean(name = "v1ConnectionFactory")
  81. @Primary
  82. public CachingConnectionFactory publicConnectionFactory(
  83. @Value("${v1.spring.rabbitmq.host}") String host,
  84. @Value("${v1.spring.rabbitmq.port}") int port,
  85. @Value("${v1.spring.rabbitmq.username}") String username,
  86. @Value("${v1.spring.rabbitmq.password}") String password,
  87. @Value("${v1.spring.rabbitmq.virtual-host}") String virtualHost,
  88. @Value("${v1.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
  89. @Value("${v1.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
  90. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  91. connectionFactory.setHost(host);
  92. connectionFactory.setPort(port);
  93. connectionFactory.setUsername(username);
  94. connectionFactory.setPassword(password);
  95. connectionFactory.setVirtualHost(virtualHost);
  96. connectionFactory.setPublisherConfirms(publisherConfirms);
  97. connectionFactory.setPublisherReturns(publisherReturns);
  98. return connectionFactory;
  99. }
  100. @Bean(name = "v1RabbitTemplate")
  101. @Primary
  102. public RabbitTemplate publicRabbitTemplate(
  103. @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,
  104. @Value("${v1.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
  105. RabbitTemplate v1RabbitTemplate = new RabbitTemplate(connectionFactory);
  106. v1RabbitTemplate.setMandatory(mandatory);
  107. v1RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
  108. if (!ack) {
  109. // LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));
  110. } else {
  111. // LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));
  112. }
  113. });
  114. v1RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
  115. // LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
  116. });
  117. return v1RabbitTemplate;
  118. }
  119. @Bean(name = "v1ContainerFactory")
  120. @Primary
  121. public SimpleRabbitListenerContainerFactory insMessageListenerContainer(
  122. @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,
  123. @Value("${v1.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
  124. @Value("${v1.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {
  125. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  126. factory.setConnectionFactory(connectionFactory);
  127. factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
  128. factory.setPrefetchCount(prefetch);
  129. return factory;
  130. }
  131. @Bean(name = "v1RabbitAdmin")
  132. @Primary
  133. public RabbitAdmin publicRabbitAdmin(
  134. @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory) {
  135. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  136. rabbitAdmin.setAutoStartup(true);
  137. return rabbitAdmin;
  138. }
  139. }

 

创建Exchange、Queue并绑定

再实现RabbitAdmin后,我们就需要根据RabbitAdmin创建对应的交换机和队列,并建立绑定关系

  1. package com.example.config.rabbitmq;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.amqp.core.TopicExchange;
  5. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  6. import org.springframework.context.annotation.Configuration;
  7. import javax.annotation.PostConstruct;
  8. import javax.annotation.Resource;
  9. /**
  10. * 创建Queue、Exchange并建立绑定关系
  11. * Created by shuai on 2019/5/16.
  12. */
  13. @Configuration
  14. public class MyRabbitMQCreateConfig {
  15. @Resource(name = "v2RabbitAdmin")
  16. private RabbitAdmin v2RabbitAdmin;
  17. @Resource(name = "v1RabbitAdmin")
  18. private RabbitAdmin v1RabbitAdmin;
  19. @PostConstruct
  20. public void RabbitInit() {
  21. v2RabbitAdmin.declareExchange(new TopicExchange("exchange.topic.example.new", true, false));
  22. v2RabbitAdmin.declareQueue(new Queue("queue.example.topic.new", true));
  23. v2RabbitAdmin.declareBinding(
  24. BindingBuilder
  25. .bind(new Queue("queue.example.topic.new", true)) //直接创建队列
  26. .to(new TopicExchange("exchange.topic.example.new", true, false)) //直接创建交换机 建立关联关系
  27. .with("routing.key.example.new")); //指定路由Key
  28. }
  29. }

生产者

为了后续验证每个连接都建立成功,并且都能生产消息,生产者这里分别使用新生成的RabbitTemplate发送一条消息。

  1. package com.example.topic;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import javax.annotation.Resource;
  6. @Component
  7. public class TopicProducer {
  8. @Resource(name = "v1RabbitTemplate")
  9. private RabbitTemplate v1RabbitTemplate;
  10. @Resource(name = "v2RabbitTemplate")
  11. private RabbitTemplate v2RabbitTemplate;
  12. public void sendMessageByTopic() {
  13. String content1 = "This is a topic type of the RabbitMQ message example from v1RabbitTemplate";
  14. v1RabbitTemplate.convertAndSend(
  15. "exchange.topic.example.new",
  16. "routing.key.example.new",
  17. content1);
  18. String content2 = "This is a topic type of the RabbitMQ message example from v2RabbitTemplate";
  19. v2RabbitTemplate.convertAndSend(
  20. "exchange.topic.example.new",
  21. "routing.key.example.new",
  22. content2);
  23. }
  24. }

消费者

这里需要注意在配置消费队列时,需要标识ContainerFactory

  1. package com.example.topic;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "queue.example.topic.new", containerFactory = "v2ContainerFactory")
  7. public class TopicConsumer {
  8. @RabbitHandler
  9. public void consumer(String message) {
  10. System.out.println(message);
  11. }
  12. }

 这样就完成了SpringBoot连接多个RabbitMQ源的示例了,再写一段测试代码验证下。

测试验证

  1. package com.example.test;
  2. import com.example.topic.TopicProducer;
  3. import org.junit.Test;
  4. import org.junit.runner.RunWith;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import org.springframework.test.context.junit4.SpringRunner;
  8. @RunWith(SpringRunner.class)
  9. @SpringBootTest
  10. public class RabbitMQMultipleTest {
  11. @Autowired
  12. private TopicProducer topicProducer;
  13. @Test
  14. public void topicProducerTest() {
  15. topicProducer.sendMessageByTopic();
  16. }
  17. }

 

执行测试代码,验证结果为:

 

验证SpringBoot连接多RabbitMQ源成功!

github地址:Spring Boot 教程、技术栈、示例代码 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/617019
推荐阅读
相关标签
  

闽ICP备14008679号