当前位置:   article > 正文

RocketMQ源码(二十一)之延迟消息_【rocketmq】第一次执行和最后一次执行间隔时间

【rocketmq】第一次执行和最后一次执行间隔时间

版本

  1. 基于rocketmq-all-4.3.1版本

延迟消息

  1. RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息。Broker内部使用SCHEDULE_TOPIC_XXXX主题所有的延迟消息,根据延迟的level的个数,创建对应数量的ConsumeQueue,在创建ConsumeQueue时将其tagCode保存消息需要投递的时间。通过定时任务扫描ConsumeQueue,将满足条件的消息重新投递到原始的Topic中,这样消费者就可以消费了

  2. 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

  3. 延迟消息的ConsumeQueue存储的tagsCode与普通消息不同

    • 延时消息的tagCode:存储的是消息到期的时间
    • 非延时消息的tagCode: tags字符串的hashCode
  4. 延迟消息整体交互图
    在这里插入图片描述

发送延迟消息

  1. Producer发送延迟消息与普通发送没有太大区别,只需要设置延迟一个级别即可。延迟级别并不是时间,只是一个数字,如果超过最大值,则会被重置为最大值

    Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
    //messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    //这里就表示10s
    message.setDelayTimeLevel(3);
    
    • 1
    • 2
    • 3
    • 4
  2. Message#setDelayTimeLevel的实现可以看到,是向消息扩展属性中添加一个DELAY属性

    public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
    
    • 1
    • 2
    • 3
    • 4
  3. Consumer提供消息重试的,在并发模式消费消费失败的情况下,可以返回一个枚举值 ConsumeConcurrentlyStatus.RECONSUME_LATER,那么消息之后将会进行重试。默认会进行重试16次,消息重试。Consumer发送的延迟时间间隔为:10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。即消息重试的16个级别,即重试16次。

    SendMessageProcessor#consumerSendMsgBack
      
    if (0 == delayLevel) {
      	//从10ms开始
         delayLevel = 3 + msgExt.getReconsumeTimes();
     }
     msgExt.setDelayTimeLevel(delayLevel);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

Broker处理延迟消息

  1. Broker端接收处理与普通消息没有区别,只是在存储的时候有一些不同。CommitLog#putMessage对于延迟消息做了相关处理

    • 将消息的Topic更改为延迟消息特定的主题SCHEDULE_TOPIC_XXXX,根据延迟级别获取queueId(等于delayLevel-1
    • 将原始Topic、Queue备份在消息的扩展属性中(为了后续恢复原始,能被消费)
    • 保存消息到CommitLog中,异步生成ConsumeQueue和indexFile,这个和普通消息没什么区别
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    // 事务prepare消息不支持延迟消息
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery 0表示不延迟,大于0表示特定的延迟级别
        if (msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            //延迟投递消息的topic
            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            //根据延迟级别获取queueId(等于delayLevel - 1)
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
            // Backup real topic, queueId
            //存入真实的topic和queueId存入消息属性中
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    				//更改Topic和queueId
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
  2. 执行异步转发创建ConsumeQueue时,会对延迟消息进行单独处理。CommitLog#checkMessageAndReturnSize对延迟消息的特殊处理如下。这里将具体的时间保存在ConsumeQueue的TagCode(不再是Tag的hash,而是投递时间ms)好处是不需要再去检查CommitLog文件,定时任务只需要检查ConsumeQueue即可,这样可以大大提高效率。如果满足条件,再去查询CommitLog将消息投递出去

     // Timing message processing
     {
         String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
         if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
             int delayLevel = Integer.parseInt(t);
    				 // 如果延迟级别边界溢出,则重置为最大
             if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                 delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
             }
    
             if (delayLevel > 0) {
                 // 计算具体的投递时间,并将改时间保存在ConsumeQueue的tagCode中
               	// 投递时间=消息存储时间(storeTimestamp) + 延迟级别对应的时间
                 tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                     storeTimestamp);
             }
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

Broker延迟消息投递

  1. RocketMQ通过ScheduleMessageService定时扫描ConsumeQueue来判断消息是否需要被投递

    public class ScheduleMessageService extends ConfigManager {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
        // 内置的Topic,用于保存所有的定时消息
        public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
        // 第一次执行定时任务的延迟时间
        private static final long FIRST_DELAY_TIME = 1000L;
        // 第一次以后定时任务的检查间隔时间,默认100ms
        private static final long DELAY_FOR_A_WHILE = 100L;
        // 延迟消息投递失败,默认10s后再次重新投递
        private static final long DELAY_FOR_A_PERIOD = 10000L;
        // 延迟级别和延迟时间的映射关系
        private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
            new ConcurrentHashMap<Integer, Long>(32);
        // 延迟级别与消费偏移量的关系
        private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
            new ConcurrentHashMap<Integer, Long>(32);
        // 定时线程
        private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
    
      	...省略...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
  2. DefaultMessageStore#start 启动时会调用ScheduleMessageService#start,启动延迟消息投递线程以及延迟消息偏移量持久化线程

    public void start() {
        // 每个延迟级别都有一个独立的定时任务
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            // 延迟级别
            Integer level = entry.getKey();
            // 延迟时间
            Long timeDelay = entry.getValue();
            // 延迟级别对应的ConsumeQueue的偏移量,从此点开始扫描
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }
            // 延迟时间
            if (timeDelay != null) {
              	 // 从现在起过FIRST_DELAY_TIME毫秒(1000)仅执行一次
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
    
        this.timer.scheduleAtFixedRate(new TimerTask() {
    
            @Override
            public void run() {
                try {
                    ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
  3. ScheduleMessageService.DeliverDelayedMessageTimerTask#DeliverDelayedMessageTimerTask检查队列中没有投递的第一条消息,如果这条消息没有到期,则之后所有的消息都不会进行检查。如果到期了,则投递,并继续检查下一条消息。如果投递失败,则10s后重新投递。如果延迟消息量比较大,可能会造成消息到期后需要很久才能被消费。

    public void executeOnTimeup() {
        // 延迟级别与queueId是一对一关系,可以相互转换
        ConsumeQueue cq =
            ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                delayLevel2QueueId(delayLevel));
    
        long failScheduleOffset = offset;
    
        if (cq != null) {
            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
            if (bufferCQ != null) {
                try {
                    long nextOffset = offset;
                    int i = 0;
                    ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                    for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                        long offsetPy = bufferCQ.getByteBuffer().getLong();
                        int sizePy = bufferCQ.getByteBuffer().getInt();
                        // tagsCode存储的是投递时间
                        long tagsCode = bufferCQ.getByteBuffer().getLong();
    
                        if (cq.isExtAddr(tagsCode)) {
                            if (cq.getExt(tagsCode, cqExtUnit)) {
                                tagsCode = cqExtUnit.getTagsCode();
                            } else {
                                //can't find ext content.So re compute tags code.
                                log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                    tagsCode, offsetPy, sizePy);
                                long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                            }
                        }
    
                        long now = System.currentTimeMillis();
                        // tagsCode保存的是超时时间,纠正当前消息的真正投递时间
                        long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
    
                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        // 判断延迟消息是否过期
                        long countdown = deliverTimestamp - now;
    
                        if (countdown <= 0) {
                            MessageExt msgExt =
                                ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                    offsetPy, sizePy);
    
                            if (msgExt != null) {
                                try {
                                    // 恢复原始消息,清除延迟属性。重新投递消息到原始的Topic和queueId中
                                    MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                    PutMessageResult putMessageResult =
                                        ScheduleMessageService.this.defaultMessageStore
                                            .putMessage(msgInner);
    
                                    if (putMessageResult != null
                                        && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                        continue;
                                    } else {
                                        // XXX: warn and notify me
                                        log.error(
                                            "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                            msgExt.getTopic(), msgExt.getMsgId());
                                        // 如果投递失败,则重新投递,并且更新偏移量
                                        ScheduleMessageService.this.timer.schedule(
                                            new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                nextOffset), DELAY_FOR_A_PERIOD);
                                        ScheduleMessageService.this.updateOffset(this.delayLevel,
                                            nextOffset);
                                        return;
                                    }
                                } catch (Exception e) {
                                    /*
                                     * XXX: warn and notify me
                                     */
                                    log.error(
                                        "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                            + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                            + offsetPy + ",sizePy=" + sizePy, e);
                                }
                            }
                        } else {
                            // 重新投递
                            ScheduleMessageService.this.timer.schedule(
                                new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                countdown);
                            ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                            return;
                        }
                    } // end of for
    
                    // 继续调度下一个,更新消费偏移量
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                        this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                    ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                    return;
                } finally {
    
                    bufferCQ.release();
                }
            } // end of if (bufferCQ != null)
            else {
                // 如果偏移量不正确,打印错误日志
                long cqMinOffset = cq.getMinOffsetInQueue();
                if (offset < cqMinOffset) {
                    failScheduleOffset = cqMinOffset;
                    log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                        + cqMinOffset + ", queueId=" + cq.getQueueId());
                }
            }
        } // end of if (cq != null)
    
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
            failScheduleOffset), DELAY_FOR_A_WHILE);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/603763
推荐阅读
相关标签
  

闽ICP备14008679号