赞
踩
上一遍博客介绍了使用spring-boot连接单个rabbitmq服务器发送和接收消息,但是在实际的项目中我们可能需要同时连接多个不同的rabbitmq服务器发送和接收消息。今天简单介绍下如何使用spring-boot连个多个rabbitmq服务器发送和接收消息。
1.配置文件中配置2个rabbitmq连接信息(这里我在2台虚拟机上安装rabbitmq)
server: port: 8091 spring: application: name: rabbit-spring-boot rabbitmq: dev: host: 192.168.98.22 port: 5672 username: root password: root test: host: 192.168.98.23 port: 5672 username: root password: root
2.创建rabbitmq连接配置类
@Configuration public class RabbitMqConfig { // 注意这里使用了primary注解 // 声明连接工厂连接开发服务器 @Primary @Bean(name = "devConnectionFactory") public ConnectionFactory devConnectionFactory(@Value("${spring.rabbitmq.dev.host}") String host, @Value("${spring.rabbitmq.dev.port}") int port, @Value("${spring.rabbitmq.dev.username}") String username, @Value("${spring.rabbitmq.dev.password}") String password) { // 使用@Value直接读取配置文件中的信息 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } // 声明连接工厂连接测试服务器 @Bean(name = "testConnectionFactory") public ConnectionFactory testConnectionFactory(@Value("${spring.rabbitmq.test.host}") String host, @Value("${spring.rabbitmq.test.port}") int port, @Value("${spring.rabbitmq.test.username}") String username, @Value("${spring.rabbitmq.test.password}") String password) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } // 声明开发服务器rabbitTemplate @Bean(name = "devRabbitTemplate") public RabbitTemplate devRabbitTemplate(@Qualifier("devConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); return rabbitTemplate; } // 声明测试服务器连接 rabbitTemplate @Bean(name = "testRabbitTemplate") public RabbitTemplate testRabbitTemplate(@Qualifier("testConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); return rabbitTemplate; } /** * 声明dev containerFactory * @param rabbitListenerContainerFactoryConfigurer * @param connectionFactory * @return */ @Bean(name = "devContainerFactory") public SimpleRabbitListenerContainerFactory devSimpleRabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer, @Qualifier("devConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory containerFactory=new SimpleRabbitListenerContainerFactory(); rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory); return containerFactory; } /** * 声明 test containerFactory * @param rabbitListenerContainerFactoryConfigurer * @param connectionFactory * @return */ @Bean(name = "testContainerFactory") public SimpleRabbitListenerContainerFactory testSimpleRabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer, @Qualifier("testConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory containerFactory=new SimpleRabbitListenerContainerFactory(); rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory); return containerFactory; } @Bean public RestTemplate restTemplate() { return new RestTemplate(); } }
3.创建消息生产者
@Component public class RabbitMQProducer { // 指定dev服务器rabbitTemplate @Resource(name = "devRabbitTemplate") AmqpTemplate devAmqpTemplate; // 指定test服务器rabbitTemplate @Resource(name = "testRabbitTemplate") AmqpTemplate testAmqpTemplate; @Value("${open-care.rabbitmq.exchange.name}") String exchangeName; @Value("${open-care.rabbitmq.queue1-name}") String queueName; public void send(Object object) { try { Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("UTF-8") .setMessageId(UUID.randomUUID().toString()).build(); // 指定exchange交换器名称为fanout_exchange(fanout交换器没有路由键) devAmqpTemplate.send("fanout_exchange","",message); // 使用devAmqpTemplate将消息发送到开发服务器 } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } public void sendMessage(Object object) { try { Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("UTF-8") .setMessageId(UUID.randomUUID().toString()).build(); // 指定交换器名称为fanout_exchange(fanout交换器没有路由键) testAmqpTemplate.send("fanout_exchange","",message); // 使用testAmqpTeplate将消息发送到测试服务器 } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }
4.创建消息消费者
@Component public class RabbitMQConsumer { @RabbitHandler @RabbitListener(queues = "q.test1",containerFactory = "devContainerFactory")// 指定队列名称和容器工厂,接收开发服务器队列q.test1中的消息 public void receive(Object object) { Message message=(Message)object; byte bytes[]=null; if (message != null) { bytes=message.getBody(); } try { String msg=new String(bytes,"UTF-8"); System.out.println("192.168.98.22 q.test1------------------------"+msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @RabbitHandler @RabbitListener(queues = "q.test2",containerFactory = "devContainerFactory")// 指定队列名称和容器工厂,接收开发服务器队列q.test2中的消息 public void receiveQueue1(Object object) { Message message=(Message)object; byte bytes[]=null; if (message != null) { bytes=message.getBody(); } try { String msg=new String(bytes,"UTF-8"); System.out.println("192.168.98.22 q.test2------------------------"+msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @RabbitHandler @RabbitListener(queues = "q.test3",containerFactory = "devContainerFactory")// 指定队列名称和容器工厂,接收开发服务器队列q.test3中的消息 public void receiveQueue2(Object object) { Message message=(Message)object; byte bytes[]=null; if (message != null) { bytes=message.getBody(); } try { String msg=new String(bytes,"UTF-8"); System.out.println("192.168.98.22 q.test3------------------------"+msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @RabbitHandler @RabbitListener(queues = "q.test1",containerFactory = "testContainerFactory") // 指定队列名称和容器工厂,接收测试服务器队列q.test1中的消息 public void receive23message(Object object) { Message message=(Message)object; byte bytes[]=null; if (message != null) { bytes=message.getBody(); } try { String msg=new String(bytes,"UTF-8"); System.out.println("192.168.98.23 q.test1------------------------"+msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @RabbitHandler @RabbitListener(queues = "q.fanout1",containerFactory = "testContainerFactory")// 指定队列名称和容器工厂,接收测试服务器队列q.fanout1中的消息 public void receiveFanoutQueue(Object object) { Message message=(Message)object; byte bytes[]=null; if (message != null) { bytes=message.getBody(); } try { String msg=new String(bytes,"UTF-8"); System.out.println("192.168.98.23 q.fanout1------------------------"+msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。