当前位置:   article > 正文

RocketMQ源码解析:消息消费失败后的重试逻辑_consumer消费消息失败,则将消息重新发送回broker

consumer消费消息失败,则将消息重新发送回broker

在这里插入图片描述

消息消费失败重新投递的流程

请添加图片描述
我们接着消息消费的逻辑分析,当消费完成后会调用ConsumeMessageConcurrentlyService#processConsumeResult的方法处理消费的结果,如果消费失败,则会将消息再次发送到这条消息原来存储的broker

为什么要将消息发送到原来存储的broker呢?
因为重试的消息只有消息的基本信息,没有具体的消息体,需要重新从commitLog中获取原来的消息获得消息体

// MQClientAPIImpl#consumerSendMessageBack
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);

// 只发送消息的基本信息,等发送到broker重新从commitLog文件取消息
requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);

// 同步发送重试消息
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
    request, timeoutMillis);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

当消息到达broker端的时候,会根据偏移量从commitLog中找到原来的消息,然后构建新的消息,消息体和原来的消息体一样,消息的topic变更为%RETRY% + consumerGroup,并且将消息的delayLevel+1,如果是第一次重试,则直接将delayLevel设置为3,此时这条消息的存储和消费逻辑就会按照定时消息来处理,定时消息的处理逻辑在后一节分析

源码解析

Consumer端处理消费结果

// ConsumeMessageConcurrentlyService#processConsumeResult
public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
    // 通过ackIndex控制消息是否要进行重试
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;

    switch (status) {
        case CONSUME_SUCCESS:
            // 消费成功
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            // 需要重试消息
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            // 广播模式不会重试消息,失败就丢弃了
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                // 发送重试消息
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }

            // 重发消息失败,5s后再重新消费
            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);

                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    // 不管消息消费成功与否,将消费过的消息从 ProcessQueue 中删除
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        // 不管消息消费成功与否,都会更新消费进度,将消费进度先暂存在本地,后台定时任务会定时将消费进度同步到broker中
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

从这段代码中可以看到当客户端的消费状态为RECONSUME_LATER时,会发送重试消息,当消费者的消费状态为如下三种时会进行重试

  1. 返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  2. 返回null
  3. 主动或被动抛出异常

Consumer端消费重试消息

Consumer端在启动的时候会订阅普通的topic和重试的topic
请添加图片描述

// DefaultMQPushConsumerImpl#copySubscription
switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        break;
    case CLUSTERING:
        // 订阅重试主题
        final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
        this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
        break;
    default:
        break;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Broker端存储消息

Broker端存储消息的流程在SendMessageProcessor#asyncConsumerSendMsgBack,挑重要的源码分析一下

// 超过最大消费次数16次,或者 delayLevel < 0
// 则将消息投递到死信队列
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
    || delayLevel < 0) {
    newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
    queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;

    topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
            DLQ_NUMS_PER_GROUP,
            PermName.PERM_WRITE | PermName.PERM_READ, 0);

    if (null == topicConfig) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("topic[" + newTopic + "] not exist");
        return CompletableFuture.completedFuture(response);
    }
    msgExt.setDelayTimeLevel(0);
} else {
    if (0 == delayLevel) {
        delayLevel = 3 + msgExt.getReconsumeTimes();
    }
    msgExt.setDelayTimeLevel(delayLevel);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

当消息超过最大消费次数16次,或者 delayLevel < 0时,会将消息投递到死信队列中,死信队列的topic名为%DLQ% + consumerGroup,否则将topic名字改为%RETRY% + consumerGroup,放入重试队列

String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
  • 1

重试消息设置了delayLevel后,这个消息就变成了延时消息,到达投递时间后,Consumer就又能消费到这条消息了(虽然消息的topic此时已经变为%RETRY% + consumerGroup了,但是Consumer在启动的时候已经订阅这个topic了哈)

参考博客

[1]https://blog.csdn.net/hsz2568952354/article/details/120422714
好文
[2]https://gitbook.cn/books/5d340810c43fe20aeadc88db/index.html
[3]https://www.baiyp.ren/RocketMQ%E6%B6%88%E8%B4%B9%E8%80%85%E4%BF%9D%E9%9A%9C.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/715398
推荐阅读
相关标签
  

闽ICP备14008679号