赞
踩
线上项目发生一次消息堆积问题,项目中使用的是rabbitmq,所以对rabbitmq的消息堆积问题进行了深入的了解,学习.
消息堆积的主要原因有以下几种:
(1)消费者的速度大大慢于生产者的速度,速度不匹配引起的堆积
(2)消费者实例IO严重阻塞或者消费者所在服务器宕机
(3)消费者业务处理异常导致的消息堆积
项目中大部分使用@RabbitmqListener注解的方式处理业务代码中MQ的消费,这个注解用于监听指定的队列,如果containerFactory未指定,默认使用SimpleRabbitListenerContainerFactory实例对象创建一个消息监听容器(SimpleMessageListenerContainer)
默认情况下,rabbitmq的消费者为单线程串行消费,这也是队列的特性,我们可以看看SimpleMessageListenerContainer的源码,从图中可以看到设置并发消费属性concurrentConsumers=1,从字面意义也可以分析出该字段是设置并发消费者的数量,默认为一个监听器设置一个消费者.
private volatile int concurrentConsumers = 1;
rabbitmq容器启动的时候根据设置的concurrentConsumers创建N个BlockingQueueConsumer(N个消费者队列)
protected int initializeConsumers() { int count = 0; synchronized(this.consumersMonitor) { if (this.consumers == null) { this.cancellationLock.reset(); this.consumers = new HashSet(this.concurrentConsumers); for(int i = 0; i < this.concurrentConsumers; ++i) { BlockingQueueConsumer consumer = this.createBlockingQueueConsumer(); this.consumers.add(consumer); ++count; } } return count; } }
接下来我们再看看他继承的抽象类AbstractMessageListenerContainer的构造函数,代码中prefetchCount为设置并发消费的另一个关键属性,prefetchCount指一个消费者每次一次性从broker里面取出的待消费的消息个数,默认值比较吉利prefetchCount=250
public AbstractMessageListenerContainer() { this.proxy = this.delegate; this.shutdownTimeout = 5000L; this.transactionAttribute = new DefaultTransactionAttribute(); this.taskExecutor = new SimpleAsyncTaskExecutor(); this.recoveryBackOff = new FixedBackOff(5000L, 9223372036854775807L); this.messagePropertiesConverter = new DefaultMessagePropertiesConverter(); this.missingQueuesFatal = true; this.possibleAuthenticationFailureFatal = true; this.autoDeclare = true; this.mismatchedQueuesFatal = false; this.failedDeclarationRetryInterval = 5000L; this.autoStartup = true; this.phase = 2147483647; this.active = false; this.running = false; this.lifecycleMonitor = new Object(); this.queueNames = new CopyOnWriteArrayList(); this.errorHandler = new ConditionalRejectingErrorHandler(); this.exposeListenerChannel = true; this.acknowledgeMode = AcknowledgeMode.AUTO; this.deBatchingEnabled = true; this.adviceChain = new Advice[0]; this.defaultRequeueRejected = true; **this.prefetchCount = 250;** this.lastReceive = System.currentTimeMillis(); this.statefulRetryFatalWithNullMessageId = true; this.exclusiveConsumerExceptionLogger = new AbstractMessageListenerContainer.DefaultExclusiveConsumerLogger(); this.lookupKeyQualifier = ""; this.forceCloseChannel = true; }
上面我们已经根据concurrentConsumer的值设置了N个消费者队列,从下面代码中最后一行可以看出消费者队列中维护了一个阻塞队列,其中阻塞队列的大小就由prefetchCount决定.
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) { this.cancelled = new AtomicBoolean(false); this.consumerArgs = new HashMap(); this.deliveryTags = new LinkedHashSet(); this.consumerTags = new ConcurrentHashMap(); this.missingQueues = Collections.synchronizedSet(new HashSet()); this.retryDeclarationInterval = 60000L; this.failedDeclarationRetryInterval = 5000L; this.declarationRetries = 3; this.connectionFactory = connectionFactory; this.messagePropertiesConverter = messagePropertiesConverter; this.activeObjectCounter = activeObjectCounter; this.acknowledgeMode = acknowledgeMode; this.transactional = transactional; this.prefetchCount = prefetchCount; this.defaultRequeueRejected = defaultRequeueRejected; if (consumerArgs != null && consumerArgs.size() > 0) { this.consumerArgs.putAll(consumerArgs); } this.noLocal = noLocal; this.exclusive = exclusive; this.queues = (String[])Arrays.copyOf(queues, queues.length); this.queue = new LinkedBlockingQueue(prefetchCount); }
根据队列的特性可知,如果阻塞队列中一个消息阻塞了,那么所有消息将会被阻塞,如果使用默认设置,concurrentConsumer=1,prefetchCount=250,那么当消费者队列中有一个消息由于某种原因阻塞了,那么该消息的后续消息同样不能被消费.为了防止这种情况的发生,我们可以增大concurrentConsumer的设置,使多个消费者可以并发消费.而prefetchCount该如何设置呢?假设conrrentConsumer=2,prefetchCount采用默认值,rabbitmq容器会初始化两个并发的消费者,每个消费者的阻塞队列大小为250,rabbitmq的机制是将消息投递给consumer1,先为consumer1投递满250个message,再往consumer2投递250个message,如果consumer1的message一直小于250个,consumer2一直处于空闲状态,那么并发消费退化为单消费者.
下面再介绍下concurrentConsumer的设置,两种方式,一种是单个固定的值,如concurrentConsumer=4,另一种是concurrentConsumer=1-4.前面的不需要多说,重点是后面的设置,源码如下
public void setConcurrency(String concurrency) {
try {
int separatorIndex = concurrency.indexOf(45);
if (separatorIndex != -1) {
this.setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
this.setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length())));
} else {
this.setConcurrentConsumers(Integer.parseInt(concurrency));
}
} catch (NumberFormatException var3) {
throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only single fixed integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
}
}
concurrency即为我们设置的参数,45为’-’的ascii码,容器首先设置了一个并发消费者,然后设置了最大并发消费者.maxConcurrentConsumer用于处理在极端情况下,可以实例化的最大的消费者数量,可以对比理解成线程池的核心线程数与最大线程数,在每次消费之初都会判断maxConcurrentConsumers是否为空,如果非空会判断是否对消费者进行弹性扩容,其中consecutiveMessages与consecutiveIdles变量控制是需要新增/减少消费者的标志位,对应的参考值分别为consecutiveActiveTrigger和consecutiveIdleTrigger,两个变量的默认值为10
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
SimpleMessageListenerContainer.this.considerAddingAConsumer();
consecutiveMessages = 0;
}
if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
SimpleMessageListenerContainer.this.considerStoppingAConsumer(this.consumer);
consecutiveIdles = 0;
}
当单个消费者连续接受的消息数量达到10个的时候,开始调用considerAddingAConsumer,判断时间是否满足要求对并发消费者进行扩容.反之就是减少消费者数量.
private void considerAddingAConsumer() {
synchronized(this.consumersMonitor) {
if (this.consumers != null && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
long now = System.currentTimeMillis();
if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
this.addAndStartConsumers(1);
this.lastConsumerStarted = now;
}
}
}
}
如果设置conrrency=1-4这种形式,那么当有消息阻塞的时候,单个消费者不会连续接受10个消息,也就不会调用considerAddingAConsumer函数,也就无法实现并发消费者的目的.当时通知中心项目中出现问题时就只有两个消费者(两台服务器的Listener).
最后总结下rabbitmq并发消费的两个参数prefetchCount和concurrentConsumers
concurrentConsumers是设置并发消费者的个数,可以进行初始化-最大值动态调整,并发消费者可以提高消息的消费能力,防止消息的堆积
prefechCount是每个消费者一次性从broker中取出的消息个数,提高这个参数并不能对消息实现并发消费,仅仅是减少了网络传输的时间
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。