当前位置:   article > 正文

SpringBoot配置多个RabbitMq_springboot 配置多个rabbitmq

springboot 配置多个rabbitmq
  • YML
  1. rabbitmq:
  2. first:
  3. username: ${app.appkey}
  4. password: ${app.appkey}
  5. virtual-host: ${app.appid}
  6. addresses: x.x.x.x:5672,x.x.x.x:5672 #集群
  7. second:
  8. username: guest
  9. password: guest
  10. virtual-host: /
  11. host: 127.0.0.1
  12. port: 5672
  • 配置源
  1. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  2. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Qualifier;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import org.springframework.context.annotation.Primary;
  11. /**
  12. * RabbitMq多源配置
  13. *
  14. * @author Lenovo
  15. */
  16. @Configuration
  17. public class RabbitConfig {
  18. @Bean(name = "firstConnectionFactory")
  19. @Primary
  20. public ConnectionFactory firstConnectionFactory(
  21. @Value("${spring.rabbitmq.first.addresses}") String addresses,
  22. @Value("${spring.rabbitmq.first.username}") String username,
  23. @Value("${spring.rabbitmq.first.password}") String password,
  24. @Value("${spring.rabbitmq.first.virtual-host}") String virtualHost
  25. ) {
  26. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  27. connectionFactory.setAddresses(addresses);
  28. connectionFactory.setUsername(username);
  29. connectionFactory.setPassword(password);
  30. connectionFactory.setVirtualHost(virtualHost);
  31. return connectionFactory;
  32. }
  33. @Bean(name = "secondConnectionFactory")
  34. public ConnectionFactory secondConnectionFactory(
  35. @Value("${spring.rabbitmq.second.host}") String host,
  36. @Value("${spring.rabbitmq.second.port}") int port,
  37. @Value("${spring.rabbitmq.second.username}") String username,
  38. @Value("${spring.rabbitmq.second.password}") String password,
  39. @Value("${spring.rabbitmq.second.virtual-host}") String virtualHost
  40. ) {
  41. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  42. connectionFactory.setHost(host);
  43. connectionFactory.setPort(port);
  44. connectionFactory.setUsername(username);
  45. connectionFactory.setPassword(password);
  46. connectionFactory.setVirtualHost(virtualHost);
  47. return connectionFactory;
  48. }
  49. @Bean(name = "firstRabbitTemplate")
  50. @Primary
  51. public RabbitTemplate firstRabbitTemplate(
  52. @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
  53. ) {
  54. RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
  55. return firstRabbitTemplate;
  56. }
  57. @Bean(name = "secondRabbitTemplate")
  58. public RabbitTemplate secondRabbitTemplate(
  59. @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
  60. ) {
  61. RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
  62. return secondRabbitTemplate;
  63. }
  64. @Bean(name = "firstFactory")
  65. public SimpleRabbitListenerContainerFactory firstFactory(
  66. SimpleRabbitListenerContainerFactoryConfigurer configurer,
  67. @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
  68. ) {
  69. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  70. configurer.configure(factory, connectionFactory);
  71. return factory;
  72. }
  73. @Bean(name = "secondFactory")
  74. public SimpleRabbitListenerContainerFactory secondFactory(
  75. SimpleRabbitListenerContainerFactoryConfigurer configurer,
  76. @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
  77. ) {
  78. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  79. configurer.configure(factory, connectionFactory);
  80. return factory;
  81. }
  82. }
  • 信道构建器
  1. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.io.IOException;
  6. /**
  7. * 信道构建器
  8. *
  9. * @author Lenovo
  10. */
  11. @Configuration
  12. public class CreateQueue {
  13. @Bean
  14. public String chargeQueue(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
  15. try {
  16. connectionFactory.createConnection().createChannel(false).queueDeclare("test.add", true, false, false, null);
  17. }catch (IOException e){
  18. e.printStackTrace();
  19. }
  20. return "test.add";
  21. }
  22. }
  • 信道监听器
  1. package com.ciih.authcenter.client.mq;
  2. import com.ciih.authcenter.manager.entity.Permission;
  3. import com.rabbitmq.client.Channel;
  4. import lombok.SneakyThrows;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  9. import org.springframework.stereotype.Component;
  10. import java.io.IOException;
  11. import java.util.List;
  12. /**
  13. * 信道监听器
  14. *
  15. * @author Lenovo
  16. */
  17. @Slf4j
  18. @Component
  19. public class ListeningHandle {
  20. public static final String ENCODING = "UTF-8";
  21. @RabbitHandler
  22. @RabbitListener(queues = {RabbitConfig.USERS_ADD}, containerFactory = "firstFactory")
  23. @SneakyThrows
  24. public void onMessageUserAdd(Message message, Channel channel) {
  25. log.info("[listenerManualAck 监听的消息userAdd] - [{}]", new String(message.getBody(), ENCODING));
  26. try {
  27. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  28. } catch (
  29. IOException e) {
  30. }
  31. }
  32. @RabbitHandler
  33. @RabbitListener(queues = {RabbitConfig.USERS_UPDATE}, containerFactory = "firstFactory")
  34. @SneakyThrows
  35. public void onMessageUserUpdate(Message message, Channel channel) {
  36. log.info("[listenerManualAck 监听的消息userUpdate] - [{}]", new String(message.getBody(), ENCODING));
  37. try {
  38. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  39. } catch (
  40. IOException e) {
  41. }
  42. }
  43. @RabbitHandler
  44. @RabbitListener(queues = {RabbitConfig.USERS_DELETE}, containerFactory = "firstFactory")
  45. @SneakyThrows
  46. public void onMessageUserDelete(Message message, Channel channel) {
  47. log.info("[listenerManualAck 监听的消息userDelete] - [{}]", new String(message.getBody(), ENCODING));
  48. try {
  49. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  50. } catch (
  51. IOException e) {
  52. }
  53. }
  54. @RabbitHandler
  55. @RabbitListener(queues = {RabbitConfig.ORGS_ADD}, containerFactory = "firstFactory")
  56. @SneakyThrows
  57. public void onMessageOrgsAdd(Message message, Channel channel) {
  58. log.info("[listenerManualAck 监听的消息orgsAdd] - [{}]", new String(message.getBody(), ENCODING));
  59. try {
  60. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  61. } catch (
  62. IOException e) {
  63. }
  64. }
  65. @RabbitHandler
  66. @RabbitListener(queues = {RabbitConfig.ORGS_UPDATE}, containerFactory = "firstFactory")
  67. @SneakyThrows
  68. public void onMessageOrgsUpdate(Message message, Channel channel) {
  69. log.info("[listenerManualAck 监听的消息orgsUpdate] - [{}]", new String(message.getBody(), ENCODING));
  70. try {
  71. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  72. } catch (
  73. IOException e) {
  74. }
  75. }
  76. @RabbitHandler
  77. @RabbitListener(queues = {RabbitConfig.ORGS_DELETE}, containerFactory = "firstFactory")
  78. @SneakyThrows
  79. public void onMessageOrgsDelete(Message message, Channel channel) {
  80. log.info("[listenerManualAck 监听的消息orgsDelete] - [{}]", new String(message.getBody(), ENCODING));
  81. try {
  82. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  83. } catch (
  84. IOException e) {
  85. }
  86. }
  87. @RabbitListener(queues = {"test.add"}, containerFactory = "secondFactory")
  88. @SneakyThrows
  89. public void hospitalAdd(List<Permission> permissions, Message message, Channel channel) {
  90. System.out.println(permissions);
  91. }
  92. }
  • 发送消息
  1. import com.ciih.authcenter.manager.entity.Permission;
  2. import com.ciih.authcenter.manager.service.PermissionService;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import javax.annotation.Resource;
  7. import java.util.List;
  8. @RestController
  9. public class Sender {
  10. @Resource
  11. PermissionService permissionService;
  12. @Resource(name = "secondRabbitTemplate")
  13. private RabbitTemplate secondRabbitTemplate;
  14. @GetMapping("test1")
  15. public void send1() {
  16. List<Permission> list = permissionService.lambdaQuery().last("limit 0, 10").list();
  17. this.secondRabbitTemplate.convertAndSend("test.add", list);
  18. }
  19. }
  • 依赖
    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/617005
推荐阅读
相关标签
  

闽ICP备14008679号