赞
踩
生产者调用MQProducer.send()
方法会将消息发送到Broker,Broker是如何处理该请求,以及消息是如何存储的呢?
RocketMQ网络通信协议被封装成Java对象RemotingCommand,消息发送也是一个请求,对应的请求头为SendMessageRequestHeader,头信息里就标明了消息是由哪个Group生产的、要发到哪个Topic下、消息属性是什么等等,
- SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
- requestHeader.setXXX();//设置消息的各种信息
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
- RemotingCommand response = remotingClient.invokeSync(addr, request, timeoutMillis);
Broker服务启动的同时,会启动Netty服务端来处理客户端的请求,当对应的客户端Channel有事件时,会通过NettyServerHandler进行处理,因为是处理客户端的请求命令,于是会调用processRequestCommand方法,根据客户端的RequestCode去匹配请求处理器NettyRequestProcessor,消息发送有两个RequestCode,分别是10和310,对应的常量为SEND_MESSAGE和SEND_MESSAGE_V2,从名字就可以看出来只是版本不同而已,V2针对属性名做了简化,FastJson序列化的性能会更好一些,区别不大,对应的Processor都是SendMessageProcessor。找到SendMessageProcessor后,会调用其asyncProcessRequest
方法异步的处理该请求,到此,才开始进入消息的核心处理逻辑。
RocketMQ的一大特点就是海量的消息积压能力,支持亿级别的消息积压,还能保证性能不受太大影响,这要归功于RocketMQ精心设计的存储系统。
要做到海量的消息积压,首先可以肯定的是,消息不能直接存储在内存中,一个是内存空间有限,其次是面对如此庞大的数据,不可能做到近实时的持久化,断电数据就丢了是不能被接受的。所以,RocketMQ直接把消息存储到磁盘。
磁盘IO效率慢的概念已经深入人心,每次消息发送都要写入磁盘,那岂不是性能极差?RocketMQ如何保证高性能呢
其实,磁盘如果利用的好,它的效率比你想象的要快得多。磁盘随机写的效率确实很差,只有100KB每秒的写入速度,但是对于顺序写,在Page Cache的加持下,它的写入速度能达到600MB每秒,这已经超过了绝大多数网卡的读写速度了,所以只要能保证顺序写,磁盘IO并不是性能瓶颈。
这里稍微提下,Page Cache是Linux系统的高速页缓存,一个页大小是4KB,它是用来提高磁盘IO效率的。当你要从磁盘读数据时,它会从内存中分配一个Page,然后将磁盘数据读入Page Cache,当你下次再次读取时,就能命中缓存,不用再读磁盘了。写数据也是一样,先写到Page Cache,然后将该页置为「Dirty」脏的,然后由系统统一将这些脏页数据刷盘。因此,在Page Cache的加持下,磁盘顺序写的效率几乎等于操作内存。
RocketMQ存储消息,主要涉及到三大类文件,分别是:CommitLog、ConsumerQueue、Index。
CommitLog存储Broker上所有的消息,不管你是哪个Topic下的,全部写到CommitLog文件,因此它是完全的顺序写。
ConsumerQueue是RocketMQ用来加速消费者消费消息的索引文件,每个Topic是一个文件夹,下面再以QueueID分片存储,消息写入到CommitLog后,还要往对应的ConsumerQueue文件写入一个索引信息,它也是顺序写的。
Index是RocketMQ用来实现消息查询的索引文件,有了它就可以通过Key和时间范围快速查询消息,同样的,消息写入到CommitLog后,也会往Index中写入索引数据,也是顺序写的。
综上所属,RocketMQ消息存储涉及的主要文件,全是顺序写的,这便保证了Broker消息存储的高性能。
以前,我们从磁盘读写数据时,均需要经过至少两次数据拷贝。
而内存映射技术,不管是读还是写,均只需要一次数据拷贝。
用户空间直接拿应用程序的逻辑内存地址映射到Linux系统的内核缓冲区,这样应用程序看似读写的是自己的内存,其实读写的是内核缓冲区,数据不用在内核空间和用户空间来回拷贝了,不仅减少了内存复制的开销,还避免了因系统调用引起的软中断。
零拷贝是提升IO效率的终极利器,以前,如果我们需要把磁盘中的数据发送到网络,至少需要经过4次数据拷贝:磁盘 > 内核缓冲区 > JVM > Socket缓冲区 > 网卡。
利用内存映射,最多只需要三次数据拷贝,数据直接从内核缓冲区拷贝到Socket缓冲区就可以直接发送了。实际上,可能连内核缓冲区拷贝到Socket缓冲区的过程都没有了,内核缓冲区和Socket缓冲区也可以建立内存映射,这样就只剩下两次数据拷贝了。
零拷贝是站在内存的角度来看,数据没有在内存中发生复制行为。数据从磁盘拷贝到内核缓冲区,再拷贝到网卡,期间数据没有在内存中复制过,所以叫零拷贝。
综上所述,零拷贝的核心是内存映射,内存映射技术在Linux系统上对应的是mmap
系统函数,在Java中对应的就MappedByteBuffer类。
RocketMQ消息存储涉及的主要文件,均是通过内存映射来完成读写的,代码在MappedFile的init
方法中,如下:
- // 随机读写文件
- this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
- // 对整个文件建立内存映射,对文件大小有限制
- this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
mmap
函数有一个缺陷,就是对映射的文件大小有限制,因此CommitLog单个文件默认为1GB。
RocketMQ高性能存储的最后一个利器就是异步刷盘,与之对应的就是同步刷盘。
同步刷盘:消息写入Page Cache后调用系统函数fsync
将数据同步到磁盘才给客户端返回ACK响应,这种方式对数据的安全性很高,但是性能会有较大影响。异步刷盘:充分利用Page Cache的优势,只要消息写入Page Cache就给客户端返回ACK响应,RocketMQ会在后台起一个线程异步刷盘,极大的提高了性能和吞吐量。
2.1 CommitLog
CommitLog用来存储消息主体和其元数据,虽然RocketMQ是基于Topic主题订阅模式的,但是对于Broker而言,所有消息全部写入CommitLog,不关心Topic,因此CommitLog是完全顺序写的。RocketMQ使用mmap来提升磁盘IO效率,利用NIO的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中,减少了数据在内核空间和用户空间来回复制的开销。但是mmap有一定的限制,映射的文件不能太大,而RocketMQ又要支持海量的消息积压,怎么办呢?
为了解决上述问题,RocketMQ将CommitLog文件进行了切分,将一个大文件切分成多个小文件,每个文件定长为1GB,文件名就是文件的起始偏移量Offset,固定为20位,不足的用0补齐。例如,第一个文件名为00000000000000000000代表起始偏移量为0,第二个文件名为00000000001073741824,起始偏移量为1073741824,因为1GB=1024*1024*1024。
当写入新的消息时,Broker会定位到最新的CommitLog文件,判断它是否可以容纳这条消息,如果文件写满了,会创建新的文件继续写。另外,消息被消费后,并不会立马删除,CommitLog的设计决定了RocketMQ不能针对单个消息进行删除,只能将过期的CommitLog文件删除。消息默认会保存3天,每天定时在一个时间点删除过期的文件。理论上,只要文件没有过期,你依然可以通过这些文件重复消费消息。
CommitLog文件结构非常的简单,没有头信息,只有Message,但是Message的长度是不固定的,消息被写入CommitLog的格式如下:
理论上来说,只要有了CommitLog文件,RocketMQ就可以正常工作了。ConsumerQueue和Index文件只是索引文件,用来加速消费者消费和查询用的。
RocketMQ是基于Topic主题订阅模式的,消费者往往只对自己订阅的Topic感兴趣,如果每次消费都要去CommitLog中检索消息,效率是非常低的,于是才有了ConsumerQueue文件。
ConsumerQueue是消息消费队列,用来加速消息消费的性能,Consumer可以根据ConsumerQueue来快速定位要消费的消息。ConsumerQueue是一个逻辑队列,它仅保存消息在CommitLog文件中的偏移量Offset、消息大小size和消息Tag的哈希值。每个索引条目为20字节,单个ConsumerQueue文件由30万个条目组成,因此ConsumerQueue文件也是定长的,约5.72M。
ConsumerQueue文件存储路径为$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个Topic是一个文件夹,同一个Topic下可以有多个队列,每个队列又是一个文件夹,最后才是ConsumerQueue文件。ConsumerQueue索引格式如下:
Index是索引文件,它的主要目的是通过Key和时间范围来快速检索消息。Index文件的存储路径$HOME/store/index/{timestamp},文件名以创建时的时间戳命名,对应的类为org.apache.rocketmq.store.index.IndexFile。
Index文件也是定长的,单个文件约400M,单个Index文件可以保存2000万个索引,底层存储结构借鉴了HashMap,用的是哈希索引。当发生哈希碰撞,索引的最后4字节指针用来链接其他索引,故用的是一个哈希+链表的结构。哈希槽存放的永远是最新的索引,因为对于MQ而言,关心的永远是最新的消息。
单个Index文件的构成:
Index文件是有头信息的,对应的类为org.apache.rocketmq.store.index.IndexHeader
,头信息的构成:
索引条目的构成:
前面已经说过,Producer发送消息,Broker对应的处理器为SendMessageProcessor,收到请求后Broker会调用它的asyncProcessRequest方法。因此要分析Broker如何存储消息,以此为入口就好了。Broker收到消息后先写入CommitLog,后台会起一个线程ReputMessageService,对CommitLog中的消息进行重放,重放的时候进行ConsumerQueue和Index的索引构建。
调用parseRequestHeader方法从请求对象中解析出请求头SendMessageRequestHeader,有了请求头Broker才知道消息要发到哪个Topic下。之前说过,Header信息在网络传输前,会将字段属性写入extFields,Broker解析时也是如此,反射创建Header对象,读取extFields属性,写入到Header。
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
请求头解析完毕后,判断是否是批量消息,如果是执行asyncSendBatchMessage
,否则执行asyncSendMessage
,我们直接看普通消息。
在接收消息前,会调用preSend方法做一些前置操作。包括:创建响应,写入Opaque,消息校验等操作。消息校验对应的方法为msgCheck,校验的事项如下:
前置操作准备好了,开始根据Topic查找TopicConfig。TopicConfig是Topic的配置对象,它记录了Topic下有多少读写队列数,是否是顺序队列,权限等数据
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
开始构建Broker内部消息对象MessageExtBrokerInner,并从请求对象中拷贝消息属性。Broker接收到的消息可能是消费者消费失败的重试消息,如果超过了最大重试次数,RocketMQ会认为消费者已经没有消费这条消息的能力了,为了保护消费者,Broker需要将这条消息扔到「死信队列」,对应的处理方法为handleRetryAndDLQ。
- private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
- RemotingCommand request,
- MessageExt msg, TopicConfig topicConfig) {
- String newTopic = requestHeader.getTopic();
- if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- // 重试消息,Topic会被改写:%RETRY%+GroupName
- String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
- SubscriptionGroupConfig subscriptionGroupConfig =
- this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
- if (null == subscriptionGroupConfig) {
- response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
- response.setRemark(
- "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
- return false;
- }
-
- // 最大重试次数
- int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
- if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
- maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
- }
- int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
- if (reconsumeTimes >= maxReconsumeTimes) {
- // 超过最大重试次数,改写Topic,扔到死信队列
- newTopic = MixAll.getDLQTopic(groupName);
- int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
- topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
- DLQ_NUMS_PER_GROUP,
- PermName.PERM_WRITE, 0
- );
- msg.setTopic(newTopic);
- msg.setQueueId(queueIdInt);
- if (null == topicConfig) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("topic[" + newTopic + "] not exist");
- return false;
- }
- }
- }
- int sysFlag = requestHeader.getSysFlag();
- if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
- sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
- }
- msg.setSysFlag(sysFlag);
- return true;
- }
内部消息对象的赋值,包括:消息主体、标记、消息属性、消息的创建时间、生产者主机地址、存储主机地址、重试次数等。
- msgInner.setBody(body);
- msgInner.setFlag(requestHeader.getFlag());
- // 设置消息属性:消息发送时,将Map转字符串,这里再将字符串反转为Map
- MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
- // 属性字符串:Key和Value空格隔开
- msgInner.setPropertiesString(requestHeader.getProperties());
- // 消息产生时间,生产者设置的
- msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
- // 生产者地址
- msgInner.setBornHost(ctx.channel().remoteAddress());
- // 消息存储地址
- msgInner.setStoreHost(this.getStoreHost());
- // 重试次数
- msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
- String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
- // 集群名称设置到Properties
- MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
- msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
消息对象构建好以后,通过消息仓库MessageStore来存储消息,方法为asyncPutMessage。首先会调用checkStoreStatus来检查消息服务的状态,检查项为:
前两项很好理解,第三项如果Page Cache繁忙,说明机器负载较大,Broker会拒绝消息的写入,此时要考虑扩容机器了。再然后会校验消息本身,确保Topic长度不超过127,Header长度不超过32767。
前置检查做完,调用CommitLog.asyncPutMessage()开始将消息写入CommitLog。
消息写入前,会先设置一下存盘时间storeTimestamp,消息主体的CRC值,读取消息的时候可以判断消息是否损坏。
- msg.setStoreTimestamp(System.currentTimeMillis());
- msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
再判断消息是否是定时消息,如果是就根据延迟级别去改写Topic和QueueID,这样消息就不会被扔到目标队列,而是被扔到一个特殊的队列SCHEDULE_TOPIC_XXXX
,RocketMQ支持18个延迟级别,每个级别对应一个队列。
- // 延迟级别
- if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
- msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
- }
- // 如果是定时消息,改写Topic,统一扔到 SCHEDULE_TOPIC_XXXX
- topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
- // 每个延迟级别对应一个MessageQueue,延迟级别转QueueId
- queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
- // 真实的Topic和QueueID写到Properties,后续投递是要用到
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
- msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
- msg.setTopic(topic);
- msg.setQueueId(queueId);
处理完定时消息后,开始定位最新的CommitLog文件,因为它总是顺序写的,对应的方法为MappedFileQueue.getLastMappedFile()。MappedFileQueue是MappedFile文件队列,Broker启动时,会遍历commitlog目录下的所有文件生成MappedFile对象并加入到MappedFileQueue中。
获取到最新的CommitLog文件后,加锁保证同步,默认是自旋锁,也可配置为ReentrantLock。竞争到锁后,判断文件是否写满,如果写满就创建新的文件继续写,最后调用MappedFile.appendMessage()将消息追加到文件。
在appendMessage方法中,先获取写指针wrotePosition,然后从CommitLog文件的这个位置开始写入消息。消息写入成功会更新wrotePosition,新的消息又可以继续往后追加了。
- public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
- // 当前写指针
- int currentPos = this.wrotePosition.get();
- if (currentPos < this.fileSize) {
- /**
- * writeBuffer:是Direct Memory,先写到内存,再统一转存到文件。效率高,不安全。
- * mappedByteBuffer:是FileChannel,写入Page Cache,mmap写。
- */
- ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
- byteBuffer.position(currentPos);
- AppendMessageResult result;
- if (messageExt instanceof MessageExtBrokerInner) {
- // 普通消息写
- result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
- } else if (messageExt instanceof MessageExtBatch) {
- // 批量消息写
- result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
- } else {
- return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
- }
- // 更新写指针位置
- this.wrotePosition.addAndGet(result.getWroteBytes());
- this.storeTimestamp = result.getStoreTimestamp();
- return result;
- }
- return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
- }
消息写入CommitLog文件的逻辑在DefaultAppendMessageCallback.doAppend()方法中,它先是计算消息的总长度,然后根据消息长度初始化一个ByteBuffer,然后按照消息格式往里面写数据,最终将ByteBuffer里的数据写入到与CommitLog文件内存映射的MappedByteBuffer。
- // 1 TOTALSIZE
- this.msgStoreItemMemory.putInt(msgLen);
- // 2 MAGICCODE
- this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
- // 3 BODYCRC
- this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
- // 4 QUEUEID
- this.msgStoreItemMemory.putInt(msgInner.getQueueId());
- // 5 FLAG
- this.msgStoreItemMemory.putInt(msgInner.getFlag());
- // 6 QUEUEOFFSET
- this.msgStoreItemMemory.putLong(queueOffset);
- System.err.println("queueOffset:" + queueOffset);
- // 7 PHYSICALOFFSET
- this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
- // 8 SYSFLAG
- this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
- // 9 BORNTIMESTAMP
- this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
- // 10 BORNHOST
- this.resetByteBuffer(bornHostHolder, bornHostLength);
- this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
- // 11 STORETIMESTAMP
- this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
- // 12 STOREHOSTADDRESS
- this.resetByteBuffer(storeHostHolder, storeHostLength);
- this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
- // 13 RECONSUMETIMES
- this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
- // 14 Prepared Transaction Offset
- this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
- // 15 BODY
- this.msgStoreItemMemory.putInt(bodyLength);
- if (bodyLength > 0)
- this.msgStoreItemMemory.put(msgInner.getBody());
- // 16 TOPIC
- this.msgStoreItemMemory.put((byte) topicLength);
- this.msgStoreItemMemory.put(topicData);
- // 17 PROPERTIES
- this.msgStoreItemMemory.putShort((short) propertiesLength);
- if (propertiesLength > 0)
- this.msgStoreItemMemory.put(propertiesData);
-
- final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
- // Write messages to the queue buffer
- // 写入DM或Page Cache
- byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
至此,消息就被Broker存储下来了,等待刷盘即可。后面的流程就是响应Response给客户端了。至于ConsumerQueue和Index文件索引的构建,是在ReputMessageService线程里对CommitLog文件进行消息重放时处理。
RocketMQ支持海量的消息积压,是因为它直接将消息写入到磁盘。同时,为了解决磁盘IO速度慢的问题,RocketMQ做了大量的优化。顺序写、零拷贝、异步刷盘是RocketMQ高性能存储的三大利器,消息生产时,CommitLog、ConsumerQueue、Index都是顺序写的,消息消费时,ConsumerQueue是顺序读的,但是CommitLog是随机读的。
因为mmap对文件大小有限制,所以RocketMQ的很多文件都做了定长切分处理。CommitLog单个文件定长为1G,如果写满会创建新的文件继续写。
从整体流程上看,消息的存储并不复杂,Broker接收到客户端的请求后,从请求体中读取数据并构建内部消息对象MessageExtBrokerInner,然后按照存储格式将消息写入到CommitLog。当然了,这中间还有大量的逻辑处理,包括:事务消息、定时消息、死信消息等等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。