赞
踩
在RocketMQ控制台创建topic时就需要设置读队列和写队列。写队列负责消息的写入,读队列负责consumer的的消息读取。这其实是一种读写分离的思想
perm字段表示Topic的权限。有三个可选项。 2:禁写禁订阅,4:可订阅,不能写,6:可写可订阅
我们设置时默认,写队列=读队列。如果写队列>读队列:那么会有一部分写队列数据无法写到读队列,也就无法被消费会出现消息丢失。如果写队列<读队列,那么就有一部分读队列上是没有数据的会造成资源浪费。
这里有一种情景是读队列!=写队列的。
要对Topic的MessageQueue进行缩减的时候,例如原来四个队列,现在要缩减成两个队列。如果立即缩减读写队列,那么被缩减的MessageQueue上没有被消费的消息,就会丢失。这时,可以先缩减写队列,待空出来的读队列上的消息都被消费完了之后,再来缩减读队列,这样就可以比较平稳的实现队列缩减了。
RocketMQ消息直接采用磁盘文件保存消息,默认路径在${user_home}/store目录。这些存储目录可以在broker.conf中自行指定。
有三个比较重要的文件:
另外,还有几个辅助的存储文件:
整体消息存储结构如图:
流程图解释:
1.所有生产者发送的消息都存储在Commitlog
2.消费者在消费消息时根据ConsumerQueue中的记录的偏移量单元,就可以定位到具体存储在Commitlog上的消息。
3.通过MeessageId或者MessageKey来查找消息时,会借助IndexFile文件,找到消息在Commitlog的具体偏移位置。
消息既然要持久化,就必须有对应的删除机制。RocketMQ内置了一套过期文件的删除机制。
在broker.conf中配置的fileReservedTime属性就是文件保留时间。文件超过了这个时间就认为是过期文件。
在broker.conf中deleteWhen属性指定文件删除时间。默认是凌晨四点,ocketMQ内部有一个定时任务,对文件进行扫描,并且触发文件删除的操作
我们知道RocketMQ的可以抗住很高的并发,并且在高并发场景下也可以保证消息写到文件存储。那么是怎么做到的呢?
零拷贝(zero-copy)是操作系统层面提供的一种加速文件读写的操作机制,非常多的开源软件都在大量使用零拷贝,来提升IO操作的性能。对于Java应用层,对应着mmap和sendFile两种方式。接下来,咱们深入操作系统来详细理解一下零拷贝。
而所谓的零拷贝技术,其实并不是不拷贝,而是要尽量减少CPU拷贝。
mmap文件映射机制是怎么回事
在一次文件拷贝过程中,操作系统层面的拷贝已经由CPU拷贝优化成了DMA拷贝。而内核态与用户态之间的拷贝依然是CPU拷贝。所以,在这个场景下,零拷贝技术优化的重点,就是内核态与用户态之间的这两次拷贝。
mmap文件映射的方式,就是在用户态不再保存文件的内容,而只保存文件的映射,包括文件的内存起始地址,文件大小等。真实的数据,也不需要在用户态留存,可以直接通过操作映射,在内核态完成数据复制。
最后,这种mmap的映射机制由于还是需要用户态保存文件的映射信息,数据复制的过程也需要用户态的参与,这其中的变数还是非常多的。所以,mmap机制适合操作小文件,如果文件太大,映射信息也会过大,容易造成很多问题。通常mmap机制建议的映射文件大小不要超过2G 。而RocketMQ做大的CommitLog文件保持在1G固定大小,也是为了方便文件映射。
sendFile机制是怎么运行的
早期的sendfile实现机制其实还是依靠CPU进行页缓存与socket缓存区之间的数据拷贝。但是,在后期的不断改进过程中,sendfile优化了实现机制,在拷贝过程中,并不直接拷贝文件的内容,而是只拷贝一个带有文件位置和长度等信息的文件描述符FD,这样就大大减少了需要传递的数据。而真实的数据内容,会交由DMA控制器,从页缓存中打包异步发送到socket中。
最后,sendfile机制在内核态直接完成了数据的复制,不需要用户态的参与,所以这种机制的传输效率是非常稳定的。sendfile机制非常适合大数据的复制转移。
通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片。所以我们去写一个文件时,也就无法把一个文件写在一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写。这个过程中有大量的寻址操作,会严重影响写数据的性能。而顺序写机制是在磁盘中提前申请一块连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行。
在操作系统层次,当应用程序写入一个文件时,文件内容并不会直接写到硬件中。而是会先写到操作系统的PageCache中,这个时候如果机器宕机了那么这部分数据就会丢失。因此,操作系统也提供了一个系统调用,应用程序可以自行调用这个系统调用,完成PageCache的强制刷盘。
RocketMQ中有同步刷盘和异步刷盘两种方式。
Broker以一个集群的方式部署,消息需要从Master复制到Slave上。而消息复制的方式分为同步复制和异步复制。
Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。
Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。
集群模式
Producer发送消息会均匀的分配到所有MessageQueue上,集群模式下每个consumer都会被均匀的分配一个或者多个MessageQueue(默认采用平均分配策略),这样保证Consumer负载均衡
广播模式
广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。
当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。
RocketMQ默认的重试次数是16次。见源码org.apache.rocketmq.common.subscription.SubscriptionGroupConfig中的retryMaxTimes属性。
这个重试次数可以在消费者端进行配置。 例如 DefaultMQPushConsumer实例中有个setMaxReconsumeTimes方法指定重试次数。
死信队列的名称是%DLQ%+ConsumGroup
死信队列的特征:
通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。
默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。