赞
踩
今天在写项目的时候遇到新需求,一个mq的功能要使用我们公司的服务器的mq,一个mq的功能要使用部署的那边的服务器的mq,话不多说直接上代码。
配置文件application.yml:
- spring:
- rabbitmq:
- yjdpeservice:
- host: xxx.xxx.xxx.xxx
- port: 5672
- username: admin
- password: admin
- yjservice:
- host: xxx.xxx.xxx.xxx
- port: 5672
- username: admin
- password: admin
配置类:
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
-
- @Configuration
- public class RabbitPlusConfig {
-
- @Bean(name="mergeConnectionFactory")
- @Primary
- public ConnectionFactory MergeConnectionFactory(
- @Value("${spring.rabbitmq.yjdpeservice.host}") String host,
- @Value("${spring.rabbitmq.yjdpeservice.port}") int port,
- @Value("${spring.rabbitmq.yjdpeservice.username}") String username,
- @Value("${spring.rabbitmq.yjdpeservice.password}") String password
- ){
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setHost(host);
- connectionFactory.setPort(port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- return connectionFactory;
- }
-
- @Bean(name="LocalConnectionFactory")
- public ConnectionFactory LocalConnectionFactory(
- @Value("${spring.rabbitmq.yjservice.host}") String host,
- @Value("${spring.rabbitmq.yjservice.port}") int port,
- @Value("${spring.rabbitmq.yjservice.username}") String username,
- @Value("${spring.rabbitmq.yjservice.password}") String password
- ){
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setHost(host);
- connectionFactory.setPort(port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- return connectionFactory;
- }
-
- @Bean(name="mergeRabbitTemplate")
- @Primary
- public RabbitTemplate mergeRabbitTemplate(
- @Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory
- ){
- RabbitTemplate yjdpRabbitTemplate = new RabbitTemplate(connectionFactory);
- return yjdpRabbitTemplate;
- }
-
- @Bean(name="LocalRabbitTemplate")
- public RabbitTemplate LocalRabbitTemplate(
- @Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory
- ){
- RabbitTemplate yjRabbitTemplate = new RabbitTemplate(connectionFactory);
- return yjRabbitTemplate;
- }
-
- @Bean(name="mergeFactory")
- public SimpleRabbitListenerContainerFactory mergeFactory(
- SimpleRabbitListenerContainerFactoryConfigurer configurer,
- @Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory
- ) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- configurer.configure(factory, connectionFactory);
- return factory;
- }
-
- @Bean(name="LocalFactory")
- public SimpleRabbitListenerContainerFactory LocalFactory(
- SimpleRabbitListenerContainerFactoryConfigurer configurer,
- @Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory
- ) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- configurer.configure(factory, connectionFactory);
- return factory;
- }
-
-
- }
发送消息类:
- import io.renren.common.utils.R;
- import io.swagger.annotations.Api;
- import io.swagger.annotations.ApiOperation;
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessagePostProcessor;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.web.bind.annotation.*;
-
- @Api(tags = "测试双mq发送")
- @RestController
- @RequestMapping("rbt/mq")
- public class RbtMqController {
-
-
- @Autowired
- @Qualifier(value = "mergeRabbitTemplate")
- private RabbitTemplate mergerabbitTemplate;
-
- @Autowired
- @Qualifier(value = "LocalRabbitTemplate")
- private RabbitTemplate LocalrabbitTemplate;
-
-
- @ApiOperation("测试发送mq")
- @PostMapping("/PostMq/{mqone}/{mqtwo}")
- public Object PostMq(@RequestParam("token") String token, @PathVariable String mqone, @PathVariable String mqtwo){
-
- mergerabbitTemplate.convertAndSend("CeshiQueue", (Object) mqone, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- long l = 40000;
- //设置定时发布的时间发送到延时队列 到时间后转交给死信队列
- message.getMessageProperties().setExpiration(String.valueOf(l));
- return message;
-
- }
- });
-
- String msgTwo = "success";
- LocalrabbitTemplate.convertAndSend("CeshiQueue", (Object) mqtwo, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- long l = 40000;
- //设置定时发布的时间发送到延时队列 到时间后转交给死信队列
- message.getMessageProperties().setExpiration(String.valueOf(l));
- return message;
-
- }
- });
- return R.ok();
- }
-
- }
消费者端:
消费LocalFactory对应的mq中的my-dlx-queue-Ceshi
- @Component
- @RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "LocalFactory")
- @Log4j2
- public class locallistener {
-
- @RabbitHandler
- public void RegularlyAddAsCheckIn(String msg) throws Exception {
- log.info(new Date() + "::LocalFactory收到信息::" + msg);
- }
- }
消费mergeFactoryFactory对应的mq中的my-dlx-queue-Ceshi
- @Component
- @RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "mergeFactory")
- @Log4j2
- public class mergerlistener {
-
- @RabbitHandler
- public void RegularlyAddAsCheckIn(String msg) throws Exception {
- log.info(new Date() + "::mergeFactory收到信息::" + msg);
- }
-
- }
完事,实测有效。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。