赞
踩
Kafka由LinkedIn采用Scala语言开发的一个多分区、多副本且基于ZooKeeper协调的分布式消息系统。目前Kafka定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
Kafka扮演三大角色:
Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个ZooKeeper集群,如图所示。ZooKeeper是Kafka用来负责集群元数据的管理、控制器的选举等操作的。Producer将消息发送到Broker,Broker负责将收到的消息存储到磁盘中,而Consumer负责从Broker订阅并消费消息。
Kafka的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题,消费者负责订阅主题并进行消费。
主题是一个逻辑上的概念,一个主题可以划为多个分区,一个分区只属于单个主题。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个偏移量(offset)。offset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。
Kafka中的分区可以分布在不同的服务器(broker)上,即一个主题可以横跨多个broker,以此来提供比单个broker更强大的性能。
每一条消息会根据分区规则选择存储到哪个具体的分区。如果一个主题只对应一个文件,那么这个文件所在的机器 I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。
Kafka 为分区引入多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。Kafka通过多副本机制实现了故障的自动转移,当Kafka集群中某个broker失效时仍然能保证服务可用。
Kafka 消费端也具备一定的容灾能力。Consumer 使用拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失。
分区中的所有副本统称为AR(Assigned Replicas)。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas)。与leader副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas)。
leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。如果OSR集合中有follower副本“追上”了leader副本,那么leader副本会把它从OSR集合转移至ISR集合。默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader。
HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset,LEO的大小相当于当前日志分区中最后一条消息的offset值加1。ISR集合中最小的LEO即为分区的HW。
往Kafka中写入消息,首先要创建一个生产者客户端实例并设置一些配置参数,然后构建消息的ProducerRecord对象,其中必须包含所要发往的主题及消息的消息体,进而再通过生产者客户端实例将消息发出,最后可以通过 close()方法来关闭生产者客户端实例并回收相应的资源。
代码清单1-1 生产者客户端示例代码
代码清单1-2 消费者客户端示例代码
服务端参数都配置在$KAFKA_HOME/config/server.properties文件中,下面挑选一些重要的服务端参数来做细致的说明。
该参数指明broker要连接的ZooKeeper集群的服务地址(包含端口号),没有默认值,且此参数为必填项。
该参数指明broker监听客户端连接的地址列表,即为客户端要连接broker的入口地址列表。
该参数用来指定Kafka集群中broker的唯一标识,默认值为-1。如果没有设置,那么Kafka会自动生成一个。
Kafka 把所有的消息都保存在磁盘上,这两个参数用来配置 Kafka 日志文件存放的根目录。一般情况下,log.dir 用来配置单个根目录,而 log.dirs 用来配置多个根目录(以逗号分隔)。
该参数用来指定broker所能接收消息的最大值,默认值为1000012(B),约等于976.6KB。
一个正常的生产逻辑需要具备以下几个步骤:
ProducerRecord类的定义如下(只截取成员变量):
该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单。
broker 端接收的消息必须以字节数组(byte[])的形式存在,key.serializer和value.serializer这两个参数分别用来指定key和value序列化操作的序列化器,这两个参数无默认值。
KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。
在创建完生产者实例之后,接下来的工作就是构建消息,即创建ProducerRecord对象。
发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。
KafkaProducer 的 send()方法并非是 void 类型,而是 Future<RecordMetadata>类型,send()方法有2个重载方法,具体定义如下:
实际上send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。在执行send()方法之后直接链式调用了get()方法来阻塞等待Kafka的响应实现同步效果,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。
KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。
对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。retries参数的默认值为0。
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。
自带的org.apache.kafka.common.serialization.StringSerializer,除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口,此接口有3个方法:
消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。
消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
Kafka中提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口,这个接口中定义了2个方法,具体如下所示。
在不改变主题分区数量的情况下,key与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了。
Kafka一共有两种拦截器:生产者拦截器和消费者拦截器。
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
生产者拦截器的使用也很方便,主要是自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3个方法:
KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。
KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。这个方法运行在Producer的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
close()方法主要用于在关闭拦截器时执行一些资源的清理工作。
在这 3 个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。
KafkaProducer可以指定多个拦截器以形成拦截链。拦截链会按照interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)。
生产者客户端的整体架构,如图2-1所示。
RecordAccumulator 缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B,即 32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque<ProducerBatch>。将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。
消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。
Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。然后进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node。
请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。通过配置参数max.in.flight.requests.per.connection,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque<Request>的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。
元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障。
这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。
acks="1"。默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。 acks设置为1,是消息可靠性和吞吐量之间的折中方案。· acks="0"。生产者发送消息之后不需要等待任何服务端的响应。acks 设置为 0 可以达到最大的吞吐量。
acks="-1"或acks="all"。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。acks 设置为-1(all)可以达到最强的可靠性。
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即 1MB。
retries参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。
Kafka 可以保证同一个分区中的消息是有序的,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection配置为1,而不是把acks配置为0,不过这样也会影响整体的吞吐。
这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。该参数还可以配置为“gzip”“snappy”和“lz4”。
这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。
这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间,默认值为 0。
这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为32768(B),即32KB。如果设置为-1,则使用操作系统的默认值。
这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。
这个参数用来配置Producer等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择进行重试。
消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。每一个分区只能被一个消费组中的一个消费者所消费。
消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少)消费者的个数来提高(或降低)整体的消费能力。对于分区数固定的情况,一味地增加消费者并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。
可以通过消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略,有关分区分配的更多细节可以参考7.1节。
可以通过消费者客户端参数group.id来配置,默认值为空字符串。
一个正常的消费逻辑需要具备以下几个步骤:(1)配置消费者客户端参数及创建相应的消费者实例。(2)订阅主题。(3)拉取消息并消费。(4)提交消费位移。(5)关闭消费者实例。
该参数的释义和生产者客户端 KafkaProducer 中的相同,用来 指 定 连 接Kafka 集 群 所 需 的 broker 地 址 清 单,此参数的默认值为“”。
消费者隶属的消费组的名称,默认值为“”。
与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应.
一个消费者可以订阅一个或多个主题,使用subscribe()方法订阅了一个主题,对于这个方法而言,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe的几个重载方法如下:
ConsumerRebalanceListener,这个是用来设置相应的再均衡监听器的。
消费者不仅可以通过KafkaConsumer.subscribe()方法订阅主题,还可以直接订阅某些主题的特定分区,在KafkaConsumer中还提供了一个assign()方法来实现这些功能,此方法的具体定义如下:
通过 subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过assign()方法订阅分区时,是不具备消费者自动均衡的功能的,其实这一点从assign()方法的参数中就可以看出端倪,两种类型的subscribe()都有ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。
Kafka所提供的反序列化器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer,它们分别用于ByteBuffer、ByteArray、Bytes、Double、Float、Integer、Long、Short 及String类型的反序列化,这些序列化器也都实现了 Deserializer 接口,与KafkaProducer中提及的Serializer接口一样,Deserializer接口也有三个方法。
Kafka中的消费是基于拉模式的。Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。
对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。
在旧消费者客户端中,消费位移是存储在ZooKeeper中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。
在消费者中还有一个committed offset的概念,它表示已经提交过的消费位移。
KafkaConsumer 类提供了 position(TopicPartition)和 committed(TopicPartition)两个方法来分别获取上面所说的position和committed offset的值。
在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit配置,默认值为 true。当然这个默认的自动提交是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是enable.auto.commit参数为true。
开启手动提交功能的前提是消费者客户端参数enable.auto.commit配置为false,手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和commitAsync()两种类型的方法。
KafkaConsumer中使用pause()和resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作。
在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费。
“latest”,默认,表示从分区末尾开始消费消息。
“earliest”,那么消费者会从起始处,也就是0开始消费。
“none”,配置为此值就意味着出现查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException异常
KafkaConsumer 中的 seek()方法可以指定offerset进行消费。
再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。
在再均衡发生期间,消费组内的消费者是无法读取消息的。当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。一般情况下,应尽量避免不必要的再均衡的发生。
再均衡监听器用来设定发生再均衡动作前后的一些准备或收尾的动作。ConsumerRebalanceListener 是一个接口,包含2 个方法,具体的释义如下:
(1)void onPartitionsRevoked(Collection<TopicPartition>partitions)
这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。参数partitions表示再均衡前所分配到的分区。
(2)void onPartitionsAssigned(Collection<TopicPartition>partitions)这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。参数partitions表示再均衡后所分配到的分区。
Rebalance讲解:
(1)Rebalance触发条件
(2)Rebalance分区分配策略
按分区总数与消费者总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能平均的分配给所有的消费者。
将消费者组内所有主题的分区按照字典序排序,然后通过轮询的方式逐个将分区一次分配给每个消费者。
分区的分配要尽可能均匀;分区的分配尽可能与上次分配的保持相同。
消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。
消费者拦截器需要自定义实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口。ConsumerInterceptor接口包含3个方法:
KafkaProducer是线程安全的,然而KafkaConsumer却是非线程安全的。KafkaConsumer中定义了一个 acquire()方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出ConcurrentModifcationException异常。
若生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。除此之外,由于Kafka 中消息保留机制的作用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。
多线程的实现方式有多种:
一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数,当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。
内部类KafkaConsumerThread代表消费线程,其内部包裹着一个独立的KafkaConsumer实例。通过外部类的main()方法来启动多个消费线程,消费线程的数量由consumerThreadNum变量指定。一般一个主题的分区数事先可以知晓,可以将consumerThreadNum设置成不大于分区数的值,如果不知道主题的分区数,那么也可以通过KafkaConsumer类的partitionsFor()方法来间接获取,进而再设置合理的consumerThreadNum值。
该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka中拉取的最小数据量,默认值为1(B)。
该参数用来配置Consumer在一次拉取请求中从Kafka中拉取的最大数据量,默认值为 52428800(B),也就是 50MB。
该参数用于指定Kafka的等待时间,默认值为500(ms)。
这个参数用来配置从每个分区里返回给Consumer的最大数据量,默认值为1048576(B),即1MB。
这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。
kafka中有两个内部的主题:__consumer_offsets和__transaction_state。exclude.internal.topics用来指定Kafka中的内部主题是否可以向消费者公开,默认值为true。
主题作为消息的归类,可以再细分为一个或多个分区。每个分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等。分区的划分不仅为Kafka提供了可伸缩性、水平扩展的功能,还通过多副本机制来为Kafka提供数据冗余以提高数据可靠性。
主题的管理包括创建主题、查看主题信息、修改主题和删除主题等操作。可以通过 Kafka提供的kafka-topics.sh 脚本来执行这些操作,这个脚本位于$KAFKA_HOME/bin/目录下。其实质上是调用了kafka.admin.TopicCommand类来执行主题管理的操作。
还可以通过KafkaAdminClient 的方式实现。
为什么不支持减少分区?
优先副本是指在 AR 集合列表中的第一个副本。理想情况下,优先副本就是该分区的leader副本,所以也可以称之为preferred leader。Kafka要确保所有主题的优先副本在Kafka集群中均匀分布,这样就保证了所有分区的leader均衡分布。如果leader分布过于集中,就会造成集群负载不均衡。
所谓的优先副本的选举是指通过一定的方式促使优先副本选举为leader副本,以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”。
建议将分区数设定为集群中broker的倍数,即假定集群中有3个broker节点,可以设定分区数为3、6、9等,至于倍数的选定可以参考预估的吞吐量。
下图描绘了主题、分区、副本、Log和LogSegment的关系。
向Log 中追加消息时是顺序写入的,只有最后一个 LogSegment 才能执行写入操作,在此之前所有的 LogSegment 都不能写入数据。将最后一个 LogSegment 称为“activeSegment”,即表示当前活跃的日志分段。随着消息的不断写入,当activeSegment满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment。
为了便于消息的检索,每个LogSegment中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex”为文件后缀)。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment中第一条消息的offset。
Kafka 中的文件目录布局如下图所示:
每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。
偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。
Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes指定,默认值为4096,即4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小log.index.interval.bytes的值,对应地可以增加或缩小索引项的密度。
稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。
Kafka 将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。
Kafka提供了两种日志清理策略。
可以通过broker端参数log.cleanup.policy来设置日志清理策略,此参数的默认值为“delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略,就需要将log.cleanup.policy设置为“compact”,并且还需要将log.cleaner.enable(默认值为true)设定为true。
在Kafka的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300000,即5分钟。
当前日志分段的保留策略有3种:基于时间的保留策略、基于日志大小的保留策略和基于日志起始偏移量的保留策略。
对于有相同key的不同value值,只保留最后一个版本。如果应用只关心key对应的最新value值,则可以开启Kafka的日志清理功能,Kafka会定期将相同key的消息进行合并,只保留最新的value值。
Kafka中的每个日志清理线程会使用一个名为“SkimpyOffsetMap”的对象来构建 key与offset 的映射关系的哈希表。日志清理需要遍历两次日志文件,第一次遍历把每个key的哈希值和最后出现的offset都保存在SkimpyOffsetMap中,映射模型如图5-18所示。第二次遍历会检查每个消息是否符合保留条件,如果符合就保留下来,否则就会被清理。
Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算 Kafka使用磁盘作为存储介质,它所能承载的吞吐量也不容小觑。
页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
Kafka 中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可以通过 log.flush.interval.messages、log.flush.interval.ms 等参数来控制。同步刷盘可以提高消息的可靠性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过笔者并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。
除了消息顺序追加、页缓存等技术,Kafka还使用零拷贝(Zero-Copy)技术来进一步提升性能。
零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。
对 Linux操作系统而言,零拷贝技术依赖于底层的 sendfile()方法实现。对应于 Java 语言,FileChannal.transferTo()方法的底层实现就是sendfile()方法。
零拷贝技术通过DMA(Direct Memory Access)技术将文件内容复制到内核模式下的Read Buffer中。不过没有数据被复制到 Socket Buffer,相反只有包含数据的位置和长度的信息的文件描述符被加到Socket Buffer中。DMA引擎直接将数据从内核模式中传递到网卡设备(协议引擎)。这里数据只经历了2次复制就从磁盘中传送出去了,并且上下文切换也变成了2次。零拷贝是针对内核模式而言的,数据在内核模式下实现了零拷贝。
Kafka自定义了一组基于TCP的二进制协议,只要遵守这组协议的格式,就可以向Kafka发送消息,也可以从Kafka中拉取消息,或者做一些其他的事情,比如提交消费位移等。
在目前的 Kafka 2.0.0 中,一共包含了 43 种协议类型,每种协议类型都有对应的请求(Request)和响应(Response),它们都遵守特定的协议模式。
每种类型的Request都包含相同结构的协议请求头(RequestHeader)和不同结构的协议请求体(RequestBody)。
每种类型的Response也包含相同结构的协议响应头(ResponseHeader)和不同结构的响应体(ResponseBody)
Kafka中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka并没有使用JDK自带的Timer或DelayQueue来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer)。JDK中Timer和DelayQueue的插入和删除操作的平均时间复杂度为O(nlogn)并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。
Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。
Kafka引入了事务的概念,对于消费者或follower副本而言,其默认的事务隔离级别为“read_uncommitted”。不过消费者可以通过客户端参数isolation.level将事务隔离级别设置为“read_committed”(注意:follower副本不可以将事务隔离级别修改为这个值),这样消费者拉取不到生产者已经写入却尚未提交的消息。
在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(KafkaController),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
Kafka中的控制器选举工作依赖于ZooKeeper,成功竞选为控制器的broker会在ZooKeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:
其中,version在目前版本中固定为1,brokerid表示成为控制器的broker的id编号,timestamp表示竞选成为控制器时的时间戳。
在任意时刻集群中有且仅有一个控制器。每个 broker 启动的时候会去尝试读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其他 broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;如果 ZooKeeper 中不存在/controller节点,或者这个节点中的数据异常,那么就会尝试去创建/controller节点。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。
ZooKeeper 中的/controller_epoch 持久节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。
controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。每个和控制器交互的请求都会携带controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么说明已经有新的控制器当选了。由此可见,Kafka 通过controller_epoch 来保证控制器的唯一性,进而保证相关操作的一致性。
控制器职责如下:
分区leader副本的选举由控制器负责具体实施。
Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略。默认情况下为org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外还有 RoundRobinAssignor 和 StickyAssignor。
自定义的分配策略必须要实现 org.apache.kafka.clients.consumer.internals.PartitionAssignor接口。PartitionAssignor接口的定义如下:
每个消费者对ZooKeeper的相关路径分别进行监听,当触发再均衡操作时,一个消费组下的所有消费者会同时进行再均衡操作,而消费者之间并不知道彼此操作的结果,这样可能导致Kafka工作在一个不正确的状态
严重依赖于ZooKeeper集群的做法还有两个比较严重的问题。(1)羊群效应(Herd Effect):所谓的羊群效应是指ZooKeeper中一个被监听的节点变化,大量的Watcher 通知被发送到客户端,导致在通知期间的其他操作延迟,也有可能发生类似死锁的情况。(2)脑裂问题(Split Brain):消费者进行再均衡操作时每个消费者都与ZooKeeper进行通信以判断消费者或broker变化的情况,由于ZooKeeper本身的特性,可能导致在同一时刻各个消费者获取的状态不一致,这样会导致异常问题发生。
GroupCoordinator是Kafka服务端中用于管理消费组的组件。而消费者客户端中的ConsumerCoordinator组件负责与GroupCoordinator进行交互。
ConsumerCoordinator与GroupCoordinator之间最重要的职责就是负责执行消费者再均衡的操作,包括前面提及的分区分配的工作也是在再均衡期间完成的。
触发再均衡的操作:
第一阶段(FIND_COORDINATOR)寻找组协调器
消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker,并创建与该broker相互通信的网络连接。如果消费者已经保存了与消费组对应的 GroupCoordinator 节点的信息,并且与它之间的网络连接是正常的,那么就可以进入第二阶段。否则,就需要向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的GroupCoordinator,这里的“某个节点”并非是集群中的任意节点,而是负载最小的节点。
第二阶段(JOIN_GROUP) 加入消费者组
在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。
选举消费组的leader
GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,分两种情况分析。如果消费组内还没有 leader,那么第一个加入消费组的消费者即为消费组的leader。如果某一时刻 leader 消费者由于某些原因退出了消费组,那么会重新选举一个新的leader,这个重新选举leader的过程又更“随意”了,相关代码如下:
选举分区分配策略
选举的分配策略基本上可以看作被各个消费者支持的最多的策略,具体的选举过程如下:(1)收集各个消费者支持的所有分配策略,组成候选集candidates。
(2)每个消费者从候选集candidates中找出第一个自身支持的策略,为这个策略投上一票。
(3)计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
第三阶段(SYNC_GROUP)
leader 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,此时leader消费者并不是直接和其余的普通消费者同步分配方案,而是通过 GroupCoordinator 这个“中间人”来负责转发同步分配方案的。在第三阶段,也就是同步阶段,各个消费者会向GroupCoordinator发送SyncGroupRequest请求来同步分配方案
第四阶段(HEARTBEAT)
进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了GroupCoordinator,并且GroupCoordinator将其保存到了Kafka内部的__consumer_offsets主题中,此时消费者可以通过OffsetFetchRequest请求获取上次提交的消费位移并从此处继续消费。
位移提交的内容最终会保存到Kafka的内部主题__consumer_offsets中。
客户端提交消费位移是使用 OffsetCommitRequest 请求实现的。在处理完消费位移之后,Kafka返回OffsetCommitResponse给客户端。
消息中间件的消息传输保障有3个层级,分别如下。
(1)at most once:至多一次。消息可能会丢失,但绝对不会重复传输。
(2)at least once:最少一次。消息绝不会丢失,但可能会重复传输。
(3)exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次。
幂等就是对接口的多次调用所产生的结果和调用一次是一致的。
生产者在进行重试的时候有可能会重复写入消息,而使用Kafka的幂等性功能之后就可以避免这种情况。开启幂等性功能的方式很简单,只需要显式地将生产者客户端参数enable.idempotence设置为true即可(这个参数的默认值为false),
为了实现生产者的幂等性,Kafka引入了producer id(以下简称PID)和序列号(sequence number)这两个概念。
每个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是完全透明的。对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将<PID,分区>对应的序列号的值加1。
broker端会在内存中为每一对<PID,分区>维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比broker端中维护的对应的序列号的值(SN_old)大1(即SN_new=SN_old+1)时,broker才会接收它。如果SN_new<SN_old+1,那么说明消息被重复写入,broker可以直接将其丢弃。如果SN_new>SN_old+1,那么说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,对应的生产者会抛出OutOfOrderSequenceException,这个异常是一个严重的异常,后续的诸如 send()、beginTransaction()、commitTransaction()等方法的调用都会抛出IllegalStateException的异常。
引入序列号来实现幂等也只是针对每一对<PID,分区>而言的,也就是说,Kafka的幂等只能保证单个生产者会话(session)中单分区的幂等。
幂等性并不能跨多个分区运作,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。
为了实现事务,应用程序必须提供唯一的 transactionalId,这个 transactionalId 通过客户端参数transactional.id来显式设置。transactionalId与PID一一对应,两者之间所不同的是transactionalId由用户显式设置,而PID是由Kafka内部分配的。
从消费者的角度分析,事务能保证的语义相对偏弱。出于以下原因,Kafka 并不能保证已提交的事务中的所有消息都能够被消费:
为了实现事务的功能,Kafka还引入了事务协调器(TransactionCoordinator)来负责处理事务,这一点可以类比一下组协调器(GroupCoordinator)。每一个生产者都会被指派一个特定的TransactionCoordinator,所有的事务逻辑包括分派 PID 等都是由 TransactionCoordinator 来负责实施的。TransactionCoordinator 会将事务状态持久化到内部主题__transaction_state 中
Kafka源码注释中说明了一般有两种情况会导致副本失效:
Kafka 只支持主写主读有几个优点:可以简化代码的实现逻辑,减少出错的可能;将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;没有延时的影响;在副本稳定的情况下,不会出现数据不一致的情况。
用消息的timestamp字段和拦截器ConsumerInterceptor接口的onConsume()方法,配合消息中的headers字段来,实现过期时间。可以将消息的TTL的设定值以键值对的形式保存在消息的 headers 字段中,消费者消费到这条消息的时候可以在拦截器中根据 headers字段设定的超时时间来判断此条消息是否超时
使用Kafka 提供的实现类 org.apache.kafka.common.header.internals.RecordHeaders 和org.apache.kafka.common.header.internals.RecordHeader。
“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列一般也被称为“延迟队列”。
注意延时与TTL的区别,延时的消息达到目标延时时间后才能被消费,而TTL的消息达到目标超时时间后会被丢弃。
方案一:
在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。
按照不同的延时等级来划分的,比如设定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour这些按延时时间递增的延时等级,延时的消息按照延时时间投递到不同等级的主题中。
方案二:
由于某些原因消息无法被正确地投递,为了确保消息不会被无故地丢弃,一般将其置于一个特殊角色的队列,这个队列一般称为死信队列。
重试队列其实可以看作一种回退队列,具体指消费端消费消息失败时,为了防止消息无故丢失而重新将消息回滚到broker中。
Kafka默认按照主题进行路由,也就是说,消息发往主题之后会被订阅的消费者全盘接收,这里没有类似消息路由的功能来将消息进行二级路由,这一点从逻辑概念上来说并无任何问题。
具体的实现方式可以在消息的 headers 字段中加入一个键为“routingkey”、值为特定业务标识的Header,然后在消费端中使用拦截器挑选出特定业务标识的消息。Kafka 中消息路由的实现架构如图11-11所示。
消息轨迹指的是一条消息从生产者发出,经由broker存储,再到消费者消费的整个过程中,各个相关节点的状态、时间、地点等数据汇聚而成的完整链路信息。
对消息轨迹而言,最常见的实现方式是封装客户端,在保证正常生产消费的同时添加相应的轨迹信息埋点逻辑。无论生产,还是消费,在执行之后都会有相应的轨迹信息,需要将这些信息保存起来。这里可以参考Kafka中的做法,它将消费位移信息保存在主题__consumer_offset中。对应地,我们同样可以将轨迹信息保存到Kafka的某个主题中,比如图11-12中的主题trace_topic。
对消息轨迹而言,最常见的实现方式是封装客户端,在保证正常生产消费的同时添加相应的轨迹信息埋点逻辑。无论生产,还是消费,在执行之后都会有相应的轨迹信息,我们需要将这些信息保存起来。这里可以参考Kafka中的做法,它将消费位移信息保存在主题__consumer_offset中。对应地,我们同样可以将轨迹信息保存到Kafka的某个主题中,比如图11-12中的主题trace_topic。
消息审计是指在消息生产、存储和消费的整个过程之间对消息个数及延迟的审计,以此来检测是否有数据丢失、是否有数据重复、端到端的延迟又是多少等内容。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。