赞
踩
上一遍博客介绍了使用spring-boot连接单个rabbitmq服务器发送和接收消息,但是在实际的项目中我们可能需要同时连接多个不同的rabbitmq服务器发送和接收消息。今天简单介绍下如何使用spring-boot连个多个rabbitmq服务器发送和接收消息。
1.配置文件中配置2个rabbitmq连接信息(这里我在2台虚拟机上安装rabbitmq)
server:
port: 8091
spring:
application:
name: rabbit-spring-boot
rabbitmq:
dev:
host: 192.168.98.22
port: 5672
username: root
password: root
test:
host: 192.168.98.23
port: 5672
username: root
password: root
2.创建rabbitmq连接配置类
@Configuration
public class RabbitMqConfig {
// 注意这里使用了primary注解
// 声明连接工厂连接开发服务器
@Primary
@Bean(name = "devConnectionFactory")
public ConnectionFactory devConnectionFactory(@Value("${spring.rabbitmq.dev.host}") String host,
@Value("${spring.rabbitmq.dev.port}") int port,
@Value("${spring.rabbitmq.dev.username}") String username,
@Value("${spring.rabbitmq.dev.password}") String password) {
// 使用@Value直接读取配置文件中的信息
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
// 声明连接工厂连接测试服务器
@Bean(name = "testConnectionFactory")
public ConnectionFactory testConnectionFactory(@Value("${spring.rabbitmq.test.host}") String host,
@Value("${spring.rabbitmq.test.port}") int port,
@Value("${spring.rabbitmq.test.username}") String username,
@Value("${spring.rabbitmq.test.password}") String password) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
// 声明开发服务器rabbitTemplate
@Bean(name = "devRabbitTemplate")
public RabbitTemplate devRabbitTemplate(@Qualifier("devConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
// 声明测试服务器连接 rabbitTemplate
@Bean(name = "testRabbitTemplate")
public RabbitTemplate testRabbitTemplate(@Qualifier("testConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
/**
* 声明dev containerFactory
* @param rabbitListenerContainerFactoryConfigurer
* @param connectionFactory
* @return
*/
@Bean(name = "devContainerFactory")
public SimpleRabbitListenerContainerFactory devSimpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer,
@Qualifier("devConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory containerFactory=new SimpleRabbitListenerContainerFactory();
rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory);
return containerFactory;
}
/**
* 声明 test containerFactory
* @param rabbitListenerContainerFactoryConfigurer
* @param connectionFactory
* @return
*/
@Bean(name = "testContainerFactory")
public SimpleRabbitListenerContainerFactory testSimpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer,
@Qualifier("testConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory containerFactory=new SimpleRabbitListenerContainerFactory();
rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory);
return containerFactory;
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
3.创建消息生产者
@Component
public class RabbitMQProducer {
// 指定dev服务器rabbitTemplate
@Resource(name = "devRabbitTemplate")
AmqpTemplate devAmqpTemplate;
// 指定test服务器rabbitTemplate
@Resource(name = "testRabbitTemplate")
AmqpTemplate testAmqpTemplate;
@Value("${open-care.rabbitmq.exchange.name}")
String exchangeName;
@Value("${open-care.rabbitmq.queue1-name}")
String queueName;
public void send(Object object) {
try {
Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8"))
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("UTF-8")
.setMessageId(UUID.randomUUID().toString()).build();
// 指定exchange交换器名称为fanout_exchange(fanout交换器没有路由键)
devAmqpTemplate.send("fanout_exchange","",message);
// 使用devAmqpTemplate将消息发送到开发服务器
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public void sendMessage(Object object) {
try {
Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8"))
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("UTF-8")
.setMessageId(UUID.randomUUID().toString()).build();
// 指定交换器名称为fanout_exchange(fanout交换器没有路由键)
testAmqpTemplate.send("fanout_exchange","",message);
// 使用testAmqpTeplate将消息发送到测试服务器
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
4.创建消息消费者
@Component
public class RabbitMQConsumer {
@RabbitHandler
@RabbitListener(queues = "q.test1",containerFactory = "devContainerFactory")// 指定队列名称和容器工厂,接收开发服务器队列q.test1中的消息
public void receive(Object object) {
Message message=(Message)object;
byte bytes[]=null;
if (message != null) {
bytes=message.getBody();
}
try {
String msg=new String(bytes,"UTF-8");
System.out.println("192.168.98.22 q.test1------------------------"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@RabbitHandler
@RabbitListener(queues = "q.test2",containerFactory = "devContainerFactory")// 指定队列名称和容器工厂,接收开发服务器队列q.test2中的消息
public void receiveQueue1(Object object) {
Message message=(Message)object;
byte bytes[]=null;
if (message != null) {
bytes=message.getBody();
}
try {
String msg=new String(bytes,"UTF-8");
System.out.println("192.168.98.22 q.test2------------------------"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@RabbitHandler
@RabbitListener(queues = "q.test3",containerFactory = "devContainerFactory")// 指定队列名称和容器工厂,接收开发服务器队列q.test3中的消息
public void receiveQueue2(Object object) {
Message message=(Message)object;
byte bytes[]=null;
if (message != null) {
bytes=message.getBody();
}
try {
String msg=new String(bytes,"UTF-8");
System.out.println("192.168.98.22 q.test3------------------------"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@RabbitHandler
@RabbitListener(queues = "q.test1",containerFactory = "testContainerFactory")
// 指定队列名称和容器工厂,接收测试服务器队列q.test1中的消息
public void receive23message(Object object) {
Message message=(Message)object;
byte bytes[]=null;
if (message != null) {
bytes=message.getBody();
}
try {
String msg=new String(bytes,"UTF-8");
System.out.println("192.168.98.23 q.test1------------------------"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@RabbitHandler
@RabbitListener(queues = "q.fanout1",containerFactory = "testContainerFactory")// 指定队列名称和容器工厂,接收测试服务器队列q.fanout1中的消息
public void receiveFanoutQueue(Object object) {
Message message=(Message)object;
byte bytes[]=null;
if (message != null) {
bytes=message.getBody();
}
try {
String msg=new String(bytes,"UTF-8");
System.out.println("192.168.98.23 q.fanout1------------------------"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。