赞
踩
目录
消息队列
2. Kafka、ActiveMq、RabbitMq、RocketMq优缺点对比
Kafka
1.5.1.为什么Kafka不像MySQL和Redis那样允许follwer副本对外提供读服务呢?
3.3.1. 那么Consumer Group 何时进行Rebalance 呢?触发条件有3 个
3.3.2. Coordinator 会在什么情况下认为某个Consumer 实例已挂从而要被“踢出”组呢?
其实就是问问你消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这个场景里用消息队列是什么?
解耦、异步、削峰
“削峰填谷”就是指缓冲上下游瞬时突发的流量,使其更平滑。对于发送能力很强的上游系统,如果没有消息引擎的保护,下游系统可能会直接被压垮导致全链路服务雪崩,消息引擎可以在很大程度上避免流量的震荡。消息引擎系统的另外一大好处在于发送方和接收方的松耦合,减少系统间不必要的交互。
优点上面已经说了,就是在特殊场景下有其对应的好处,解耦、异步、削峰。
① 系统可用性降低
系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,ABCD 四个系统还好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整?MQ 一挂,整套系统崩溃,你不就完了?如何保证消息队列的高可用,可以点击这里查看。
② 系统复杂度提高
硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。
③ 一致性问题
A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
综上,各种对比之后,有如下建议:
一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了。
后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高。
不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
kafka和RocketMQ的总体区别
- kafka设计初衷是用于日志传输
- RocketMQ的设计用于解决各类应用可靠的消息传输,阿里云官网承诺RocketMQ数据可靠性为10个9,服务可靠性为99.95%。
kafka相比RocketMQ的优势
1、单机吞吐量TPS可上百万,远高于RocketMQ的TPS7万每秒,适用于日志类消息
2、kafka支持多语言的客户端RocketMQ相比kafka的优势
1、保证消息不丢( 数据可靠性达10个9)
2、可严格保证消息有序
3、支持分布式事务消息
4、支持按时间做消息回溯(可精确到毫秒级)
5、支持按标识和内容查询消息,用于排查丢消息
6、支持消费失败重试
7、可支持更多的partition, 即更多的消费线程数
总结:
概括:
总结:
kafka的单机TPS能跑到每秒上百万,是因为Producer端将多个小消息合并,批量发向broker。
那么RocketMQ为什么没有这样做呢?
发送消息的Producer通常是用Java语言,缓存过多消息,GC是个很严重的问题。(问题:难道kafka用scala不需要GC?)
Producer发送消息到broker, 若消息发送出去后,未达到broker,就通知业务消息发送成功,若此时Broker宕机,则会导致消息丢失,从而导致业务出错。
Producer通常为分布式系统,且每台机器都是多线程发送,通常来说线上单Producer产生的消息数量不会过万。
消息合并功能完全可由上层业务来做。
一句话概括:RocketMQ写入性能上不如kafka, 主要因为kafka主要应用于日志场景,而RocketMQ应用于业务场景,为了保证消息必达牺牲了性能,且基于线上真实场景没有在RocketMQ层做消息合并,推荐在业务层自己做。
队列多有什么好处呢?
概括
概括
概括
概括
首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。挑一个 Kafka 来举个例子,说说怎么重复消费吧。
Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。
但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,没被提交offset的消息会再次消费一次。
其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。
其实还是得结合业务来思考,我这里给几个思路:
这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条
手动提交消息
唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。
那么,只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。
但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。
生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。
我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。
所以此时一般是要求起码设置如下 4 个参数:
replication.factor
参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。min.insync.replicas
参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。acks=all
:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。retries=MAX
(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送API,也就是说不要使用producer.send(msg),而要使用producer.send(msg, callback),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
要想实现消息有序,需要从Producer和Consumer两方面来考虑。
如果对Kafka不了解的话,可以先看这篇博客《一文快速了解Kafka》。
针对消息有序的业务需求,还分为全局有序和局部有序。
全局有序:一个Topic下的所有消息都需要按照生产顺序消费。
局部有序:一个Topic下的消息,只需要满足同一业务字段的要按照生产顺序消费。例如:Topic消息是订单的流水表,包含订单orderId,业务要求同一个orderId的消息需要按照生产顺序进行消费。
由于Kafka的一个Topic可以分为了多个Partition,Producer发送消息的时候,是分散在不同 Partition的。当Producer按顺序发消息给Broker,但进入Kafka之后,这些消息就不一定进到哪个Partition,会导致顺序是乱的。
因此,要满足全局有序:
1. 需要1个Topic只能对应1个Partition。
2. 而且对应的consumer也要使用单线程或者保证消费顺序的线程模型,否则消费端造成的消费乱序。
生产者和消费者,指定Partition Key,保证局部有序
消费者进程优化:在不增加partition数量的情况下想提高消费速度,每个消费者进程可以进行再次优化(实现以下功能)
如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
你看这问法,其实本质针对的场景是:
接着就坑爹了,可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办?或者是这整个就积压了几个小时,你这个时候怎么办?或者是你积压的时间太长了,导致比如 RabbitMQ 设置了消息过期时间后就没了怎么办?
消息堆积可能原因如下:
关于这个事儿,我们一个一个来梳理吧,先假设一个场景,我们现在消费端出故障了,然后大量消息在 mq 里积压,现在出事故了,慌了。
几千万条数据在 MQ 里积压了七八个小时,从下午 4 点多,积压到了晚上 11 点多。这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。
分析速度:一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。
一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:
8.2.1.事前处理机制:避免
在应用上线之前,对大致的流量是有预估的,并且采用压测,探测「生产者产生消息的速率、消费者消费消息的速率」
如果发现消费者速度较慢,可以:①对消费者程序进行优化 ②消费者扩容
8.2.2.事中处理机制
场景描述:运营在调整活动时,导致了流量激增,导致生产者生产速率超过了我们预估的消费者速率,导致消息积压
解决方案:消费者扩容
8.2.3.事后处理机制
绝大多数的消费者程序都是IO密集型,一般是操作数据库、调用RPC
- 提高消费并行度:比如,并发调用下游服务
- 批量方式消费
- 跳过非重要消息
- 优化每条消息的消费过程
push
pull
一个典型的Kafka 体系架构包括若干Producer、若干Broker、若干Consumer,以及一个ZooKeeper 集群,如下图所示:
kafka的消息,以主题为单位进行归类:生产者负责将消息发送到特定的主题(发送到Kafka 集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。
主题topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被订阅该topic的消费者消费。
但是有⼀个问题,如果说这个topic中的消息⾮常⾮常多,多到需要⼏T来存,因为消息是会被保存到log⽇志⽂件中的。为了解决这个⽂件过⼤的问题,kafka提出了Partition分区的概念
主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。
分区是物理上的概念,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。
offset 是消息在分区中的唯一标识,是一个单调递增且不变的值。Kafka 通过它来保证消息在分区内的顺序性,不过offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
案例:如下图所示,主题中有4个分区,消息被顺序追加到每个分区日志文件的尾部。
每一条消息被发送到 broker 之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器 I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。
不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和 LogSegment也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)。下图描绘了主题、分区、副本、Log 以及 LogSegment 之间的关系。
Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。备份的思想,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在 Kafka 中被称为副本(Replica)。
同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中,leader 副本负责处理读写请求,follower 副本只负责与leader 副本的消息同步。副本处于不同的broker 中,当leader 副本出现故障时,从follower 副本中重新选举新的leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当Kafka 集群中某个broker 失效时仍然能保证服务可用。
使用场景
Kafka副本机制使用的是异步消息拉取,因此存在leader和follower之间的不一致性。如果要采用读写分离,必然要处理副本lag引入的一致性问题,比如如何实现read-your-writes、如何保证单调读(monotonic reads)以及处理消息因果顺序颠倒的问题。相反地,如果不采用读写分离,所有客户端读写请求都只在Leader上处理也就没有这些问题了——当然最后全局消息顺序颠倒的问题在Kafka中依然存在,常见的解决办法是使用单分区,其他的方案还有version vector,但是目前Kafka没有提供。
主写从读无非就是为了减轻leader节点的压力,将读请求的负载均衡到follower节点,如果Kafka的分区相对均匀地分散到各个broker上,同样可以达到负载均衡的效果,没必要刻意实现主写从读增加代码实现的复杂程度。
如上图所示,Kafka 集群中有4个broker,某个主题中有3个分区,且副本因子(即副本个数)也为3。每个分区便有1个leader 副本和2个follower 副本。生产者和消费者只与leader 副本进行交互,而follower 副本只负责消息的同步,很多时候follower 副本中的消息相对leader 副本而言会有一定的滞后。
分区中的所有副本统称为AR(Assigned Replicas)。AR=ISR+OSR
满足这两个条件,叫作“正在同步中”(in-sync)
ISR 与HW 和LEO 也有紧密的关系。
HW 是High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset 之前的消息。
LEO 是Log End Offset 的缩写,它标识当前日志文件中下一条待写入消息的offset
如上图所示,它代表一个日志文件,这个日志文件中有9条消息,第一条消息的offset(LogStartOffset)为0,最后一条消息的offset 为8,offset 为9的消息用虚线框表示,代表下一条待写入的消息。日志文件的HW 为6,表示消费者只能拉取到offset 在0至5之间的消息,而offset 为6的消息对消费者而言是不可见的。
上图中offset 为9的位置即为当前日志文件的LEO,LEO 的大小相当于当前日志分区中最后一条消息的offset 值加1。分区ISR 集合中的每个副本都会维护自身的LEO,而ISR 集合中最小的LEO 即为分区的HW,对消费者而言只能消费HW 之前的消息。
分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
分区是实现负载均衡以及高吞吐量的关键,故在生产者这一端就要仔细盘算合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,这样极易引发下游数据消费的性能下降。
Kafka生产者的分区策略是决定生产者将消息发送到哪个分区的算法。Kafka提供默认的分区策略,同时它也支持自定义分区策略。常见的分区策略如下:
也称Round-robin 策略,即顺序分配。比如一个主题下有3 个分区,那么第一条消息被发送到分区0,第二条被发送到分区1,第三条被发送到分区2,以此类推。当生产第4 条消息时又会重新开始,即将其分配到分区0。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。
也称Randomness 策略,所谓随机就是我们随意地将消息放置到任意一个分区上。本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。
Kafka 允许为每条消息定义消息键,简称为Key。这个Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务ID 等;也可以用来表征消息元数据。一旦消息被定义了Key,那么你就可以保证同一个Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略。Kafka的主题会有多个分区,分区作为并行任务的最小单位,为消息选择分区要根据消息是否含有键来判断。
1. 消费者采用拉取模型带来的优点有哪些?
2. 为什么要约定“同一个分区只可被一个消费者处理”?
3. 消费者如何拉取数据
4. 消费者如何消费消息
5. 消费者提交分区偏移量
6. 消费者组再平衡操作
7. 消费者组是什么
8. 消费者组的协调者
消息由生产者发布到Kafka集群后,会被消费者消费。消息的消费模型有两种:推送模型(push)和拉取模型(pull)。
基于推送模型的消息系统,由broker记录消费者的消费状态。broker在将消息推送到消费者后,标记这条消息为已消费,这种方式无法很好地保证消息的处理语义。比如,broker把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经把这条消息标记为己消费了,但实际上这条消息并没有被实际处理)。如果要保证消息的处理语义,broker发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要在消息代理中记录所有消息的消费状态,这种方式需要在客户端和服务端做一些复杂的状态一致性保证,比较复杂。
因此,kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。这种由消费者控制偏移量的优点是消费者可以按照任意的顺序消费消息,比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前时刻开始消费。broker是无状态的,它不需要标记哪些消息被消费者处理过,也不需要保证一条消息只会被一个消费者处理。而且,不同的消费者可以按照自己最大的处理能力来拉取数据,即使有时候某个消费者的处理速度稍微落后,它也不会影响其他的消费者,并且在这个消费者恢复处理速度后,仍然可以追赶之前落后的数据。
Consumer Group 是Kafka 提供的可扩展且具有容错性的消费者机制。
那么一个 Group 下该有多少个 Consumer 实例呢?理想情况下,「Consumer 实例的数量」应该等于「Group 订阅主题的分区总数」
Rebalance 就是让一个Consumer Group 下所有的Consumer实例就如何消费订阅主题的所有分区达成共识的过程。
在 Rebalance 过程中,所有Consumer 实例共同参与,在协调者组件(Coordinator)的帮助下,完成订阅主题分区的分配。
当Consumer Group 完成 Rebalance 之后,每个Consumer 实例都会定期地向Coordinator 发送心跳请求,表明它还存活着。
如果某个Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该Consumer 已经“死”了,从而将其从Group 中移除,然后开启新一轮Rebalance。Rebalance 发生时,Group 下所有的Consumer 实例都会协调在一起共同参与。
首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。如果你了解 JVM 的垃圾回收机制,你一定听过万物静止的收集方式,即著名的 stop the world,简称 STW。在 STW期间,所有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程也和这个类似,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。
其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。
最后,Rebalance 实在是太慢了。
所以,我们尽量避免一些非必要的 Rebalance。
消费者组的重平衡流程,它的作用是让组内所有的消费者实例就消费哪些主题分区达成一致。重平衡需要借助「 Kafka Broker 端的 Coordinator 组件」,在 Coordinator 的帮助下完成整个消费者组的分区重分配。
1. 触发与通知
2. 消费者组状态机
重平衡一旦开启,Broker 端的协调者组件Coordinator就要开始忙了,主要涉及到控制消费者组的状态流转。
Kafka 设计了一套消费者组状态机(State Machine),帮助协调者完成整个重平衡流程。
a) Kafka 消费者组状态
b) 状态机的各个状态流转图如下:
一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable状态完成重平衡。当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。如果消费者组停了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。
重平衡的完整流程需要消费者端和协调者组件共同参与才能完成。在消费者端,重平衡分为以下两个步骤:
1) 加入组:JoinGroup 请求
2) 等待领导者消费者分配方案:SyncGroup 请求
当组内成员加入组时,他会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。通常情况下,第一个发送JoinGroup 请求的成员自动成为领导者。注意区分这里的领导者和之前介绍的领导者副本,不是一个概念。这里的领导者是具体的消费者实例,它既不是副本,也不是协调者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应中,然后发给领导者,由领导者统一做出分配方案后,进入下一步:发送 SyncGroup 请求。在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只是请求体中并没有实际内容。这一步的目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式发给所有成员,这样组内成员就都知道自己该消费哪些分区了。
以上是 JoinGroup 请求的处理过程。就像前面说的,JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。下面这张图是 SyncGroup 请求的处理流程。
SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。
1. 副本机制如何工作,故障发生时,怎么确保数据不会丢失?
2. 消息成功提交的定义是什么?
3. Kafka 的消息提交机制如何保证消费者看到的数据是一致的?(ISR)
控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache Zookeeper 的帮助下管理和协调整个 Kafka 集群。集群中任意一台 Broker 都能充当控制器的角色,但在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。
1. 自动检测 新增 Broker、Broker 主动关闭及被动宕机。
这种自动检测是依赖于前面提到的 Watch功能和 ZooKeeper 临时节点组合实现的。
控制器组件会利用 watch 机制检查 Zookeeper 的/brokers/ids 节点下的子节点数量变更。当有新Broker 启动后,它会在/brokers 下创建专属的 znode 节点。一旦创建完毕,Zookeeper 会通过Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化。进而开启后续新增 Broker 作业。
侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:临时节点。每个 Broker 启动后,会在/brokers/ids 下创建一个临时的 znode。当 Broker 宕机或主机关闭后,该 Broker 与 Zookeeper的会话结束,这个 znode 会被自动删除。同理,Zookeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行善后。
2. 控制器保存的数据
控制器中保存的这些数据在 Zookeeper 中也保存了一份。每当控制器初始化时,它都会从Zookeeper 上读取对应的元数据并填充到自己的缓存中。这里面比较重要的数据有:
先问自己几个小问题:
- Kafka的主题与分区内部是如何存储的,有什么特点?
- 如何利用操作系统的优化技术来高效地持久化日志文件和加快数据传输效率?page cache和zero copy的技术
我们来说说Kafka Broker 是如何持久化数据的。总的来说,Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机I/O 操作,用性能较好的顺序I/O 写操作,这也是实现Kafka 高吞吐量特性的一个重要手段。
向Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。
图中所有的数据都写入文件系统的持久化日志文件,但不进行刷新数据的任何调用。数据会首先被传输到磁盘缓存,操作系统随后会将这些数据定期自动刷新到物理磁盘。
消息系统内的消息从生产者保存到服务端,消费者再从服务端读取出来,数据的传输效率决定了生产者和消费者的性能。生产者如果每发送一条消息都直接通过网络发送到服务端,势必会造成过多的网络请求。如果我们能够将多条消息按照分区进行分组,并采用批量的方式一次发送一个消息集,并且对消息集进行压缩,就可以减少网络传输的带宽,进一步提高数据的传输效率。
消费者要读取服务端的数据,需要将服务端的磁盘文件通过网络发送到消费者进程,而网络发送通常涉及不同的网络节点。如下图(左)所示,传统读取磁盘文件的数据在每次发送到网络时,都需要将页面缓存先保存到用户缓存,然后在读取消息时再将其复制到内核空间,具体步骤如下:
结合Kafka 的消息有多个订阅者的使用场景,生产者发布的消息一般会被不同的消费者消费多次。如下图(右)所示,使用零拷贝技术(zero-copy )只需将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的使用者时,都可以重复使用同一个页面缓存),避免了重复的复制操作。这样,消息使用的速度基本上等同于网络连接的速度了。
Kafka 作为一个消息队列,涉及到磁盘 I/O 主要有两个操作:
- Provider 向 Kakfa 发送消息,Kakfa 负责将消息以日志的方式持久化落盘(应用层数据 ===> 磁盘)
- Consumer 向 Kakfa 进行拉取消息,Kafka 负责从磁盘中读取一批日志消息,然后再通过网卡发送(磁盘数据 ===> 网络)
Kakfa 服务端接收 Provider 的消息并持久化的场景下使用 mmap 机制[6],能够基于顺序磁盘 I/O 提供高效的持久化能力,使用的 Java 类为 java.nio.MappedByteBuffer。
Kakfa 服务端向 Consumer 发送消息的场景下使用 sendfile 机制[7],这种机制主要两个好处:
- sendfile 避免了内核空间到用户空间的 CPU 全程负责的数据移动;
- sendfile 基于 Page Cache 实现,因此如果有多个 Consumer 在同时消费一个主题的消息,那么由于消息一直在 page cache 中进行了缓存,因此只需一次磁盘 I/O,就可以服务于多个 Consumer;
使用 mmap 来对接收到的数据进行持久化,使用 sendfile 从持久化介质中读取数据然后对外发送是一对常用的组合。但是注意,你无法利用 sendfile 来持久化数据,利用 mmap 来实现 CPU 全程不参与数据搬运的数据拷贝。
最多一次(at most once):消息可能会丢失,但绝不会被重复发送
至少一次(at least once):消息不会丢失,但有可能被重复发送
精确一次(exactly once):消息不会丢失,也不会被重复发送
Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性、事务
事务型 Producer 的功能
- 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。
- 事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
事务消息是由 producer、事务协调器、broker、组协调器、consumer 共同参与实现的[1] producer
- 为 producer 指定固定的 TransactionalId,可以穿越 producer 的多次会话(producer 重启/断线重连)中,持续标识 producer 的身份
- 使用 epoch 标识 producer 的每一次"重生",防止同一 producer 存在多个会话
- producer 遵从幂等消息的行为,并在发送的 BatchRecord 中增加事务 id 和 epoch
[2] 事务协调器(Transaction Coordinator)
- 引入事务协调器,以两阶段提交2PC的方式,实现消息的事务提交。
- 事务协调器使用一个特殊的 topic:transaction,来做事务提交日志。
- 事务控制器通过 RPC 调用,协调 broker 和 consumer coordinator 实现事务的两阶段提交。
- 每一个 broker 都会启动一个事务协调器,使用 hash(TransactionalId)确定 producer 对应的事务协调器,使得整个集群的负载均衡。
[3] broker
- broker 处理事务协调器的 commit/abort 控制消息,把控制消息向正常消息一样写入 topic(和正常消息交织在一起,用来确认事务提交的日志偏移),并向前推进消息提交偏移 hw。
[4] 组协调器
- 如果在事务过程中,提交了消费偏移,组协调器在 offset log 中写入事务消费偏移。当事务提交时,在 offset log 中写入事务 offset 确认消息。
[5] consumer
- consumer 过滤未提交消息和事务控制消息,使这些消息对用户不可见。 有两种实现方式,
- consumer 缓存方式:设置 isolation.level=read_uncommitted,此时 topic 的所有消息对 consumer 都可见。 consumer 缓存这些消息,直到收到事务控制消息。若事务 commit,则对外发布这些消息;若事务 abort,则丢弃这些消息
- broker 过滤方式:设置 isolation.level=read_committed,此时 topic 中未提交的消息对 consumer 不可见,只有在事务结束后,消息才对 consumer 可见。broker 给 consumer 的 BatchRecord 消息中,会包含以列表,指明哪些是"abort"事务,consumer 丢弃 abort 事务的消息即可
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。