当前位置:   article > 正文

springboot集成多个rabbitmq

spring boot 多台rabbitmq rabbitlistener指定containerfactory

springboot版本:1.5.19.RELEASE

Gradle版本:4.10

 

Gradle的build.gradle中增加引入

Java代码 

compile('org.springframework.boot:spring-boot-starter-amqp')  

 

application.yaml中增加配置

Yaml代码 

  1. buddie:
  2. rabbitmq:
  3. consume:
  4. host: 127.0.0.1
  5. port: 5672
  6. username: admin
  7. password: admin
  8. produce:
  9. host: 127.0.0.1
  10. port: 5674
  11. username: admin
  12. password: admin

 

增加配置类,配置我们的两个rabbitMQ:

Java代码 

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  3. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Qualifier;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.context.annotation.Primary;
  12. @Configuration
  13. public class RabbitConfig {
  14. public ConnectionFactory rabbitConfiguration(String host, int port, String username, String password) {
  15. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  16. connectionFactory.setHost(host);
  17. connectionFactory.setPort(port);
  18. connectionFactory.setUsername(username);
  19. connectionFactory.setPassword(password);
  20. return connectionFactory;
  21. }
  22. @Bean("consumeRabbitConnectionFactory")
  23. @Primary
  24. public ConnectionFactory innerRabbitConfiguration(@Value("${buddie.rabbitmq.consume.host}") String host,
  25. @Value("${buddie.rabbitmq.consume.port}") int port,
  26. @Value("${buddie.rabbitmq.consume.username}") String username,
  27. @Value("${buddie.rabbitmq.consume.password}") String password) {
  28. return this.rabbitConfiguration(host, port, username, password);
  29. }
  30. @Bean("consumeRabbitTemplate")
  31. @Primary
  32. public RabbitTemplate consumeRabbitTemplate(
  33. @Qualifier("consumeRabbitConnectionFactory") ConnectionFactory connectionFactory
  34. ) {
  35. return new RabbitTemplate(connectionFactory);
  36. }
  37. public SimpleRabbitListenerContainerFactory rabbitFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
  38. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  39. configurer.configure(factory, connectionFactory);
  40. return factory;
  41. }
  42. @Bean("consumeRabbitFactory")
  43. public SimpleRabbitListenerContainerFactory consumeRabbitFactory(
  44. SimpleRabbitListenerContainerFactoryConfigurer configurer,
  45. @Qualifier("consumeRabbitConnectionFactory") ConnectionFactory connectionFactory
  46. ) {
  47. return this.rabbitFactory(configurer, connectionFactory);
  48. }
  49. @Bean("produceRabbitConnectionFactory")
  50. public ConnectionFactory outerRabbitConfiguration(@Value("${buddie.rabbitmq.produce.host}") String host,
  51. @Value("${buddie.rabbitmq.produce.port}") int port,
  52. @Value("${buddie.rabbitmq.produce.username}") String username,
  53. @Value("${buddie.rabbitmq.produce.password}") String password) {
  54. return this.rabbitConfiguration(host, port, username, password);
  55. }
  56. @Bean("produceRabbitTemplate")
  57. public RabbitTemplate produceRabbitTemplate(
  58. @Qualifier("produceRabbitConnectionFactory") ConnectionFactory connectionFactory
  59. ) {
  60. return new RabbitTemplate(connectionFactory);
  61. }
  62. @Bean("produceRabbitFactory")
  63. public SimpleRabbitListenerContainerFactory outerRabbitFactory(
  64. SimpleRabbitListenerContainerFactoryConfigurer configurer,
  65. @Qualifier("produceRabbitConnectionFactory") ConnectionFactory connectionFactory
  66. ) {
  67. return this.rabbitFactory(configurer, connectionFactory);
  68. }
  69. @Bean
  70. public Queue topicQueueCreate() {
  71. return new Queue("topic.task");
  72. }
  73. @Bean
  74. public TopicExchange topicExchange() {
  75. return new TopicExchange("topic.exchange");
  76. }
  77. @Bean
  78. public Binding topicBindingCreate() {
  79. return BindingBuilder.bind(this.topicQueueCreate()).to(this.topicExchange()).with("topic.task.#");
  80. }
  81. }

注意事项:

做为生产者,在启服时,并不会对连接rabbitMQ,更不会去创建Topic,Queue及绑定。

而作为消费者,在启服后,会连接rabbitMQ,并检查Queue是否有消息可消费。

所以应该将消费的rabbitMQ配置,加上@Primary,否则在rabbitMQ上没有对应的Queue时,报错,无法启动服务器

 

使用:

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.Resource;
  9. import java.io.IOException;
  10. @Component
  11. public class MessageConsumeService {
  12. private static final Logger logger = LoggerFactory.getLogger(MessageConsumeService.class);
  13. @Resource(name = "produceRabbitTemplate")
  14. private RabbitTemplate outerRabbitTemplate;
  15. @Autowired
  16. private ObjectMapper objectMapper;
  17. @RabbitListener(queues = PropConstants.TOPIC_TASK, containerFactory = "consumeRabbitFactory")
  18. public void receiveTopicCreate(String srcMessage) {
  19. try {
  20. outerRabbitTemplate.convertAndSend("topic.exchange", "topic.task.#", srcMessage);
  21. } catch (Exception e) {
  22. logger.error("MessageConsumeService.receiveTopicCreate error", e);
  23. }
  24. }
  25. }

 

转载于:https://my.oschina.net/buddie/blog/3015421

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/617061
推荐阅读
相关标签
  

闽ICP备14008679号