赞
踩
ISR:In-Sync Replicas 副本同步队列
AR:Assigned Replicas 所有副本
ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
LEO(Log End Offset):每个副本的最后一个offset
HW(High Watermark):所有副本的最小LEO
follower故障时:follower会被踢出isr,恢复后,follower读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步,等follower的LEO大于等于该partition的HW后,就可以重新加入isr。
leader故障时:leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的, 数据一致性, 其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。
kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。
整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.
**分区器:**可以给消息指定传入分区
**序列化器:**将生产者消息序列化,可以被网络传输
**拦截器:**可以在消息传入Kafka前和producer回调函数返回前对消息进行简单的处理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uUxlqKGQ-1586164239914)(1586155090138.png)]
main线程负责:拦截器—>序列化器—>分区器
sender线程负责:将分区后的数据发送给对于分区
正确,这样会浪费资源。
offset+1
重复消费:先处理数据,再提交offset
消息漏消费:先提交offset,再处理数据
Kafka消息消费有两个consumer接口,Low-level API和High-level API:
Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
High-level API:封装了对parition和offset的管理,使用简单;
如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;
消息的重复消费,即要解决消费时的幂等性。解决思路:
生产时的幂等性如何解决?
分区数可以增加,不可以减少
按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入到现有的分区中,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题、以及分区和副本的状态机切换问题都是不得不面对的。反观这个功能的收益点却是很低,如果真的需要实现此类的功能,完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。
虽然分区数不可以减少,但是分区对应的副本数是可以减少的,这个其实很好理解,你关闭一个副本时就相当于副本数减少了。不过正规的做法是使用kafka-reassign-partition.sh脚本来实现,具体用法可以自行搜索。
consumer 默认将 offset 提交到Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。
Kafka有两种分配策略。
RangeAssignor,默认
Range 范围分区策略是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。假如现在有 10 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6,7,8,9;消费者排序完之后将会是C1-0,C2-0,C3-0。通过 partitions数/consumer数来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。
Range 范围分区的弊端:
针对 1 个 topic 而言,C1-0消费者多消费1个分区影响不是很大。如果有 N 多个 topic,那么针对每个 topic,消费者 C1-0 都将多消费 1 个分区,topic越多,C1-0 消费的分区会比其他消费者明显多消费 N 个分区。这就是 Range 范围分区的一个很明显的弊端了
RoundRobinAssignor
RoundRobin 轮询分区策略,是把所有的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。
如果想要使用RoundRobin 轮询分区策略,必须满足如下两个条件:
什么时候触发分区分配策略
当出现以下几种情况时,Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作
Kafka 消费端的 Rebalance 机制,规定了一个 Consumer group 下的所有 consumer 如何达成一致来分配订阅 topic 的每一个分区。而具体如何执行分区策略,就是上面提到的 Range 范围分区 和 RoundRobin 轮询分区 两种内置的分区策略。
Kafka 对于分区分配策略这块,也提供了可插拔式的实现方式,除了上面两种分区分配策略外,我们也可以创建满足自己使用的分区分配策略,即:自定义分区策略。
.index .log
通过二分查找找到对应的index文件,通过扫描index文件找到对应数据在log文件中的偏移量
Controller这个角色是在kafka 0.8以后添加的,它负责的功能很多,包括Topic的创始, Partition leader的选取, Partition的增加, PartitionReassigned, PreferredReplicaElection, Topic的删除等。
Kafka Rebalance 重平衡的时候,提过一个群组协调器,负责协调群组间的关系,那么 broker 之间也有一个控制器组件(Controller),它是 Kafka 的核心组件。它的主要作用是**在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群,**在 Kafka 集群启动后,只有一个 broker 会成为 Controller 。
Zookeeper简介
ZooKeeper 的数据是保存在节点上的,每个节点也被称为znode,znode 节点是一种树形的文件结构,它很像 Linux 操作系统的文件路径,ZooKeeper 的根节点是 /。
znode 根据数据的持久化方式可分为临时节点和持久性节点。持久性节点不会因为 ZooKeeper 状态的变化而消失,但是临时节点会随着 ZooKeeper 的重启而自动消失。
znode 节点有一个 Watcher 机制:当数据发生变化的时候, ZooKeeper 会产生一个 Watcher 事件,并且会发送到客户端。Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 Zookeeper 上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 ZooKeeper 实现分布式锁、集群管理等功能。
Controller的选举
Kafka 当前选举控制器的规则是:Kafka 集群中第一个启动的 broker 通过在 ZooKeeper 里创建一个临时节点 /controller 让自己成为 controller 控制器。其他 broker 在启动时也会尝试创建这个节点,但是由于这个节点已存在,所以后面想要创建 /controller 节点时就会收到一个节点已存在 的异常。然后其他 broker 会在这个控制器上注册一个 ZooKeeper 的 watch 对象,/controller 节点发生变化时,其他 broker 就会收到节点变更通知。这种方式可以确保只有一个控制器存在。那么只有单独的节点一定是有个问题的,那就是单点问题。
如果控制器关闭或者与 ZooKeeper 断开链接,ZooKeeper 上的临时节点就会消失。集群中的其他节点收到 watch 对象发送控制器下线的消息后,其他 broker 节点都会尝试让自己去成为新的控制器。其他节点的创建规则和第一个节点的创建原则一致,都是第一个在 ZooKeeper 里成功创建控制器节点的 broker 会成为新的控制器,那么其他节点就会收到节点已存在的异常,然后在新的控制器节点上再次创建 watch 对象进行监听。
Controller的作用
Kafka 被设计为一种模拟状态机的多线程控制器,它可以作用有下面这几点:
具体分为如下 5 点
当控制器发现一个 broker 离开集群(通过观察相关 ZooKeeper 路径),控制器会收到消息:这个 broker 所管理的那些分区需要一个新的 Leader。控制器会依次遍历每个分区,确定谁能够作为新的 Leader,然后向所有包含新 Leader 或现有 Follower 的分区发送消息,该请求消息包含谁是新的 Leader 以及谁是 Follower 的信息。随后,新的 Leader 开始处理来自生产者和消费者的请求,Follower 用于从新的 Leader 那里进行复制。
当控制器发现一个 broker 加入集群时,它会使用 broker ID 来检查新加入的 broker 是否包含现有分区的副本。如果有控制器就会把消息发送给新加入的 broker 和 现有的 broker。
broker controller 数据存储
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kp73aeio-1586164239916)(e91113796014b106f22df0d.jpeg)]
上面我们介绍到 broker controller 会提供数据服务,用于保存大量的 Kafka 集群数据。主要分为三类:
broker controller 故障转移
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VrXgUTcI-1586164239917)(05e4e250bf7d308.jpeg)]
broker controller 只有一个,那么必然会存在单点失效问题。kafka 为考虑到这种情况提供了故障转移功能,也就是 Fail Over。
最一开始,broker1 会抢先注册成功成为 controller,然后由于网络抖动或者其他原因致使 broker1 掉线,ZooKeeper 通过 Watch 机制觉察到 broker1 的掉线,之后所有存活的 brokers 开始竞争成为 controller,这时 broker3 抢先注册成功,此时 ZooKeeper 存储的 controller 信息由 broker1 -> broker3,之后,broker3 会从 ZooKeeper 中读取元数据信息,并初始化到自己的缓存中。
注意:ZooKeeper 中存储的不是缓存信息,broker 中存储的才是缓存信息。
broker controller 存在的问题
在 Kafka 0.11 版本之前,控制器的设计是相当繁琐的。我们上面提到过一句话:Kafka controller 被设计为一种模拟状态机的多线程控制器,这种设计其实是存在一些问题的。controller 状态的更改由不同的监听器并发执行,因此需要进行很复杂的同步,并且容易出错而且难以调试。状态传播不同步,broker 可能在时间不确定的情况下出现多种状态,这会导致不必要的额外的数据丢失。controller 控制器还会为主题删除创建额外的 I/O 线程,导致性能损耗。controller 的多线程设计还会访问共享数据,我们知道,多线程访问共享数据是线程同步最麻烦的地方,为了保护数据安全性,控制器不得不在代码中大量使用ReentrantLock 同步机制,这就进一步拖慢了整个控制器的处理速度。
broker controller 内部设计原理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IwxYQH2z-1586164239918)(e7cd343d.jpeg)]
在 Kafka 0.11 之后,Kafka controller 采用了新的设计,把多线程的方案改成了单线程加事件队列的方案。主要所做的改变有下面这几点:
第一个改进是增加了一个 Event Executor Thread,事件执行线程,从图中可以看出,不管是 Event Queue 事件队列还是 Controller context 控制器上下文都会交给事件执行线程进行处理。将原来执行的操作全部建模成一个个独立的事件,发送到专属的事件队列中,供此线程消费。
第二个改进是将之前同步的 ZooKeeper 全部改为异步操作。ZooKeeper API 提供了两种读写的方式:同步和异步。之前控制器操作 ZooKeeper 都是采用的同步方式,这次把同步方式改为异步,据测试,效率提升了10倍。
第三个改进是根据优先级处理请求,之前的设计是 broker 会公平性的处理所有 controller 发送的请求。什么意思呢?公平性难道还不好吗?在某些情况下是的,比如 broker 在排队处理 produce 请求,这时候 controller 发出了一个 StopReplica 的请求,你会怎么办?还在继续处理 produce 请求吗?这个 produce 请求还有用吗?此时最合理的处理顺序应该是,赋予 StopReplica 请求更高的优先级,使它能够得到抢占式的处理。
Kafka中的选举大致可以分为三大类:控制器的选举、分区leader的选举以及消费者相关的选举,这里还可以具体细分为7个小类。我们一一来过一下,本文只是简单罗列下大致的内容,至于内部具体的细节逻辑就需要靠读者自己去探索啦。虐人还是被虐就靠你的自驱力了。
在Kafka集群中会有一个或多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态等工作。比如当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。再比如当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
Kafka Controller的选举是依赖Zookeeper来实现的,在Kafka集群中哪个broker能够成功创建/controller这个临时(EPHEMERAL)节点他就可以成为Kafka Controller。
这里需要说明一下的是Kafka Controller的实现还是相当复杂的,涉及到各个方面的内容,如果你掌握了Kafka Controller,你就掌握了Kafka的“半壁江山”。篇幅所限,这里就不一一展开了,有兴趣的读者可以查阅一下《深入理解Kafka》中第6章的相关内容。
这里不说什么一致性协议(PacificA)相关的内容,只讲述具体的选举内容。
分区leader副本的选举由Kafka Controller 负责具体实施。当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的leader副本下线,此时分区需要选举一个新的leader上线来对外提供服务)的时候都需要执行leader的选举动作。
基本思路是按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。注意这里是根据AR的顺序而不是ISR的顺序进行选举的。这个说起来比较抽象,有兴趣的读者可以手动关闭/开启某个集群中的broker来观察一下具体的变化。
还有一些情况也会发生分区leader的选举,比如当分区进行重分配(reassign)的时候也需要执行leader的选举动作。这个思路比较简单:从重分配的AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中。
再比如当发生优先副本(preferred replica partition leader election)的选举时,直接将优先副本设置为leader即可,AR集合中的第一个副本即为优先副本。
还有一种情况就是当某节点被优雅地关闭(也就是执行ControlledShutdown)时,位于这个节点上的leader副本都会下线,所以与此对应的分区需要执行leader的选举。这里的具体思路为:从AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中,与此同时还要确保这个副本不处于正在被关闭的节点上。
组协调器GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,这个选举的算法也很简单,分两种情况分析。如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader。如果某一时刻leader消费者由于某些原因退出了消费组,那么会重新选举一个新的leader,这个重新选举leader的过程又更“随意”了,相关代码如下:
//scala code
private val members = new mutable.HashMap[String, MemberMetadata]
var leaderId = members.keys.head
解释一下这2行代码:在GroupCoordinator中消费者的信息是以HashMap的形式存储的,其中key为消费者的member_id,而value是消费者相关的元数据信息。leaderId表示leader消费者的member_id,它的取值为HashMap中的第一个键值对的key,这种选举的方式基本上和随机无异。总体上来说,消费组的leader选举过程是很随意的。
批量处理
传统消息中间件的消息发送和消费整体上是针对单条的。对于生产者而言,它先发一条消息,然后broker返回ACK表示已接收,这里产生2次rpc;对于消费者而言,它先请求接受消息,然后broker返回消息,最后发送ACK表示已消费,这里产生了3次rpc(有些消息中间件会优化一下,broker返回的时候返回多条消息)。而Kafka采用了批量处理:生产者聚合了一批消息,然后再做2次rpc将消息存入broker,这原本是需要很多次的rpc才能完成的操作。假设需要发送1000条消息,每条消息大小1KB,那么传统的消息中间件需要2000次rpc,而Kafka可能会把这1000条消息包装成1个1MB的消息,采用2次rpc就完成了任务。这一改进举措一度被认为是一种“作弊”的行为,然而在微批次理念盛行的今日,其它消息中间件也开始纷纷效仿。
客户端优化
这里接着批量处理的概念继续来说,新版生产者客户端摒弃了以往的单线程,而采用了双线程:主线程和Sender线程。主线程负责将消息置入客户端缓存,Sender线程负责从缓存中发送消息,而这个缓存会聚合多个消息为一个批次。有些消息中间件会把消息直接扔到broker。
消息压缩
Kafka支持多种消息压缩方式(gzip、snappy、lz4)。对消息进行压缩可以极大地减少网络传输 量、降低网络 I/O,从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。
建立索引,方便快速定位查询
每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率,这也是提升性能的一种方式。
分布式架构
很多人会忽略掉这个因素,其实分区也是提升性能的一种非常有效的方式,这种方式所带来的效果会比前面所说的日志编码、消息压缩等更加的明显。分区在其他分布式组件中也有大量涉及,至于为什么分区能够提升性能这种基本知识在这里就不在赘述了。
顺序写盘
操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块读入内存) 和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算 Kafka 使用磁盘作为存储介质,它所能承载的吞吐量也不容小觑。
页缓存
为什么Kafka性能这么高?当遇到这个问题的时候很多人都会想到上面的顺序写盘这一点。其实在顺序斜盘前面还有页缓存(PageCache)这一层的优化。
页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。具体 来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。为了弥补性 能上的差异,现代操作系统越来越“激进地”将内存作为磁盘缓存,甚至会非常乐意将所有可用的内存用作磁盘缓存,这样当内存回收时也几乎没有性能损失,所有对于磁盘的读写也 将经由统一的缓存。
当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页 (page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的 I/O 操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的 数据写入磁盘,以保持数据的一致性。
对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操 作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且,除非使用 Direct I/O 的方式, 否则页缓存很难被禁止。此外,用过 Java 的人一般都知道两点事实:对象的内存开销非常大, 通常会是真实数据大小的几倍甚至更多,空间使用率低下;Java 的垃圾回收会随着堆内数据的增多而变得越来越慢。基于这些因素,使用文件系统并依赖于页缓存的做法明显要优于维护一 个进程内缓存或其他结构,至少我们可以省去了一份进程内部的缓存消耗,同时还可以通过结构紧凑的字节码来替代使用对象的方式以节省更多的空间。如此,我们可以在 32GB 的机器上使用 28GB 至 30GB 的内存而不用担心 GC 所带来的性能问题。此外,即使 Kafka 服务重启, 页缓存还是会保持有效,然而进程内的缓存却需要重建。这样也极大地简化了代码逻辑,因为 维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。
Kafka 中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的。
零拷贝
Kafka使用了Zero Copy技术提升了消费的效率。前面所说的Kafka将消息先写入页缓存,如果消费者在读取消息的时候如果在页缓存中可以命中,那么可以直接从页缓存中读取,这样又节省了一次从磁盘到页缓存的copy开销。另外对于读写的概念可以进一步了解一下什么是写放大和读放大。
————————————————
版权声明:本文为CSDN博主「朱小厮」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u013256816/article/details/93772377
Kafka的特点和他的存储机制
Kafka高性能的原因——零拷贝机制
Kafka生产者:数据可靠性策略与幂等性
Kafka消费者:分区分配策略,协调器,offset
Kafka:整理的一些面试题
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。