赞
踩
默认情况一下,一个listener对应一个consumer,如果想对应多个,有两种方式。
在消费端,配置prefetch和concurrency参数便可以实现消费端MQ并发处理消息
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
# acknowledge-mode: manual # 手动确定(默认自动确认)
concurrency: 1 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。)
max-concurrency: 10 # 消费端的监听最大个数
prefetch: 10
connection-timeout: 15000 # 超时时间
这个是个全局配置,应用里的任何队列对应的listener都至少有5个consumer,但是最好别这么做,因为一般情况下,一个listener对应一个consumer是够用的。只是针对部分场景,才需要一对多。
在注解中配置该参数
在RabbitListeners注解上加上concurrency属性
会覆盖配置文件中的参数
直接在@RabbitListener上配置
@Component
public class SpringBootMsqConsumer {
@RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10")
public void receive(Message message) {
System.out.println("receive message:" + new String(message.getBody()));
}
}
//@RabbitListener(queues = {"kinson"}, concurrency = "2")
@RabbitListener(queues = {"kinson"}, concurency="min-max") // 最小并发数是5,最大并发是10
public void receiver(Message msg, Channel channel) throws InterruptedException {
// Thread.sleep(10000);
byte[] messageBytes = msg.getBody();
if (messageBytes != null && messageBytes.length > 0) {
//打印数据
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【消3】:{}", message);
}
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "myQueue1"),
exchange = @Exchange(value = "myExchange1"),
key = "routingKey1"
),
concurrency = "10"
)
public void process1(Message message) throws Exception {
// 执行程序
}
利用@RabbitListener中的concurrency属性进行指定就行,例如上面的concurrency = “5-10”
就表示最小5个,最大10个consumer。启动消费端应用后,找到spring-boot-direct-queue这个队列的consumer,会发现有5个。
这5个消费者都可以从spring-boot-direct-queue这个队列中获取消息,加快队列中消息的消费速度,提高吞吐量。
我们经过压测,来判断consumer的消费能力,如果单位时间内,consumer到达的消息太多,也可能把消费者压垮。
得到压测数据后,可以在@RabbitListener中配置prefetch count。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringBootMsqConsumer {
@RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10",containerFactory = "mqConsumerlistenerContainer")
public void receive(Message message) {
System.out.println("receive message:" + new String(message.getBody()));
}
}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfig { @Autowired private CachingConnectionFactory connectionFactory; @Bean(name = "mqConsumerlistenerContainer") public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPrefetchCount(50); return factory; } }
factory.setPrefetchCount(50);,就是用于设置prefetch count的,启动后,会在spring-boot-direct-queue队列的consumer中体现出来。
配置成功后,consumer单位时间内接收到消息就是50条。
@Autowired
CachingConnectionFactory cachingConnectionFactory;
@Bean(name="limitContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrentConsumers(60); // 并发消费者数量
factory.setPrefetchCount(3); // 每次最多拿3个,等这三个处理完之后,再去队列中拿第二批,起到了限流的作用
return factory;
}
@Bean("limitContainerFactory")
public SimpleRabbitListenerContainerFactory pointTaskContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setPrefetchCount(2); //每个消费者预取数量
factory.setConcurrentConsumers(60); //并发消费者数量
configurer.configure(factory, connectionFactory);
return factory;
}
@RabbitHandler
@RabbitListener(queues = QueueContent.MESSAGE_QUEUE_NAME,containerFactory = "limitContainerFactory")
public void handler(String msg,Channel channel, Message message) throws IOException {
......
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。