赞
踩
高可用性是指系统无间断地执行其功能的能力,代表系统的可用性程度。Kafka提供了高可用机制,可保障一个或多个Broker宕机后,其他Broker及所有Partition都能继续提供服务,且存储的消息不丢失。
对于分布式系统来说,当集群规模上升到一定程度后,一台或多台机器宕机的可能性大大增加,Kafka采用多机备份和消息应答确认的方式来解决数据丢失问题,并通过一套失败恢复机制解决服务不可用的问题。
上一篇文章提到,Kafka的每个分区(Partition)都有一个副本集合AR,每个副本集合包含一个Leader副本,及0个以上的Follower副本。生产者将消息发送对应Partition的Leader副本,Follower副本从Leader副本同步消息,Kafka的Leader机制在保障数据一致性的同时,也降低了消息备份的复杂度。
同一个Partition的副本不会存储在同一个Broker上,Kafka会尽量将所有的Partition以及其各个副本均匀地分配在整个集群中,这样既做好了负载均衡,又提高了容错能力。
所有与Leader副本保持一定程度同步的副本(包括Leader副本)组成副本列表 ISR(In-Sync Replicas),实际存储的是副本所在Broker的BrokerId。这里的保持同步并不是Follower副本与Leader副本数据完全一致,只需要在一定时间内保持有效连接即可,这个时间由参数replica.lag.time.max.ms设定,默认值为10s。
Follower会周期性的向Leader发送FetchRequest请求获取要存储的消息,发送的时间间隔由参数replica.fetch.wait.max.ms设定,默认值为500ms。
在消息同步期间,Follower副本相对于Leader副本具有一定程度的滞后,Leader副本负责维护和跟踪ISR集合中所有Follower副本的滞后状态,并将ISR的变更同步至ZooKeeper。当Follower副本落后太多或失效时,Leader副本会将它从ISR集合中剔除,被移除ISR的Follower可以继续发送FetchRequest请求,尝试再次跟上Leader并重新进入ISR。追赶上Leader副本的判定标准是,此Follower副本的LEO不小于Leader副本的HW。
通常只有在ISR集合中的副本才有资格被选举为新的Leader。当Kafka中的unclean.leader.election.enable(是否可以从非ISR集合中选举leader副本,默认值为 false)配置为true,并且ISR中所有副本都宕机的情况下,才允许ISR外的副本(即OSR,上一篇文章有介绍)被选举为Leader,由于OSR副本不保证和Leader副本同步,可能造成数据丢失。
同一个分区,同一个Broker节点中不允许出现多个副本,当分区的Leader节点发生功能故障时,其中一个Follower节点就会成为新的Leader节点。
分区Leader副本的选举由Kafka Controller负责具体实施。当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的Leader副本下线,此时分区需要选举一个新的Leader上线来对外提供服务)的时候都需要执行Leader的选举动作。
选举的基本思路是按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。注意这里是根据AR的顺序而不是ISR的顺序进行选举的。
分区进行重分配(reassign)的时候也需要执行leader的选举动作。同样也是从重分配的AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中。分区重分配指的是,当集群中新增Broker节点时,只有新创建的主题分区才有可能被分配到这个节点上,之前的主题分区是不会自动分配到新节点上的,就会出现新旧节点的负载不均衡,此时需要进行分区重分配,达到分区平衡。
优先副本的选举逻辑相同,直接将优先副本设置为Leader即可,AR集合中的第一个副本即为优先副本。
生产者发送消息中包含acks字段,该字段代表Leader应答生产者前副本的写入数量
acks=0
生产者无需等待服务端任何确认,消息被添加到生产者套接字缓存区后即视为已发送,因此不保证服务端已收到消息
acks=1
Leader将消息写入本地日志后,无需等待Follower的消息确认就做出应答,如果Leader在响应生产者消息已接收之后立即宕机,其他Follower均为完成消息的复制,这批消息就会丢失
acks=-1/all
Leader将等待ISR中所有副本都写入完成,并应答给Leader后,Leader才会响应生产者消息已接收。因此只有ISR中任意一个副本还存活,这些消息就不会丢失
Broker端的配置参数min.insync.replicas表示分区ISR集合中至少要有多少个副本,默认值为1。当ISR中的副本数量小于min.insync.replicas时,Leader停止写入生产者的消息,并向生产者抛出NotEnoughReplicas异常,阻塞等待更多的Follower赶上并重新进入ISR。
被Leader写入的消息都至少有min.insync.replicas个副本,因此容忍min.insync.replicas - 1个副本同时宕机。
为了保证不丢失消息,可以配置生产者acks=all 并且 min.insync.replicas >= 2
上一篇文章介绍了消息的追加过程,提到了日志偏移量,Kafka的每个副本对象有两个重要的偏移量属性:
所有高水位线以下的消息都是备份过的,消费者仅可以消费各个分区Leader高水位线以下的消息,所以Leader的HW值是由ISR中所有备份的LEO最小值决定的
Kafka使用ZooKeeper存储Broker、Topic等状态数据,Kafka集群中的Controller和Broker会在ZooKeeper指定节点上个注册Watcher(时间监听器),以便在特定事件触发时,由ZooKeeper将事件通知到对应的Broker。
由于Broker0和ZooKeeper还在正常连接中,因此ZooKeeper认为Broker0依然存活,则对于两个分区有不同的操作:
Partition0
Broker的副本为Partition0的Leader副本,当Broker0超过replica.lag.time.max.ms(默认值为10s)没有收到Broker1、Broker2的FetchRequest请求后,Broker0选择将Partition0的ISR收缩到仅剩Broker0本身,并将ISR的变更同步到ZooKeeper。Broker0根据min.insync.replicas的配置值决定是否继续接收生产者的消息
Partition1
超过replica.lag.time.max.ms后,Broker1会将Broker0的副本从Partition1的ISR中移除。如果后续Broker0恢复与其他Broker的连接,相应的Follower副本赶上Broker1,还会将其重新加入ISR中
ZooKeeper会认为Broker0已经宕机,会删除Broker0的节点,对于两个分区也有不同的操作:
Partition0
ZooKeeper删除节点后该节点上注册的Watcher会通知控制器(Controller),控制器会发现Broker0是Partition0的Leader,于是从当前存活的ISR中选择Broker2作为Partition0的新Leader。控制器将Leader的变更通知Broker1、Broker2,Broker1改向Broker2发送FetchRequest请求数据。
生产者每隔60s从bootstrap.servers中的Broker获取最新的元数据(metadata),当发现Partition0的Leader节点发生变更后,会改向新的Leader发送消息。
Broker0由于收不到ZooKeeper的通知,依然认为自己是Partition0的Leader,当发现Broker1、Broker2不再向自己发送FetchRequest请求数据,缺失了ISR应答的Broker0停止写入acks=all的消息,但可以写入acks=1的消息。在replica.lag.time.max.ms时间之后,Broker0尝试向ZooKeeper发送ISR变更发现无法正常连接,便不再接收新的生产者的消息。
当Broker0与ZooKeeper恢复连接后,发现自己不再是Partition0的Leader,会向Broker2发送FetchRequest请求数据,并且将自己的本地日志阶段,为了与Leader数据一致。在失联开始与生产者重新向Broker2发送消息这段时间内的消息会丢失。
Partition1
由于Broker0与Broker1依然保持连接,因此Broker0依然会向Broker1发送FetchRequest请求数据。只要Broker0能继续保持同步,Broker1也不会向ZooKeeper变更ISR。
Controller无法通知Broker0,会认为Broker0已经宕机,会执行上述1.3.1.2中的场景
ZooKeeper会将Controller临时节点删除,进行重新选举新的Controller。Kafka中的控制器选举依赖于ZooKeeper,成功竞选为控制器的broker会在ZooKeeper中创建/controller这个临时节点。
选举成功后,控制器会读取ZooKeeper中各个节点的数据来初始化上下文信息(ControllerContext),并且需要管理这些上下文信息。
选举过程如下:
选举触发时机:
Kafka虽然采用的是磁盘存储,却有着高性能、高吞吐的特点,吞吐量可以达到上百万。
Kafka的架构如图所示,Producer生产消息,以Partition为维度,按照一定的路由策略,提交消息到Broker集群中各个Partition的Leader节点。Consumer同样以Partition为维度,从Broker中的Leader节点拉取并消费消息。
Producer生产消息后发送给Broker会涉及大量的消息网络传输,所以Kafka采用了批量发送的方式。Broker在持久化消息、读取消息的时候,如果采用传统IO读写方式,会严重影响性能,所以Kafka采用了顺序写+零拷贝的方式。
生产者架构如图所示,消息会被追加到消息累加器(RecordAccumulator)的各个Partition的双端队列中的批记录(ProducerBatch),然后Sender线程从队列中读取消息并发送数据到Broker中
发送到Broker需满足消息大小达到阈值(由batch.size参数来指定,默认值为16KB),或者消息等待发送时间达到阈值(由linger.ms参数来指定,默认值为0,即有消息立刻发送)
Sender线程发送具体过程如下:
Kafka消息是存储在磁盘上的,以日志(Log)的形式存储,每个Partition的的每个副本都有日志,为了防止日志过大,引入了日志分段(LogSegment),将Log切分成多个LogSegment,便于维护和清理。
每个LogSegment又有日志文件(.log)、偏移量索引文件(.index)、时间戳索引文件(.timeindex)、其他文件
索引文件是以稀疏索引(sparse index)的方式构造消息的索引,每当写入一定量(由参数log.index.interval.bytes指定,默认值为4KB),偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。稀疏索引通过MappedByteBuffer将索引文件映射到内存中,加快索引查询速度。
日志分段的切分条件,满足任意一个即可:
偏移量索引以Key-Value的形式存储,Key为相对偏移量(relativeOffset),表示消息相对于基准偏移量(baseOffset)的偏移量,占用4个字节,当前索引文件的名称即为baseOffset的值,使用相对偏移量是为了减小索引文件占用的时间。Value表示为物理地址(position),占用4个字节,也就是消息在日志分段文件中对应的物理位置。
时间戳索引以Key-Value的形式存储,Key为时间戳(timestamp),占用8个字节,表示当前日志分段的最大时间戳。Value为相对偏移量(relativeOffset),占用4个字节,表示时间戳所对应消息的相对偏移量。
每个要追加的时间戳索引项的timestamp必须大于之前追加的索引项的timestamp,都在不予追加。
Kafka提供了两种日志清理策略:
日志删除,默认策略,删除依据有:
日志文件的保留时间是否超过阈值,默认值为7天
日志文件的大小是否超过设定的阈值,默认值为无穷大
日志压缩
日志压缩类似于Redis的RDB持久化,用的较少,暂不详述
Kafka采用顺序写入磁盘的方式提高吞吐量,通过文件追加的方式写入消息,即只能在日志文件的尾部追加新的消息,并且不允许修改已写入的消息。
当需要从磁盘读取数据时,系统将数据逻辑地址传给磁盘,磁盘的控制电路将逻辑地址翻译成物理地址,即确定要读取数据的磁道和扇区,流程如下
随机IO
如果数据是随机分散在磁盘的不同盘片的不同扇区中,就需要不断的取寻找磁道和磁盘旋转,最后找到相应的扇区,读取数据,读取速度较慢。
顺序IO
数据在同一片扇区或者相邻的扇区,找到第一块数据之后,无需重新寻址,依次获取数据。减少了大量的磁盘寻址过程,提高了查询效率。
写操作
用户调用fwrite把数据写入C库标准IObuffer后返回,即写操作通常是异步操作。写入IObuffer后,不会立即刷新到磁盘,会将多次小数据量相邻写操作先缓存起来合并,最终调用write函数一次性写入(或者将大块数据分解多次write调用)页缓存。数据到达页缓存后也不会立即刷新到磁盘,内核有pdflush线程在不停地检测脏页,判断是否写入磁盘,需要写入的则发起磁盘I/O请求。
读操作
读操作是同步的,用户调用fread到C库标准IObuffer中读取数据,如果成功则返回。否则继续到页缓存中读取数据,如果成功则返回。否则继续发起I/O请求,从磁盘中读取,读取到数据后将数据缓存到页缓存和C库标准IObuffer并返回。
磁盘I/O请求
通用块层根据I/O请求构造一个或多个bio结构并提交给调度层。调度器将bio结构进行排序、合并(将一个或多个进程的读操作合并,将一个或多个进程的写操作合并)到请求队列中,然后调度器再将这些请求提交给驱动,由驱动开始从磁盘读取。
页缓存(pagecache)是操作系统实现的一种主要的磁盘缓存,以此来减少对磁盘I/O的操作,把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页(page)是否在页缓存中,如果存在(命中)则直接返回数据,从而避免对物理磁盘的I/O操作。如果没有命中,则从磁盘中读取并将数据页存入页缓存,之后再将数据返回给进程。
Kafka中大量使用了页缓存,先将消息写入页缓存,然后由操作系统负责具体的刷盘任务。
零拷贝是指将数据直接数据从磁盘文件复制到网卡设备中,而不需要经过应用程序。零拷贝大大提高了应用程序的性能,减少了内核态和用户态之间的上下文切换。
以将磁盘文件传送给用户举例,先将文件复制出来放到内存buf中,然后再将这个buf通过套接字(Socket)传输给用户,如图所示,文件经历了4次复制过程:
内核态和用户态的上下文切换也发生了4次
零拷贝技术依赖于Linux底层的sendfile()方法实现,如图所示,数据通过DMA拷贝到内核态Reader Buffer后,直接通过DMA拷贝到网卡设备(NIC Buffer),无需CPU拷贝,这也是零拷贝叫法的来源。
这里数据只经历2次复制就从磁盘中传送出去了,并且上下文切换也变成了2次。
Kafka的数据传输通过TransportLayer来完成,最终通过Java NIO的transferTo()和transferFrom()方法实现零拷贝,但是不能保证一定能使用零拷贝,还需要看操作系统是否提供了sendfile这样的零拷贝调用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。