赞
踩
原因:消费端由于没有确认消息,导致队列阻塞,这是RabbitMQ
的一种保护机制。防止当消息激增的时候,海量的消息进入consumer
而引发consumer
宕机。
RabbitMQ提供了一种QOS(服务质量保证)功能,即在开启手动确认消息的前提下,限制信道上的消费者所能保持的最大未确认的数量。可以通过设置PrefetchCount实现。
举例说明:可以理解为在consumer前面加了一个缓冲容器,容器能容纳最大的消息数量就是PrefetchCount。如果容器没有满RabbitMQ就会将消息投递到容器内,如果满了就不投递了。当consumer对消息进行ack以后就会将此消息移除,从而放入新的消息。
- // 最小并发数
- container.setConcurrentConsumers(1);
- // 最大并发数
- container.setMaxConcurrentConsumers(5);
- // 削峰限流:消费端限流,一次处理一条数据。
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
- //一次处理的消息数量
- container.setPrefetchCount(2);
-
- //参数为0,轮询发送处理的消息,一次一条
- container.setPrefetchCount(0);
判断队列的阻塞风险
当ack模式为manual,并且线上出现了unacked消息,这个时候不用慌。由于QOS是限制信道channel上的消费者所能保持的最大未确认的数量。所以允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。
channlCount就是由concurrency,max-concurrency决定的。
- min = concurrency * prefetch * 节点数量
- max = max-concurrency * prefetch * 节点数量
- 由此可以的出结论
- unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。
- unacked_msg_count >= min 可能会出现堵塞。
- unacked_msg_count >= max 队列一定阻塞。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。