赞
踩
Kafka 目前总共演进了 7 个大版本,分别是 0.7、0.8、0.9、0.10、0.11、1.0 和 2.0,其中的小版本和 Patch 版本很多。
Kafka 有主题(Topic)的概念,它是承载真实数据的逻辑容器,而在主题之下还分为若干个分区,也就是说 Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
其实分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。
值得注意的是,不同的分布式系统对分区的叫法也不尽相同。比如在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode。从表面看起来它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变。
目前 Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。
不论是哪个版本,Kafka 的消息层次都分为两层:消息集合(message set)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。
在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
有两种例外情况就可能让 Broker 重新压缩消息。
除了在 Consumer 端解压缩,Broker 端也会进行解压缩。每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证。
在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。
对于 Kafka 而言,在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。
Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
Producer 的参数:
Broker 端的参数:
确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。
Kafka 拦截器分为生产者拦截器和消费者拦截器。
Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。
Apache Kafka 的所有通信都是基于 TCP 的,无论是生产者、消费者,还是 Broker 之间的通信都是如此。
Producer producer = new KafkaProducer(props);
在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。即 TCP 连接是在创建 KafkaProducer 实例时建立的。
TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。同样地,当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。
Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭。
在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。
在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture)。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。
但幂等性 Producer 只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。
Kafka 自 0.11 版本开始也提供了对事务的支持。
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。
Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。
老版本的 Consumer Group 把位移保存在 ZooKeeper 中。新版本的 Consumer Group 将位移保存在 Broker 端的内部主题中。
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。
举个简单的例子,假设目前某个 Consumer Group 下有两个 Consumer,比如 A 和 B,当第三个成员 C 加入时,Kafka 会触发 Rebalance,并根据默认的分配策略重新为 A、B 和 C 分配分区,如下图所示:
但在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
__consumer_offsets
在 Kafka 源码中有个更为正式的名字,叫位移主题,即 Offsets Topic。
位移主题的 Key 中应该保存 3 部分内容:<Group ID,主题名,分区号 >
当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3。
Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。
将很多元数据以消息的方式存入 Kafka 内部主题的做法越来越流行。除了 Consumer 位移管理,Kafka 事务也是利用了这个方法,当然那是另外的一个内部主题了。
所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。
如果 Consumer Group 下的 Consumer 实例数量发生变化,就一定会引发 Rebalance。这是 Rebalance 发生的最常见的原因。我碰到的 99% 的 Rebalance,都是这个原因导致的。
哪些 Rebalance 是“不必要的”:
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
提交位移主要是为了表征 Consumer 的消费进度。
所谓 CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。
场景一:当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常。
防止这种场景下抛出异常:
场景二:应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员。
早期 KafkaConsumer 是单线程的设计,从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。
引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。
KafkaConsumer 类不是线程安全的 (thread-safe)。简单来说,就是你不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。
和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的,TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。
Kafka 有主题概念的,每个主题又进一步划分成若干个分区,每个分区配置有若干个副本。Kafka 副本机制,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。
基于领导者(Leader-based)的副本机制
In-sync Replicas(ISR)
如果一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
Kafka 使用 Reactor 模式处理请求。
Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher。
Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中。
当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。
IO 线程池处中的线程才是执行请求逻辑的线程。目前默认值是 8,表示每台 Broker 启动后自动创建 8 个 IO 线程处理请求。
请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。
Purgatory 的组件是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求。
Kafka 社区把 PRODUCE 和 FETCH 这类请求称为数据类请求,把 LeaderAndIsr、StopReplica 这类请求称为控制类请求。Kafka Broker 启动后,会在后台分别两套创建网络线程池和 IO 线程池,它们分别处理数据类请求和控制类请求。
消费者组的重平衡是让组内所有的消费者实例就消费哪些主题分区达成一致。重平衡需要借助 Kafka Broker 端的 Coordinator 组件。
重平衡的 3 个触发条件:
重平衡过程是靠消费者端的心跳线程(Heartbeat Thread)来通知到其他消费者实例的。为此,Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个重平衡流程。
重平衡的完整流程需要消费者端和协调者组件共同参与才能完成。
在消费者端,重平衡分为两个步骤:加入组(JoinGroup 请求)和等待领导者消费者分配方案(SyncGroup 请求)。通常情况下,第一个发送 JoinGroup 请求的成员自动成为领导者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
JoinGroup 请求的处理过程:
JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。
SyncGroup 请求的处理流程:
SyncGroup 请求的主要目的是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。
场景一:新成员入组
当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。
场景二:组成员主动离组
场景三:组成员崩溃离组
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。
场景四:重平衡时协调者对组内成员提交位移的处理
控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。换句话说,每个正常运转的 Kafka 集群,在任意时刻都有且只有一个控制器。
Kafka 控制器重度依赖 ZooKeeper,且大量使用 Watch 功能实现对集群的协调管理。
实际上,Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点,第一个成功创建 /controller 节点的 Broker 会被指定为控制器。
控制器故障转移(Failover)
故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器。这个过程就被称为 Failover,该过程是自动完成的,无需你手动干预。
控制器内部设计原理
在 Kafka 0.11 版本之前,控制器是多线程的设计,会在内部创建很多个线程,而这些线程还会访问共享的控制器缓存数据,为了保护数据安全性,控制器不得不在代码中大量使用 ReentrantLock 同步机制。
社区于 0.11 版本重构了控制器的底层设计,最大的改进就是,把多线程的方案改成了单线程加事件队列的方案。
Kafka 的水位是用来表征消息位移的。
在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息,即图中位移小于 8 的所有消息。
Log End Offset 表示副本写入下一条消息的位移值。注意,数字 15 所在的方框是虚线,这就说明,这个副本当前只有 15 条消息,位移值是从 0 到 14,下一条新消息的位移是 15。显然,介于高水位和 LEO 之间的消息就属于未提交消息。
Kafka 所有副本都有对应的高水位和 LEO 值,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。
高水位更新机制
在 Leader 副本所在的 Broker 上,还保存了其他 Follower 副本的 LEO 值。
Kafka 把 Broker 0 上保存的这些 Follower 副本称为远程副本(Remote Replica)。Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值。
副本同步机制解析
当生产者给主题分区发送一条消息后,Leader 副本成功将消息写入了本地磁盘,故 LEO 值被更新为 1。Follower 从 Leader 拉取消息,Follower 副本也成功地更新 LEO 为 1。此时,Leader 和 Follower 副本的 LEO 副本的 LEO 都是 1,但各自的高水位依然是 0,它们在下一轮的拉取中被更新。
Kafka 就是利用这样的机制,实现了 Leader 和 Follower 副本之间的同步。
Leader Epoch
Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。基于此,社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。
假设 Leader 和 Follower 都写入了这两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位还未更新。若此时副本 B 所在的 Broker 宕机,当它重启回来后,副本 B 会执行日志截断操作,将 LEO 值调整为之前的高水位值,也就是 1。
当执行完截断操作后,副本 B 开始从 A 拉取消息,执行正常的消息同步。如果就在这个节骨眼上,副本 A 所在的 Broker 宕机了,那么 Kafka 让副本 B 成为新的 Leader,此时,当 A 回来后,需要执行相同的日志截断操作,即将高水位调整为与 B 相同的值,也就是 1。这样操作之后,位移值为 1 的那条消息就从这两个副本中被永远地抹掉了。这就是下图展示的数据丢失场景。
而通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景。
所谓授权,一般是指对与信息安全或计算机安全相关的资源授予访问权限,特别是存取控制。
具体到权限模型,常见的有四种。
Kafka 没有使用 RBAC 模型,它用的是 ACL 模型。
当前在 Kafka 中,配置授权的方法是通过 kafka-acls 脚本。
对 Kafka 而言,性能一般是指吞吐量和延时。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。