当前位置:   article > 正文

SpringBoot实现多rabbitmq连接_springboot 连接多个rabbitmq

springboot 连接多个rabbitmq

一、配置

1. 配置文件

  1. rabbitmq:
  2. first:
  3. host:
  4. port:
  5. username:
  6. password:
  7. #虚拟host 可以不设置,使用server默认host
  8. virtual-host: /
  9. second:
  10. host:
  11. port:
  12. username:
  13. password:
  14. virtual-host: /

2. 配置类

  1. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  2. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Qualifier;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import org.springframework.context.annotation.Primary;
  11. /**
  12. * RabbitMq多源配置
  13. *
  14. * @author lq
  15. */
  16. @Configuration
  17. public class RabbitConfig {
  18. @Bean(name = "firstConnectionFactory")
  19. @Primary
  20. public ConnectionFactory firstConnectionFactory(
  21. @Value("${spring.rabbitmq.first.host}") String host,
  22. @Value("${spring.rabbitmq.first.port}") int port,
  23. @Value("${spring.rabbitmq.first.username}") String username,
  24. @Value("${spring.rabbitmq.first.password}") String password,
  25. @Value("${spring.rabbitmq.first.virtual-host}") String virtualHost
  26. ) {
  27. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  28. connectionFactory.setHost(host);
  29. connectionFactory.setPort(port);
  30. connectionFactory.setUsername(username);
  31. connectionFactory.setPassword(password);
  32. connectionFactory.setVirtualHost(virtualHost);
  33. return connectionFactory;
  34. }
  35. @Bean(name = "secondConnectionFactory")
  36. public ConnectionFactory secondConnectionFactory(
  37. @Value("${spring.rabbitmq.second.host}") String host,
  38. @Value("${spring.rabbitmq.second.port}") int port,
  39. @Value("${spring.rabbitmq.second.username}") String username,
  40. @Value("${spring.rabbitmq.second.password}") String password,
  41. @Value("${spring.rabbitmq.second.virtual-host}") String virtualHost
  42. ) {
  43. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  44. connectionFactory.setHost(host);
  45. connectionFactory.setPort(port);
  46. connectionFactory.setUsername(username);
  47. connectionFactory.setPassword(password);
  48. connectionFactory.setVirtualHost(virtualHost);
  49. return connectionFactory;
  50. }
  51. @Bean(name = "firstRabbitTemplate")
  52. @Primary
  53. public RabbitTemplate firstRabbitTemplate(
  54. @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
  55. ) {
  56. RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
  57. return firstRabbitTemplate;
  58. }
  59. @Bean(name = "secondRabbitTemplate")
  60. public RabbitTemplate secondRabbitTemplate(
  61. @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
  62. ) {
  63. RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
  64. return secondRabbitTemplate;
  65. }
  66. @Bean(name = "firstFactory")
  67. public SimpleRabbitListenerContainerFactory firstFactory(
  68. SimpleRabbitListenerContainerFactoryConfigurer configurer,
  69. @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
  70. ) {
  71. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  72. configurer.configure(factory, connectionFactory);
  73. return factory;
  74. }
  75. @Bean(name = "secondFactory")
  76. public SimpleRabbitListenerContainerFactory secondFactory(
  77. SimpleRabbitListenerContainerFactoryConfigurer configurer,
  78. @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
  79. ) {
  80. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  81. configurer.configure(factory, connectionFactory);
  82. return factory;
  83. }
  84. }

 3.信道构建器

  1. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.io.IOException;
  6. /**
  7. * 信道构建器
  8. *
  9. * @author liuqi
  10. */
  11. @Configuration
  12. public class CreateQueue {
  13. @Bean
  14. public String chargeQueue(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
  15. try {
  16. connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME, true, false, false, null);
  17. }catch (IOException e){
  18. e.printStackTrace();
  19. }
  20. return Constants.RABBITMQ_QUEUE_NAME;
  21. }
  22. @Bean
  23. public String chargeQueue2(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
  24. try {
  25. connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME2, true, false, false, null);
  26. }catch (IOException e){
  27. e.printStackTrace();
  28. }
  29. return Constants.RABBITMQ_QUEUE_NAME2;
  30. }
  31. }

二、发送

1. 创建发送类

  1. import com.alibaba.fastjson.JSONObject;
  2. import com.google.gson.JsonObject;
  3. import com.zlhy.websocket.util.constant.Constants;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.tomcat.util.bcel.Const;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.core.MessageProperties;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.stereotype.Service;
  10. import javax.annotation.Resource;
  11. import java.util.Map;
  12. /**
  13. * @author liuqi
  14. * @version 1.0
  15. * @description 向关联方的队列发送消息
  16. */
  17. @Slf4j
  18. @Service
  19. public class SendMessage {
  20. @Resource(name = "firstRabbitTemplate")
  21. private RabbitTemplate firstRabbitTemplate;
  22. @Resource(name = "secondRabbitTemplate")
  23. private RabbitTemplate secondRabbitTemplate;
  24. public void sendToOneMessage(JSONObject jsonObject) {
  25. MessageProperties messageProperties = new MessageProperties();
  26. messageProperties.setContentType("application/json");
  27. Message info = new Message(jsonObject.toString().getBytes(), messageProperties);
  28. firstRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME, info);
  29. }
  30. public void sendToTwoMessage(JSONObject jsonObject) {
  31. MessageProperties messageProperties = new MessageProperties();
  32. messageProperties.setContentType("application/json");
  33. Message info = new Message(jsonObject.toString().getBytes(), messageProperties);
  34. secondRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME2, info);
  35. }
  36. }

2. 调用方法发送数据

sendMessage.sendToOneMessage(jsonResult);

sendMessage.sendToTwoMessage(jsonResult);

三、消费测试

  1. import com.rabbitmq.client.Channel;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Service;
  6. import java.io.IOException;
  7. import java.nio.charset.StandardCharsets;
  8. @Slf4j
  9. @Service
  10. public class OneReceive {
  11. @RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME, containerFactory = "firstFactory")
  12. public void listenOne(Message message, Channel channel) throws IOException {
  13. //获取MQ返回的数据
  14. // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  15. String data = new String(message.getBody(), StandardCharsets.UTF_8);
  16. log.info("MQ1返回的数据:{}", data);
  17. //下面进行业务逻辑处理
  18. }
  19. }
  1. import com.rabbitmq.client.Channel;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Service;
  6. import java.io.IOException;
  7. import java.nio.charset.StandardCharsets;
  8. @Slf4j
  9. @Service
  10. public class TwoReceive {
  11. @RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME2, containerFactory = "secondFactory")
  12. public void listenTwo(Message message, Channel channel) throws IOException {
  13. //获取MQ返回的数据
  14. //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  15. String data = new String(message.getBody(), StandardCharsets.UTF_8);
  16. log.info("MQ2返回的数据:{}", data);
  17. //下面进行业务逻辑处理
  18. }
  19. }

也可放到一个类中测试消费

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/616990
推荐阅读
相关标签
  

闽ICP备14008679号