赞
踩
在《RocketMQ源码分析之消息拉取流程》文章最后留下了一个问题:consumer端在接收到消息后是如何消费信息呢?本篇文章就来回答这个问题。
在RocketMQ中ConsumeMessageService是负责消息消费的,它其实是一个接口,实现该接口的是ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService,这两个服务分别对应一个消费模式,ConsumeMessageConcurrentlyService是并发消费,ConsumeMessageOrderlyService是顺序消费。
在《RocketMQ源码分析之消息拉取流程》中consumer在收到broker返回的response后会执行回调pullCallback,在回调函数中会将拉取到的消息放入processQueue中,然后再将消息提交到ConsumeMessageQueue,我们就从这里开始分析,即submitConsumeRequest方法。
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
submitConsumeRequest方法主要完成的提交信息消费请求,其实现逻辑如下:
获取consumeBatchSize,它表示一次消费任务ConsumeRequest中包含的消息条数,默认值是1。msgs.size()最大是32,如果msgs.size()小于consumeBatchSize则直接由msgs、processQueue和messageQueue构建ConsumeRequest,并将ConsumeRequest提交到consumeExecutor(消费者线程池),如果在提交的过程中出现RejectedExecutionException异常则延迟5秒再提交。如果msgs.size()大于consumeBatchSize则将msgs进行拆分,创建多个ConsumeRequest并进行提交,每个ConsumeRequest中包含consumeBatchSize条消息。
public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }
提交完ConsumeRequest后就是消息消费了,具体是执行ConsumeRequest中的run方法,消费的具体逻辑是:
1.判断processQueue允许被消费,具体是检查其dropped属性,如果为true则表示不能被消费
2.初始化MessageListener、ConsumeConcurrentlyContext(consumer并发消费上下文)和ConsumeConcurrentlyStatus(consumer并发消费状态,其状态分为两种:CONSUME_SUCCESS和RECONSUME_LATER)
3.执行resetRetryAndNamespace方法,如果消息来自延迟队列则设置其topic为%RETRY_TOPIC%+consumerGroup
4.如果开启了消息轨迹功能会在此处设置消费信息上下文consumeMessageContext并执行钩子函数
5.记录当前的时间(在消息消费完成后与此时间做差用来统计消费消息锁花费的时间)
6.遍历ConsumeRequest中的每条消息,将每条消息中开始消费时间属性设置为当前的时间,然后使用应用代码中注册的回调messageListener中的consumeMessage方法来消费消息并将消费结果记录在status中
7.计算消费消息所花费的时间consumeRT
8.判断执行完consumer中自定义的consumeMessage方法后的status,如果status为空且在消费过程中出现异常则将returnType设置为ConsumeReturnType.EXCEPTION;如果status为空且没有出现异常则将returnType设置为ConsumeReturnType.RETURNNULL;如果consumeRT大于15分钟则将returnType设置为ConsumeReturnType.TIME_OUT;如果status的值为ConsumeConcurrentlyStatus.RECONSUME_LATER则将returnType设置为ConsumeReturnType.FAILED;如果status的值为ConsumeConcurrentlyStatus.CONSUME_SUCCESS则将returnType设置为ConsumeReturnType.SUCCESS
9.如果开启了消息轨迹功能则将消费消息信息上下文consumeMessageContext中添加ConsumeContextType属性并将该属性的值设置为returnType
10.判断status是否为空,如果为空则将status设置为ConsumeConcurrentlyStatus.RECONSUME_LATER
11.如果开启了消息轨迹功能则在此处会执行钩子函数
12.获取consumerStatsManager对象并将刚刚计算的消费消息花费时间添加到统计信息中
13.判断processQueue的属性是否为false,如果是false则调用processConsumeResult方法对消费信息的结果进行处理
public void run() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap<String, String>()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
下面详细分析consumer端对消费结果是如何处理的,也就是processConsumeResult方法。
1.从并发消费上下文中获取ackIndex
2.判断consumeRequest中的msgs是否为空,如果为空则不做任何处理直接返回
3.消息在消费后会返回并发消费的状态status,status分为两种分别是CONSUME_SUCCESS和RECONSUME_LATER,如果其消费的status是CONSUME_SUCCESS则将ackIndex设置为msgs.size()-1,并在此时统计consumer消费成功及失败的TPS;如果status为RECONSUME_LATER则将ackIndex设置为-1并统计consumer消费失败的TPS
4.如果consumer的消费模式是广播模式且ackIndex为-1则表示consumer消费完信息后的状态是RECONSUME_LATER,此时消息不会重新消费只是以警告级别将信息输出到日志中;如果consumer的消费模式是集群模式且ackIndex为-1则表示consumer消费完信息后的状态是RECONSUME_LATER,此时会遍历consumeRequest中的消息并执行sendMessageBack方法,如果sendMessageBack方法执行失败了会将消息的reconsumeTimes属性加1,然后将该消息再次封装成ConsumeRequest,最后延迟5秒将请求提交到consumeExecutor(消费者线程池)再消费
5.从consumeRequest的processQueue中删除这批消息,然后返回删除这批消息后最小的偏移量,最后用这个偏移量更新消息消费进度
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
下面分析sendMessageBack方法,这个方法是在消息监听器返回的消费结果是RECONSUME_LATER且消费模式是集群模式的情况调用的,这个方法实现的功能是将消费失败的消息发送给broker。具体实现如下:
1.从并发消费的上下文中获取消息延迟级别
2.在重新将消息发送到broker前重新设置消息的topic
3.在并发消费上下文中有消息的MessageQueue信息,根据MessageQueue可以知道brokerName,调用sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)将消息发给broker
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
int delayLevel = context.getDelayLevelWhenNextConsume();
// Wrap topic with namespace before sending back message.
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)的实现如下:
1.根据brokerName获取broker的地址
2.调用consumerSendMessageBack方法将消息发送给broker,从这个方法中可以知道发送给broker的请求是ConsumerSendMsgBackRequestHeader,请求类型是RequestCode.CONSUMER_SEND_MSG_BACK,在这个请求中包含了consumerGroup、消息原始的topic、消息的commitlog offset、消息的延迟级别、消息原来的msgId和消息最大重新消费次数。
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); String originMsgId = MessageAccessor.getOriginMessageId(msg); MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); newMsg.setFlag(msg.getFlag()); MessageAccessor.setProperties(newMsg, msg.getProperties()); MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.mQClientFactory.getDefaultMQProducer().send(newMsg); } finally { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); } }
下面看看broker是如何处理RequestCode.CONSUMER_SEND_MSG_BACK类型的请求,根据请求类型在broker源码中进行搜索可以知道处理该请求的是SendMessageProcessor的asyncConsumerSendMsgBack方法,其实现如下:
1.判断consumeMessageHookList是否为空以及消息原来的msgId是都为空,如果都不为空则构造ConsumeMessageContext实例context并执行钩子函数
2.根据请求中的消费者组获取其订阅配置信息,判断订阅配置信息是否为空、broker是否有写的权限、消费组的重试队列数是否小于等于0(即该消费组是否支持重试),如果配置信息为空、broker没有写的权限则会在返回的response的code及remark属性反馈错误,如果重试队列数小于等于0则返回的response是成功的,这仅仅表示消费组不支持重试
3.创建重试主题并构建TopicConfig对象,其主题名称为,%RETRY%+消费组名称
4.根据请求中的commitlog offset获取消息,将消息的重试topic存入消息的扩展属性中
5.如果消息的消费重试次数大于最大重新消费次数或则延迟级别小于0,修改消息的topic名称为%DLQ%,设置该主题的权限为只写,说明消息进入死信队列后将不再被消费;如果消息的延迟级别为0则将消息的延迟级别设置为3与重新消费次数的和
6.根据原来的消息创建一个新的消息对象,该重试消息会拥有与原来消息不一样且唯一的msgId,将重试消息存储在commitlog中
7.将重试消息存入commitlog中时会将重试消息的topic和queueId备份到其扩展属性中,属性名称分别是REAL_TOPIC和REAL_QID,然后将其topic重置为SCHEDULE_TOPIC_XXXX,queueId重置为消息延迟级别减一
重试消息存储在commitlog后并不会被consumer消费而是等到特定的时间后才能被消费,而实现此功能的是ScheduleMessageService,下面详细分析ScheduleMessageService的工作原理。
ScheduleMessageService是在DefaultMessageStore加载的过程中加载,在DefaultMessageStore启动的过程中启动。ScheduleMessageService加载过程主要实现的是加载延迟消息队列拉取进度与构建delayLevelTable(这个map中记录了消息延迟级别及该级别对应的延迟时间)。ScheduleMessageService启动过程主要完成两个任务:
(1)为每个延迟级别队列创建定时任务DeliverDelayedMessageTimerTask,该定时任务第一次启动时延迟1秒执行,第二次开始使用对应延迟时间执行,定时任务完成的任务是:找到延迟消息topic的ConsumeQueue,然后从ConsumeQueue中解析出延迟消息的commitlog offset、消息长度及tag hashcode,根据延迟消息的commitlog offset与消息长度从commitlog中读取消息,读取消息后根据该消息构建一个新的消息,这两个消息的区别在于新消息的topic和queueId是之前消息原来的topic和queueId,然后将构建的新消息存储到commitlog中让consumer再次消息消息,最后更新延迟队列拉取进度
(2)创建定时任务完成持久化延迟队列的消息消费进度的任务,该定时任务每10秒执行一次
public void start() { if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); } }
最后通过以下流程图总结下consumer并发消费消息的过程:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。