赞
踩
在探究 Kafka 核心知识之前,我们先思考一个问题:什么场景会促使我们使用Kafka? 说到这里,我们头脑中或多或少会蹦出异步解耦和削峰填谷等字样,是的,这就是 Kafka 最重要的落地场景。
异步解耦:同步调用转换成异步消息通知,实现生产者和消费者的解耦。想象一个场景,在商品交易时,在订单创建完成之后,需要触发一系列其他的操作,比如进行用户订单数据的统计、给用户发送短信、给用户发送邮件等等。如果所有操作都采用同步方式实现,将严重影响系统性能。针对此场景,我们可以利用消息中间件解耦订单创建操作和其他后续行为。
削峰填谷:利用 Broker 缓冲上游生产者瞬时突发的流量,使消费者消费流量整体平滑。对于发送能力很强的上游系统,如果没有消息中间件的保护,下游系统可能会直接被压垮导致全链路服务雪崩。想象秒杀业务场景,上游业务发起下单请求,下游业务执行秒杀业务(库存检查,库存冻结,余额冻结,生成订单等等),下游业务处理的逻辑是相当复杂的,并发能力有限,如果上游服务不做限流策略,瞬时可能把下游服务压垮。针对此场景,我们可以利用 MQ 来做削峰填谷,让高峰流量填充低谷空闲资源,达到系统资源的合理利用。
通过上述例子可以发现交易、支付等场景常需要异步解耦和削峰填谷功能解决问题,而交易、支付等场景对性能、可靠性要求特别高。那么,我们本文的主角Kafka能否满足相应要求呢?下面我们来探讨下。
在探究 Kafka 的高性能、高可靠性之前,我们从宏观上来看下 Kafka 的系统架构
如上图所示,Kafka 由 Producer、Broker、Consumer 以及负责集群管理的 ZooKeeper 组成,各部分功能如下:
上图消息流转过程中,还有几个特别重要的概念—主题(Topic)、分区(Partition)、分段(Segment)、位移(Offset)。
在对 Kafka 的整体系统框架及相关概念简单了解后,下面我们来进一步深入探讨下高可靠性、高性能实现原理。
Kafka 高可靠性的核心是保证消息在传递过程中不丢失,涉及如下核心环节:
为了保障消息从生产者可靠地发送至 Broker,我们需要确保两点
针对问题1,Kafka 为我们提供了三种 Ack 策略,
为了实现强可靠的 Kafka 系统,我们需要设置Request.required.acks= -1,同时还会设置集群中处于正常同步状态的副本 Follower 数量 min.insync.replicas>2,另外,设置unclean.leader.election.enable=false 使得集群中 ISR 的 Follower 才可变成新的 Leader,避免特殊情况下消息截断的出现。
针对问题2,Kafka 提供两类消息发送方式:同步(Sync)发送和异步(Async)发送,相关参数如下:
参数 | 含义 | 默认值 | 推荐值 |
---|---|---|---|
async | 是否进行异步生产,可选值: 0:同步生产,默认值 1:异步生产 | 0 | 一般情况下使用0,如果使用异步生产,需要通过channel捕捉消息生产失败的情况,并进行异步修复处理,逻辑会相对复杂。 对于延时比较敏感,避免生产消息导致耗时过高的场景可以考虑异步。 |
以 Sarama 实现为例,在消息发送的过程中,无论是同步发送还是异步发送都会涉及到两个协程–负责消息发送的主协程和负责消息分发的 Dispatcher 协程。
对于异步发送(Ack != 0 场景,等于0时不关心写 Kafka 结果,后文详细讲解)而言,其流程大概如下
同步发送(Ack != 0 场景)是在异步发送的基础上加以条件限制实现的。同步消息发送在newSyncProducerFromAsyncProducer 中开启两个异步协程处理消息成功与失败的“回调”,并使用 WaitGroup 进行等待,从而将异步操作转变为同步操作。其流程大概如下
通过上述分析可以发现,Kafka 消息发送本质上都是异步的,不过同步发送通过 WaitGroup 将异步操作转变为同步操作。同步发送在一定程度上确保了我们在跨网络向 Broker 传输消息时,消息一定可以可靠地传输到 Broker。因为在同步发送场景我们可以明确感知消息是否发送至 Broker,若因网络抖动、机器宕机等故障导致消息发送失败或结果不明,可通过重试等手段确保消息至少一次(at least once) 发送到Broker。另外,Kafka(0.11.0.0版本后)还为 Producer 提供两种机制来实现精确一次(exactly once) 消息发送:幂等性(Idempotence)和事务(Transaction)。
类别 | 开启 | 特征 | 实现原理 | 注意事项 |
---|---|---|---|---|
幂等性 Producer | enable.idempotence=true | 1. 单分区幂等性:只能保证单分区上幂等性,无法实现多个分区的幂等。 2. 单会话幂等性:只能实现单会话上的冥等性,当 Producer 重启后,这种幂等性保证就失效了。 | 为每个 Producer 设置唯一的 PID,Broker 发送消息时会带上 PID 并为每个消息生产一个 Seq number,Broker 端根据 PID 和 Seq number 去重 | Broker 端不会重复写入同一 PID 的 Producer 发送的相同的消息,底层日志中只会持久化一次。 |
事务型 Producer | enable.idempotence=true,生产者设置 Transcational.id;若要实现 consumer-transform-producer,还需设置 Consumer 端参数 Isolation.level=read_committed | 1. 跨分区事务:能够保证将消息原子性地写入到多个分区中。2. 跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(Commit 或者 Abort)。 | 1. Kafka 引入了一个协调者组件 TransactionCoordinator 来管理 Transaction。Producer 在开始一个事务时,告诉协调者事务开始,然后开始向多个 Topic-Partition 写数据,只有这批数据全部写完(中间没有出现异常),Producer 会调用 Commit 接口进行 Commit,然后事务真正提交,否则如果中间出现异常,那么事务将会被 Abort(Producer 通过 Abort 接口告诉协调者执行 Abort 操作)。2. 引入一个全局唯一的 Transaction ID,并将 Producer 获得的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就可以通过正在进行的Transaction ID 获得原来的 PID。3. TransactionCoordinator 还负责将事务状态写入 Kafka 的一个内部 Topic: __transaction_state 中,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。 | 1. 开启事务对性能影响很大,在使用时要充分考虑。2. 当具有相同 Transaction ID 的新的 Producer 实例被创建且工作时,旧的且拥有相同 Transaction ID 的 Producer 将不再工作。 |
通过 Ack 策略配置、同步发送、事务消息组合能力,我们可以实现 exactly once 语意跨网络向 Broker 传输消息。但是,Producer 收到 Broker 的成功 Ack,消息一定不会丢失吗?为了搞清这个问题,我们首先要搞明白 Broker 在接收到消息后做了哪些处理。
为了确保 Producer 收到 Broker 的成功 Ack 后,消息一定不在 Broker 环节丢失,我们核心要关注以下几点:
Kafka 为了获得更高吞吐,Broker 接收到消息后只是将数据写入 PageCache 后便认为消息已写入成功,而 PageCache 中的数据通过 Linux 的 Flusher 程序进行异步刷盘(刷盘触发条:主动调用 Sync 或 Fsync 函数、可用内存低于阀值、Dirty Data 时间达到阀值),将数据顺序写到磁盘。消息处理示意图如下:
由于消息是写入到 PageCache,单机场景,如果还没刷盘 Broker 就宕机了,那么 Producer 产生的这部分数据就可能丢失。为了解决单机故障可能带来的数据丢失问题,Kafka 为分区引入了副本机制。
Kafka 每组分区通常有多个副本,同组分区的不同副本分布在不同的 Broker 上,保存相同的消息(可能有滞后)。副本之间是“一主多从”的关系,其中 Leader 副本负责处理读写请求,Follower 副本负责从 Leader 拉取消息进行同步。分区的所有副本统称为 AR(Assigned Replicas),其中所有与 Leader 副本保持一定同步的副本(包括 Leader 副本在内)组成 ISR(In-Sync Replicas),与 Leader 同步滞后过多的副本组成 OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。Follower 副本是否与 Leader 同步的判断标准取决于 Broker 端参数 replica.lag.time.max.ms(默认为10秒),Follower 默认每隔500ms向 Leader Fetch 一次数据,只要一个 Follower 副本落后 Leader 副本的时间不连续超过10秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的。在正常情况下,所有的 Follower 副本都应该与 Leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合为空。
当 Leader 副本所在 Broker 宕机时,Kafka 会借助ZK从 Follower 副本中选举新的 Leader 继续对外提供服务,实现故障的自动转移,保证服务可用。为了使选举的新 Leader 和旧 Leader 数据尽可能一致,当 Leader 副本发生故障时,默认情况下只有在 ISR 集合中的副本才有资格被选举为新的 Leader,而在 OSR 集合中的副本则没有任何机会(可通过设置 unclean.leader.election.enable 改变)。
当 Kafka 通过多副本机制解决单机故障问题时,同时也带来了多副本间数据同步一致性问题。Kafka 通过高水位更新机制、副本同步机制、 Leader Epoch 等多种措施解决了多副本间数据同步一致性问题,下面我们来依次看下这几大措施。
首先,我们来看下两个和 Kafka 中日志相关的重要概念 HW 和 LEO:
如上图所示,它代表一个日志文件,这个日志文件中有8条消息,0至5之间的消息为已提交消息,5至7的消息为未提交消息。日志文件的 HW 为6,表示消费者只能拉取到5之前的消息,而 Offset 为5的消息对消费者而言是不可见的。日志文件的 LEO 为8,下一条消息将在此处写入。
注意:所有副本都有对应的 HW 和 LEO,只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。Leader 副本和 Follower 副本的HW有如下特点:
注意:为方便描述,下面 Leader HW 简记为 HW L ,Follower HW 简记为 HW F ,Leader LEO 简记为 LEOL ,Follower LEO 简记为 LEO F。
下面我们演示一次完整的 HW / LEO 更新流程:
HW L=0,LEO L=0,HW F=0,LEO F=0。
上述更新流程中 Follower 和 Leader 的 HW 更新有时间 GAP。如果 Leader 节点在此期间发生故障,则 Follower 的 HW 和 Leader 的 HW 可能会处于不一致状态,如果 Follower 被选为新的 Leader 并且以自己的 HW 为准对外提供服务,则可能带来数据丢失或数据错乱问题。
数据丢失
第1步:
第2步:
此时,如果没有异常,A 会收到 B 的回复,得知目前的 HW 为2,然后更新自身的 HW 为2。但在此时 A 重启了,没有来得及收到 B 的回复,此时 B 仍然是 Leader。A 重启之后会以 HW 为标准截断自己的日志,因为 A 作为 Follower 不知道多出的日志是否是被提交过的,防止数据不一致从而截断多余的数据并尝试从 Leader 那里重新同步
第3步:
B 崩溃了,min.isr 设置的是1,所以 Zookeeper 会从 ISR 中再选择一个作为 Leader,也就是 A,但是 A 的数据不是完整的,从而出现了数据丢失现象。
问题在哪里?在于 A 重启之后以 HW 为标准截断了多余的日志。不截断行不行?不行,因为这个日志可能没被提交过(也就是没有被 ISR 中的所有节点写入过),如果保留会导致日志错乱。
数据错乱
在分析日志错乱的问题之前,我们需要了解到 Kafka 的副本可靠性保证有一个前提:在 ISR 中至少有一个节点。如果节点均宕机的情况下,是不保证可靠性的,在这种情况会出现数据丢失,数据丢失是可接受的。这里我们分析的问题比数据丢失更加槽糕,会引发日志错乱甚至导致整个系统异常,而这是不可接受的。
第1步:
第2步:
由于 A 和 B 均宕机,而 min.isr=1 并且 unclean.leader.election.enable=true(关闭 Unclean 选择策略),所以 Kafka 会等到第一个 ISR 中的节点恢复并选为 Leader,这里不幸的是 B 被选为 Leader,而且还接收到 Producer 发来的新消息 m3。注意,这里丢失 m2 消息是可接受的,毕竟所有节点都宕机了。
第3步:
A 恢复重启后发现自己是 Follower,而且 HW 为2,并没有多余的数据需要截断,所以开始和 B 进行新一轮的同步。但此时 A 和 B 均没有意识到,Offset 为1的消息不一致了。
问题在哪里?在于日志的写入是异步的,上面也提到 Kafka 的副本策略的一个设计是消息的持久化是异步的,这就会导致在场景二的情况下被选出的 Leader 不一定包含所有数据,从而引发日志错乱的问题。
为了解决上述缺陷,Kafka 引入了 Leader Epoch 的概念。Leader Epoch 和 Raft 中的任期号的概念很类似,每次重新选择 Leader 的时候,用一个严格单调递增的 ID 来标志,可以让所有 Follower 意识到 Leader 的变化。而 Follower 也不再以 HW 为准,每次奔溃重启后都需要去 Leader 那边确认下当前 Leader 的日志是从哪个 Offset 开始的。下面看下 Leader Epoch 是如何解决上面两个问题的。
数据丢失解决
这里的关键点在于副本 A 重启后作为 Follower,不是忙着以 HW 为准截断自己的日志,而是先发起 LeaderEpochRequest 询问副本 B 第0代的最新的偏移量是多少,副本 B 会返回自己的 LEO 为2给副本 A,A 此时就知道消息 m2 不能被截断,所以 m2 得到了保留。当 A 选为 Leader 的时候就保留了所有已提交的日志,日志丢失的问题得到解决。
如果发起 LeaderEpochRequest 的时候就已经挂了怎么办?这种场景下,不会出现日志丢失,因为副本 A 被选为 Leader 后不会截断自己的日志,日志截断只会发生在 Follower 身上。
数据错乱解决
这里的关键点还是在第3步,副本 A 重启作为 Follower 的第一步还是需要发起 LeaderEpochRequest 询问 Leader 当前第0代最新的偏移量是多少,由于副本 B 已经经过换代,所以会返回给 A 第1代的起始偏移(也就是1),A 发现冲突后会截断自己偏移量为1的日志,并重新开始和 Leader 同步。副本 A 和副本 B 的日志达到了一致,解决了日志错乱。
Broker 接收到消息后只是将数据写入 PageCache 后便认为消息已写入成功,但是,通过副本机制并结合 ACK 策略可以大概率规避单机宕机带来的数据丢失问题,并通过 HW、副本同步机制、 Leader Epoch 等多种措施解决了多副本间数据同步一致性问题,最终实现了 Broker 数据的可靠持久化。
Consumer 在消费消息的过程中需要向 Kafka 汇报自己的位移数据,只有当 Consumer 向 Kafka 汇报了消息位移,该条消息才会被 Broker 认为已经被消费。因此,Consumer 端消息的可靠性主要和 Offset 提交方式有关,Kafka消费端提供了两种消息提交方式:
提交方式 | 参数设置 | 功能特点 | 注意事项 |
---|---|---|---|
自动提交 | enable.auto.commit = true,auto.commit.interval.ms(提交间隔) | Kafka Consumer 在后台默默地为你提交位移,开发者无需在代码中显示提交 | Consumer 收到消息就返回正确给 Brocker,如果业务逻辑没有走完中断了,即消息没有消费成功,则相关消息可能丢失。这种场景适用于可靠性要求不高的业务。 |
手动提交 | enable.auto.commit = false | 开发者需要在代码中自己提交位移,Kafka Consumer 压根不管 | 该模式下,业务开发者需要在消息被处理完成后进行手动提交。如果消息处理完成后还没来得及提价位移,系统发生重启,则之前消费到未提交的消息会重新消费到,即消息会投递多次。因此,消费侧需要做幂等保障。 |
正常情况下我们很难实现 exactly once 语意的消息,通常是通过手动提交+幂等实现消息的可靠消费。
Kafka 高性能的核心是保障系统低延迟、高吞吐地处理消息,为此,Kafaka 采用了许多精妙的设计:
如上文所述,Kafka 提供了异步和同步两种消息发送方式。在异步发送中,整个流程都是异步的。调用异步发送方法后,消息会被写入 Channel,然后立即返回成功。Dispatcher 协程会从 Channel 轮询消息,将其发送到 Broker,同时会有另一个异步协程负责处理 Broker 返回的结果。同步发送本质上也是异步的,但是在处理结果时,同步发送通过 WaitGroup 将异步操作转换为同步。使用异步发送可以最大化提高消息发送的吞吐能力。
Kafka 支持批量发送消息,将多个消息打包成一个批次进行发送,从而减少网络传输的开销,提高网络传输的效率和吞吐量。
Kafka 的批量发送消息是通过以下两个参数来控制的:
在 Kafka 的生产者客户端中,当发送消息时,如果启用了批量发送,Kafka 会将消息缓存到缓冲区中。当缓冲区中的消息大小达到 Batch.size 或者等待时间到达 Linger.ms 时,Kafka 会将缓冲区中的消息打包成一个批次进行发送。如果在等待时间内没有达到 Batch.size,Kafka 也会将缓冲区中的消息发送出去,从而避免消息积压。
Kafka 支持压缩技术,通过将消息进行压缩后再进行传输,从而减少网络传输的开销(压缩和解压缩的过程会消耗一定的 CPU 资源,因此需要根据实际情况进行调整。),提高网络传输的效率和吞吐量。Kafka 支持多种压缩算法,在 Kafka2.1.0 版本之前,仅支持 GZIP,Snappy 和 LZ4,2.1.0 后还支持 Zstandard 算法(Facebook 开源,能够提供超高压缩比)。这些压缩算法性能对比(两指标都是越高越好)如下:
在 Kafka 中,压缩技术是通过以下两个参数来控制的:
在 Kafka 的生产者客户端中,当发送消息时,如果启用了压缩技术,Kafka 会将消息进行压缩后再进行传输。在消费者客户端中,如果消息进行了压缩,Kafka 会在消费消息时将其解压缩。注意:Broker 如果设置了和生产者不通的压缩算法,接收消息后会解压后重新压缩保存。Broker 如果存在消息版本兼容也会触发解压后再压缩。
Kafka 为了提升系统吞吐、降低时延,Broker 接收到消息后只是将数据写入 PageCache 后便认为消息已写入成功,而 PageCache 中的数据通过 Linux 的 Flusher 程序进行异步刷盘(避免了同步刷盘的巨大系统开销),将数据顺序追加写到磁盘日志文件中。由于 Pagecache 是在内存中进行缓存,因此读写速度非常快,可以大大提高读写效率。顺序追加写充分利用顺序 I/O 写操作,避免了缓慢的随机 I/O 操作,可有效提升 Kafka 吞吐。
如上图所示,消息被顺序追加到每个分区日志文件的尾部。
Kafka 中存在大量的网络数据持久化到磁盘(Producer 到 Broker)和磁盘文件通过网络发送(Broker 到 Consumer)的过程,这一过程的性能直接影响 Kafka 的整体吞吐量。传统的 IO 操作存在多次数据拷贝和上下文切换,性能比较低。Kafka 利用零拷贝技术提升上述过程性能,其中网络数据持久化磁盘主要用 mmap 技术,网络数据传输环节主要使用 Sendfile 技术。
传统模式下,数据从网络传输到文件需要 4 次数据拷贝、4 次上下文切换和两次系统调用。如下图所示
为了减少上下文切换以及数据拷贝带来的性能开销,Broker 在对 Producer 传来的网络数据进行持久化时使用了 mmap 技术。通过这种技术手段,Broker 读取到 Socket Buffer 的网络数据,可以直接在内核空间完成落盘,没有必要将 Socket Buffer 的网络数据读取到应用进程缓冲区。
传统方式实现:先读取磁盘、再用 Socket 发送,实际也是进过四次 Copy。如下图所示:
为了减少上下文切换以及数据拷贝带来的性能开销,Kafka 在 Consumer 从 Broker 读数据过程中使用了 Sendfile 技术。具体在这里采用的方案是通过 NIO 的 transferTo/transferFrom
调用操作系统的 Sendfile 实现零拷贝。总共发生 2 次内核数据拷贝、2 次上下文切换和一次系统调用,消除了 CPU 数据拷贝,如下:
为了方便对日志进行检索和过期清理,Kafka 日志文件除了有用于存储日志的.log文件,还有一个位移索引文件.index 和一个时间戳索引文件.timeindex 文件,并且三文件的名字完全相同,如下
Kafka 的索引文件是按照稀疏索引的思想进行设计的。稀疏索引的核心是不会为每个记录都保存索引,而是写入一定的记录之后才会增加一个索引值,具体这个间隔有多大则通过 log.index.interval.bytes 参数进行控制,默认大小为 4 KB,意味着 Kafka 至少写入 4KB 消息数据之后,才会在索引文件中增加一个索引项。可见,单条消息大小会影响 Kakfa 索引的插入频率,因此 log.index.interval.bytes 也是 Kafka 调优一个重要参数值。由于索引文件也是按照消息的顺序性进行增加索引项的,因此 Kafka 可以利用二分查找算法来搜索目标索引项,把时间复杂度降到了 O(lgN),大大减少了查找的时间。
位移索引文件.index
位移索引文件的索引项结构如下:
相对位移:保存于索引文件名字上面的起始位移的差值,假设一个索引文件为:00000000000000000100.index,那么起始位移值即 100,当存储位移为 150 的消息索引时,在索引文件中的相对位移则为 150 - 100 = 50,这么做的好处是使用 4 字节保存位移即可,可以节省非常多的磁盘空间。
文件物理位置:消息在 log 文件中保存的位置,也就是说 Kafka 可根据消息位移,通过位移索引文件快速找到消息在 Log 文件中的物理位置,有了该物理位置的值,我们就可以快速地从 log 文件中找到对应的消息了。
下面我用图来表示 Kafka 是如何快速检索消息:
假设 Kafka 需要找出位移为 3550 的消息,那么 Kafka 首先会使用二分查找算法找到小于 3550 的最大索引项:[3528, 2310272],得到索引项之后,Kafka 会根据该索引项的文件物理位置在 Log 文件中从位置 2310272 开始顺序查找,直至找到位移为 3550 的消息记录为止。
时间戳索引文件.timeindex
Kafka 在 0.10.0.0 以后的版本当中,消息中增加了时间戳信息,为了满足用户需要根据时间戳查询消息记录,Kafka 增加了时间戳索引文件,时间戳索引文件的索引项结构如下:
时间戳索引文件的检索与位移索引文件类似,如下快速检索消息示意图:
Kafka 集群包含多个 Broker。一个 Topic 下通常有多个 Partition,Partition 分布在不同的 Broker 上,用于存储 Topic 的消息,这使 Kafka 可以在多台机器上处理、存储消息,给 Kafka 提供给了并行的消息处理能力和横向扩容能力。
多 Reactor 多线程网络模型 是一种高效的网络通信模型,可以充分利用多核 CPU 的性能,提高系统的吞吐量和响应速度。Kafka 为了提升系统的吞吐,在 Broker 端处理消息时采用了该模型,示意如下:
SocketServer 和 KafkaRequestHandlerPool 是其中最重要的两个组件:
整个服务端处理请求的流程大致分为以下几个步骤:
Kafka 生产端的负载均衡主要指如何将消息发送到合适的分区。Kafka 生产者生产消息时,根据分区器将消息投递到指定的分区中,所以 Kafka 的负载均衡很大程度上依赖于分区器。Kafka 默认的分区器是 Kafka 提供的 DefaultPartitioner。它的分区策略是根据 Key 值进行分区分配的:
在 Kafka 中,每个分区(Partition)只能由一个消费者组中的一个消费者消费。当消费者组中有多个消费者时,Kafka 会自动进行负载均衡,将分区均匀地分配给每个消费者。在 Kafka 中,消费者负载均衡算法可以通过设置消费者组的 Partition.assignment.strategy 参数来选择。目前主流的分区分配策略以下几种:
Kafka 借助 ZooKeeper 进行集群管理。Kafka 中很多信息都在ZK中维护,如 Broker 集群信息、Consumer 集群信息、 Topic 相关信息、 Partition 信息等。Kafka 的很多功能也是基于 ZK 实现的,如 Partition 选主、Broker 集群管理、Consumer 负载均衡等,限于篇幅本文将不展开陈述,这里先附一张网上截图大家感受下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。