赞
踩
Producer投送消息的过程简单来说,是首先找到这个Topic对应的所有Broker的Queue,并按照投放策略,
把消息投放到对应的broker上的Queue上。
同一个消费者组,里面的消费者,在进行消费的时候,比如消费的Topic一共有5个Queue,q1,q2,q3,q4,q5
这个消费者组里面有3个消费者c1,c2,c3,负载均衡下五个queue会分轮训分配给这3个消费者,
c1会监听消费q1,q4;c2监听消费q2,q5;c3监听消费q3; 如果此时挂掉一个消费者,
那么它监听消费的Queue会重新负载均衡,分配给剩下的两个消费者。这有一个什么问题呢?
也就是说如果一个消费者组里面的消费者的个数超过了这个Topic的Queue的个数,那么就会有消费者浪费
不能分配到Queue进行监听消费。
而且还有一个很关键的点,就是同一个消费者组里面,不同的消费者指定的Topic和Tag不同的话,
会按照最后一个消费者组里面最后一个连接到MQ上的消费者的Topic和Tag为准,进行消费。
导致部分消费者无法消费到消息。
除了记录消息本身的属性(消息长度、消息体、Topic 长度、Topic、消息属性长度和消息属性),
CommitLog 同时记录了消息所在消费队列的信息(消费队列 ID 和偏移量)。由于存储条目具备不定长的特性,
当 CommitLog 剩余空间无法满足消息时,CommitLog 在尾部追加一个 MAGIC CODE 等于 BLANK_MAGIC_CODE
的存储条目作为结束标记,并将消息存储至下一个 CommitLog 文件
这个文件就是Broker持久化消息的文件,为了保证写入的速度,RocketMQ通过顺序写的方式将消息写入这个文件,
即:这个Broker会将收到的任何Topic的任何Queue上的消息都存储到这个文件上面。而不是一个Topic一个文件,
因为当Topic 的多起来以后,不同的Topic来回切换访问,会导致磁盘读取不断的切换位置,去加载对应Topic的文件。
所以这里RocketMq避免了来回切换,就用一个文件存储所有的消息,这样写入的时候就可以一直写下去。避免了
磁盘读取不断切换。但是这样就导致一个问题,原本写入不同Topic 不同Queue的消息怎么查询呢?
这就用到了ConsumeQueue文件了。
ConsumeQueue 的存储条目采用定长存储结构,如下图所示。为了实现定长存储,ConsumeQueue
存储了消息 Tag 的 Hash Code,在进行 Broker 端消息过滤时,通过比较 Consumer 订阅 Tag 的 HashCode
和存储条目中的 Tag Hash Code 是否一致来决定是否消费消息
RocketMQ 引入 Index 的目的是为消息建立索引方便问题排查:在给定消息 Topic 和 Key 的前提下,
快速定位消息。Index 的文件存储结构如下图所示。Index 的整体设计思想类似持久化在磁盘的 HashMap,
同样使用链式地址法解决哈希冲突:每个 Hash Slot 关联一个 Message Index 链表,
多个 Message Index 通过 preIndexOffset 连接
了解到这里,我们继续说上面说到的同一个消费者组不同消费者的Topic和Tag不同的时候,
会导致什么?下面说一下消费者消费的过程:
所以上面说的同一个消费者组consumer里面的c1监听q1,q2; c2监听q3,q4;但是c1消费 topic1 tag1,c2消费topic2 tag2,且c2最后注册到MQ,那么此时消费者组consumer的消费监听的就是topic2 tag2.上面的三个步骤就是按照topic2 tag2去操作的,这个时候就会导致尽管q1.q2上面有对应的topic1 tag1,和topic2 tag2的消息,仍然无法消费。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。