当前位置:   article > 正文

Kafka架构概述

Kafka架构概述

Kafka的体系结构

Kafka是由Apache软件基金会管理的一个开源的分布式数据流处理平台。Kafka具有支持消息的发布/订阅模式、高吞吐量与低延迟、持久化、支持水平扩展、高可用性等特点。可以将Kafka应用于大数据实时处理、高性能数据管道、流分析、数据集成和关键任务应用等场景。在使用Kafka时,也要充分考虑Kafka在某些方面的不足,如引入Kafka作为消息队列后,系统将强依赖Kafka,当Kafka不可用时,系统也会受到影响、引入Kafka后,会提升系统的复杂度、引入Kafka作为消息队列后,会带来分布式系统的一致性问题等。Kafka整体架构的逻辑视图如下所示:

请添加图片描述

一个典型的Kafka体系结构包括Producer(生产者)、Broker(服务代理节点)、Consumer(消费者)等三种角色。其中,Producer负责将消息发送到Broker,Broker负责将收到的消息存储到磁盘中,而Consumer负责从Broker订阅并消费消息。
在Kafka的早期版本,Kafka使用ZooKeeper来管理Kafka集群,如Broker注册与删除、Topic的注册与删除、根据当前的Partition数量及Consumer数量来实现动态负载均衡,等等。但是在Kafka 2.8之后,引入了基于Raft协议的KRaft模式,不再依赖Zookeeper,大大简化了Kafka的架构,可以以一种轻量级的方式来使用Kafka。同时,移除ZooKeeper可以进一步降低脑裂问题的出现概率,提升系统可靠性。注意,Kafka实现集群自治确实可以在一定程度上减少脑裂问题的风险,但并不能完全解决脑裂问题。此外,如果要使用KRaft模式的话,建议选择较高版本的Kafka,因为这个功能还在持续完善优化中。Kafka 3.3.1版本是第一个将KRaft(Kafka Raft)共识协议标记为生产就绪的版本。而在Kafka 3.5版本,ZooKeeper已经被标记为弃用。在后续的Kafka 4.0版本将不再支持ZooKeeper。
生产者是指Message(消息)的生成者,即将消息发送到Kafka存储的Topic(主题)中的生成者。Topic是一个逻辑概念,用于组织和存储消息。Topic通过分区的方式存储在Broker中。生产者可以通过特定的分区函数决定消息路由到Topic的某个分区。默认的分区函数是轮询策略,也即将消息均匀地分布在Topic所有的Partition上。消息的生成者发送消息有两种模式,分别为同步模式和异步模式。根据需要发送的消息数量,可以将消息分为单条发送消息和批量发送消息。
Broker接收到消息后,以一种有序、不可变、分段的消息存储结构将消息存储到磁盘。消息按分段的方式存储,每个分段包含1GB或一周的数据,以较小的判断标准为主。为了快速定位消息数据,Kafka在消息日志之上构建了索引结构。索引可以帮助快速定位某个消息的物理偏移量,从而加快消息的读取速度。为保证消息不丢失,Kafka为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同一个分区的不同副本中保存的是相同的信息,副本之间是"一主多从"的关系。其中Leader负责处理读写请求(Producer和Consumer只与Leader交互),Follower只负责与Leader的消息同步。当Leader出现故障时,从Follower中重新选举新的Leader并对外提供服务。
消费者负责订阅Kafka中的Topic,并从订阅的主题上拉取消息。在Kafka的消费理念中还有一层消费者组(Consumer Group)的概念。一个消费者组可以包含一个或多个消费者。同一消费组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消息消息时互不影响。Kafka通过消费组来实现消息的点对点(Point-to-Point,P2P)模式和广播模式。

Broker

Broker是Kafka Cluster中的一个核心组件,负责消息的存储、消息的转发、消息的多副本管理和集群管理等功能。
Broker的主要职责之一就是存储生产者发送的消息。Broker接收到消息后,会将其持久化到本地磁盘中。Kafka将消息组织成Topic,而每个Topic又被划分为一个或多个Partition。Broker负责将消息按照Partition存储到本地磁盘上,确保消息的持久化。Kafka采用顺序写入的方式来提高写入性能,消息在磁盘上是按照顺序进行存储的,这有助于减少磁盘的寻道时间。
除了存储消息外,Broker还负责将消息转发给消费者。当消费者需要读取消息时,它会与Broker建立连接,并请求获取某个Topic的特定分区的消息。Broker会根据消费者的请求,从相应的分区中读取消息并将其发送给消费者。这样,Broker就充当了消息的生产者和消费者之间的桥梁。
为了保证消息的可靠性和高可用性,Kafka采用了多副本机制。每个分区都有一个Leader副本和多个Follower副本。Leader副本负责处理读写请求,而Follower副本则用于同步数据,以确保数据的备份和容错。当Leader副本出现故障时,Kafka会自动从Follower副本中选举出新的Leader副本,以保证服务的连续性。
Broker之间通过分布式协调机制来维护集群的状态和一致性。Broker之间通过一种称为Controller的选举机制来选举出一个负责集群协调的Broker。这个Kafka Controller负责处理集群中的元数据更新、分区领导选举、集群中节点故障检测、元数据管理等集群管理操作。这使得Kafka集群能够自动处理节点故障、扩展和缩容等情况,保持整个系统的状态一致性、稳定性和可用性。
此外,Broker还负责处理与安全性相关的任务,如消息的加密、解密以及客户端的身份验证等。Kafka支持多种认证和授权机制,如SASL/SSL、OAuth等,以确保数据传输的安全性和访问控制的有效性。

接收来自生产者发送过来的消息
SocketServer
向消费者提供消息
Topic管理
OffsetManager

消息存储

Broker接收到从生产者发送过来的消息后,会将其持久化到本地磁盘中。Kafka将消息组织成Topic,而每个Topic又被划分为一个或多个Partition。每个Partition可以有一个或多个副本,每个副本对应一个日志文件,每个日志文件对应一个或多个日志分段(LogSegment)。Kafka中消息存储的逻辑视图如下所示:

请添加图片描述

Topic

Kafka中的Topic是消息的逻辑分类,可以将其理解为一个逻辑上的消息容器。生产者将消息发布到特定的Topic,而消费者可以从Topic中订阅并消费消息。
Topic的创建支持自动创建和手动创建。针对手动创建Topic的场景,比较推荐的方式是通过kafka-topics.sh脚本来创建Topic,对于云平台场景,可以通过可视化界面创建Topic。针对自动创建场景,首先需要确保Broker的配置参数auto.create.topics.enable设置为true(默认值就是true),那么当生产者向一个尚未创建的Topic发送消息时,会自动创建一个Partition数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的Topic。此外,当一个消费者开始从未知Topic中读取消息,或当任意一个客户端向未知Topic发送元数据请求时,都会创建一个相应的主题。需要说明的是,这种自动创建Topic的行为是非预期的。除非有特殊需求,否则不建议将Broker的配置参数auto.create.topics.enable设置为true,这会增加Topic的管理和维护的难度。
Topic不支持重命名。可能会有Topic重命名的场景,如基于业务规范的需求、Topic的合并或拆分需求。尽管上述需求看似合理,但是Kafka的设计哲学更倾向于使用不可变的Topic名称。这是因为Topic名称在Kafka中扮演着重要的角色,它们不仅是消息的标识符,还用于在集群中定位和管理数据。允许重命名Topic可能会导致一系列复杂性和潜在的问题,如消费者偏移量的管理、数据一致性的保证等。所以,为实现和Topic重命名类似的功能,可以通过创建一个新的Topic,将旧Topic的数据迁移到新Topic上,删除旧Topic的过程间接实现。
如果确定不再使用一个Topic,可以删除这个Topic。注意,必须将Broker的配置参数delete.topic.enable设置为true(默认值是true),才能够删除Topic。如果delete.topic.enable参数配置为false,那么删除Topic的操作将会被忽略。删除Topic是一个不可逆操作。一旦删除之后,与其相关的所有消息数据会被全部删除,要谨慎操作。

Partition

每个Topic又可进一步细分为一个或多个Partition。这样,Topic就可通过Partition实现水平扩展能力。在创建Topic时,可以指定Partition的数量。在设计Topic和Partition时,需要考虑主题的数据量、消息生产者和消费者的数量,以及消息处理的延迟等因素。增加Partition数,可以提高并发处理能力和系统的吞吐量,但也会增加存储和网络开销,因此需要根据实际需求和资源情况来选择合适的Partition数。
Kafka也支持在运行时动态调整Topic的Partition数量。但是,目前Kafka只支持增加Partition数,而不支持减少Partition数。可以通过kafka-topics.sh脚本来增加Partition,对于云平台场景,可以通过可视化界面增加Partition。需要注意的是,动态增加Partition并不会自动迁移原有Partition的数据到新的Partition中。也就是说,新增加的Partition一开始是不包含任何消息的。如果需要将原有数据分布到新的Partition中,需要自行实现数据迁移的逻辑。这可以通过编写自定义的迁移脚本或使用Kafka提供的工具来完成。
Kafka之所以不支持减少Partition数,主要是实现此功能需要考虑的因素比较多。如删除分区中的消息该如何处理。如果随着分区一起消息,则会带来数据丢失的问题;如果需要保留,还需考虑如何消费的问题。此外,还要考虑顺序性问题、性能问题等。如果需要实现该功能,可以重新创建一个Partition数较少的Topic,然后将现有Topic的数据迁移到新Topic上。
如何选择合适的Partition数,是很多Kafka的使用者经常面临的问题。对于这个问题没有非常统一的答案,只能从某些角度做具体分析,最终还是根据实际的业务场景来做具体的考量。如性能与Partition数有着必然的联系,在设定Partition数时,一般也需要考虑性能的因素。如不同的任务类型可能需要不同数量的分区如果正在使用Kafka作为日志收集系统,那么可能不需要太多的分区,因为这种情况下通常只需要顺序写入一组日志即可。然而,如果正在使用Kafka作为实时数据管道或流处理平台,那么更多的分区可能有助于支持更高的并发性和实时性要求。Partition数并不是越大越好,随着Partition数的增加,响应的吞吐量也跟着上涨。一旦分区数超过了某个阈值,整体的吞吐量不降反升。

Replica

每个Partition可以有一个或多个Replica。为保证消息不丢失,Kafka为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同一个分区的不同副本中保存的是相同的信息,副本之间是"一主多从"的关系。其中Leader负责处理读写请求(Producer和Consumer只与Leader交互),Follower只负责与Leader的消息同步。当Leader出现故障时,从Follower中重新选举新的Leader并对外提供服务。更多Replica相关知识细节可以参考下文的消息的多副本管理一节。

Log和LogSegment

每个Replica对应一个Log文件。为防止Log文件过大,每个Log文件又可以分为一个或多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件。事实上,Log和LogSegment不是纯粹物理意义上的概念,Log在物理上只以文件夹的形式存储,而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。Log和LogSegment的逻辑关系如下图所示:

请添加图片描述

Broker接收到消息后,以一种有序、不可变、分段的消息存储结构将消息存储到磁盘。向Log中追加消息是顺序写入的,只有最后一个LogSegment才能执行写入操作,其他的LogSegment均不能写入数据(可以理解为写满)。当最后一个LogSegment满足一定条件时(每个分段包含1GB或一周的数据,以较小的判断标准为主),就需要创建新的LogSegment,之后新追加的消息就写入这个LogSegment上。
为了便于消息的检索,每个LogSegment的日志文件(以.log为文件后缀)都有对应的两个索引文件:偏移量索引文件(以.index为文件后缀)和时间戳索引文件(以.timeindex为文件后缀)。每个LogSegment都有一个基本偏移量BaseOffset,用来表示当前LogSegment中的第一条消息的Offset。偏移量是一个64位的长整数,日志文件和两个索引文件都是根据基准偏移量命名的,名称固定为20位整数,没有用到位数用0填充。Offset是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过Offset并不跨越分区。也就是说,Kafka保证的是分区有序性而不是Topic的有序性,即局部有序。
注意,每个LogSegment中不仅仅包含".log"、“.index”、“.timeindex"这3个文件,还可能包含"deleted”、“cleaned"等临时文件,以及可能的”.snapshot"、“txnindex"等文件。
为了减少消息占用的存储空间和传输带宽的消耗,可以将消息进行压缩。在Kafka中,消息压缩是在生产者端进行的。一般情况下,生成者发送的压缩数据在Broker中也是保持压缩状态存储的。消费者从Broker获取的也是压缩的消息,消费者在处理消息之前才会解压消息,这样保持了端到端的消息压缩。
Kafka日志中使用哪种压缩方式是通过参数compression.type来配置的,默认值为"producer”,表示保留生产者使用的压缩方式。如果参数compression.type配置为"uncompressed",则表示不压缩。
每个从生产者发出的消息的Offset都是从0开始的。对Offset的转换是在Broker中进行的。消费者在消费消息时,会从Broker获取该压缩消息的Absolute Offset。需要注意的是,Kafka的压缩机制是端到端的,即生产者负责压缩消息,消费者负责解压缩消息。因此,在使用压缩功能时,需要确保生产者和消费者都支持相同的压缩算法,并且配置了正确的压缩参数。
Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加,需要对消息做清理操作。Kafka本身没有提供消息的TTL功能。但是可以设置Topic中的消息的默认保存时限(默认是7天)。这个默认保存时间可以通过server.properties文件中的log.retention.hours属性进行修改。Kafka提供了两种日志清理策略:(1) 日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的LogSegment;(2) 日志压缩(Log Compaction):针对每个消息的Key进行整合,对于有相同Key的不同Value,只保留最后一个版本。

消息转发

除了存储消息外,Broker还负责将消息转发给消费者。当消费者需要读取消息时,它会与Broker建立连接,并请求获取某个Topic的特定分区的消息。Broker会根据消费者的请求,从相应的分区中读取消息并将其发送给消费者。这样,Broker就充当了消息的生产者和消费者之间的桥梁。
为了加快消息的检索,Broker在消息日志之上构建了索引结构。索引可以帮助快速定位某个消息的物理偏移量,从而加快消息的读取速度。Broker提供了两种索引结构:偏移量索引时间戳索引。其中,偏移量索引用来建立消息偏移量(Offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置。时间戳索引则根据指定的时间戳(Timestamp)来查找对应的偏移量信息。
Kafka中的索引文件以稀疏索引的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。
偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不再索引文件中,则会返回小于指定偏移量的最大偏移量。时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置,还需要根据偏移量索引文件来进行再次定位。

消息的多副本管理

为保证消息不丢失,Kafka为Partition引入了多副本(Replica)机制,通过增加副本数量来提升容灾能力。同一个Partition的不同副本中保存的是相同的信息,副本之间是"一主多从"的关系。其中Leader负责处理读写请求(Producer和Consumer只与Leader交互),Follower只负责与Leader的消息同步。当Leader出现故障时,从Follower中重新选举新的Leader并对外提供服务。在创建Topic的时候,可以同步指定副本因子或使用默认的副本因子。具体来说,如果是手动创建Topic,通过设置replication.factor参数来指定副本数量;如果是自动创建Topic,且Broker的配置参数auto.create.topics.enable已经设置为true(默认值就是true),那么副本数量就是default.replication.factor(默认值为1)。以由4个Broker组成的Kafka Cluster为例,副本数为3,其副本分布如下图所示:

请添加图片描述

Kafka会确保Partition的每个副本被放在不同的Broker上,这样可以保证不会出现因一个Broker失效,导致多个副本同时失效的问题的发生。如果Kafka的副本数量不能大于Broker的数量,则至少有两个副本分布在同一个Broker上,那么Kafka会无法成功分配所有副本,从而导致Topic创建失败。因此,在配置Kafka集群时,需要确保Broker的数量足够以支持所需的副本数。如果Broker数量不足,可能需要增加Broker的数量,或者调整主题的副本数设置,以确保Kafka集群能够正常运行并满足数据可靠性和可用性的需求。一般业务使用三副本即可,对于金融系统等更推荐五副本。副本数的选择是一个多方面因素考虑的结果,要根据具体的应用,选择合适的数量。
对于Broker来说,为了让Leader副本平均分布,从而避免单个Broker分配过多Leader副本,带来负载失衡的问题,Kafka引入了优先副本原则。理想情况下,Kafka可以确保所有Topic的优先副本在Broker中均匀分布。这样就能保证所有Partition的Leader的均衡分布。但是,对于Broker来说,仍然无法解决单个Leader负载过重带来的数据倾斜问题。单个Leader负载过重问题的性能优化需要从端到端的角度解决,不在本文讨论的范围。常见的解决策略有:优化Producer配置、调整分区数、调整负载均衡策略、数据压缩,等等。

消息同步

在多副本管理时,一个重要的过程就是Leader和Follower间的消息同步。Kafka使用了ISR机制来保证Leader和Follower的数据同步。这里先介绍下ISR机制相关的关键概念:
Partition中的所有副本统称为AR(Assigned Replicas)。在AR中,将所有与Leader保持一定程度同步的副本(包含Leader在内)称为ISR(In-Sync Replicas),将与Leader同步滞后的副本(一定不包括Leader)称为OSR(Out-of-Sync Replicas)。所以AR=ISR+OSR。Leader负责维护和跟踪ISR集合中所含有Follower的滞后状态,当Follower落后太多或失效时,Leader会把它从ISR集合中剔除。当然,如果OSR集合中有Follower跟上Leader,那么Leader会把它从OSR集合转移到ISR集合。
在Leader和Follower进行数据同步时(Leader推送消息给Follower,Follower从Leader拉取消息均有可能使用到),可以选择多种复制方案,如同步、异步等,这主要通过生产者的消息确认模式的设置acks方式实现:
acks=0:生产者发送消息后不会等待任何来自Broker的响应,即只要消息通过网络发送出去,就认为消息已成功写入Kafka。这种情况下,如果Broker没有收到消息或消息丢失,生产者将无从得知,因此可能会导致数据丢失。但由于不需要等待服务器响应,这种设置可以以网络支持的最大速度发送消息,达到较高的吞吐量。
acks=1:生产者发送消息后,只要集群的Leader副本成功写入消息(并不一定要写入磁盘),就会收到一个成功响应的ack。一旦消息无法写入Leader分区副本(如由于网络原因或Leader副本所在节点崩溃),生产者会收到一个错误响应,并可以选择重发消息以避免数据丢失。这是消息可靠性和吞吐量之间的一个折中方案。
acks=all或acks=-1:生产者发送消息后,会等待所有同步副本(ISR,In-Sync Replicas)都收到消息后才认为写入成功。这种设置提供了更高的数据可靠性,但可能会降低吞吐量。从功能上来看,acks=-1还是acks=all没有差异。在实际应用中,选择acks=-1还是acks=all取决于个人的编码习惯。但是,为了避免潜在的混淆或误解,建议根据团队的编码规范或Kafka的最佳实践来选择一个明确的值,这里推荐使用-1,表示一种标识且与其他参数表现一致。需要说明的是,ISR集合中同步副本的数量是由Broker中的min.insync.replicas参数(默认值是1)设置。min.insync.replicas参数表示ISR集合中的最少副本数。它的主要目的是确保在ISR中至少有一定数量的副本已经同步了消息,从而增强数据的可靠性和持久性。这个参数只在acks设置为all或-1时才有效。这个参数作用于所有Topic,在使用时要注意。
在Leader副本和Follower副本进行消息同步时,为了避免消息同步对正常业务的影响,需要有一个限流的机制。根据数据同步的方式(pull模式或push模式),还可针对Leader副本和Follower副本单独进行设置。在Kafka中,可以为Broker设置复制速率,以确保所有Topic的副本复制不超过这个速率,也可对某个Topic设置复制速率。使用Kafka自带的命令行工具kafka-configs.sh来设置这个参数的示例如下:

# 设置Broker的复制速率,这里速率单位是字节/秒
kafka-configs.sh localhost:2181/kafka --entity-type brokers --entity-name 2 --alter --add-config follower.replication.throttled.rate=10000,leader.replication.throttled.rate=10000
# 设置某个Topic的复制速率
kafka-configs.sh localhost:2181/kafka --entity-type topics --entity-name my_topic --alter --add-config leader.replication.throttled.rate=10000
  • 1
  • 2
  • 3
  • 4
故障转移

在多副本管理时,另一个重要的过程就是Leader副本所在节点出现故障后,如何从Follower副本中选举出新的Leader副本,从而保证服务的可用性。
在Kafka中,基于KRaft算法从Broker集合中选出Kafka Controller后,接下来就由Kafka Controller完成Leader副本的选举及通知。简单来说,当Leader副本故障后,Kafka Controller会收集所有可用Follower副本的信息,并选择一个新的Leader副本。选举的过程通常基于一些优先级规则,如ISR(In-Sync Replicas)列表中的副本优先于非ISR列表中的副本,具有较低副本ID的副本优先于具有较高副本ID的副本等。一旦选定了新的Leader副本,Kafka Controller会向集群中的Broker发送更新集群元数据的请求。
在选举的过程中,Kafka Controller会按照如下的步骤来进行选举:
(1) 筛选出在线的ISR和在线的AR。
(2) 优先在在线的ISR中选择,如果在线的ISR不为空,则选择在线ISR列表中的第一个作为Leader副本,然后结束选举。
(3) 如果在线的ISR为空,则根据Kafka Broker中的unclean.leader.election参数(默认是true)来决定是否在在线的AR列表中选举。如果该参数设置为true,则选择在线的AR列表中的第一个作为Leader副本,然后选举结束,否则选举失败,选举也同样结束。注意,启用此选项可能会导致数据丢失,因为非ISR副本中的数据可能与已故障Leader的数据不一致。
以上选举策略仅适用于Partition中Leader故障的场景,对应的选举策略是"OfflinPartitionLeaderElectionStrategy",实际的Leader选举更复杂。针对不同的场景,使用的选择策略也不相同。如对于分区重分配场景,现有Leader并为故障,但仍需要重新选举Leader,这时就需采用其他选举策略,如PreferredReplicaPartitionLeaderSelector或ReassignedPartitionLeaderSelector。Leader选举不是本文的重点,有兴趣的同学还请自行学习。

集群管理

Broker之间通过一种称为Controller的选举机制(基于KRaft算法)来选举出一个负责集群协调的Broker。这个Kafka Controller负责处理集群中的元数据更新、分区领导选举、集群中节点故障检测、元数据管理等集群管理操作。
在Kafka Controller的选举过程中仍然存在脑裂问题或羊群效应,但发生频率较低,且引入了一系列处理手段。如通过引入Controller Epoch来确保只有一个有效的Controller在运作,避免应网络分区导致Controller重新选主而引发脑裂现象的发生。通过合理设置消费者的重试策略等方式,避免因Leader副本失效后,大量的消费者几乎同时尝试重新连接或请求其他可用的副本,从而导致集群中的其他Broker负载急剧增加而引发羊群效应。
Kafka Controller负责维护Kafka集群的元数据,包括Broker的存活状态、分区的分配情况以及副本的分配情况等。Kafka Controller监控集群中Broker的状态变化,并更新集群的元数据。当Kafka Controller检测到某个分区的ISR(In-Sync Replicas)集合发生变化时,它会负责通知所有Broker更新其元数据信息。
当有新的topic创建或者有新的Broker加入集群时,Kafka Controller会根据一定的策略来进行分区分配。它会根据集群的负载情况、副本的分布情况等因素,将分区均匀地分配给各个Broker。
Kafka Controller维护集群中所有分区的状态以及每个分区的Leader副本信息。如当动态增加分区时,Kafka会在后台创建新的分区文件,并将它们分配到集群中的Broker上。当Leader副本故障后,Kafka Controller会从Follower副本中,按照选举策略选出新的Leader副本。

Producer

生产者是消息的生成者,即将消息发送到Kafka存储的Topic中的生成者。生产者可以通过特定的分区函数决定消息路由到Topic的某个分区。默认的分区函数是轮询策略,也即将消息均匀地分布在Topic所有的Partition上。

生产者参数配置

生产者发送消息前,需要创建一个生产者实例。这里介绍下生产者实例创建所需的一些关键配置,并给出示例代码。

public Properties createProducerConfig(String brokerList, String producerId) {
    Properties props = new Properties();
    // 指定生产者连接Broker所需的地址清单,内容格式为host1:port1,host2:port2
    // 注意,这里并非需要所有的Broker地址,而是要通过暴露的Broker获取集群元数据信息
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    // Broker接收的消息必须是字节数组,所以还需对消息的key和value做相应的序列化操作
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 设置acks为-1,意味着Producer会等待分区副本确认写入后才认为消息发送成功  
    props.put(ProducerConfig.ACKS_CONFIG, "-1");  
    // 显式设置Producer Id,命名时要有业务含义且遵循团队规范,方便后面的消息查找
    props.put(ProducerConfig.CLIENT_ID_CONFIG, producerId);
    return props;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer示例,也可将KafkaProducer实例进行池化来供其他线程调用。一般情况下,每个客户端使用一个KafkaProducer即可。

消息发送模式

消息的生成者发送消息有两种模式,分别为同步模式和异步模式。在KafkaProducer中仅提供了异步方法send(),但是可以通过阻塞等待消息返回达到同步的效果。示例代码如下:

public void sendSync(Producer<String, String> producer, ProducerRecord<String, String> record) {
    // 发送消息
    Future<RecordMetadata> future = producer.send(record);

    // 等待消息发送完成,并获取元数据
    try {
        RecordMetadata metadata = future.get();
        System.out.println("消息已发送到分区 " + metadata.partition() + ",偏移量 " + metadata.offset());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public void sendAsync(Producer<String, String> producer, ProducerRecord<String, String> record) {
    // 异步发送消息
    try {
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                // 发送失败的处理逻辑
                exception.printStackTrace();
                System.out.println("发送消息失败: " + exception.getMessage());
            } else {
                // 发送成功的处理逻辑
                System.out.println("消息已发送到分区 " + metadata.partition() + ",偏移量 " + metadata.offset());
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

单条发送和批量发送

根据需要发送的消息数量,可以将消息分为单条发送消息和批量发送消息。Kafka Producer并没有提供批量发送消息的接口,但是可以通过异步发送间接实现。在异步发送消息时,可以设置延迟发送的时间,来收集更多的消息,达到批量发送的效果。此外,为避免等待期间消息的数量多大,还可以通过设置batch.size来控制批量发送的size。对应生产者配置如下:

public Properties createProducerConfig(String brokerList, String producerId) {
    Properties props = new Properties();
    // 指定生产者连接Broker所需的地址清单,内容格式为host1:port1,host2:port2
    // 注意,这里并非需要所有的Broker地址,而是要通过暴露的Broker获取集群元数据信息
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    // Broker接收的消息必须是字节数组,所以还需对消息的key和value做相应的序列化操作
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 设置acks为-1,意味着Producer会等待分区副本确认写入后才认为消息发送成功  
    props.put(ProducerConfig.ACKS_CONFIG, "-1");  
    // 显式设置Producer Id,命名时要有业务含义且遵循团队规范,方便后面的消息查找
    props.put(ProducerConfig.CLIENT_ID_CONFIG, producerId);
    // 设置批量提交的字节数,默认是16384字节,即16KB
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    // 设置生产者发送请求前的等待时间,以毫秒为单位,默认是0,表示无需等待
    props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
    return props;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

这样,再次使用异步发送代码发送消息时,消息不会立即发送,而是在linger.ms或batch.size的任一条件满足后,才发送消息,从而达到批量发送的效果。

消息返回确认

生产者将消息发送给Broker后,接下来就是通过ACK(Acknowledgment)机制来确认消息是否返回。ACK机制确保了消息在生产者和Broker之间的可靠传递,并在消息被认为已成功发送或处理之前防止其丢失。ACK机制的确认级别之前已有介绍,这里重复介绍下:
(1) acks=0:这是最快速的确认级别,也是最不可靠的。生产者发送消息后不会等待任何确认,直接将消息添加到分区的副本中,并认为消息已成功发送。在这种模式下,如果发生故障或错误,生产者将不会知道,也不会重试发送消息。这种模式通常用于不太关心消息可靠性的场景。
(2) acks=1(Leader确认):这是默认的确认级别。生产者发送消息后会等待分区的领导者(Leader副本)确认消息已成功写入到其本地日志。一旦领导者确认,生产者会认为消息已成功发送。然而,如果领导者成功写入消息但在复制给其他副本时发生故障,消息仍有可能会丢失。
(3)acks=-1(也称为all):这是最可靠的确认模式。生产者发送消息后会等待Leader和ISR集合中的Follower确认消息已写入,然后发送下一条消息。这种模式下,只有在所有副本都成功写入消息后,生产者才会收到确认,确保了消息的可靠性,但可能会导致更长的延迟。
生产者可以通过配置参数来控制其行为,以适应不同的业务需求。在选择确认机制时,需要权衡消息的可靠性和系统的吞吐量。对于需要高可靠性的场景,建议使用acks=-1;而对于对延迟敏感或不太关心消息可靠性的场景,可以使用acks=0或acks=1。

失败重试和错误处理

KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。比如网络异常,这个有可能是网络抖动导致消息没有达到Broker,这时可以通过重试解决。但是,有时异常无法通过重试解决。如消息体过大的异常,这种异常就无法通过重试解决,只能对错误进行处理,如记录日志,重写现有代码保证消息体不会过大。
对于失败后可重试的异常,其重试参数可以通过生产者的retries参数配置。KafkaProducer会自动捕捉可重试的失败,无需编码。重试的次数默认值是10,配置实例代码如下:

props.put(ProducerConfig.RETRIES_CONFIG,10);
  • 1

对于消息发送无法处理的失败,则需要将其作为错误处理。这种失败处理机制和Java的异常处理机制类似。在Java中会将错误分为Error和Exception两类。对于Error是无法捕获,对于Exception则是可以捕获并处理。以下是一种常见的针对不可重试的异常的处理的示例代码:

try {
    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            // Broker返回的消息发送失败的异常
            exception.printStackTrace();
            System.out.println("发送消息失败: " + exception.getMessage());
        } else {
            // 发送成功的处理逻辑
            System.out.println("消息已发送到分区 " + metadata.partition() + ",偏移量 " + metadata.offset());
        }
    });
} catch (Throwable e) {
    // 捕获执行send方法产生的异常
    e.printStackTrace();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Consumer

消费者负责订阅Kafka中的Topic,并从订阅的主题上拉取消息。在Kafka的消费理念中还有一层消费者组(Consumer Group)的概念。一个消费者组可以包含一个或多个消费者。同一消费组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消息消息时互不影响。Kafka通过消费组来实现消息的点对点(Point-to-Point,P2P)模式和广播模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。广播模式定义了如何向一个内容结点发送和订阅消息,这个内容节点称为Topic。Topic可以认为是消息传播的中介,消息发布者将消息发布到某个Topic,而消息订阅者从Topic中订阅消息。Topic使得消息的订阅者和发布者保持相对独立,不需要进行接触即可保证消息的传递。
接下来举例说明Topic、Partition、消费者组、消费者之间的映射关系。某个Topic共有4个Partition:P1、P2、P3、P4。有两个消费者组A和B都订阅了该Topic,消费者组A有4个消费者:C1、C2、C3、C4,消费者组B中有2个消费者:C21、C22。则上述实例中Topic、Partition、消费者组、消费者之间的映射关系可用下图表示:

请添加图片描述

一个Topic可以有一个或多个Partition。在创建Topic的时候就指定了Partition的数量。对于自动创建Topic,其Partition数量从num.partitions(默认值为1)获取,对于手动创建Topic,则根据指定的Partition数决定。
每个Partition只能被一个消费者组中的一个消费者所消费。如果Partition数量等于消费者组中消费者的数量,那么每个消费者只需要消费Topic的一个Partition。如果Partition数量大于消费者组中消费者的数量,那么每个消费者至少消费Topic的一个Partition,且至少有一个消费者消费两个Partition。如果Partition数量小于消费者组中消费者的数量,那么每个消费者至多消费Topic的一个Partition,且存在消费者不消费Partition的情况。
当在Kafka运行时动态的增加Partition数时,会带来Partition-Consumer的再分配。需要注意的,动态增加Partition并不会自动迁移原有Partition的数据到新的Partition中。也就是说,新增加的Partition一开始是不包含任何消息的。如果需要将原有数据分布到新的Partition中,需要自行实现数据迁移的逻辑。
消费者也支持动态的增加或减少,且也会带来Partition-Consumer的再分配。由于Partition此时并不会改变,所以只是更换了Partition对应的Consumer。注意,当Consumer数量等于Partition数量时,再次增加Consumer不会提高消费者端的消费能力,因为这个新的Consumer没有对应的Partition可以消费。
这里之所以不讨论Partition动态减少的情况,是因为目前Kafka并不支持该功能,这里不拓展讲解具体的原因,有兴趣的同学可以自行学习。

消费者参数配置

与生产者类似,消费者在接收消息前,需要创建一个消费者实例。这里介绍下消费者实例创建所需的一些关键配置,并给出示例代码。

public Properties createConsumerConfig(String brokerList, String groupId) {
    Properties props = new Properties();
    // 指定生产者连接Broker所需的地址清单,内容格式为host1:port1,host2:port2
    // 注意,这里并非需要所有的Broker地址,而是要通过暴露的Broker获取集群元数据信息
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    // 每个消费者都对应一个消费者组
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    // 从Broker接收的消息都是经过生产者序列化之后的,所以在消费的时候要进行反序列化处理
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 控制一次消息拉取的最大记录数。换句话说,它限制了消费者每次从Kafka中拉取消息时能够获取的最大消息数量
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
    // 是否自动提交消费者的Offset,默认是true
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    // 指定自动提交消费者的Offset的时间间隔,单位是毫秒,默认是5000ms,也即5s
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    // 自动消费重置参数,消费者提交的偏移量不在Broker上,规范如何消费消息,默认是lastest,即如果分区中没有已提交的偏移量,消费者将不会消费任何已存在的数据,而是等待新数据的到来
    // 在消费者启动时,该参数起着关键作用,尤其是在消费者尝试读取一个分区时,该分区没有初始的偏移量或当前偏移量不再存在(例如,由于数据被删除)。该参数决定了消费者在这种情况下应该采取的行为。
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    return props;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

与KafkaProducer不同,Kafka Consumer是非线程安全的。这意味着在多个线程中直接共享一个KafkaConsumer实例是不安全的,可能会导致数据不一致等问题。然而,这并不意味着Kafka Consumer只能在单线程中使用。为了在多线程环境中使用Kafka Consumer,可以采取一些策略,如每个线程维护一个Kafka Consumer实例,或者使用单Kafka Consumer实例加上多worker线程的模式。
尽管Kafka Consumer本身不是线程安全的,但Kafka提供了其他机制来确保数据的一致性和可靠性。如Kafka Consumer的提交偏移量操作是线程安全的,这意味着即使在多线程环境中,Kafka也能确保提交操作的原子性和顺序性,从而避免数据冲突或丢失。

单条消费和批量消费

根据需要接收的消息数量,可以将消息消费分为单条消费和批量消费。可以通过设置max.poll.records(默认值是500)为1来实现单条消费,一般情况下,不建议单条消费,尽量批量拉取、单条处理。在Kafka Consumer API中接提供connsumer.poll(Duration.ofMillis(timeout))方法来拉取消息。poll()方法返回的是一个消息集合,所以单条消费和多条消费可以使用相同的代码:

public void consumeMessage(Consumer<String, String> consumer) {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 如果拉取的消息只有一条,则为单条处理,如果拉取的消息是多条,则为批量处理
            System.out.println("Received message: " + record.value());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

手写代码可能会存在这样那样的问题,对于使用Spring Boot开发的应用来说,推荐使用Spring Kafka三方件提供的单条消费和批量消费的接口。

自动提交偏移量和手动提交偏移量

Kafka Consumer在消费消息时,支持自动提交偏移量和手动提交偏移量。自动提交偏移量是Kafka Consumer的默认方式,可以简化客户端代码对偏移量的感知,手动提交偏移量方式,需要客户端代码主动提交偏移量。对于自动提交偏移量方式,如果在接收消息后出现错误,自动提交可能仍在进行,进而导致这部分数据丢失,也即实现至多一次的语义保证。而手动提交偏移量方式可以由客户端代码决定何时提交偏移量,可以在消息处理失败时不处理偏移量,从而避免消息丢失,也即实现"至少一次"或"恰好一次"的效果。
自动提交适合简单的用例,其中消息的丢失或重复处理不是关键问题。手动提交适用于需要更高控制级别和更强语义保证的场景,例如金融交易或关键业务逻辑。
自动提交偏移量方式是Kafka Consumer的默认实现,具体代码可以参考之前的示例,这里给出手动提交偏移量的示例代码:

public void consumeMessageByManual(Consumer<String, String> consumer) {
    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息逻辑
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                // 假设消息处理成功,则提交偏移量
                // 同步提交偏移量
                consumer.commitSync();
            }
        }
    } catch (Throwable e) {
        // 处理异常
        e.printStackTrace();
    } finally {
        consumer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

注意,在编写手动提交偏移量的代码前,还需配置Kafka Consumer为手动提交模式,对应参数是enable.auto.commit,对应的值为false。在Kafka Consumer手动提交模式下,还支持同步提交偏移量和异步提交偏移量两种方式,commitSync()方法会一直阻塞,直到偏移量被成功提交或发生错误。如果希望提交操作是非阻塞的,可以使用commitAsync()方法,并为其提供一个回调来处理可能的异常。

可靠性保证

在将Kafka作为消息管理平台时,为保证系业务系统的可靠性,还需对消息处理的常见场景进行考量,以期实现一个符合业务需要的系统。常见的问题有消息丢失问题、消息重复消费问题、消息的顺序性问题、消息积压问题、消息TTL问题、消息回溯问题。

消息丢失问题

在Kafka中,消息从生产到存储到消费,都可能存在消息丢失的问题。在使用Kafka的时候,要根据业务需要,进行合理的配置。

生产者防止消息丢失

生产者使用ACK机制来确保了消息在生产者和Broker之间的可靠传递,并在消息被认为已成功发送或处理之前防止其丢失。其中:
(1) acks=0是最不可靠的,这个策略下,生产者发送消息后不会等待任何确认,直接将消息添加到分区的副本中,并认为消息已成功发送。在这种模式下,如果发生故障或错误,生产者将不会知道,也不会重试发送消息。这种模式通常用于不太关心消息可靠性的场景;
(2) acks=1是默认的确认级别。这个策略下,生产者发送消息后会等待分区的领导者(Leader副本)确认消息已成功写入到其本地日志。一旦领导者确认,生产者会认为消息已成功发送。然而,如果领导者成功写入消息但在复制给其他副本时发生故障,消息仍有可能会丢失;
(3) acks=-1(也称为all)是最可靠的确认模式。这个策略下,生产者发送消息后会等待Leader和ISR集合中的Follower确认消息已写入,然后发送下一条消息。这种模式下,只有在所有副本都成功写入消息后,生产者才会收到确认,确保了消息的可靠性,但可能会导致更长的延迟。生产者可以通过配置参数来控制其行为,以适应不同的业务需求。在选择确认机制时,需要权衡消息的可靠性和系统的吞吐量。对于需要高可靠性的场景,建议使用acks=-1;而对于对延迟敏感或不太关心消息可靠性的场景,可以使用acks=0或acks=1。
Kafka Producer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。比如网络异常,这个有可能是网络抖动导致消息没有达到Broker,这时可以通过重试解决。但是,有时异常无法通过重试解决。如消息体过大的异常,这种异常就无法通过重试解决,只能对错误进行处理,如记录日志,重写现有代码保证消息体不会过大。
对于失败后可重试的异常,其重试参数可以通过生产者的retries参数配置。KafkaProducer会自动捕捉可重试的失败,无需编码。重试的次数默认值是10。对于消息发送无法处理的失败,则需要将其作为错误处理。这种失败处理机制和Java的异常处理机制类似。在Java中会将错误分为Error和Exception两类。对于Error是无法捕获,对于Exception则是可以捕获并处理。

Broker防止消息丢失

为保证消息不丢失,Kafka为Partition引入了多副本(Replica)机制,通过增加副本数量来提升容灾能力。同一个Partition的不同副本中保存的是相同的信息,副本之间是"一主多从"的关系。其中Leader负责处理读写请求(Producer和Consumer只与Leader交互),Follower只负责与Leader的消息同步。当Leader出现故障时,从Follower中重新选举新的Leader并对外提供服务。
此外,生产环境要给Broker设置监控和警报系统,及时发现和处理消息丢失的问题。这包括监控Kafka集群的状态、生产者和消费者的性能以及消息的处理情况等。

消费者防止消息丢失

消费者消费消息的过程中,无论是使用自动提交偏移量,还是手动提交偏移量都有可能导致消息丢失。特别是自动提交偏移量,如果消息处理错误但消息的偏移量已自动提交,则这部分消息将丢失。如果不能容忍这类场景下的消息丢失,则需使用手动提交偏移量方式,实现"至少一次"或"恰好一次"的效果。但是因为手动提交偏移量需要开发者自行编写代码,必须确保代码在充分验证的情况下,保证不会出现不必要的消息丢失问题。

消息重复消费问题

Kafka出现消息重复消费问题,主要是由于已经消费的消息的偏移量没有成功提交到Broker。出现以上问题的常见原因有:
(1) 消费者宕机或重启:当消费者宕机或重启时,如果之前已经消费但尚未提交Offset的数据,会在消费者恢复后再次被消费,导致重复。
(2) 自动提交Offset与Rebalance冲突:当消费者设置为自动提交Offset,并且在新的消费者加入或移除导致Rebalance发生时,尚未提交的Offset可能导致数据重复消费。
(3) 消息处理耗时过长:如果消费者处理消息的时间过长,超过了Kafka配置的最大轮询间隔(max.poll.interval.ms),Kafka会认为该消费者已经死亡,触发Rebalance,从而导致重复消费。
针对消息重复问题,可以考虑以下方案:
(1) 采用手动提交Offset:手动提交Offset,可以在消息被成功处理后,再提交Offset。这样可以确保只有在消息真正被处理后才更新Offset,减少重复消费的可能性。
(2) 优化消息处理逻辑:减少消息处理的时间,避免超过max.poll.interval.ms配置的时间。如果消息处理确实需要较长时间,可以考虑增加该配置的值。
(3) 幂等性设计:在设计应用时,保证消息的消费逻辑时幂等的,即无论同一个消息被消费多少次,都不会产生副作用。
(4) 监控和警报:设置监控和警报系统,及时发现和处理消息重复消费的问题。
结合以上解决方案,可以有效地减少Kafka消息重复消费的风险。同时,应根据具体的业务场景和需求,选择最适合的解决方案来应对消息重复消费问题。

消息的顺序性问题

所谓的消息顺序性,主要是指消息可以按照写入的逻辑顺序进行消费。在Kafka中,只能保证单个Partition的消息是有序的,无法保证多个Partition间消息的有序性。如果期望实现Kafka消息的顺序性,则必须保证这个Topic只有一个Partition,且只有一个消费者去消费。
如果需要在多个Partition之间保证顺序消费,可以采用一些额外策略,如分区设计时,根据一定的规则为业务标识分配一个唯一的标识符,并将相同标识符的消息发送到同一个分区中。

消息积压问题

所谓的消息积压是指消息大量堆积,消息的生产速率和消费速率无法匹配。常见的消息积压的原因有:(1) 生产者速度过快:当生产者发送消息的速度远超过消费者处理消息的速度时,消息会在Kafka中积压;(2) 消费者速度过慢:消费者处理消息的速度较慢,无法及时消费掉生产者发送的消息,导致消息积压;(3) 副本同步延迟:Kafka的副本同步延迟较高,导致副本之间的同步速度跟不上消息的写入速度,从而引发消息积压;(4) 网络故障:Kafka集群所在的网络出现故障,影响生产者和消费者的正常通信,导致消息积压。
针对产生消息堆积的不同原因,可以选择对应的解决方案:
(1) 优化生产者发送速度:如果生产者发送速度过快,可以考虑调整其发送策略,如增加消息发送的间隔,或使用流控机制来限制发送速度。
(2) 调整消费者消费速度:使用Kafka的消费者配置参数(如max.poll.records)来控制消费者每次拉取的最大记录数,以适应生产者的速度。
(3) 增加消费者数量:增加消费者的数量,提高处理消息的速度,减少单个消费者的负载压力。注意,如果消费者的数量已经等于Partition的数量,则增加消费者的数量无法提高消费速度。但是,对于生产应用来说,除了消息消费,还承载其他业务,增加消费者可以通过分担现网业务压力,间接提交消息处理速度。
(4) 增加Kafka的分区数(num.partitions),提高吞吐量。注意,分区数的增加会对现网业务产生影响,应在充分验证的前提下谨慎操作。
(5) 监控和警报。设置监控和警报系统,及时发现和处理消息积压问题。监控Kafka集群的状态、生产者和消费者的性能以及消息的处理情况等。对于现网业务来说,针对Kafka消息积压的监控和告警是必不可少的。
(6) 提高消费者的硬件配置,如增加内存或CPU资源,以加快消息处理速度。可以临时太高消费者的硬件配置,解决消息堆积问题。
解决Kafka消息积压问题需要综合考虑多个方面,并根据实际情况调整和优化相关配置和策略。

消息TTL

所谓的消息TTL(Time To Live),就是消息的可存活时间。Kafka本身没有提供消息的TTL功能。但是可以设置Topic中的消息的默认保存时限(默认是7天)。这个默认保存时间可以通过server.properties文件中的log.retention.hours属性进行修改。

消息回溯

Kafka中的消息回溯是指用户可以从消息队列中重新消费之前的消息。Kafka支持两种主要的消息回溯方式:
(1) 基于消息偏移量(Offset)的回溯:在Kafka的每个分区中,每条消息都有一个唯一的偏移量,用来表示消息在分区中的位置。消费者每次消费消息后,都会将消费的此条消息的偏移量提交到Broker,用于记录消费到分区中的位置。因此,基于消息偏移量回溯非常直接,只需要重置偏移量,消费者就会从该偏移量之后开始消费消息。
(2) 基于时间点的消息回溯:Kafka存储消息是以日志的形式,每个分区对应一个日志,但日志实际上是由多个文件组成的。当需要基于时间点回溯消息时,Kafka可以根据存储的日志文件和对应的时间戳来找到并重新消费之前的消息。
消息回溯可用于以下场景:
(1) 数据丢失或错误处理:当消费者处理消息时发生错误或数据丢失时,回溯机制允许消费者重新读取之前的消息,以便进行错误处理或重新处理数据。
(2) 版本升级:在Kafka集群进行版本升级时,回溯机制可以确保消费者能够与新版本的Kafka集群保持兼容,避免兼容性问题。
需要注意的是,虽然消息回溯提供了很大的灵活性,但在实际使用中需要谨慎处理,以避免对Kafka集群的性能和稳定性产生负面影响。同时,也应考虑消息幂等性问题,避免出现业务错误。

参考

《深入理解Kafka:核心设计与实践原理》 朱忠华 著
《Kafka权威指南》 Neha Narkhede Gwen Shapira Todd Palino 著 薛命灯译
《Kafka源码解析与实战》 王亮 著
https://blog.csdn.net/qq_32828253/article/details/110732652 Kafka 设计架构原理详细解析(超详细图解)
https://kafka.apache.org/documentation/#zk 6.9 ZooKeeper
https://iteritory.com/beginners-guide-apache-kafka-basic-architecture-components-concepts/ Apache Kafka Basic Architecture, Components, Concepts

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/357464
推荐阅读
相关标签
  

闽ICP备14008679号