赞
踩
在项目中经常遇到一个springboot工程要连接多个rabbitmq。如果只连接一个,springboot可以零配置连接rabbitmq,这样不需要做额外的工作。但如果连接多个rabbitmq,就得做一些配置了。
1 配置多个rabbitmq的连接地址:
spring.rabbitmq.first.host=192.168.10.223
spring.rabbitmq.first.port=5672
spring.rabbitmq.first.username=admin
spring.rabbitmq.first.password=admin
spring.rabbitmq.second.host=192.168.10.224
spring.rabbitmq.second.port=5672
spring.rabbitmq.second.username=admin
spring.rabbitmq.second.password=admin
spring.rabbitmq.third.host=192.168.10.225
spring.rabbitmq.third.port=5672
spring.rabbitmq.third.username=admin
spring.rabbitmq.third.password=admin
2 编写配置类 /* 定义连接,rabbitMQ */ @Bean(name = "mainConnectionFactory") @Primary public ConnectionFactory mainConnectionFactory( @Value("${spring.rabbitmq.first.host}") String host, @Value("${spring.rabbitmq.first.port}") int port, @Value("${spring.rabbitmq.first.username}") String username, @Value("${spring.rabbitmq.first.password}") String password) { return connectionFactory(host, port, username, password); } /* 定义连接 */ @Bean(name = "secondConnectionFactory") public ConnectionFactory secondConnectionFactory( @Value("${spring.rabbitmq.second.host}") String host, @Value("${spring.rabbitmq.second.port}") int port, @Value("${spring.rabbitmq.second.username}") String username, @Value("${spring.rabbitmq.second.password}") String password) { return connectionFactory(host, port, username, password); } /* 定义连接 */ @Bean(name = "thirdConnectionFactory") public ConnectionFactory thirdConnectionFactory( @Value("${spring.rabbitmq.third.host}") String host, @Value("${spring.rabbitmq.third.port}") int port, @Value("${spring.rabbitmq.third.username}") String username, @Value("${spring.rabbitmq.third.password}") String password) { return connectionFactory(host, port, username, password); } public CachingConnectionFactory connectionFactory(String host, int port, String username, String password) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); // connectionFactory.setVirtualHost(virtual_host); return connectionFactory; } @Bean(name = "mainRabbitTemplate") @Primary public RabbitTemplate mainRabbitTemplate(@Qualifier("mainConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate mainRabbitTemplate = new RabbitTemplate(connectionFactory); mainRabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return mainRabbitTemplate; } @Bean(name = "secondRabbitTemplate") public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory); return secondRabbitTemplate; } @Bean(name = "thirdRabbitTemplate") public RabbitTemplate thridRabbitTemplate(@Qualifier("thirdConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate thridRabbitTemplate = new RabbitTemplate(connectionFactory); return thridRabbitTemplate; } @Bean(name = "mainFactory") @Primary public SimpleRabbitListenerContainerFactory myFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("mainConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setMessageConverter(new Jackson2JsonMessageConverter()); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "secondFactory") public SimpleRabbitListenerContainerFactory secondFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConcurrentConsumers(10); factory.setMaxConcurrentConsumers(20); factory.setPrefetchCount(1); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "thirdFactory") public SimpleRabbitListenerContainerFactory thirdFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("thirdConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setMaxConcurrentConsumers(20); factory.setConcurrentConsumers(10); factory.setPrefetchCount(1); configurer.configure(factory, connectionFactory); return factory; }
3 使用发送消息 @Resource(name = "mainRabbitTemplate") protected RabbitTemplate rabbitTemplate; public MessageProvider(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); } public void sendMessage(RealBusBaseLineDTO realBusBaseLineDTO) { if (realBusBaseLineDTO != null) { String msg = JSON.toJSONString(realBusBaseLineDTO, SerializerFeature.DisableCircularReferenceDetect); rabbitTemplate.convertAndSend(QueueEnum.MESSAGE_REALBUS_QUEUE.getExchange(), QueueEnum.MESSAGE_REALBUS_QUEUE.getRouteKey(), msg); } else { log.warn("消息内容为空!!!!!"); } } 4 消费消息 @Resource private IRealBusDataService realBusDataService; @Value("${network.bus.isReadActiveQueue}") private Boolean isReadActiveQueue; @RabbitListener(queues = QueueContent.MESSAGE_REALBUS_DATA_QUEUE_NAME, containerFactory = "secondFactory") @RabbitHandler public void processRealBusDataQueue(String msg, Channel channel, Message message) throws IOException { log.warn("processRealBusDataQueue:{} ", msg); try { String bodyMessage = new String(msg); // 逻辑处理 } catch (Exception e) { log.error("确认消费异常", e); } } } 到此就配置好了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。