赞
踩
RocketMQ 相较于其它消息队列产品的一个特性是支持延时消息,也就是说消息发送到 Broker 不会立马投递给消费者,要等待一个指定的延迟时间再投递,适用场景例如:下单后多长时间没付款系统自动关闭订单。
RocketMQ 4.x 版本的延时消息存在一定的局限性,实现原理是:Broker 内置了名称为SCHEDULE_TOPIC_XXXX
的Topic,包含 18 个对应延时级别的队列,每个延时级别的时间是固定的。Broker 收到消息后,如果发现是延时消息就会改写消息的 Topic 和 queueID,再通过专门的线程定时扫描这些队列,把延迟时间已到的消息重新投递到真正的 Topic 里面。
这种实现方案相对简单,但也存在局限性。消息的延迟时间不能随意设置,只能按照给定的延迟级别来设置;最长的延迟级别是两小时,如果用户需要更久的延时,就只能自行改造方案。
RocketMQ 5.0 意识到了这个问题,终于延时消息迎来升级,废弃了之前延迟级别的设计,消费者可以设置 24 小时内的任意时间,这个限制其实还可以增加,不过堆积太多的消息会带来其它问题。
延时消息的设计是存在一些难点的,下面逐个分析。
1、任意延迟时间如何扫描?
延迟消息肯定需要一个数据结构来存储,为了不影响消息发送的吞吐量,还必须保证写入的高性能。然后还需要有线程定时的扫描这个数据结构,把延迟时间已到的消息投递给消费者。
RocketMQ 4.x 为了保证高性能写,还是把延时消息正常写 CommitLog,只不过换个 Topic。不同延迟时长的消息写入不同队列,这样就能保证每个队列前面的消息肯定比后面的消息先投递,线程扫描的时候顺序扫,只要扫到第一个延迟时间还没到的消息,后面的消息就可以跳过了,避免全局扫描。
2、消息清理问题
RocketMQ 本身可以看作是一个缓冲区,是用来做削峰填谷的,消息不可能一直保留,所以要有定时清理机制。CommitLog 默认会保存三天,如果支持太久的延迟时间,万一 CommitLog 被清理了,延迟时间投递的时候就无法读取出原始消息内容了。
3、大量消息同时到期
大量消息同时到期也是延迟消息比较头疼的问题,毕竟是单线程扫描投递,如果大量消息同时到期,短时间内投递的消息太多,就会导致消息延迟,不过这个问题可以业务上加个随机时间解决。
RocketMQ 5.0 引入了时间轮算法 (TimingWheel) 来支持任意时间的延时消息。
先看一下算法的思想,如图所示,圆环就是一个时间轮,它共有 8 个刻度,假设每个刻度代表一秒钟。延时任务会根据延迟时间添加到时间轮对应的刻度上。Data1、Data2 延迟时间都是一秒,所以被添加到刻度1上;Data4 延迟时间 14 秒,饶了一圈后被添加到刻度6上。同时,会有一个指向当前时间刻度的指针,沿着时间轮顺时针旋转,指针每秒前进一个刻度,并把当前刻度上所有的延迟时间已到的延时任务全部执行一遍。
基于时间轮算法,就可以实现任意时间的延时任务的调度执行,如果你觉得“秒”的精度不够,可以把刻度再拆分的更精细化一些,定时任务跑的频率更高一些即可。
RocketMQ 是怎么实现时间轮算法的呢?
RocketMQ 用 TimerWheel 类来封装时间轮,它实际对应磁盘上一个固定大小的文件,默认文件路径是${StoreHome}/timerwheel
,默认大小约 37 MB。
TimerWheel 由若干个刻度组成,一个刻度就代表一个时间单位,RocketMQ 用Slot
类来描述刻度,默认一个 Slot 代表一秒。TimerWheel 默认有 14 天内以秒为单位的所有 Slot,即 1209600 个 Slot。
每个 Slot 占用固定的 32 字节,格式如下:
字段 | 长度(字节) | 说明 |
---|---|---|
timeMs | 8 | 延迟时间 |
firstPos | 8 | 第一个消息的位置 |
lastPos | 8 | 最后一个消息的位置 |
num | 4 | 消息数量 |
magic | 4 | 魔数(废弃) |
每个 Slot 都有对应的延迟时间,相同延迟时间的消息才会被添加进来,多个延时消息会串联成链表,Slot 用两个指针分别指向了链表的首尾地址。现在的问题是,延时消息存到哪里呢?
为了保证消息写入的性能,延时消息会顺序写入到timerlog
文件里,它有点类似 CommitLog,区别是 timerlog 不会存储完整的消息,因为太浪费空间了。延时消息会被转换成固定的 52 字节的 TimerLog 写入。
写入的 TimerLog 格式如下:
字段 | 长度(字节) | 说明 |
---|---|---|
size | 4 | log 长度,固定值 52 |
prev pos | 8 | 前一条 log 的位置 |
magic | 4 | 魔数 |
curr write time | 8 | 写入时间 |
delayed time | 4 | 延迟时间 |
offsetPy | 8 | 实际消息在 CommitLog 偏移量 |
sizePy | 4 | 实际消息大小 |
hash code of real topic | 4 | 真实 Topic 哈希码 |
reserved value | 8 | 保留位 未使用 |
当有延时消息要被添加到 TimerWheel,首先要根据消息的延迟时间定位到 Slot,然后转换成 52 字节的 TimerLog,顺序写入 timerlog 文件,同时更新 Slot。
如图所示,现在要在 1号 Slot 上再添加一条延时消息1-4
,需要先把1-4
写入 timerlog,1-4
的 prevPos 指针会指向1-3
串联成链表,再把 Slot -> lastPos 指向1-4
。
延时消息存储起来了,接下就是线程定时扫描时间轮上的 Slot,判断消息如果到期就投递到原始 Topic 里面让消费者开始消费,如果消息没到期就重新投递进延时队列,进入下一期的时间轮。
不管是延时消息写入 timerlog、还是从 timerlog 取出消息重新投递,这些工作都是通过起单独的线程定时执行的,这里列举下相关的线程 Service:
结合流程图再回顾一下整体流程,首先是延时消息发到 Broker 被写入 CommitLog:
然后是 TimerMessageStore 启动线程把延时消息写入 timerlog、再定时扫描到期消息重新投递的过程。
客户端 SDK 发送延时消息前,会把延迟时间放在 Message -> system_properties 属性里面,Broker 收到消息准备 putMessage 前,会触发PutMessageHook
预留的钩子函数,其中一个叫handleScheduleMessage
的钩子就是专门处理延时消息的。
最终会调用HookUtils#handleScheduleMessage
方法,如果判断 Broker 启用了时间轮算法,且接收到的是延时消息,就会对消息进行转换。
public static PutMessageResult handleScheduleMessage(BrokerController brokerController, final MessageExtBrokerInner msg) { final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { if (!isRolledTimerMessage(msg)) { if (checkIfTimerMessage(msg)) { if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) { return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null); } // 是延时消息 且启用时间轮算法 消息转换 PutMessageResult tranformRes = transformTimerMessage(brokerController, msg); if (null != tranformRes) { return tranformRes; } } } // Delay Delivery if (msg.getDelayTimeLevel() > 0) { transformDelayLevelMessage(brokerController, msg); } } return null; }
转换的过程就是解析出延迟时间,然后把延迟时间和真实 Topic、QueueID 写入 properties,最后改写 Topic:
private static PutMessageResult transformTimerMessage(BrokerController brokerController, MessageExtBrokerInner msg) { //do transform int delayLevel = msg.getDelayTimeLevel(); long deliverMs; try { // 从properties取出延迟时间 if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) { deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC)) * 1000; } else if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) != null) { deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS)); } else { deliverMs = Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS)); } } catch (Exception e) { return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null); } if (deliverMs > System.currentTimeMillis()) { if (delayLevel <= 0 && deliverMs - System.currentTimeMillis() > brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000) { return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null); } // 处理精度 int timerPrecisionMs = brokerController.getMessageStoreConfig().getTimerPrecisionMs(); if (deliverMs % timerPrecisionMs == 0) { deliverMs -= timerPrecisionMs; } else { deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs; } if (brokerController.getTimerMessageStore().isReject(deliverMs)) { return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_FLOW_CONTROL, null); } // 改写Topic,把真实Topic写入properties MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TIMER_OUT_MS, deliverMs + ""); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(TimerMessageStore.TIMER_TOPIC); msg.setQueueId(0); } else if (null != msg.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)) { return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null); } return null; }
后续就是把消息写入 CommitLog,因为改写了 Topic,所以消费者现在是没办法消费延时消息的。
接着就是 TimerEnqueueGetService 线程消费rmq_sys_wheel_timer
队列,取出延时消息,构建 TimerRequest 放到 enqueuePutQueue。
public boolean enqueue(int queueId) { ...... ConsumeQueue cq = (ConsumeQueue) this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId); SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(offset); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 取出原始消息 MessageExt msgExt = getMessageByCommitOffset(offsetPy, sizePy); // 延迟时间 long delayedTime = Long.parseLong(msgExt.getProperty(TIMER_OUT_MS)); // use CQ offset, not offset in Message msgExt.setQueueOffset(offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)); TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, System.currentTimeMillis(), MAGIC_DEFAULT, msgExt); while (true) { // 延时消息封装成TimerRequest入队 if (enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) { break; } if (!isRunningEnqueue()) { return false; } } } ...... }
再是 TimerEnqueuePutService 线程从 enqueuePutQueue 取出 先前构建好的TimerRequest,写入 timerlog。
public void run() {
......
while (!this.isStopped() || enqueuePutQueue.size() != 0) {
TimerRequest firstReq = enqueuePutQueue.poll(10, TimeUnit.MILLISECONDS);
// 延迟时间小于当前时间,入队dequeuePutQueue
if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {
dequeuePutQueue.put(req);
} else {
// 写入timerlog
boolean doEnqueueRes = doEnqueue(req.getOffsetPy(), req.getSizePy(), req.getDelayTime(), req.getMsg());
req.idempotentRelease(doEnqueueRes || storeConfig.isTimerSkipUnknownError());
}
}
......
}
首先需要定位到 Slot,再顺序写 timerlog,最后更新 Slot 信息。
public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) { LOGGER.debug("Do enqueue [{}] [{}]", new Timestamp(delayedTime), messageExt); long tmpWriteTimeMs = currWriteTimeMs; // 超过一轮时间周期,到期后需要重新进入下一期时间轮等待 boolean needRoll = delayedTime - tmpWriteTimeMs >= timerRollWindowSlots * precisionMs; int magic = MAGIC_DEFAULT; if (needRoll) { magic = magic | MAGIC_ROLL; if (delayedTime - tmpWriteTimeMs - timerRollWindowSlots * precisionMs < timerRollWindowSlots / 3 * precisionMs) { //give enough time to next roll delayedTime = tmpWriteTimeMs + (timerRollWindowSlots / 2) * precisionMs; } else { delayedTime = tmpWriteTimeMs + timerRollWindowSlots * precisionMs; } } boolean isDelete = messageExt.getProperty(TIMER_DELETE_UNIQKEY) != null; if (isDelete) { magic = magic | MAGIC_DELETE; } String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC); // 定位Slot,顺序写timerLog Slot slot = timerWheel.getSlot(delayedTime); ByteBuffer tmpBuffer = timerLogBuffer; tmpBuffer.clear(); tmpBuffer.putInt(TimerLog.UNIT_SIZE); //size tmpBuffer.putLong(slot.lastPos); //prev pos tmpBuffer.putInt(magic); //magic tmpBuffer.putLong(tmpWriteTimeMs); //currWriteTime tmpBuffer.putInt((int) (delayedTime - tmpWriteTimeMs)); //delayTime tmpBuffer.putLong(offsetPy); //offset tmpBuffer.putInt(sizePy); //size tmpBuffer.putInt(hashTopicForMetrics(realTopic)); //hashcode of real topic tmpBuffer.putLong(0); //reserved value, just set to 0 now long ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE); if (-1 != ret) { // 更新timerWheel对应的Slot timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret, isDelete ? slot.num - 1 : slot.num + 1, slot.magic); addMetric(messageExt, isDelete ? -1 : 1); } return -1 != ret; }
不管是 timerlog 还是 timerWheel 文件,都是需要频繁写的,为了提高性能,RocketMQ 均使用 mmap 技术写,然后定时 flush 到磁盘。
然后是 TimerDequeueGetService 线程,定时扫描时间轮,取出到期的TimerRequest,放入dequeueGetQueue
public int dequeue() throws Exception { // 定位到Slot Slot slot = timerWheel.getSlot(currReadTimeMs); if (-1 == slot.timeMs) {// Slot是空的 moveReadTime(); return 0; } long currOffsetPy = slot.lastPos; while (currOffsetPy != -1) { // 定位timerlog timeSbr = timerLog.getWholeBuffer(currOffsetPy); int position = (int) (currOffsetPy % timerLogFileSize); timeSbr.getByteBuffer().position(position); timeSbr.getByteBuffer().getInt(); //size prevPos = timeSbr.getByteBuffer().getLong(); int magic = timeSbr.getByteBuffer().getInt(); long enqueueTime = timeSbr.getByteBuffer().getLong(); long delayedTime = timeSbr.getByteBuffer().getInt() + enqueueTime; long offsetPy = timeSbr.getByteBuffer().getLong(); int sizePy = timeSbr.getByteBuffer().getInt(); // timerlog再转换成TimerRequest TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, enqueueTime, magic); timerRequest.setDeleteList(deleteUniqKeys); for (List<TimerRequest> normalList : splitIntoLists(normalMsgStack)) { for (TimerRequest tr : normalList) { tr.setLatch(normalLatch); } // TimerRequest入队dequeueGetQueue dequeueGetQueue.put(normalList); } } ...... }
再是 TimerDequeueGetMessageService 线程,从 dequeueGetQueue 取出 TimerRequest,从 CommitLog 查询完整消息,放入 dequeuePutQueue。
public void run() { while (!this.isStopped()) { List<TimerRequest> trs = dequeueGetQueue.poll(100 * precisionMs / 1000, TimeUnit.MILLISECONDS); for (int i = 0; i < trs.size(); ) { // CommitLog查询完整消息 MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy()); String uniqkey = MessageClientIDSetter.getUniqID(msgExt); if (null != uniqkey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 && tr.getDeleteList().contains(uniqkey)) { doRes = true; tr.idempotentRelease(); perfs.getCounter("dequeue_delete").flow(1); } else { tr.setMsg(msgExt); while (!isStopped() && !doRes) { // 入队 doRes = dequeuePutQueue.offer(tr, 3, TimeUnit.SECONDS); } } } } }
最后是 TimerDequeuePutMessageService 线程,从 dequeuePutQueue 取出 TimerRequest,判断消息是否到期,到期直接投递到真实的 Topic,没到期进入下一期时间轮。
public void run() { while (!this.isStopped() || dequeuePutQueue.size() != 0) { TimerRequest tr = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS); // 消息转换 如果到期则复原Topic queueID MessageExtBrokerInner msg = convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic())); doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); while (!doRes && !isStopped()) { if (!isRunningDequeue()) { dequeueStatusChangeFlag = true; tmpDequeueChangeFlag = true; break; } // 重写写入CommitLog doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); Thread.sleep(500 * precisionMs / 1000); } } ....... }
时间轮对应的类是 TimerWheel,它对应磁盘上的一个文件,由若干个 Slot 组成,因为要随机读写,所以 RocketMQ 使用 RandomAccessFile 来读写文件。
public class TimerWheel { // 总的Slot数量 public final int slotsTotal; // 时间精度 public final int precisionMs; // 文件名 private String fileName; private final RandomAccessFile randomAccessFile; private final FileChannel fileChannel; // mmap private final MappedByteBuffer mappedByteBuffer; private final ByteBuffer byteBuffer; private final ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<ByteBuffer>() { @Override protected ByteBuffer initialValue() { return byteBuffer.duplicate(); } }; // 时间轮文件大小 private final int wheelLength; }
Slot 类的属性:
public class Slot {
public static final short SIZE = 32;
public final long timeMs; // 延迟时间
public final long firstPos; // 第一个消息的位置
public final long lastPos; // 最后一个消息的位置
public final int num; // 消息数量
public final int magic; //no use now, just keep it
}
最后是 TimerLog,它底层对应一组文件,单个文件限制在 100MB 大小,如果写满了就创建新的文件继续写。因为是顺序写的,所以效率很高。
public class TimerLog { private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); public final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8; private final static int MIN_BLANK_LEN = 4 + 8 + 4; public final static int UNIT_SIZE = 4 //size + 8 //prev pos + 4 //magic value + 8 //curr write time, for trace + 4 //delayed time, for check + 8 //offsetPy + 4 //sizePy + 4 //hash code of real topic + 8; //reserved value, just in case of public final static int UNIT_PRE_SIZE_FOR_MSG = 28; public final static int UNIT_PRE_SIZE_FOR_METRIC = 40; private final MappedFileQueue mappedFileQueue; private final int fileSize; }
RocketMQ 5.0 解除了 4.x 版本延时消息延迟级别的时间限制,现在生产者可以设置任意延迟时间了,功能上更加强大,实现上也更加复杂。RocketMQ 引入了新的时间轮算法,简单理解就是把时间按照精度划分成 N 个Slot,消息会按照延迟时间加入到对应的 Slot,然后线程定时扫描时间轮,把 Slot 对应的到期消息重新投递即可。新的算法不仅实现更复杂,RocketMQ 还需要额外写 timerwheel 和 timerlog 文件,这两个文件也是要持久化定期刷盘的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。