当前位置:   article > 正文

RocketMQ源码分析之consumer并发消费信息_rocketmq consumer是并发消费吗

rocketmq consumer是并发消费吗

  在《RocketMQ源码分析之消息拉取流程》文章最后留下了一个问题:consumer端在接收到消息后是如何消费信息呢?本篇文章就来回答这个问题。
  在RocketMQ中ConsumeMessageService是负责消息消费的,它其实是一个接口,实现该接口的是ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService,这两个服务分别对应一个消费模式,ConsumeMessageConcurrentlyService是并发消费,ConsumeMessageOrderlyService是顺序消费。
  在《RocketMQ源码分析之消息拉取流程》中consumer在收到broker返回的response后会执行回调pullCallback,在回调函数中会将拉取到的消息放入processQueue中,然后再将消息提交到ConsumeMessageQueue,我们就从这里开始分析,即submitConsumeRequest方法。

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);
  • 1
  • 2
  • 3
  • 4
  • 5

  submitConsumeRequest方法主要完成的提交信息消费请求,其实现逻辑如下:
  获取consumeBatchSize,它表示一次消费任务ConsumeRequest中包含的消息条数,默认值是1。msgs.size()最大是32,如果msgs.size()小于consumeBatchSize则直接由msgs、processQueue和messageQueue构建ConsumeRequest,并将ConsumeRequest提交到consumeExecutor(消费者线程池),如果在提交的过程中出现RejectedExecutionException异常则延迟5秒再提交。如果msgs.size()大于consumeBatchSize则将msgs进行拆分,创建多个ConsumeRequest并进行提交,每个ConsumeRequest中包含consumeBatchSize条消息。

public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

  提交完ConsumeRequest后就是消息消费了,具体是执行ConsumeRequest中的run方法,消费的具体逻辑是:
  1.判断processQueue允许被消费,具体是检查其dropped属性,如果为true则表示不能被消费
  2.初始化MessageListener、ConsumeConcurrentlyContext(consumer并发消费上下文)和ConsumeConcurrentlyStatus(consumer并发消费状态,其状态分为两种:CONSUME_SUCCESS和RECONSUME_LATER)
  3.执行resetRetryAndNamespace方法,如果消息来自延迟队列则设置其topic为%RETRY_TOPIC%+consumerGroup
  4.如果开启了消息轨迹功能会在此处设置消费信息上下文consumeMessageContext并执行钩子函数
  5.记录当前的时间(在消息消费完成后与此时间做差用来统计消费消息锁花费的时间)
  6.遍历ConsumeRequest中的每条消息,将每条消息中开始消费时间属性设置为当前的时间,然后使用应用代码中注册的回调messageListener中的consumeMessage方法来消费消息并将消费结果记录在status中
  7.计算消费消息所花费的时间consumeRT
  8.判断执行完consumer中自定义的consumeMessage方法后的status,如果status为空且在消费过程中出现异常则将returnType设置为ConsumeReturnType.EXCEPTION;如果status为空且没有出现异常则将returnType设置为ConsumeReturnType.RETURNNULL;如果consumeRT大于15分钟则将returnType设置为ConsumeReturnType.TIME_OUT;如果status的值为ConsumeConcurrentlyStatus.RECONSUME_LATER则将returnType设置为ConsumeReturnType.FAILED;如果status的值为ConsumeConcurrentlyStatus.CONSUME_SUCCESS则将returnType设置为ConsumeReturnType.SUCCESS
  9.如果开启了消息轨迹功能则将消费消息信息上下文consumeMessageContext中添加ConsumeContextType属性并将该属性的值设置为returnType
  10.判断status是否为空,如果为空则将status设置为ConsumeConcurrentlyStatus.RECONSUME_LATER
  11.如果开启了消息轨迹功能则在此处会执行钩子函数
  12.获取consumerStatsManager对象并将刚刚计算的消费消息花费时间添加到统计信息中
  13.判断processQueue的属性是否为false,如果是false则调用processConsumeResult方法对消费信息的结果进行处理

public void run() {
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
        return;
    }

    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    ConsumeConcurrentlyStatus status = null;
    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

    ConsumeMessageContext consumeMessageContext = null;
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
        consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
        consumeMessageContext.setProps(new HashMap<String, String>());
        consumeMessageContext.setMq(messageQueue);
        consumeMessageContext.setMsgList(msgs);
        consumeMessageContext.setSuccess(false);
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
    }

    long beginTimestamp = System.currentTimeMillis();
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
            }
        }
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        hasException = true;
    }
    long consumeRT = System.currentTimeMillis() - beginTimestamp;
    if (null == status) {
        if (hasException) {
            returnType = ConsumeReturnType.EXCEPTION;
        } else {
            returnType = ConsumeReturnType.RETURNNULL;
        }
    } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
        returnType = ConsumeReturnType.TIME_OUT;
    } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
        returnType = ConsumeReturnType.FAILED;
    } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
        returnType = ConsumeReturnType.SUCCESS;
    }

    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
    }

    if (null == status) {
        log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.setStatus(status.toString());
        consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
    }

    ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
        .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

    if (!processQueue.isDropped()) {
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

  下面详细分析consumer端对消费结果是如何处理的,也就是processConsumeResult方法。
  1.从并发消费上下文中获取ackIndex
  2.判断consumeRequest中的msgs是否为空,如果为空则不做任何处理直接返回
  3.消息在消费后会返回并发消费的状态status,status分为两种分别是CONSUME_SUCCESS和RECONSUME_LATER,如果其消费的status是CONSUME_SUCCESS则将ackIndex设置为msgs.size()-1,并在此时统计consumer消费成功及失败的TPS;如果status为RECONSUME_LATER则将ackIndex设置为-1并统计consumer消费失败的TPS
  4.如果consumer的消费模式是广播模式且ackIndex为-1则表示consumer消费完信息后的状态是RECONSUME_LATER,此时消息不会重新消费只是以警告级别将信息输出到日志中;如果consumer的消费模式是集群模式且ackIndex为-1则表示consumer消费完信息后的状态是RECONSUME_LATER,此时会遍历consumeRequest中的消息并执行sendMessageBack方法,如果sendMessageBack方法执行失败了会将消息的reconsumeTimes属性加1,然后将该消息再次封装成ConsumeRequest,最后延迟5秒将请求提交到consumeExecutor(消费者线程池)再消费
  5.从consumeRequest的processQueue中删除这批消息,然后返回删除这批消息后最小的偏移量,最后用这个偏移量更新消息消费进度

public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;

    switch (status) {
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }

            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);

                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

第一次消费消息失败后消息RocketMQ是如何实现消息再消费?

  下面分析sendMessageBack方法,这个方法是在消息监听器返回的消费结果是RECONSUME_LATER且消费模式是集群模式的情况调用的,这个方法实现的功能是将消费失败的消息发送给broker。具体实现如下:
  1.从并发消费的上下文中获取消息延迟级别
  2.在重新将消息发送到broker前重新设置消息的topic
  3.在并发消费上下文中有消息的MessageQueue信息,根据MessageQueue可以知道brokerName,调用sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)将消息发给broker

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
    int delayLevel = context.getDelayLevelWhenNextConsume();

    // Wrap topic with namespace before sending back message.
    msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
    try {
        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
        return true;
    } catch (Exception e) {
        log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
    }

    return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

  sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)的实现如下:
  1.根据brokerName获取broker的地址
  2.调用consumerSendMessageBack方法将消息发送给broker,从这个方法中可以知道发送给broker的请求是ConsumerSendMsgBackRequestHeader,请求类型是RequestCode.CONSUMER_SEND_MSG_BACK,在这个请求中包含了consumerGroup、消息原始的topic、消息的commitlog offset、消息的延迟级别、消息原来的msgId和消息最大重新消费次数。

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    try {
        String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
            : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
            this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
    } catch (Exception e) {
        log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);

        Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

        String originMsgId = MessageAccessor.getOriginMessageId(msg);
        MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

        newMsg.setFlag(msg.getFlag());
        MessageAccessor.setProperties(newMsg, msg.getProperties());
        MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
        MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
        MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
        MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
        newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

        this.mQClientFactory.getDefaultMQProducer().send(newMsg);
    } finally {
        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

  下面看看broker是如何处理RequestCode.CONSUMER_SEND_MSG_BACK类型的请求,根据请求类型在broker源码中进行搜索可以知道处理该请求的是SendMessageProcessor的asyncConsumerSendMsgBack方法,其实现如下:
  1.判断consumeMessageHookList是否为空以及消息原来的msgId是都为空,如果都不为空则构造ConsumeMessageContext实例context并执行钩子函数
  2.根据请求中的消费者组获取其订阅配置信息,判断订阅配置信息是否为空、broker是否有写的权限、消费组的重试队列数是否小于等于0(即该消费组是否支持重试),如果配置信息为空、broker没有写的权限则会在返回的response的code及remark属性反馈错误,如果重试队列数小于等于0则返回的response是成功的,这仅仅表示消费组不支持重试
  3.创建重试主题并构建TopicConfig对象,其主题名称为,%RETRY%+消费组名称
  4.根据请求中的commitlog offset获取消息,将消息的重试topic存入消息的扩展属性中
  5.如果消息的消费重试次数大于最大重新消费次数或则延迟级别小于0,修改消息的topic名称为%DLQ%,设置该主题的权限为只写,说明消息进入死信队列后将不再被消费;如果消息的延迟级别为0则将消息的延迟级别设置为3与重新消费次数的和
  6.根据原来的消息创建一个新的消息对象,该重试消息会拥有与原来消息不一样且唯一的msgId,将重试消息存储在commitlog中
  7.将重试消息存入commitlog中时会将重试消息的topic和queueId备份到其扩展属性中,属性名称分别是REAL_TOPIC和REAL_QID,然后将其topic重置为SCHEDULE_TOPIC_XXXX,queueId重置为消息延迟级别减一

  重试消息存储在commitlog后并不会被consumer消费而是等到特定的时间后才能被消费,而实现此功能的是ScheduleMessageService,下面详细分析ScheduleMessageService的工作原理。
  ScheduleMessageService是在DefaultMessageStore加载的过程中加载,在DefaultMessageStore启动的过程中启动。ScheduleMessageService加载过程主要实现的是加载延迟消息队列拉取进度与构建delayLevelTable(这个map中记录了消息延迟级别及该级别对应的延迟时间)。ScheduleMessageService启动过程主要完成两个任务:
  (1)为每个延迟级别队列创建定时任务DeliverDelayedMessageTimerTask,该定时任务第一次启动时延迟1秒执行,第二次开始使用对应延迟时间执行,定时任务完成的任务是:找到延迟消息topic的ConsumeQueue,然后从ConsumeQueue中解析出延迟消息的commitlog offset、消息长度及tag hashcode,根据延迟消息的commitlog offset与消息长度从commitlog中读取消息,读取消息后根据该消息构建一个新的消息,这两个消息的区别在于新消息的topic和queueId是之前消息原来的topic和queueId,然后将构建的新消息存储到commitlog中让consumer再次消息消息,最后更新延迟队列拉取进度
  (2)创建定时任务完成持久化延迟队列的消息消费进度的任务,该定时任务每10秒执行一次

public void start() {
    if (started.compareAndSet(false, true)) {
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

  最后通过以下流程图总结下consumer并发消费消息的过程:
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/715414
推荐阅读
相关标签
  

闽ICP备14008679号