赞
踩
如下图所示,我们通过控制台查看RocketMQ默认创建的Topic配置时,发现Topic属于哪个Broker、读写队列数量等。perm:2不能订阅不能写 4 能订阅 不能写 6:能订阅能写
我们可以通过${user_home}/store/config/Topics.json文件查看到Topic的配置
"TopicTest":{
"order":false,
"perm":6,
"readQueueNums":4,
"topicFilterType":"SINGLE_TAG",
"topicName":"TopicTest",
"topicSysFlag":0,
"writeQueueNums":4
}
队列介绍:
写队列是会真实创建存储文件的,负责数据的写入。读队列会记录consumer消费的offset,负责消息的读取,这其实是一种读写分离思想,然后MessageQueue会根据路由策略选择对应的队列进行写和读,所以读写队列的数量是比较关键的。
通常情况下读队列和写队列是一一对应的,因为在往写队列里写Message时,会同步写入到一个对应的读队列中。
RocketMQ的消息是直接是在磁盘上保存的,我们可以通过默认路径${user_home}/store下查看都有哪些文件,并且简单介绍一下这些文件分别有什么作用
这里存储文件主要分为三个:
Commitlog:主要存储消息的元数据,commitlog内会有多个大小为1G的文件组成,所有生成的消息都会按照顺序存入到commitlog文件中,这样的好处就是可以减少查找目标文件的时间,消息落盘非常快。对比kafka文件,还需要寻找消息所属partition文件,然后在完成写入,但是当Topic比较多时,这样的寻找partition就会浪费比较长时间。
然后因为每次消息的大小不是固定的,所以每次写入commitlog时都会判断当前commitlog 文件空间是否足够,如果不够就会重新创建一个commitlog文件,文件名为当前消息的偏移 量。
Consumerqueue:存储消息在commitlog的索引,可以由下图可以看出文件内部存储的是每个Topic下每个队列的信息,主要记录这些Messagequeue被哪些消费者组消费到了哪一条commitlog。但是消费者的消费进入是存储在config/consumeroffset.json文件中
文件结构是由30w个大小为20byte的数据块组成,每个数据块内容有msgPhyoffset(消息在文件中的其实位置)+msgsize(消息在文件中的长度)+msgTagCode(消息tag的hash值)
index:提供了一种通过key或者时间区间来查询消息的方法
另外几个文件主要起到辅助作用:
Checkpoint:主要记录commitlog、consumequeue、index这些文件最后一次刷盘的时间戳
Config:主要记录一些关键配置,例如上面介绍Topic的配置,还有消费者组的配置、消费者组偏移量offset等信息
abort:存在的意义就是判断程序是否是正常关闭的(例如:服务器宕机等),如果是这不存在,如果不是这存在。后续可以通过这个文件来判断上次如果是非正常关闭的,就会做一些数据恢复的操作。
整体消息存储结构
构建消息文件:
RocketMQ会启动一个定时任务ReputMessageService定时调用(间隔1ms)来生成consume queue和index
此处有一个关键参数:reputFromOffset
消息允许重复:reputFromOffset = commitlog的提交指针
消息不允许重复:reputFromOffset = commitlog中内存的最大偏移量
我们从上面可以看出消息是最终写入到commitlog中,那么RocketMQ文件写入相对于普通的文件写入做了哪些优化呢?
零拷贝是操作系统为了加速文件读写提供的一种操作机制,使用零拷贝确实可以提升IO操作的性能,在Java语言层面总共有mmap和sendFile两种方式。
在讲解mmap方式我们要先介绍一下CPU拷贝和DMA拷贝。
CPU拷贝
在操作系统层面,内存空间可以分为用户态和内核态,用户态是无法直接操作操作系统硬件资源的(网卡、磁盘),需要进行用户态到内核态的转换的。但是进行转换后,那么用户态当时的数据就需要复制到内核态,那这些操作就是有CPU来进行任务的分配和调度的,所以当发生大量数据读写时CPU的占用率就会比较高。
DMA拷贝
由上图所示,为了防止IO操作过多,让CPU调度被占用。所以就引入了DMA(直接存储器操作),由DMA来负责这些频繁IO操作,DMA是一套操作指令值,不会占用CPU的计算资源,这样CPU就不用参与具体的数组复制操作,只需要管理DMA的权限就行。这样可以极大的释放CPU的性能,这样的拷贝速度会比CPU拷贝快很多。
并且DMA也在不断的优化,因为引入DMA拷贝后,虽然CPU不需要参与具体的数据复制工作,直接由DMA就可以完成,但是复制过程中需要数据总线(DMA控制器和内存之间、以及DMA控制器和外设之间传输数据的通道),过了反之占用过多的数据总线,就引入了channel通道的方式。channel是一个完全独立的处理器,专门负责IO操作,channel有自己的IO指令与CPU无关,所以channel更适合IO操作,性能更高。
Java中的零拷贝相关的操作都是通过channel的子类来实现的,而所谓的零拷贝并不是不拷贝,而是减少了copy的次数。
由上文我们知道所谓的零拷贝就是减少文件copy的次数,而mmap文件映射就是减少copy次数的一种方式。主要是通过nio包下java.nio.channels.FileChannel的map方法完成映射。
JAVA中可通过FileChannel的map方法创建内存映射文件。在Linux服务器中由该方法创建的文件使用的是操作系统的pagecache,即页缓存。
普通读写
我们可以从下图可以虽然内核态到磁盘的copy虽然优化成了DMA copy,但是用户态到内核态的copy还是CPU拷贝。那么零拷贝考虑的优化就是用户态到内核态的copy
mmap映射
上面cpu copy每次复制的都是存储的文件内容的copy,而mmap的copy只是保存文件的映射(文件的内存地址、文件大小),所以真实的数据也不需要在用户态进行留存,直接通过操作映射在内核态完成数据复制
暂时无法在飞书文档外展示此内容
这个过程是在操作系统完成的,在Java代码层是无法直观看到的,我们可以通过看JDK源码,在JDK的nio包里,java.nio.HeapByteBuffer映射的就是一块JVM内存(会通过byte[]数组缓存数据),所有的读写都是通过直接操作byte[],这个是没有使用零拷贝的普通读写。
public ByteBuffer put(int i, byte x) {
hb[ix(checkIndex(i))] = x;
return this;
}
而在JDK的nio包里,另一个java.nio.DirectByteBuffer包里,则映射的一块堆外内存,在DirectByteBuffer里,并没有用一个数据结构来保存数据内容,而是保存了一个内存地址,所有的读写操作都是通过unsafe魔法值直接交于内核完成,这个就是mmap的读写机制。这种机制适合小文件,如果文件过大映射信息也会过多,容易造成问题。
public ByteBuffer put(int i, byte x) {
try {
UNSAFE.putByte(ix(checkIndex(i)), ((x)));
} finally {
Reference.reachabilityFence(this);
}
return this;
}
早期的sendFile方式是通过cpu copy来进行页缓存与socket缓存区的数据拷贝,后期的优化中并不会直接copy文件内容,而是通过拷贝一个带有位置信息和长度新的文件描述符FD,这样就减少了需要拷贝的数据,而真是的数据通过DMA,从页缓存打包异步发送到socket。在Linux2.6.33版本以前out_fd只能是一个socket,现在版本已经没有了这个限制。
在操作系统中通常不会直接写入到磁盘中,而是会先写入到一个缓存pageCache中。但是这些操作对于应用程序来说相当于已经落盘成功了,可以进行修改、查看、复制等操作,但是page cache依旧是内存状态,因为还没有落盘。如果在还没落盘前,如果操作系统发生宕机,那么这些信息就可能丢失。而将数据从pageCache写入到磁盘中的过程被称为刷盘。
操作系统会在某些特定的情况下才会把数据写入到磁盘,例如我们正常关机。平常pageCache中被修改过的页被称为Dirty(脏页),当脏页达到一定阈值后也会触发刷盘。我们可以通过 proc/meminfo文件查看PageCache状态。
RocketMQ刷盘机制有两种,同步刷盘,异步刷盘。
如果broker是以集群方式进行部署的话,那么主从节点的消息是怎么复制的。在RocketMQ中,通过会配置conf下的2m-2s-async等文件下文件,具体选择可以通过想配置的集群类型来选择。
消息既然可以持久化,那么肯定要有对应的删除机制,下图是RocketMQ安装目录下 conf/broker.conf配置信息
producer发送消息时,默认会轮询目标Topic下所有的messagequeue,并采用递增取模的方式往不同的message queue发送消息,以达到消息可以平均落到不同的message queue上。
consumer可以选择集群模式和广播模式,在不同的模式下选择也是不一样的
集群模式下一条消息只需要被一个消费者组consumer group中的一个consumer实例消费,在采用拉取方式时,消费者需要指定拉取哪个message queue下的消息。
当consumer实例的数量有变化后,就会触发一次负载均衡,就会按照message queue数量和consumer数量从新分配。把message queue和消费者进行排序后,在根据不同的算法进行分配,算法总共有六种。
//源码在下面路径下,然后不同实现就是不同的策略 org.apache.rocketmq.client.consumer public interface AllocateMessageQueueStrategy { /** * Allocating by consumer id * * @param consumerGroup current consumer group * @param currentCID current consumer id * @param mqAll message queue set in current topic * @param cidAll consumer set in current consumer group * @return The allocate result of given strategy */ List<MessageQueue> allocate( final String consumerGroup, final String currentCID, final List<MessageQueue> mqAll, final List<String> cidAll ); /** * Algorithm name * * @return The strategy name */ String getName(); }
因为广播模式下消息会发送到每一个订阅了Topic的consumer实例上,所以也没有对应的均衡模式。广播模式的关键是将消费者的消息偏移量不再保存在broker上,而是保存客户端中,由客户端自己维护自己的偏移量
那么消费者进行消息处理时是不能保证消息100%被消费成功,那么消费失败,consumer会怎么处理呢?消费失败我们通过设置返回状态达到重试的效果。但是广播模式下是不支持重试的,即使消费失败了消息也不会重新发送,而是会继续消费新的消息。
重试消息会进入到一个“%RETRY%”+ConsumerGroup的队列中,然后默认的重试次数是为16次
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
如果消息重试次数超过了最大的重试次数,那么RocketMQ就会把这个消息丢到死信队列中。RocketMQ默认的重试次数是16次。见源码 org.apache.rocketmq.common.subscription.SubscriptionGroupConfig中的retryMaxTimes属性。 这个重试次数可以在消费者端进行配置。 例如 DefaultMQPushConsumer实例中有个setMaxReconsumeTimes方法 指定重试次数。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。