赞
踩
场景:需要监听 同一个RabbitMQ下的 不同的虚拟主机下的 队列 来完成数据流转
my.rabbitmq.host=你的ip
my.rabbitmq.port=你的port
my.rabbitmq.username=你的username
my.rabbitmq.password=你的password
my.rabbitmq.first.virtual-host=firstVHost
my.rabbitmq.second.virtual-host=secondVHost
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @author: zdy * date: 2020/7/22 16:26 * version: 1.0.0 * description: RabbitMQ配置类 */ @Configuration public class RabbitMqConfig { /** * first 连接工厂 */ @Bean(name = "firstConnectionFactory") public ConnectionFactory firstConnectionFactory(@Value("${my.rabbitmq.host}") String host, @Value("${my.rabbitmq.port}") int port, @Value("${my.rabbitmq.username}") String username, @Value("${my.rabbitmq.password}") String password, @Value("${my.rabbitmq.first.virtual-host}") String vHost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vHost); return connectionFactory; } /** * second 连接工厂 */ @Bean(name = "secondConnectionFactory") //这个标签是必须的,因为加上后才能有默认值,不会在项目加载时产生冲突 @Primary public ConnectionFactory secondConnectionFactory(@Value("${my.rabbitmq.host}") String host, @Value("${my.rabbitmq.port}") int port, @Value("${my.rabbitmq.username}") String username, @Value("${my.rabbitmq.password}") String password, @Value("${my.rabbitmq.second.virtual-host}") String vHost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vHost); return connectionFactory; } /** * first生产者 */ @Bean(name = "firstRabbitTemplate") public RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } /** * second生产者 */ @Bean(name = "secondRabbitTemplate") public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } /** * first 消费者 */ @Bean(name = "firstListenerFactory") public SimpleRabbitListenerContainerFactory firstListenerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory(); //设置手动ack listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(listenerContainerFactory, connectionFactory); return listenerContainerFactory; } /** * second 消费者 */ @Bean(name = "secondListenerFactory") public SimpleRabbitListenerContainerFactory secondListenerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory(); listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(listenerContainerFactory, connectionFactory); return listenerContainerFactory; }
@RabbitListener(queues = "你的队列名", containerFactory = "firstListenerFactory") public void dealDeclareMessage(Channel channel, Message message) throws Exception { try { //消息类型 String routingKey = message.getMessageProperties().getReceivedRoutingKey(); log.info(routingKey); //消息内容 JSONObject jsonObject = JSON.parseObject(new String(message.getBody(), StandardCharsets.UTF_8)); //处理你的消息 saveOrUpdate(jsonObject); } catch (Exception e) { log.info("你的队列名"+"异常信息:" + e.getMessage()); } finally { //手动ack //TODO 也需要异常处理 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。