赞
踩
最近一个粉丝朋友在面试时被面试官一个问题问懵了,看你简历上写着你深入研读过RocketMQ的源码,那请问一下:RocketMQ一个消费组内订阅同一个主题不同的TAG为什么会丢消息。
RocketMQ支持tag级别的消息过滤机制,其具体实现原理主要从存储、拉取模型两个部分进行展开。
RocketMQ专门按照Topic为每一个topic建立索引,方便消费端按照topic进行消费,其具体实现为消息队列。
在RocketMQ中,ConsumeQueue的引入并不是为了提高消息写入的性能,而是为消费服务的。
消息消费队列中的每一个条目是一个定长的,设计极具技巧性,其每个条目使用固定长度(8字节commitlog物理偏移量、4字节消息长度、8字节tag hashcode),这里不是存储tag的原始字符串,而是存储hashcode。
目的就是确保每个条目的长度固定,可以使用访问类似数组下标的方式来快速定位条目,极大的提高了ConsumeQueue文件的读取性能,这样根据消费进度去访问消息的方法为使用逻辑偏移量logicOffset * 20即可找到该条目的起始偏移量(consumequeue文件中的偏移量),然后读取该偏移量后20个字节即得到了一个条目,无需遍历consumequeue文件。
消费端队列存储的是 tag 的 hashcode,众所周知,不同的字符串得到的hashcode值可能一样,故在服务端是无法精确对消息进行过滤的,所以在RocketMQ中会进行两次消息过滤。
当客户端向服务端拉取消息时,服务端在返回消息之前,会先根据hashcode进行过滤,然后客户端收到服务端的消息后,再根据消息的tag字符串进行精确过滤。
上面的原理很好理解呀,那为什么会丢失消息呢?这其实和消息队列负载机制有关.
在RocketMQ中使用集群模式消费时,同一个消费组中的多个消费者共同完成主题中的队列的消费,即一个消费者只会分配到其中某几个队列,并且同一时间,一个队列只会分配给一个消费者,这样结合上面的的过滤机制,就会明显有问题,请看示例图:
其问题的核心关键是,同一个tag会分布在不同的队列中,但消费者C1分配到的队列为q0,q1,q0,q1中有taga和tagb的消息,但tagb的消息会被消费者C1过滤,但这部分消息却不会被C2消费,造成了消息丢失。
所以在RocketMQ中,一个消费组内的所有消费这,其订阅关系必须保持一致。
你的点赞,关注、收藏是最对我最大的认可与帮助,一起加油吧。
掌握一到两门java主流中间件,是敲开BAT等大厂必备的技能,送给大家一个Java中间件学习路线,助力大家实现职场的蜕变。
最后分享笔者一个硬核的RocketMQ电子书,您将获得千亿级消息流转的运维经验,助你轻松打造自己的职场亮点。
获取方式:RocketMQ电子书。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。