赞
踩
最近工作上需要在一个项目里面使用两个RabbitMq作为数据源,记录一下大概的实现过程,加深影响。
一般情况下,都是依赖单个RabbitMQ作为单个数据,以SpringBoot官网实例为例,在SpringBoot项目的application.properties中配置rabbitmq的连接信息即可,实例代码如下:
package com.example.messagingrabbitmq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class MessagingRabbitmqApplication { static final String topicExchangeName = "spring-boot-exchange"; static final String queueName = "spring-boot"; @Bean Queue queue() { return new Queue(queueName, false); } @Bean TopicExchange exchange() { return new TopicExchange(topicExchangeName); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#"); } @RabbitListener(queues = "${spring.queue}") public void receiveQueueFromXalarm(Message message) { handlerMessage(message); } public static void main(String[] args) throws InterruptedException { SpringApplication.run(MessagingRabbitmqApplication.class, args).close(); } }
从单个RabbitMQ的数据源,我们了解到SpringBoot使用约定优于配置的方式,简化了RabbitMQ开发。在spring-boot-starter-amqp中,根据application.properties中的rabbitmq连接信息,自动创建了连接信息,创建@Queue,@Exchange,@Binding。但是要在一个项目里面使用多个RabbitMQ数据源,需要手动创建其他的RabbitMQ数据源的连接工作,大概的步骤如下:
spring.rabbitmq.main.host=192.168.10.223
spring.rabbitmq.main.port=5672
spring.rabbitmq.main.username=admin
spring.rabbitmq.main.password=admin
spring.rabbitmq.second.used=true
spring.rabbitmq.second.host=192.168.10.224
spring.rabbitmq.second.port=5672
spring.rabbitmq.second.username=admin
spring.rabbitmq.second.password=admin
@Configuration public class RabbitMqConfig { @Value("${direct.exchange.alarm}") private String directExchangeAlarm; @Value("${queue.alarm}") private String queueAlarm; @Value("${routing.key.alarm}") private String routingKeyAlarm; @Value("${direct.exchange.alarm.second}") private String directExchangeAlarmSecond; @Value("${queue.alarm.second}") private String queueAlarmSecond; @Value("${routing.key.alarm.second}") private String routingKeyAlarmSecond; @Value("${spring.rabbitmq.main.ip}") private String mainHost; @Value("${spring.rabbitmq.main.port}") private int mainPort; @Value("${spring.rabbitmq.main.username}") private String mainUserName; @Value("${spring.rabbitmq.main.password}") private String mainPassword; @Value("${spring.rabbitmq.second.ip}") private String secondHost; @Value("${spring.rabbitmq.second.port}") private int secondPort; @Value("${spring.rabbitmq.second.username}") private String secondUserName; @Value("${spring.rabbitmq.second.password}") private String secondPassword; @Bean(name = "mainConnectionFactory") @Primary public ConnectionFactory mainConnectionFactory(){ return connectionFactory(mainHost, mainPort, mainUserName, mainPassword); } @Bean(name = "secondConnectionFactory") @ConditionalOnProperty(prefix = "spring.rabbitmq.second", value = "used", havingValue = "true") public ConnectionFactory secondConnectionFactory(){ return connectionFactory(secondHost, secondPort, secondUserName, secondPassword); } @Bean(name = "mainFactory") @Primary public SimpleRabbitListenerContainerFactory mainFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("mainConnectionFactory") ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "secondFactory") @ConditionalOnProperty(prefix = "spring.rabbitmq.second", value = "used", havingValue = "true") public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } //xalarm行为分析 @Bean public Queue alarmQueue() { return new Queue(queueAlarm); } @Bean DirectExchange alarmExchange() { return new DirectExchange(directExchangeAlarm); } @Bean Binding alarmBinding() { return BindingBuilder.bind(alarmQueue()).to(alarmExchange()).with(routingKeyAlarm); } @Bean @ConditionalOnProperty(prefix = "spring.rabbitmq.second", value = "used", havingValue = "true") public String alarmTQueueSecond(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) throws IOException { HashMap<String, Object> map = new HashMap<>(); map.put("x-expires", 86400000); // 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp map.put("x-max-length", 100000); // 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉 map.put("x-max-length-bytes", 209715200); // 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B, 设置为200M connectionFactory.createConnection().createChannel(false).queueDeclare(queueAlarmSecond, true, false, false, map); connectionFactory.createConnection().createChannel(false).queueBind(queueAlarmSecond, directExchangeAlarmSecond, routingKeyAlarmSecond); return "alarmTQueueSecond"; } private CachingConnectionFactory connectionFactory(String host, int port, String username, String password){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); return connectionFactory; } }
将两个数据处理的公共部分抽取到抽象类中,具体如下所示:
public abstract class AbstractAlarmConsumer { @Autowired private XXXService xxxService; protected void handlerMessage(Message message) { try { String strFromRMQ = new String(message.getBody()); // TODO 公共业务逻辑 transformsMessage(analysisResult); // TODO 公共业务逻辑 } catch (Exception e) { throw new ImmediateAcknowledgeAmqpException(e); } } protected abstract void transformsMessage(AnalysisResult analysisResult) throws ExecutionException; }
@Component
public class XXXAlarmConsumer extends AbstractAlarmConsumer {
@Override
protected void transformsMessage(AnalysisResult analysisResult) throws ExecutionException {
// TODO
}
@RabbitListener(queues = "${queue.xalarm}")
public void receiveQueueFromXalarm(Message message) {
handlerMessage(message);
}
}
@Component
@ConditionalOnProperty(prefix = "spring.rabbitmq.second", value = "used", havingValue = "true")
public class ThirdAiBehaviorConsumer extends AbstractAiBehaviorConsumer {
@RabbitListener(queues = "${queue.xalarm.second}", containerFactory = "secondFactory")
public void receiveSecondQueueFromXalarm(Message message) {
handlerMessage(message);
}
@Override
protected void transformsMessage(AnalysisResult analysisResult) throws ExecutionException {
// TODO
}
}
queueDeclare(String queue, boolean durable, boolean exclusive, Map<String, Object> arguments);
是指该方法中设置arguments,主要用于设置单个queue,主要类型如下:
主要用来设置一组queue,属于全局设置的参数,主要包括:
rabbitmqctl set_policy Main-policy ".*" '{"max-length-bytes":10485760000,"max-length":500000,"message-ttl":259200000}' --apply-to queues
Main-policy:策略名称
“.*”:通配符,所有队列
max-length-bytes:队列最大堆积上线(配置为10G 10485760000)
max-length:最大堆积数量(配置的为50w条)
message-ttl:消息过期时间(默认3天 259200000)
该策略配置后,消息堆积以最先到达条件为准(10G或者50w条),若到达消息堆积上限,默认会将最早的消息丢弃(若到达50w条后,队列中就只会堆积50w条,再有消息进来老的消息会被丢弃)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。