赞
踩
- rabbitmq:
- first:
- host:
- port:
- username:
- password:
- #虚拟host 可以不设置,使用server默认host
- virtual-host: /
- second:
- host:
- port:
- username:
- password:
- virtual-host: /
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
-
- /**
- * RabbitMq多源配置
- *
- * @author lq
- */
- @Configuration
- public class RabbitConfig {
-
- @Bean(name = "firstConnectionFactory")
- @Primary
- public ConnectionFactory firstConnectionFactory(
- @Value("${spring.rabbitmq.first.host}") String host,
- @Value("${spring.rabbitmq.first.port}") int port,
- @Value("${spring.rabbitmq.first.username}") String username,
- @Value("${spring.rabbitmq.first.password}") String password,
- @Value("${spring.rabbitmq.first.virtual-host}") String virtualHost
- ) {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
-
- connectionFactory.setHost(host);
- connectionFactory.setPort(port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost(virtualHost);
- return connectionFactory;
- }
-
- @Bean(name = "secondConnectionFactory")
- public ConnectionFactory secondConnectionFactory(
- @Value("${spring.rabbitmq.second.host}") String host,
- @Value("${spring.rabbitmq.second.port}") int port,
- @Value("${spring.rabbitmq.second.username}") String username,
- @Value("${spring.rabbitmq.second.password}") String password,
- @Value("${spring.rabbitmq.second.virtual-host}") String virtualHost
- ) {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setHost(host);
- connectionFactory.setPort(port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost(virtualHost);
- return connectionFactory;
- }
-
- @Bean(name = "firstRabbitTemplate")
- @Primary
- public RabbitTemplate firstRabbitTemplate(
- @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
- ) {
- RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
- return firstRabbitTemplate;
- }
-
- @Bean(name = "secondRabbitTemplate")
- public RabbitTemplate secondRabbitTemplate(
- @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
- ) {
- RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
- return secondRabbitTemplate;
- }
-
-
- @Bean(name = "firstFactory")
- public SimpleRabbitListenerContainerFactory firstFactory(
- SimpleRabbitListenerContainerFactoryConfigurer configurer,
- @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
- ) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- configurer.configure(factory, connectionFactory);
- return factory;
- }
-
- @Bean(name = "secondFactory")
- public SimpleRabbitListenerContainerFactory secondFactory(
- SimpleRabbitListenerContainerFactoryConfigurer configurer,
- @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
- ) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- configurer.configure(factory, connectionFactory);
- return factory;
- }
- }
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import java.io.IOException;
-
- /**
- * 信道构建器
- *
- * @author liuqi
- */
- @Configuration
- public class CreateQueue {
-
- @Bean
- public String chargeQueue(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
- try {
- connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME, true, false, false, null);
- }catch (IOException e){
- e.printStackTrace();
- }
- return Constants.RABBITMQ_QUEUE_NAME;
- }
-
- @Bean
- public String chargeQueue2(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
- try {
- connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME2, true, false, false, null);
- }catch (IOException e){
- e.printStackTrace();
- }
- return Constants.RABBITMQ_QUEUE_NAME2;
- }
- }
- import com.alibaba.fastjson.JSONObject;
- import com.google.gson.JsonObject;
- import com.zlhy.websocket.util.constant.Constants;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.tomcat.util.bcel.Const;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.Resource;
- import java.util.Map;
-
- /**
- * @author liuqi
- * @version 1.0
- * @description 向关联方的队列发送消息
- */
- @Slf4j
- @Service
- public class SendMessage {
-
- @Resource(name = "firstRabbitTemplate")
- private RabbitTemplate firstRabbitTemplate;
-
- @Resource(name = "secondRabbitTemplate")
- private RabbitTemplate secondRabbitTemplate;
-
- public void sendToOneMessage(JSONObject jsonObject) {
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setContentType("application/json");
- Message info = new Message(jsonObject.toString().getBytes(), messageProperties);
- firstRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME, info);
- }
-
- public void sendToTwoMessage(JSONObject jsonObject) {
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setContentType("application/json");
- Message info = new Message(jsonObject.toString().getBytes(), messageProperties);
- secondRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME2, info);
- }
- }
-
sendMessage.sendToOneMessage(jsonResult);
sendMessage.sendToTwoMessage(jsonResult);
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
-
-
- @Slf4j
- @Service
- public class OneReceive {
-
- @RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME, containerFactory = "firstFactory")
- public void listenOne(Message message, Channel channel) throws IOException {
- //获取MQ返回的数据
- // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- String data = new String(message.getBody(), StandardCharsets.UTF_8);
- log.info("MQ1返回的数据:{}", data);
- //下面进行业务逻辑处理
- }
- }
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
-
-
- @Slf4j
- @Service
- public class TwoReceive {
- @RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME2, containerFactory = "secondFactory")
- public void listenTwo(Message message, Channel channel) throws IOException {
- //获取MQ返回的数据
- //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- String data = new String(message.getBody(), StandardCharsets.UTF_8);
- log.info("MQ2返回的数据:{}", data);
- //下面进行业务逻辑处理
- }
-
- }
也可放到一个类中测试消费
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。