当前位置:   article > 正文

RocketMq——Broker消息高性能存储分析_rmqbroker

rmqbroker

摘要

生产者调用MQProducer.send()方法会将消息发送到Broker,Broker是如何处理该请求,以及消息是如何存储的呢?

RocketMQ网络通信协议被封装成Java对象RemotingCommand,消息发送也是一个请求,对应的请求头为SendMessageRequestHeader,头信息里就标明了消息是由哪个Group生产的、要发到哪个Topic下、消息属性是什么等等,

  1. SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
  2. requestHeader.setXXX();//设置消息的各种信息
  3. RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
  4. RemotingCommand response = remotingClient.invokeSync(addr, request, timeoutMillis);

Broker服务启动的同时,会启动Netty服务端来处理客户端的请求,当对应的客户端Channel有事件时,会通过NettyServerHandler进行处理,因为是处理客户端的请求命令,于是会调用processRequestCommand方法,根据客户端的RequestCode去匹配请求处理器NettyRequestProcessor,消息发送有两个RequestCode,分别是10和310,对应的常量为SEND_MESSAGE和SEND_MESSAGE_V2,从名字就可以看出来只是版本不同而已,V2针对属性名做了简化,FastJson序列化的性能会更好一些,区别不大,对应的Processor都是SendMessageProcessor。​找到SendMessageProcessor后,会调用其asyncProcessRequest方法异步的处理该请求,到此,才开始进入消息的核心处理逻辑。

一、高性能存储

RocketMQ的一大特点就是海量的消息积压能力,支持亿级别的消息积压,还能保证性能不受太大影响,这要归功于RocketMQ精心设计的存储系统。

要做到海量的消息积压,首先可以肯定的是,消息不能直接存储在内存中,一个是内存空间有限,其次是面对如此庞大的数据,不可能做到近实时的持久化,断电数据就丢了是不能被接受的。所以,RocketMQ直接把消息存储到磁盘。

磁盘IO效率慢的概念已经深入人心,每次消息发送都要写入磁盘,那岂不是性能极差?RocketMQ如何保证高性能呢

1.1 顺序写

其实,磁盘如果利用的好,它的效率比你想象的要快得多。磁盘随机写的效率确实很差,只有100KB每秒的写入速度,但是对于顺序写,在Page Cache的加持下,它的写入速度能达到600MB每秒,这已经超过了绝大多数网卡的读写速度了,所以只要能保证顺序写,磁盘IO并不是性能瓶颈。

这里稍微提下,Page Cache是Linux系统的高速页缓存,一个页大小是4KB,它是用来提高磁盘IO效率的。当你要从磁盘读数据时,它会从内存中分配一个Page,然后将磁盘数据读入Page Cache,当你下次再次读取时,就能命中缓存,不用再读磁盘了。写数据也是一样,先写到Page Cache,然后将该页置为「Dirty」脏的,然后由系统统一将这些脏页数据刷盘。因此,在Page Cache的加持下,磁盘顺序写的效率几乎等于操作内存。

RocketMQ存储消息,主要涉及到三大类文件,分别是:CommitLog、ConsumerQueue、Index。

CommitLog存储Broker上所有的消息,不管你是哪个Topic下的,全部写到CommitLog文件,因此它是完全的顺序写。

ConsumerQueue是RocketMQ用来加速消费者消费消息的索引文件,每个Topic是一个文件夹,下面再以QueueID分片存储,消息写入到CommitLog后,还要往对应的ConsumerQueue文件写入一个索引信息,它也是顺序写的。

Index是RocketMQ用来实现消息查询的索引文件,有了它就可以通过Key和时间范围快速查询消息,同样的,消息写入到CommitLog后,也会往Index中写入索引数据,也是顺序写的。

综上所属,RocketMQ消息存储涉及的主要文件,全是顺序写的,这便保证了Broker消息存储的高性能。

1.2 内存映射与零拷贝

以前,我们从磁盘读写数据时,均需要经过至少两次数据拷贝。

  • 读:磁盘 > 内核缓冲区 > JVM内存。
  • 写:JVM内存 > 内核缓冲区 磁盘。

而内存映射技术,不管是读还是写,均只需要一次数据拷贝。

  • 读:磁盘 > 内核缓冲区。
  • 写:内核缓冲区 > 磁盘。

用户空间直接拿应用程序的逻辑内存地址映射到Linux系统的内核缓冲区,这样应用程序看似读写的是自己的内存,其实读写的是内核缓冲区,数据不用在内核空间和用户空间来回拷贝了,不仅减少了内存复制的开销,还避免了因系统调用引起的软中断。

零拷贝是提升IO效率的终极利器,以前,如果我们需要把磁盘中的数据发送到网络,至少需要经过4次数据拷贝:磁盘 > 内核缓冲区 > JVM > Socket缓冲区 > 网卡。

利用内存映射,最多只需要三次数据拷贝,数据直接从内核缓冲区拷贝到Socket缓冲区就可以直接发送了。实际上,可能连内核缓冲区拷贝到Socket缓冲区的过程都没有了,内核缓冲区和Socket缓冲区也可以建立内存映射,这样就只剩下两次数据拷贝了。

零拷贝是站在内存的角度来看,数据没有在内存中发生复制行为。数据从磁盘拷贝到内核缓冲区,再拷贝到网卡,期间数据没有在内存中复制过,所以叫零拷贝。

综上所述,零拷贝的核心是内存映射,内存映射技术在Linux系统上对应的是mmap系统函数,在Java中对应的就MappedByteBuffer类。

RocketMQ消息存储涉及的主要文件,均是通过内存映射来完成读写的,代码在MappedFile的init方法中,如下:

  1. // 随机读写文件
  2. this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
  3. // 对整个文件建立内存映射,对文件大小有限制
  4. this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

 mmap函数有一个缺陷,就是对映射的文件大小有限制,因此CommitLog单个文件默认为1GB。

1.3 异步刷盘

RocketMQ高性能存储的最后一个利器就是异步刷盘,与之对应的就是同步刷盘。

同步刷盘:消息写入Page Cache后调用系统函数fsync将数据同步到磁盘才给客户端返回ACK响应,这种方式对数据的安全性很高,但是性能会有较大影响。​异步刷盘:充分利用Page Cache的优势,只要消息写入Page Cache就给客户端返回ACK响应,RocketMQ会在后台起一个线程异步刷盘,极大的提高了性能和吞吐量。

二、消息仓库设计

2.1  CommitLog

CommitLog用来存储消息主体和其元数据,虽然RocketMQ是基于Topic主题订阅模式的,但是对于Broker而言,所有消息全部写入CommitLog,不关心Topic,因此CommitLog是完全顺序写的。RocketMQ使用mmap来提升磁盘IO效率,利用NIO的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中,减少了数据在内核空间和用户空间来回复制的开销。但是mmap有一定的限制,映射的文件不能太大,而RocketMQ又要支持海量的消息积压,怎么办呢?

为了解决上述问题,RocketMQ将CommitLog文件进行了切分,将一个大文件切分成多个小文件,每个文件定长为1GB,文件名就是文件的起始偏移量Offset,固定为20位,不足的用0补齐。例如,第一个文件名为00000000000000000000代表起始偏移量为0,第二个文件名为00000000001073741824,起始偏移量为1073741824,因为1GB=1024*1024*1024。

当写入新的消息时,Broker会定位到最新的CommitLog文件,判断它是否可以容纳这条消息,如果文件写满了,会创建新的文件继续写。另外,消息被消费后,并不会立马删除,CommitLog的设计决定了RocketMQ不能针对单个消息进行删除,只能将过期的CommitLog文件删除。消息默认会保存3天,每天定时在一个时间点删除过期的文件。理论上,只要文件没有过期,你依然可以通过这些文件重复消费消息。

CommitLog文件结构非常的简单,没有头信息,只有Message,但是Message的长度是不固定的,消息被写入CommitLog的格式如下:

理论上来说,只要有了CommitLog文件,RocketMQ就可以正常工作了。ConsumerQueue和Index文件只是索引文件,用来加速消费者消费和查询用的。

2.1 ConsumerQueue

RocketMQ是基于Topic主题订阅模式的,消费者往往只对自己订阅的Topic感兴趣,如果每次消费都要去CommitLog中检索消息,效率是非常低的,于是才有了ConsumerQueue文件。

ConsumerQueue是消息消费队列,用来加速消息消费的性能,Consumer可以根据ConsumerQueue来快速定位要消费的消息。ConsumerQueue是一个逻辑队列,它仅保存消息在CommitLog文件中的偏移量Offset、消息大小size和消息Tag的哈希值。每个索引条目为20字节,单个ConsumerQueue文件由30万个条目组成,因此ConsumerQueue文件也是定长的,约5.72M。

ConsumerQueue文件存储路径为$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个Topic是一个文件夹,同一个Topic下可以有多个队列,每个队列又是一个文件夹,最后才是ConsumerQueue文件。ConsumerQueue索引格式如下:

2.2 Index设计

Index是索引文件,它的主要目的是通过Key和时间范围来快速检索消息。Index文件的存储路径$HOME/store/index/{timestamp},文件名以创建时的时间戳命名,对应的类为org.apache.rocketmq.store.index.IndexFile。

Index文件也是定长的,单个文件约400M,单个Index文件可以保存2000万个索引,底层存储结构借鉴了HashMap,用的是哈希索引。当发生哈希碰撞,索引的最后4字节指针用来链接其他索引,故用的是一个哈希+链表的结构。哈希槽存放的永远是最新的索引,因为对于MQ而言,关心的永远是最新的消息。

单个Index文件的构成:

Index文件是有头信息的,对应的类为org.apache.rocketmq.store.index.IndexHeader,头信息的构成:

索引条目的构成:

三、Broker高性能源码分析

前面已经说过,Producer发送消息,Broker对应的处理器为SendMessageProcessor,收到请求后Broker会调用它的asyncProcessRequest方法。因此要分析Broker如何存储消息,以此为入口就好了。Broker收到消息后先写入CommitLog,后台会起一个线程ReputMessageService,对CommitLog中的消息进行重放,重放的时候进行ConsumerQueue和Index的索引构建。

调用parseRequestHeader方法从请求对象中解析出请求头SendMessageRequestHeader,有了请求头Broker才知道消息要发到哪个Topic下。之前说过,Header信息在网络传输前,会将字段属性写入extFields,Broker解析时也是如此,反射创建Header对象,读取extFields属性,写入到Header。

SendMessageRequestHeader requestHeader = parseRequestHeader(request);

请求头解析完毕后,判断是否是批量消息,如果是执行asyncSendBatchMessage,否则执行asyncSendMessage,我们直接看普通消息。 

在接收消息前,会调用preSend方法做一些前置操作。包括:创建响应,写入Opaque,消息校验等操作。消息校验对应的方法为msgCheck,校验的事项如下:

  1. 确保Broker为Master,且有写权限。
  2. 校验Topic名称的合法性,不能过长,不能使用预定义的名称。
  3. 确保Topic存在。

前置操作准备好了,开始根据Topic查找TopicConfig。TopicConfig是Topic的配置对象,它记录了Topic下有多少读写队列数,是否是顺序队列,权限等数据

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

开始构建Broker内部消息对象MessageExtBrokerInner,并从请求对象中拷贝消息属性。Broker接收到的消息可能是消费者消费失败的重试消息,如果超过了最大重试次数,RocketMQ会认为消费者已经没有消费这条消息的能力了,为了保护消费者,Broker需要将这条消息扔到「死信队列」,对应的处理方法为handleRetryAndDLQ。

  1. private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
  2. RemotingCommand request,
  3. MessageExt msg, TopicConfig topicConfig) {
  4. String newTopic = requestHeader.getTopic();
  5. if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  6. // 重试消息,Topic会被改写:%RETRY%+GroupName
  7. String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
  8. SubscriptionGroupConfig subscriptionGroupConfig =
  9. this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
  10. if (null == subscriptionGroupConfig) {
  11. response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
  12. response.setRemark(
  13. "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
  14. return false;
  15. }
  16. // 最大重试次数
  17. int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
  18. if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
  19. maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
  20. }
  21. int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
  22. if (reconsumeTimes >= maxReconsumeTimes) {
  23. // 超过最大重试次数,改写Topic,扔到死信队列
  24. newTopic = MixAll.getDLQTopic(groupName);
  25. int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
  26. topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
  27. DLQ_NUMS_PER_GROUP,
  28. PermName.PERM_WRITE, 0
  29. );
  30. msg.setTopic(newTopic);
  31. msg.setQueueId(queueIdInt);
  32. if (null == topicConfig) {
  33. response.setCode(ResponseCode.SYSTEM_ERROR);
  34. response.setRemark("topic[" + newTopic + "] not exist");
  35. return false;
  36. }
  37. }
  38. }
  39. int sysFlag = requestHeader.getSysFlag();
  40. if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
  41. sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
  42. }
  43. msg.setSysFlag(sysFlag);
  44. return true;
  45. }

内部消息对象的赋值,包括:消息主体、标记、消息属性、消息的创建时间、生产者主机地址、存储主机地址、重试次数等。

  1. msgInner.setBody(body);
  2. msgInner.setFlag(requestHeader.getFlag());
  3. // 设置消息属性:消息发送时,将Map转字符串,这里再将字符串反转为Map
  4. MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
  5. // 属性字符串:Key和Value空格隔开
  6. msgInner.setPropertiesString(requestHeader.getProperties());
  7. // 消息产生时间,生产者设置的
  8. msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
  9. // 生产者地址
  10. msgInner.setBornHost(ctx.channel().remoteAddress());
  11. // 消息存储地址
  12. msgInner.setStoreHost(this.getStoreHost());
  13. // 重试次数
  14. msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
  15. String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
  16. // 集群名称设置到Properties
  17. MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
  18. msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

消息对象构建好以后,通过消息仓库MessageStore来存储消息,方法为asyncPutMessage。首先会调用checkStoreStatus来检查消息服务的状态,检查项为:

  1. 确保服务没有停止。
  2. 确保写入的是Master。
  3. 确保系统Page Cache空闲。

前两项很好理解,第三项如果Page Cache繁忙,说明机器负载较大,Broker会拒绝消息的写入,此时要考虑扩容机器了。再然后会校验消息本身,确保Topic长度不超过127,Header长度不超过32767。

前置检查做完,调用CommitLog.asyncPutMessage()开始将消息写入CommitLog。
消息写入前,会先设置一下存盘时间storeTimestamp,消息主体的CRC值,读取消息的时候可以判断消息是否损坏。

  1. msg.setStoreTimestamp(System.currentTimeMillis());
  2. msg.setBodyCRC(UtilAll.crc32(msg.getBody()));

再判断消息是否是定时消息,如果是就根据延迟级别去改写Topic和QueueID,这样消息就不会被扔到目标队列,而是被扔到一个特殊的队列SCHEDULE_TOPIC_XXXX,RocketMQ支持18个延迟级别,每个级别对应一个队列。

  1. // 延迟级别
  2. if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
  3. msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
  4. }
  5. // 如果是定时消息,改写Topic,统一扔到 SCHEDULE_TOPIC_XXXX
  6. topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
  7. // 每个延迟级别对应一个MessageQueue,延迟级别转QueueId
  8. queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
  9. // 真实的Topic和QueueID写到Properties,后续投递是要用到
  10. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
  11. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
  12. msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
  13. msg.setTopic(topic);
  14. msg.setQueueId(queueId);

处理完定时消息后,开始定位最新的CommitLog文件,因为它总是顺序写的,对应的方法为MappedFileQueue.getLastMappedFile()。MappedFileQueue是MappedFile文件队列,Broker启动时,会遍历commitlog目录下的所有文件生成MappedFile对象并加入到MappedFileQueue中。

获取到最新的CommitLog文件后,加锁保证同步,默认是自旋锁,也可配置为ReentrantLock。竞争到锁后,判断文件是否写满,如果写满就创建新的文件继续写,最后调用MappedFile.appendMessage()将消息追加到文件。

在appendMessage方法中,先获取写指针wrotePosition,然后从CommitLog文件的这个位置开始写入消息。消息写入成功会更新wrotePosition,新的消息又可以继续往后追加了。

  1. public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
  2. // 当前写指针
  3. int currentPos = this.wrotePosition.get();
  4. if (currentPos < this.fileSize) {
  5. /**
  6. * writeBuffer:是Direct Memory,先写到内存,再统一转存到文件。效率高,不安全。
  7. * mappedByteBuffer:是FileChannel,写入Page Cache,mmap写。
  8. */
  9. ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
  10. byteBuffer.position(currentPos);
  11. AppendMessageResult result;
  12. if (messageExt instanceof MessageExtBrokerInner) {
  13. // 普通消息写
  14. result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
  15. } else if (messageExt instanceof MessageExtBatch) {
  16. // 批量消息写
  17. result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
  18. } else {
  19. return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
  20. }
  21. // 更新写指针位置
  22. this.wrotePosition.addAndGet(result.getWroteBytes());
  23. this.storeTimestamp = result.getStoreTimestamp();
  24. return result;
  25. }
  26. return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
  27. }

消息写入CommitLog文件的逻辑在DefaultAppendMessageCallback.doAppend()方法中,它先是计算消息的总长度,然后根据消息长度初始化一个ByteBuffer,然后按照消息格式往里面写数据,最终将ByteBuffer里的数据写入到与CommitLog文件内存映射的MappedByteBuffer。

  1. // 1 TOTALSIZE
  2. this.msgStoreItemMemory.putInt(msgLen);
  3. // 2 MAGICCODE
  4. this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
  5. // 3 BODYCRC
  6. this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
  7. // 4 QUEUEID
  8. this.msgStoreItemMemory.putInt(msgInner.getQueueId());
  9. // 5 FLAG
  10. this.msgStoreItemMemory.putInt(msgInner.getFlag());
  11. // 6 QUEUEOFFSET
  12. this.msgStoreItemMemory.putLong(queueOffset);
  13. System.err.println("queueOffset:" + queueOffset);
  14. // 7 PHYSICALOFFSET
  15. this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
  16. // 8 SYSFLAG
  17. this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
  18. // 9 BORNTIMESTAMP
  19. this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
  20. // 10 BORNHOST
  21. this.resetByteBuffer(bornHostHolder, bornHostLength);
  22. this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
  23. // 11 STORETIMESTAMP
  24. this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
  25. // 12 STOREHOSTADDRESS
  26. this.resetByteBuffer(storeHostHolder, storeHostLength);
  27. this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
  28. // 13 RECONSUMETIMES
  29. this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
  30. // 14 Prepared Transaction Offset
  31. this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
  32. // 15 BODY
  33. this.msgStoreItemMemory.putInt(bodyLength);
  34. if (bodyLength > 0)
  35. this.msgStoreItemMemory.put(msgInner.getBody());
  36. // 16 TOPIC
  37. this.msgStoreItemMemory.put((byte) topicLength);
  38. this.msgStoreItemMemory.put(topicData);
  39. // 17 PROPERTIES
  40. this.msgStoreItemMemory.putShort((short) propertiesLength);
  41. if (propertiesLength > 0)
  42. this.msgStoreItemMemory.put(propertiesData);
  43. final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
  44. // Write messages to the queue buffer
  45. // 写入DM或Page Cache
  46. byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

至此,消息就被Broker存储下来了,等待刷盘即可。后面的流程就是响应Response给客户端了。至于ConsumerQueue和Index文件索引的构建,是在ReputMessageService线程里对CommitLog文件进行消息重放时处理。

RocketMQ支持海量的消息积压,是因为它直接将消息写入到磁盘。同时,为了解决磁盘IO速度慢的问题,RocketMQ做了大量的优化。顺序写、零拷贝、异步刷盘是RocketMQ高性能存储的三大利器,消息生产时,CommitLog、ConsumerQueue、Index都是顺序写的,消息消费时,ConsumerQueue是顺序读的,但是CommitLog是随机读的。

因为mmap对文件大小有限制,所以RocketMQ的很多文件都做了定长切分处理。CommitLog单个文件定长为1G,如果写满会创建新的文件继续写。

从整体流程上看,消息的存储并不复杂,Broker接收到客户端的请求后,从请求体中读取数据并构建内部消息对象MessageExtBrokerInner,然后按照存储格式将消息写入到CommitLog。当然了,这中间还有大量的逻辑处理,包括:事务消息、定时消息、死信消息等等。

博文参考

https://blog.csdn.net/qq_32099833/category_11326262.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/604006
推荐阅读
相关标签
  

闽ICP备14008679号