赞
踩
Prior releases: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X.
我们知道流处理平台有以下三种特性:
Kafka适合什么样的场景?
它可以用于两大类别的应用:
为了理解Kafka是如何做到以上所说的功能,从下面开始,我们将深入探索Kafka的特性。.
首先是一些概念:
Kafka有四个核心的API:
在Kafka中,客户端和服务器使用一个简单、高性能、支持多语言的 TCP 协议.此协议版本化并且向下兼容老版本, 我们为Kafka提供了Java客户端,也支持许多其他语言的客户端。
让我们首先深入了解下Kafka的核心概念:提供一串流式的记录— topic 。
Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。
对于每一个topic, Kafka集群都会维持一个分区日志,如下所示:
每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。
Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制. 举个例子, 如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。Kafka的性能和数据大小无关,所以长时间存储数据没有什么问题.
事实上,在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从"现在"开始消费。
这些细节说明Kafka 消费者是非常廉价的—消费者的增加和减少,对集群或者其他消费者没有多大的影响。比如,你可以使用命令行工具,对一些topic内容执行 tail操作,并不会影响已存在的消费者消费数据。
日志中的 partition(分区)有以下几个用途。第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。每个单独的分区都必须受限于主机的文件限制,不过一个主题可能有多个分区,因此可以处理无限量的数据。第二,可以作为并行的单元集—关于这一点,更多细节如下
日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。每一个分区都会在已配置的服务器上进行备份,确保容错性.
每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。
生产者可以将数据发布到所选择的topic(主题)中。生产者负责将记录分配到topic的哪一个 partition(分区)中。可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数(例如:记录中的key)来完成。下面会介绍更多关于分区的使用。
消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例.消费者实例可以分布在多个进程中或者多个机器上。
如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例.
如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程.
如图,这个 Kafka 集群有两台 server 的,四个分区(p0-p3)和两个消费者组。消费组A有两个消费者,消费组B有四个消费者。
通常情况下,每个 topic 都会有一些消费组,一个消费组对应一个"逻辑订阅者"。一个消费组由许多消费者实例组成,便于扩展和容错。这就是发布和订阅的概念,只不过订阅者是一组消费者而不是单个的进程。
在Kafka中实现消费的方式是将日志中的分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。维护消费组中的消费关系由Kafka协议动态处理。如果新的实例加入组,他们将从组中其他成员处接管一些 partition 分区;如果一个实例消失,拥有的分区将被分发到剩余的实例。
Kafka 只保证分区内的记录是有序的,而不保证主题中不同分区的顺序。每个 partition 分区按照key值排序足以满足大多数应用程序的需求。但如果你需要总记录在所有记录的上面,可使用仅有一个分区的主题来实现,这意味着每个消费者组只有一个消费者进程。
high-level Kafka给予以下保证:
关于保证的更多细节可以看文档的设计部分。
Kafka streams的概念与传统的企业消息系统相比如何?
传统的消息系统有两个模块: 队列 和 发布-订阅。 在队列中,消费者池从server读取数据,每条记录被池子中的一个消费者消费; 在发布订阅中,记录被广播到所有的消费者。两者均有优缺点。 队列的优点在于它允许你将处理数据的过程分给多个消费者实例,使你可以扩展处理过程。 不好的是,队列不是多订阅者模式的—一旦一个进程读取了数据,数据就会被丢弃。 而发布-订阅系统允许你广播数据到多个进程,但是无法进行扩展处理,因为每条消息都会发送给所有的订阅者。
消费组在Kafka有两层概念。在队列中,消费组允许你将处理过程分发给一系列进程(消费组中的成员)。 在发布订阅中,Kafka允许你将消息广播给多个消费组。
Kafka的优势在于每个topic都有以下特性—可以扩展处理并且允许多订阅者模式—不需要只选择其中一个.
Kafka相比于传统消息队列还具有更严格的顺序保证
传统队列在服务器上保存有序的记录,如果多个消费者消费队列中的数据, 服务器将按照存储顺序输出记录。 虽然服务器按顺序输出记录,但是记录被异步传递给消费者, 因此记录可能会无序的到达不同的消费者。这意味着在并行消耗的情况下, 记录的顺序是丢失的。因此消息系统通常使用“唯一消费者”的概念,即只让一个进程从队列中消费, 但这就意味着不能够并行地处理数据。
Kafka 设计的更好。topic中的partition是一个并行的概念。 Kafka能够为一个消费者池提供顺序保证和负载平衡,是通过将topic中的partition分配给消费者组中的消费者来实现的, 以便每个分区由消费组中的一个消费者消耗。通过这样,我们能够确保消费者是该分区的唯一读者,并按顺序消费数据。 众多分区保证了多个消费者实例间的负载均衡。但请注意,消费者组中的消费者实例个数不能超过分区的数量。
Kafka 作为存储系统
许多消息队列可以发布消息,除了消费消息之外还可以充当中间数据的存储系统。那么Kafka作为一个优秀的存储系统有什么不同呢?
数据写入Kafka后被写到磁盘,并且进行备份以便容错。直到完全备份,Kafka才让生产者认为完成写入,即使写入失败Kafka也会确保继续写入
Kafka使用磁盘结构,具有很好的扩展性—50kb和50TB的数据在server上表现一致。
可以存储大量数据,并且可通过客户端控制它读取数据的位置,您可认为Kafka是一种高性能、低延迟、具备日志存储、备份和传播功能的分布式文件系统。
关于Kafka提交日志存储和备份设计的更多细节,可以阅读 这页 。
Kafka用做流处理
Kafka 流处理不仅仅用来读写和存储流式数据,它最终的目的是为了能够进行实时的流处理。
在Kafka中,流处理器不断地从输入的topic获取流数据,处理数据后,再不断生产流数据到输出的topic中去。
例如,零售应用程序可能会接收销售和出货的输入流,经过价格调整计算后,再输出一串流式数据。
简单的数据处理可以直接用生产者和消费者的API。对于复杂的数据变换,Kafka提供了Streams API。 Stream API 允许应用做一些复杂的处理,比如将流数据聚合或者join。
这一功能有助于解决以下这种应用程序所面临的问题:处理无序数据,当消费端代码变更后重新处理输入,执行有状态计算等。
Streams API建立在Kafka的核心之上:它使用Producer和Consumer API作为输入,使用Kafka进行有状态的存储, 并在流处理器实例之间使用相同的消费组机制来实现容错。
批处理
将消息、存储和流处理结合起来,使得Kafka看上去不一般,但这是它作为流平台所备的.
像HDFS这样的分布式文件系统可以存储用于批处理的静态文件。 一个系统如果可以存储和处理历史数据是非常不错的。
传统的企业消息系统允许处理订阅后到达的数据。以这种方式来构建应用程序,并用它来处理即将到达的数据。
Kafka结合了上面所说的两种特性。作为一个流应用程序平台或者流数据管道,这两个特性,对于Kafka 来说是至关重要的。
通过组合存储和低延迟订阅,流式应用程序可以以同样的方式处理过去和未来的数据。 一个单一的应用程序可以处理历史记录的数据,并且可以持续不断地处理以后到达的数据,而不是在到达最后一条记录时结束进程。 这是一个广泛的流处理概念,其中包含批处理以及消息驱动应用程序
同样,作为流数据管道,能够订阅实时事件使得Kafk具有非常低的延迟; 同时Kafka还具有可靠存储数据的特性,可用来存储重要的支付数据, 或者与离线系统进行交互,系统可间歇性地加载数据,也可在停机维护后再次加载数据。流处理功能使得数据可以在到达时转换数据。
有关Kafka提供的保证、API和功能的更多信息,请看文档的剩余部分。
以下描述了一些 ApacheKafka ®的流行用例。有关这些领域的概述,请参阅 此博客中的文章。
Kafka 很好地替代了传统的message broker(消息代理)。 Message brokers 可用于各种场合(如将数据生成器与数据处理解耦,缓冲未处理的消息等)。 与大多数消息系统相比,Kafka拥有更好的吞吐量、内置分区、具有复制和容错的功能,这使它成为一个非常理想的大型消息处理应用。
根据我们的经验,通常消息传递使用较低的吞吐量,但可能要求较低的端到端延迟,Kafka提供强大的持久性来满足这一要求。
在这方面,Kafka 可以与传统的消息传递系统(ActiveMQ 和 RabbitMQ)相媲美。
Kafka 的初始用例是将用户活动跟踪管道重建为一组实时发布-订阅源。 这意味着网站活动(浏览网页、搜索或其他的用户操作)将被发布到中心topic,其中每个活动类型有一个topic。 这些订阅源提供一系列用例,包括实时处理、实时监视、对加载到Hadoop或离线数据仓库系统的数据进行离线处理和报告等。
每个用户浏览网页时都生成了许多活动信息,因此活动跟踪的数据量通常非常大
Kafka 通常用于监控数据。这涉及到从分布式应用程序中汇总数据,然后生成可操作的集中数据源。
许多人使用Kafka来替代日志聚合解决方案。 日志聚合系统通常从服务器收集物理日志文件,并将其置于一个中心系统(可能是文件服务器或HDFS)进行处理。 Kafka 从这些日志文件中提取信息,并将其抽象为一个更加清晰的消息流。 这样可以实现更低的延迟处理且易于支持多个数据源及分布式数据的消耗。 与Scribe或Flume等以日志为中心的系统相比,Kafka具备同样出色的性能、更强的耐用性(因为复制功能)和更低的端到端延迟。
许多Kafka用户通过管道来处理数据,有多个阶段: 从Kafka topic中消费原始输入数据,然后聚合,修饰或通过其他方式转化为新的topic, 以供进一步消费或处理。 例如,一个推荐新闻文章的处理管道可以从RSS订阅源抓取文章内容并将其发布到“文章”topic; 然后对这个内容进行标准化或者重复的内容, 并将处理完的文章内容发布到新的topic; 最终它会尝试将这些内容推荐给用户。 这种处理管道基于各个topic创建实时数据流图。从0.10.0.0开始,在Apache Kafka中,Kafka Streams 可以用来执行上述的数据处理,它是一个轻量但功能强大的流处理库。除Kafka Streams外,可供替代的开源流处理工具还包括Apache Storm 和Apache Samza.
Event sourcing是一种应用程序设计风格,按时间来记录状态的更改。 Kafka 可以存储非常多的日志数据,为基于 event sourcing 的应用程序提供强有力的支持。
Kafka 可以从外部为分布式系统提供日志提交功能。 日志有助于记录节点和行为间的数据,采用重新同步机制可以从失败节点恢复数据。 Kafka的日志压缩 功能支持这一用法。 这一点与Apache BookKeeper 项目类似。
本教程假定您是一只小白,没有Kafka 或ZooKeeper 方面的经验。 Kafka控制脚本在Unix和Windows平台有所不同,在Windows平台,请使用 bin\windows\
而不是bin/
, 并将脚本扩展名改为.bat
.
下载 1.0.0版本并解压缩。.
1 2 |
|
Kafka 使用 ZooKeeper 如果你还没有ZooKeeper服务器,你需要先启动一个ZooKeeper服务器。 您可以通过与kafka打包在一起的便捷脚本来快速简单地创建一个单节点ZooKeeper实例。
1 2 3 |
|
现在启动Kafka服务器:
1 2 3 4 |
|
让我们创建一个名为“test”的topic,它有一个分区和一个副本:
1 |
|
现在我们可以运行list(列表)命令来查看这个topic:
1 2 |
|
或者,您也可将代理配置为:在发布的topic不存在时,自动创建topic,而不是手动创建。
Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。
运行 producer,然后在控制台输入一些消息以发送到服务器。
1 2 3 |
|
Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。
1 2 3 |
|
如果您将上述命令在不同的终端中运行,那么现在就可以将消息输入到生产者终端中,并将它们在消费终端中显示出来。
所有的命令行工具都有其他选项;运行不带任何参数的命令将显示更加详细的使用信息。
到目前为止,我们一直在使用单个代理,这并不好玩。对 Kafka来说,单个代理只是一个大小为一的集群,除了启动更多的代理实例外,没有什么变化。 为了深入了解它,让我们把集群扩展到三个节点(仍然在本地机器上)。
首先,为每个代理创建一个配置文件 (在Windows上使用copy
命令来代替):
1 2 |
|
现在编辑这些新文件并设置如下属性:
1 2 3 4 5 6 7 8 9 |
|
broker.id
属性是集群中每个节点的名称,这一名称是唯一且永久的。我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的代理尝试在同一个端口注册,或者覆盖彼此的数据。
我们已经建立Zookeeper和一个单节点了,现在我们只需要启动两个新的节点:
1 2 3 4 |
|
现在创建一个副本为3的新topic:
1 |
|
Good,现在我们有一个集群,但是我们怎么才能知道那些代理在做什么呢?运行"describe topics"命令来查看:
1 2 3 |
|
以下是对输出信息的解释。第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。
请注意,在示例中,节点1是该主题中唯一分区的领导者。
我们可以在已创建的原始主题上运行相同的命令来查看它的位置:
1 2 3 |
|
这没什么大不了,原来的主题没有副本且在服务器0上。我们创建集群时,这是唯一的服务器。
让我们发表一些信息给我们的新topic:
1 2 3 4 5 |
|
现在我们来消费这些消息:
1 2 3 4 5 |
|
让我们来测试一下容错性。 Broker 1 现在是 leader,让我们来杀了它:
1 2 3 |
|
在 Windows 上用:
1 2 3 4 |
|
领导权已经切换到一个从属节点,而且节点1也不在同步副本集中了:
1 2 3 |
|
不过,即便原先写入消息的leader已经不在,这些消息仍可用于消费:
1 2 3 4 5 |
|
Step 7: 使用Kafka Connect来导入/导出数据
从控制台读出数据并将其写回是十分方便操作的,但你可能需要使用其他来源的数据或将数据从Kafka导出到其他系统。针对这些系统, 你可以使用Kafka Connect来导入或导出数据,而不是写自定义的集成代码。
Kafka Connect是Kafka的一个工具,它可以将数据导入和导出到Kafka。它是一种可扩展工具,通过运行connectors(连接器), 使用自定义逻辑来实现与外部系统的交互。 在本文中,我们将看到如何使用简单的connectors来运行Kafka Connect,这些connectors 将文件中的数据导入到Kafka topic中,并从中导出数据到一个文件。
首先,我们将创建一些种子数据来进行测试:
1 |
|
在Windows系统使用:
1 2 |
|
接下来,我们将启动两个standalone(独立)运行的连接器,这意味着它们各自运行在一个单独的本地专用 进程上。 我们提供三个配置文件。首先是Kafka Connect的配置文件,包含常用的配置,如Kafka brokers连接方式和数据的序列化格式。 其余的配置文件均指定一个要创建的连接器。这些文件包括连接器的唯一名称,类的实例,以及其他连接器所需的配置。
1 |
|
这些包含在Kafka中的示例配置文件使用您之前启动的默认本地群集配置,并创建两个连接器: 第一个是源连接器,用于从输入文件读取行,并将其输入到 Kafka topic。 第二个是接收器连接器,它从Kafka topic中读取消息,并在输出文件中生成一行。
在启动过程中,你会看到一些日志消息,包括一些连接器正在实例化的指示。 一旦Kafka Connect进程启动,源连接器就开始从 test.txt
读取行并且 将它们生产到主题 connect-test
中,同时接收器连接器也开始从主题 connect-test
中读取消息, 并将它们写入文件 test.sink.txt
中。我们可以通过检查输出文件的内容来验证数据是否已通过整个pipeline进行交付:
1 2 3 |
|
请注意,数据存储在Kafka topic connect-test
中,因此我们也可以运行一个console consumer(控制台消费者)来查看 topic 中的数据(或使用custom consumer(自定义消费者)代码进行处理):
1 2 3 4 |
|
连接器一直在处理数据,所以我们可以将数据添加到文件中,并看到它在pipeline 中移动:
1 |
|
您应该可以看到这一行出现在控制台用户输出和接收器文件中。
Kafka Streams是用于构建实时关键应用程序和微服务的客户端库,输入与输出数据存储在Kafka集群中。 Kafka Streams把客户端能够轻便地编写部署标准Java和Scala应用程序的优势与Kafka服务器端集群技术相结合,使这些应用程序具有高度伸缩性、弹性、容错性、分布式等特性。 本快速入门示例将演示如何运行一个基于该库编程的流式应用程序。
在主发行版之外,有大量的工具与 Kafka 集成。在 生态圈 里列出了许多内容,有流处理系统、Hadoop集成、监视和部署工具。
从 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x 升级到1.0.0
Kafka 1.0.0 介绍了通信协议方面的改变。 遵循下面的滚动升级计划,可以保证您在升级过程中不用停机。 在升级之前,请先查看1.0.0版本中显著的变化。
滚动升级计划:
inter.broker.protocol.version
并将其设置为1.0。其他升级说明:
delete.topic.enable
设置为false。请记住,在topic中删除数据的操作是不可逆的(即没有“撤销删除”操作)。inter.broker.protocol.version
是1.0或更高版本,即使存在脱机日志目录,代理也会一直保持联机,并在实时日志目录上提交副本。由硬件故障导致的IOException,日志目录可能会变为脱机状态。用户需要监控每个代理度量标准offlineLogDirectoryCount
来检查是否存在离线日志目录。handleError
方法已经从以下过时类中除去kafka.api
:包FetchRequest
,GroupCoordinatorRequest
,OffsetCommitRequest
, OffsetFetchRequest
,OffsetRequest
,ProducerRequest
,和TopicMetadataRequest
。这只是为了在代理上使用,但是实际上它已经不再被使用了,实现也没有被维护。只是因为二进制兼容性,保留了一个存根。kafka-consumer-offset-checker.sh
工具已被弃用。使用kafka-consumer-groups.sh
来得到consumer group 的详细信息AuthenticationException
中的一个子类向客户端报告身份验证失败日志。如果客户端连接失败,并不会重新进行验证 。SaslServer
实现可能会向客户端抛出 SaslAuthenticationException
来提供有关身份验证失败的错误信息。同时应注意在异常信息中,不要向未授权的客户泄露任何安全方面的关键信息。app-info
将被弃用,由提供这些属性的metrics(度量)来替换。org.apache.kafka.common.Metric#value()
已被弃用并返回0.0
以最大限度地减少用户读取每个客户端值时系统断开的概率(用户调用 MetricsReporter
或metrics()
来读取)。org.apache.kafka.common.Metric#metricValue()
用来检索数字和非数字的度量值-total
方便后续处理。 例如, records-consumed-rate
对应的度量标准是 records-consumed-total
。kafka_mx4jenable
设置为 true
时,Mx4j才会启用。以前它是默认启用的,如果 kafka_mx4jenable
设置为 true
,则禁用Mx4j。org.apache.kafka.common.security.auth
包现在是公有的,已被添加到javadocs中。这个包中的内部类已经移到其他地方了。is_new
。offline_replicas
。offline_replicas
。ProcessorContext#schedule()
、Processor#punctuate()
、KStreamBuilder
和TopologyBuilder
正在被新的API取代。我们建议进行相应的代码更改,在升级时这些改变是细微的,因为新的API看起来非常相似。从 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x 或 0.10.2.x 升级到 0.11.0.0
Kafka 0.11.0.0 引入了一个新的消息格式版本,在有线协议方面也有变化。 遵循下面的滚动升级计划,可以保证您在升级过程中不用停机。在升级之前,请先查看0.11.0.0版本中显著的变化。
从0.10.2 版本开始,Java客户端(生产者和消费者)已经可以与旧代理进行通信,0.11.0版本客户可以与0.10.0及其以上的代理进行通信。但如果代理版本大于0.10.0,则须先升级Kafka集群中的所有代理,然后再升级客户端。0.11.0版本的代理支持0.8.x及其以上的客户端。
对于滚动升级:
inter.broker.protocol.version
为0.11.0,但不要更改log.message.format.version
。其他升级说明:
log.message.format.version
之前,也可以使用主题管理工具(bin/kafka-topics.sh
)在各个topic上启用0.11.0消息格式。key.serde
,value.serde
和 timestamp.extractor
,建议使用替换的配置参数,因为这些配置已被弃用。unclean.leader.election.enable
改为为true
。block.on.buffer.full
,metadata.fetch.timeout.ms
和timeout.ms
已被删除。他们在 0.9.0.0版本中就被弃用。offsets.topic.replication.factor
配置现在被限制由 topic 自动生成。当群集大小不满足复制因子要求时,topic 内部自动生成将失败并返回 GROUP_COORDINATOR_NOT_AVAILABLE 错误。max.message.bytes
现在指批量消息的大小。之前将其应用于批量压缩的消息,或单独应用于未压缩的消息。批量消息可能只包含单个消息,因此大多数情况下,单个消息的大小只能通过批量格式的上限来控制。不过,消息格式转换有一些微妙的含义(详见 below for more detail)。请注意,代理以前会确保在每个提取请求中至少返回一条消息(无论总分区级别和分区级别的提取大小),但这一行为现在适用于批量消息。Headers headers()
方法调用新的Headers API。group.initial.rebalance.delay.ms
,该配置指定时间以毫秒为单位。GroupCoordinator
将延迟初始消费者以实现再平衡。当有新成员加入group时,将根据group.initial.rebalance.delay.ms
的值进行平衡,延迟的时间最高可达max.poll.interval.ms
(默认为3秒)。在开发和测试中,为了不延迟执行的时间,可能需要将其设置为0。org.apache.kafka.common.Cluster#partitionsForTopic
、 partitionsForNode
和availablePartitionsForTopic
方法会返回一个空列表,而不是null
(这被认为是不好的做法)。timestamp.extractor
、key.serde
和 value.serde
分别被default.timestamp.extractor
、default.key.serde
和default.value.serde
替代。commitAsync
API中的偏移提交失败,我们不再公布底层原因。更多详细信息,请参阅KAFKA-5052。log_start_offset
。log_start_offset
。key
字段和value
字段的header
。key
字段和value
字段的header
。在生产者方面,Kafka 0.11.0 支持幂等和事务性能力。幂等传递确保消息在单个生产者的生命周期内仅给特定的主题分区传递一次。事务交付允许生产者给多个分区发送数据,这样所有的消息都会被传递成功或失败。这些功能使Kafka符合“恰好一次语义”。有关这些功能的更多详细信息,请参阅用户指南。下面我们将指出一些有关升级群集过程中的特定注意事项。请注意,启用EoS不是必需的,如未使用,不会影响broker的行为。
__transaction_state
中。在首次使用事务性请求API时才创建此主题。同样地,消费者偏移主题也有几个配置设置用来控制主题。如transaction.state.log.min.isr
控制主题的最小ISR。请参阅用户指南中的配置部分以获取完整的选项列表。bin/kafka-acls.sh
工具打开。为了更好地支持生产者的交付语义(见KIP-98)以及提升复制容错能力(参见KIP-101),0.11.0消息格式增强了几个主要的功能。虽然新格式包含了更多信息以实现这些改进,但我们已经使批处理格式更有效率。只要每批消息的数量大于2,就可以降低整体开销。然而,对于单批次,可能会有一些轻微的性能影响。请参阅这里 以便了解我们对新消息格式初始性能的分析结果。您也可以在KIP-98方案中找到更多有关信息格式的细节。
新消息格式中,一个显著的差异是:未压缩的消息会被存储为一个批次。这会对代理配置max.message.bytes
(它限制单个批次的大小)有一些影响。首先,如果一个旧版的客户端使用旧格式生产消息到主题分区,且每个消息都比max.message.bytes
小,那么通过上述转换,合并成单批次后,代理仍可能会拒绝它们。通常,这可能发生在单个消息的聚合大小大于max.message.bytes
时。旧的消费者阅读从新格式转换来的消息时也有类似的影响:如果提取大小未被设置为 max.message.bytes
,即使单个未压缩的消息小于已配置的获取大小,消费者也可能无法取得进展。此行为不影响0.10.1.0及更高版本的Java客户端,因为它的获取协议是新的,该协议保证即使超过获取大小也能返回至少一条消息。为了解决这些问题,你应该确保:1)生产者的批量大小没有大于max.message.bytes
,并且2)消费者的获取大小为max.message.bytes
。
大多数关于升级到0.10.0消息格式对性能影响的讨论,仍然与0.11.0升级有关。这主要影响不使用TLS保护的群集,因为在这种情况下“零复制”传输是不可行的。为了避免下变换的成本,您应确保客户应用程序升级到最新的0.11.0版本。值得注意的是,由于旧消费者在0.11.0.0已经被弃用,它不支持新的消息格式。您必须升级才能使用新消费者及新的消息格式,这不需要下转换的成本。请注意,0.11.0消费者向后兼容0.10.0及更高版本的代理,所以可以先在升级代理之前升级客户端。
从0.8.x,0.9.x,0.10.0.x或0.10.1.x升级到0.10.2.0
0.10.2.0在线协议方面有变化。遵循下面的滚动升级计划,可以保证您在升级过程中不用停机。请在升级之前,请查看0.10.2.0中的显著更改。
从0.10.2版本开始,Java客户端(生产者和消费者)获得了与旧代理进行通信的能力。0.10.2版本客户端可以与0.10.0版本及其以上的代理进行通信。但是,如果您的代理低于0.10.0版本,则必须先升级Kafka集群中的所有代理,然后再升级您的客户端。0.10.2版代理支持0.8.x及以上的客户端。
对于滚动升级:
注意:如果你可以接受停机,那么你可以把所有的broker关闭,更新代码并重启。系统将默认启动新的协议。
注意:在升级broker后,可以随时更新协议版本并重启。这不需要在升级broker后立即进行。
TimestampExtractor
界面已更改。StreamsMetric
界面已更改。retries
的默认值从0更改为10,max.poll.interval.ms
的默认值从300000改为Integer.MAX_VALUE
。InterruptException
。请参阅KafkaConsumer
Javadoc以获得更深入的解释。KafkaConsumer
来控制最大等待时间。topics
数组设置为null
, OffsetFetchRequest v2 将支持检索所有主题的偏移量,error_code
。listener_name
字段作为end_points
数组的元素。validate_only
字段。error_message
字段作为topic_errors
数组的元素。从0.8.x,0.9.x或0.10.0.X升级到0.10.1.0
0.10.1.0 在线协议方面有变化。遵循下面的滚动升级计划,可以保证您在升级过程中不用停机。请在升级之前,请查看0.10.1.0中的显著更改。
注意:由于引入了新协议,在升级客户端之前先升级您的Kafka群集是非常重要的(即,0.10.1.x客户端仅支持0.10.1.x或更高版本的代理,而0.10.1.x代理支持较旧的客户端)。
对于滚动升级:
注意:如果你可以接受停机,那么你可以把所有的broker关闭,更新代码并重启。系统将默认启动新的协议。
注意:在升级broker后,可以随时更新协议版本并重启。这不需要在升级broker后立即进行。
--new-consumer
/--new.consumer
转换已不再需要MirrorMaker和消费者控制台之类的工具。只需要通过Kafka broker来连接,而不必使用ZooKeeper。此外,旧消费者控制台已被弃用,并将在以后的版本中删除。max.poll.interval.ms
可以控制用户主动离开组前轮询调用的最大时间(默认为5分钟)(在用户主动离开组前)。配置request.timeout.ms
的值必须总是大于max.poll.interval.ms
,因为这是JoinGroup请求平衡服务器消费的最大时间,所以我们已经将其默认值更改为5分钟以上。session.timeout.ms
的默认值已经调整到10秒,max.poll.records
的默认值已经改为500。kafka.api.FetchRequest
和kafka.javaapi.FetchRequest
且可以让调用者指定分区的顺序(因为在v3中顺序是重要的)。原先的构造函数已被弃用,在发送请求之前将对分区进行洗牌以避免饥饿问题。0.10.0.0 版本有 潜在的重大变化(请在升级之前查看)并可能有 升级后的性能影响。请遵循以下建议的滚动升级计划,可确保在升级过程中和升级完成后不会出现停机和性能影响。
注意:由于引入了新的协议,在升级客户端之前,升级您的Kafka集群是非常重要的。
针对0.9.0.0版本客户的说明:由于0.9.0.0版本中的一个错误,依赖于ZooKeeper的客户端(使用旧Scala高级Consumer和MirrorMaker的客户端)不能与0.10.0.x代理一起工作。因此,在将代理升级到0.10.0.x 之前,应将0.9.0.0客户端升级到0.9.0.1。对于0.8.X或0.9.0.1版本,这一步不是必需的。
对于滚动升级:
注意: 如果你可以接受停机,那么你可以把所有的broker关闭,更新代码并重启。系统将默认启动新的协议。
注意: 在升级broker后,可以随时更新协议版本并重启。这不需要在升级broker后立即进行。
0.10.0中的消息格式包含一个新的时间戳字段,并使用压缩消息的相对偏移量。磁盘上的消息格式可以通过server.properties文件中的log.message.format.version来配置。磁盘上默认的消息格式是0.10.0。如果消费者客户端使用的是0.10.0.0之前的版本,则只能适用0.10.0之前的消息格式。在这种情况下,代理可以将消息从0.10.0格式转换为较早的格式,然后将响应发送给旧版本的消费者。但在这种情况下,代理不能使用零复制转移。Kafka社区中关于性能影响的报告显示,在升级之后,CPU使用率从20%上升到100%,这会迫使所有客户端立即升级,以使性能恢复正常。为避免这种消息转换,可以在代理升级到0.10.0.0时,将log.message.format.version设置为0.8.2或0.9.0。这样,代理仍可使用零拷贝将数据发送给以前的消费者。消费者升级之后,可以将代理上的消息格式更改为0.10.0,就可以使用新时间戳和改进压缩的新消息格式。Kafka支持这一转换以确保兼容性也有利于一些尚未更新到较新客户端的应用程序。但即使在过度配置的群集上也不支持所有消费者间的通信。因此,在代理升级后,尽量避免信息转换是非常重要的,但大多数客户端还没有。
对于升级到0.10.0.0的客户端,不会影响性能。
注意:通过设置消息格式版本,可以让所有现有消息都在该版本及其以下。且10.0.0之前的消费者可能会中断。尤其是,把消息格式设置为0.10.0后,不应将其更改回以前的格式,因为它可能会中断0.10.0.0版本前的使用者。
注意:由于在每条消息中额外地引入了时间戳,由于增加开销,发送小消息的生产者可能会产生消息吞吐量下降。同样,复制里现在每个消息额外传输8个字节。如果您运行的集群接近网络容量,则可能会压垮网卡,并由过载而导致故障和性能问题。
注意:如果您在生产者上启用了压缩,您可能会注意到某些情况下,生产者吞吐量或代理的压缩率会降低。在接收压缩消息时,0.10.0的代理会避免重新压缩消息,通常会减少延迟并提高吞吐量。然而,在某些情况下,这可能会减少生产者的批量大小,导致更差的吞吐量。如果发生这种情况,用户可以调整生产者的linger.ms和batch.size以获得更好的吞吐量。另外,如果用snappy压缩消息的生产者缓冲区比代理缓冲区小,可能会不利于磁盘消息的压缩率。我们打算在更高版本的Kafka中进行配置。
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
改为def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]
改为 def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
kafka.tools
改为kafka.common
kafka.tools
改为kafka.common
handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])
方法,因为它从未被调用过。java.util.Collection
序列类型作为方法的参数。现有的代码可能需要更新才能使用0.10.0客户端库。exclude.internal.topics
以防止内部主题(例如消费者偏移主题)意外地被符合正则表达式的订阅源订阅。默认情况下,该设置启用。从0.8.0,0.8.1.X或0.8.2.X升级到0.9.0.0
0.9.0.0有潜在的重大变化(请在升级之前查看),而且代理协议也有所改变。这意味着升级的代理和客户端可能与旧版本不兼容。在升级客户端之前升级您的Kafka集群是非常重要的。如果您正在使用MirrorMaker,则应先升级下游群集。
对于滚动升级:
注意:如果你可以接受停机,那么你可以把所有的broker关闭,更新代码并重启。系统将默认启动新的协议。
注意:在升级broker后,可以随时更新协议版本并重启。这不需要在升级broker后立即进行。
0.9.0.0中弃用的部分
0.8.2与0.8.1完全兼容。升级代理可以通过将其关闭,更新代码并重启来完成。
0.8.1与0.8完全兼容。0.8.1与0.8完全兼容。
0.7版本与新版本不兼容。新版本对API、ZooKeeper数据结构、协议以及配置进行了重大更改,以便添加副本(在0.7中没有)。从0.7升级到更高版本需要专门的迁移工具。可以在不停机的情况下完成迁移。
Kafka includes five core apis:
Kafka exposes all its functionality over a language independent protocol which has clients available in many programming languages. However only the Java clients are maintained as part of the main Kafka project, the others are available as independent open source projects. A list of non-Java clients is available here.
The Producer API allows applications to send streams of data to topics in the Kafka cluster.
Examples showing how to use the producer are given in the javadocs.
To use the producer, you can use the following maven dependency:
1 2 3 4 5 6 |
|
The Consumer API allows applications to read streams of data from topics in the Kafka cluster.
Examples showing how to use the consumer are given in the javadocs.
To use the consumer, you can use the following maven dependency:
1 2 3 4 5 6 |
|
The Streams API allows transforming streams of data from input topics to output topics.
Examples showing how to use this library are given in the javadocs
Additional documentation on using the Streams API is available here.
To use Kafka Streams you can use the following maven dependency:
1 2 3 4 5 6 |
|
The Connect API allows implementing connectors that continually pull from some source data system into Kafka or push from Kafka into some sink data system.
Many users of Connect won't need to use this API directly, though, they can use pre-built connectors without needing to write any code. Additional information on using Connect is available here.
Those who want to implement custom connectors can see the javadoc.
The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects.
To use the AdminClient API, add the following Maven dependency:
1 2 3 4 5 6 |
|
For more information about the AdminClient APIs, see the javadoc.
A more limited legacy producer and consumer api is also included in Kafka. These old Scala APIs are deprecated and only still available for compatibility purposes. Information on them can be found here here.
Kafkas使用property文件格式的键值对来配置程序。这些键值对配置既可以来自property文件也可以来程序内部。
核心基础配置如下:
broker.id
log.dirs
zookeeper.connect
Topic-level配置及其默认值在下面有更详尽的讨论。
名称 | 描述 | 类型 | 默认值 | 有效值 | 重要性 |
---|---|---|---|---|---|
zookeeper.connect | Zookeeper主机地址 | string | 高 | ||
advertised.host.name | 不建议:仅在未设置`advertised.listeners` 或 `listeners`时使用。用`advertised.listeners`替换。 主机名发布到zookeeper供客户端使用。在IaaS环境,这可能需要与broker绑定不通的端口。如果未设置,将使用`host.name`的值(如果已经配置)。否则,他将使用java.net.InetAddress.getCanonicalHostName()返回的值。 | string | null | 高 | |
advertised.listeners | 监听器发布到ZooKeeper供客户端使用,如果与`listeners`配置不同。在IaaS环境,这可能需要与broker绑定不通的接口。如果没有设置,将使用`listeners`的配置。与`listeners`不同的是,配置0.0.0.0元地址是无效的。 | string | null | 高 | |
advertised.port | 不建议:仅在未设置“advertised.listeners”或“listeners”时使用。使用`advertised.listeners`代替。 这个端口发布到ZooKeeper供客户端使用。在IaaS环境,这可能需要与broker绑定不通的端口。如果没有设置,它将绑定和broker相同的端口。 | int | null | 高 | |
auto.create.topics.enable | 是否允许在服务器上自动创建topic | boolean | true | 高 | |
auto.leader.rebalance.enable | 是否允许leader平衡。后台线程会定期检查并触发leader平衡。 | boolean | true | 高 | |
background.threads | 用于处理各种后台任务的线程数量 | int | 10 | [1,...] | 高 |
broker.id | 用于服务的broker id。如果没设置,将生存一个唯一broker id。为了避免ZooKeeper生成的id和用户配置的broker id相冲突,生成的id将在reserved.broker.max.id的值基础上加1。 | int | -1 | 高 | |
compression.type | 为特点的topic指定一个最终压缩类型。此配置接受的标准压缩编码方式有('gzip', 'snappy', 'lz4')。此外还有'uncompressed'相当于不压缩;'producer'意味着压缩类型由'producer'决定。 | string | producer | 高 | |
delete.topic.enable | 是否允许删除topic。如果关闭此配置,通过管理工具删除topic将不再生效。 | boolean | true | 高 | |
host.name | 不建议: 仅在未设置`listeners`时使用。使用`listeners`来代替。 如果设置了broker主机名,则他只会当定到这个地址。如果没设置,将绑定到所有接口。 | string | "" | 高 | |
leader.imbalance.check.interval.seconds | 由控制器触发分区重新平衡检查的频率设置 | long | 300 | 高 | |
leader.imbalance.per.broker.percentage | 每个broker允许的不平衡的leader的百分比,如果高于这个比值将触发leader进行平衡。这个值用百分比来指定。 | int | 10 | 高 | |
listeners | 监听器列表 - 使用逗号分隔URI列表和监听器名称。如果侦听器名称不是安全协议,则还必须设置listener.security.protocol.map。指定主机名为0.0.0.0来绑定到所有接口。留空则绑定到默认接口上。合法监听器列表的示例:PLAINTEXT:// myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION:// localhost:9093 | string | null | 高 | |
log.dir | 保存日志数据的目录(对log.dirs属性的补充) | string | /tmp/kafka-logs | 高 | |
log.dirs | 保存日志数据的目录,如果未设置将使用log.dir的配置。 | string | null | 高 | |
log.flush.interval.messages | 在将消息刷新到磁盘之前,在日志分区上累积的消息数量。 | long | 9223372036854775807 | [1,...] | 高 |
log.flush.interval.ms | 在刷新到磁盘之前,任何topic中的消息保留在内存中的最长时间(以毫秒为单位)。如果未设置,则使用log.flush.scheduler.interval.ms中的值。 | long | null | 高 | |
log.flush.offset.checkpoint.interval.ms | 日志恢复点的最后一次持久化刷新记录的频率 | int | 60000 | [0,...] | 高 |
log.flush.scheduler.interval.ms | 日志刷新器检查是否需要将所有日志刷新到磁盘的频率(以毫秒为单位) | long | 9223372036854775807 | 高 | |
log.flush.start.offset.checkpoint.interval.ms | 我们更新日志持久化记录开始offset的频率 | int | 60000 | [0,...] | 高 |
log.retention.bytes | 日志删除的大小阈值 | long | -1 | 高 | |
log.retention.hours | 日志删除的时间阈值(小时为单位) | int | 168 | 高 | |
log.retention.minutes | 日志删除的时间阈值(分钟为单位),如果未设置,将使用log.retention.hours的值 | int | null | 高 | |
log.retention.ms | 日志删除的时间阈值(毫秒为单位),如果未设置,将使用log.retention.minutes的值 | long | null | 高 | |
log.roll.hours | 新日志段轮转时间间隔(小时为单位),次要配置为log.roll.ms | int | 168 | [1,...] | 高 |
log.roll.jitter.hours | 从logrolltimemillis(以小时计)中减去的最大抖动,次要配置log.roll.jitter.ms | int | 0 | [0,...] | 高 |
log.roll.jitter.ms | 从logrolltimemillis(以毫秒计)中减去的最大抖动,如果未设置,则使用log.roll.jitter.hours的配置 | long | null | 高 | |
log.roll.ms | 新日志段轮转时间间隔(毫秒为单位),如果未设置,则使用log.roll.hours配置 | long | null | 高 | |
log.segment.bytes | 单个日志段文件最大大小 | int | 1073741824 | [14,...] | 高 |
log.segment.delete.delay.ms | 从文件系统中删除一个日志段文件前的保留时间 | long | 60000 | [0,...] | 高 |
message.max.bytes | kafka允许的最大的一个批次的消息大小。 如果这个数字增加,且有0.10.2版本以下的consumer,那么consumer的提取大小也必须增加,以便他们可以取得这么大的记录批次。 在最新的消息格式版本中,记录总是被组合到一个批次以提高效率。 在以前的消息格式版本中,未压缩的记录不会分组到批次中,并且此限制仅适用于该情况下的单个记录。 可以使用topic设置`max.message.bytes`来设置每个topic。 | int | 1000012 | [0,...] | 高 |
min.insync.replicas | 当producer将ack设置为“全部”(或“-1”)时,min.insync.replicas指定了被认为写入成功的最小副本数。如果这个最小值不能满足,那么producer将会引发一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。当一起使用时,min.insync.replicas和acks允许您强制更大的耐久性保证。 一个经典的情况是创建一个复本数为3的topic,将min.insync.replicas设置为2,并且producer使用“all”选项。 这将确保如果大多数副本没有写入producer则抛出异常。 | int | 1 | [1,...] | 高 |
num.io.threads | 服务器用于处理请求的线程数,可能包括磁盘I/O | int | 8 | [1,...] | 高 |
num.network.threads | 服务器用于从接收网络请求并发送网络响应的线程数 | int | 3 | [1,...] | 高 |
num.recovery.threads.per.data.dir | 每个数据目录,用于启动时日志恢复和关闭时刷新的线程数 | int | 1 | [1,...] | 高 |
num.replica.fetchers | 从源broker复制消息的拉取器的线程数。增加这个值可以增加follow broker的I/O并行度。 | int | 1 | 高 | |
offset.metadata.max.bytes | 与offset提交相关联的元数据条目的最大大小 | int | 4096 | 高 | |
offsets.commit.required.acks | 在offset提交可以接受之前,需要设置acks的数目,一般不需要更改,默认值为-1。 | short | -1 | 高 | |
offsets.commit.timeout.ms | offset提交将延迟到topic所有副本收到提交或超时。这与producer请求超时类似。 | int | 5000 | [1,...] | 高 |
offsets.load.buffer.size | 每次从offset段文件往缓存加载时,批量读取的数据大小 | int | 5242880 | [1,...] | 高 |
offsets.retention.check.interval.ms | 检查失效offset的频率 | long | 600000 | [1,...] | 高 |
offsets.retention.minutes | 超过这个保留期限未提交的offset将被丢弃 | int | 1440 | [1,...] | 高 |
offsets.topic.compression.codec | 用于offsets topic的压缩编解码器 - 压缩可用于实现“原子”提交 | int | 0 | 高 | |
offsets.topic.num.partitions | Offsets topic的分区数量(部署后不应更改) | int | 50 | [1,...] | 高 |
offsets.topic.replication.factor | offset topic的副本数(设置的越大,可用性越高)。内部topic创建将失败,直到集群大小满足此副本数要求。 | short | 3 | [1,...] | 高 |
offsets.topic.segment.bytes | 为了便于更快的日志压缩和缓存加载,offset topic段字节应该保持相对较小 | int | 104857600 | [1,...] | 高 |
port | 不建议: 仅在未设置“listener”时使用。使用`listeners`来代替。端口用来来监听和接受连接 | int | 9092 | 高 | |
queued.max.requests | 网络线程阻塞前队列允许的最大请求数 | int | 500 | [1,...] | 高 |
quota.consumer.default | 不建议:仅在动态默认配额未配置或在zookeeper中使用。任何由clientid区分开来的consumer,如果它每秒产生的字节数多于这个值,就会受到限制 | long | 9223372036854775807 | [1,...] | 高 |
quota.producer.default | 不建议:仅在动态默认配额未配置或在zookeeper中使用。任何由clientid区分开来的producer,如果它每秒产生的字节数多于这个值,就会受到限制 | long | 9223372036854775807 | [1,...] | 高 |
replica.fetch.min.bytes | 复制数据过程中,replica收到的每个fetch响应,期望的最小的字节数,如果没有收到足够的字节数,就会等待更多的数据,直到达到replicaMaxWaitTimeMs(复制数据超时时间) | int | 1 | 高 | |
replica.fetch.wait.max.ms | 副本follow同leader之间通信的最大等待时间,失败了会重试。 此值始终应始终小于replica.lag.time.max.ms,以防止针对低吞吐量topic频繁收缩ISR | int | 500 | 高 | |
replica.high.watermark.checkpoint.interval.ms | high watermark被保存到磁盘的频率,用来标记日后恢复点/td> | long | 5000 | 高 | |
replica.lag.time.max.ms | 如果一个follower在这个时间内没有发送fetch请求或消费leader日志到结束的offset,leader将从ISR中移除这个follower,并认为这个follower已经挂了 | long | 10000 | 高 | |
replica.socket.receive.buffer.bytes | socket接收网络请求的缓存大小 | int | 65536 | 高 | |
replica.socket.timeout.ms | 副本复制数据过程中,发送网络请求的socket超时时间。这个值应该大于replica.fetch.wait.max.ms的值 | int | 30000 | 高 | |
request.timeout.ms | 该配置控制客户端等待请求响应的最长时间。如果在超时之前未收到响应,则客户端将在必要时重新发送请求,如果重试仍然失败,则请求失败。 | int | 30000 | 高 | |
socket.receive.buffer.bytes | 服务端用来处理socket连接的SO_RCVBUFF缓冲大小。如果值为-1,则使用系统默认值。 | int | 102400 | 高 | |
socket.request.max.bytes | socket请求的最大大小,这是为了防止server跑光内存,不能大于Java堆的大小。 | int | 104857600 | [1,...] | 高 |
socket.send.buffer.bytes | 服务端用来处理socket连接的SO_SNDBUF缓冲大小。如果值为-1,则使用系统默认值。 | int | 102400 | 高 | |
transaction.max.timeout.ms | 事务允许的最大超时时间。如果客户请求的事务超时,那么broker将在InitProducerIdRequest中返回一错误。 这样可以防止客户超时时间过长,从而阻碍consumers读取事务中包含的topic。 | int | 900000 | [1,...] | 高 |
transaction.state.log.load.buffer.size | 将producer ID和事务加载到高速缓存中时,从事务日志段(the transaction log segments)中批量读取的大小。 | int | 5242880 | [1,...] | 高 |
transaction.state.log.min.isr | 覆盖事务topic的min.insync.replicas配置 | int | 2 | [1,...] | 高 |
transaction.state.log.num.partitions | 事务topic的分区数(部署后不应该修改) | int | 50 | [1,...] | 高 |
transaction.state.log.replication.factor | 事务topic的副本数(设置的越大,可用性越高)。内部topic在集群数满足副本数之前,将会一直创建失败。 | short | 3 | [1,...] | 高 |
transaction.state.log.segment.bytes | 事务topic段应保持相对较小,以便于更快的日志压缩和缓存负载。 | int | 104857600 | [1,...] | 高 |
transactional.id.expiration.ms | 事务协调器在未收到任何事务状态更新之前,主动设置producer的事务标识为过期之前将等待的最长时间(以毫秒为单位) | int | 604800000 | [1,...] | 高 |
unclean.leader.election.enable | 指定副本是否能够不再ISR中被选举为leader,即使这样可能会丢数据 | boolean | false | 高 | |
zookeeper.connection.timeout.ms | 与ZK server建立连接的超时时间,没有配置就使用zookeeper.session.timeout.ms | int | null | 高 | |
zookeeper.session.timeout.ms | ZooKeeper的session的超时时间 | int | 6000 | 高 | |
zookeeper.set.acl | ZooKeeper客户端连接是否设置ACL安全y安装 | boolean | false | 高 | |
broker.id.generation.enable | 是否允许服务器自动生成broker.id。如果允许则产生的值会交由reserved.broker.max.id审核 | boolean | true | 中 | |
broker.rack | broker的机架位置。 这将在机架感知副本分配中用于容错。例如:RACK1,us-east-1 | string | null | 中 | |
connections.max.idle.ms | 连接空闲超时:服务器socket处理线程空闲超时关闭时间 | long | 600000 | 中 | |
controlled.shutdown.enable | 是否允许服务器关闭broker服务 | boolean | true | 中 | |
controlled.shutdown.max.retries | 当发生失败故障时,由于各种原因导致关闭服务的次数 | int | 3 | 中 | |
controlled.shutdown.retry.backoff.ms | 在每次重试关闭之前,系统需要时间从上次故障状态(控制器故障切换,副本延迟等)中恢复。 这个配置决定了重试之前等待的时间。 | long | 5000 | 中 | |
controller.socket.timeout.ms | 控制器到broker通道的socket超时时间 | int | 30000 | 中 | |
default.replication.factor | 自动创建topic时的默认副本个数 | int | 1 | 中 | |
delete.records.purgatory.purge.interval.requests | 删除purgatory中请求的清理间隔时间(purgatory:broker对于无法立即处理的请求,将会放在purgatory中,当请求完成后,并不会立即清除,还会继续在purgatory中占用资源,直到下一次delete.records.purgatory.purge.interval.requests) | int | 1 | 中 | |
fetch.purgatory.purge.interval.requests | 提取purgatory中请求的间隔时间 | int | 1000 | 中 | |
group.initial.rebalance.delay.ms | 在执行第一次重新平衡之前,group协调器将等待更多consumer加入group的时间。延迟时间越长意味着重新平衡的工作可能越小,但是等待处理开始的时间增加。 | int | 3000 | 中 | |
group.max.session.timeout.ms | consumer注册允许的最大会话超时时间。超时时间越短,处理心跳越频繁从而使故障检测更快,但会导致broker被抢占更多的资源。 | int | 300000 | medium | |
group.min.session.timeout.ms | consumer注册允许的最小会话超时时间。超时时间越短,处理心跳越频繁从而使故障检测更快,但会导致broker被抢占更多的资源。 | int | 6000 | 中 | |
inter.broker.listener.name | broker间通讯的监听器名称。如果未设置,则侦听器名称由security.inter.broker.protocol定义。 同时设置此项和security.inter.broker.protocol属性是错误的,只设置一个。 | string | null | 中 | |
inter.broker.protocol.version | 指定使用哪个版本的 inter-broker 协议。 在所有broker升级到新版本之后,这通常会有冲突。一些有效的例子是:0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1,详情可以检查apiversion的完整列表 | string | 1.0-IV0 | 中 | |
log.cleaner.backoff.ms | 检查log是否需要清除的时间间隔。 | long | 15000 | [0,...] | 中 |
log.cleaner.dedupe.buffer.size | 日志去重清理线程所需要的内存 | long | 134217728 | 中 | |
log.cleaner.delete.retention.ms | 日志记录保留时间 | long | 86400000 | 中 | |
log.cleaner.enable | 在服务器上启用日志清理器进程。如果任何topic都使用cleanup.policy = compact,包括内部topic offset,则建议开启。如果被禁用的话,这些topic将不会被压缩,而且会不断增长。 | boolean | true | 中 | |
log.cleaner.io.buffer.load.factor | 日志清理器去重的缓存负载数。完全重复数据的缓存比例可以改变。数值越高,清理的越多,但会导致更多的hash冲突 | double | 0.9 | 中 | |
log.cleaner.io.buffer.size | 所有清理线程的日志清理I/O缓存区所需要的内存 | int | 524288 | [0,...] | 中 |
log.cleaner.io.max.bytes.per.second | 日志清理器受到的大小限制数,因此它的I/O读写总和将小于平均值 | double | 1.7976931348623157E308 | 中 | |
log.cleaner.min.cleanable.ratio | 日志中脏数据清理比例 | double | 0.5 | 中 | |
log.cleaner.min.compaction.lag.ms | 消息在日志中保持未压缩的最短时间。 仅适用于正在压缩的日志。 | long | 0 | 中 | |
log.cleaner.threads | 用于日志清理的后台线程的数量 | int | 1 | [0,...] | 中 |
log.cleanup.policy | 超出保留窗口期的日志段的默认清理策略。用逗号隔开有效策略列表。有效策略:“delete”和“compact” | list | delete | [compact, delete] | 中 |
log.index.interval.bytes | 添加offset索引字段大小间隔(设置越大,代表扫描速度越快,但是也更耗内存) | int | 4096 | [0,...] | 中 |
log.index.size.max.bytes | offset索引的最大字节数 | int | 10485760 | [4,...] | 中 |
log.message.format.version | 指定broker用于将消息附加到日志的消息格式版本。应该是一个有效的apiversion值。例如:0.8.2,0.9.0.0,0.10.0,详情去看apiversion。通过设置特定的消息格式版本,用户得保证磁盘上的所有现有消息的版本小于或等于指定的版本。不正确地设置这个值会导致旧版本的用户出错,因为他们将接收到他们无法处理的格式消息。 | string | 1.0-IV0 | 中 | |
log.message.timestamp.difference.max.ms | broker收到消息时的时间戳和消息中指定的时间戳之间允许的最大差异。当log.message.timestamp.type=CreateTime,如果时间差超过这个阈值,消息将被拒绝。如果log.message.timestamp.type = logappendtime,则该配置将被忽略。允许的最大时间戳差值,不应大于log.retention.ms,以避免不必要的频繁日志滚动。 | long | 9223372036854775807 | 中 | |
log.message.timestamp.type | 定义消息中的时间戳是消息创建时间还是日志追加时间。 该值应该是“createtime”或“logappendtime”。 | string | CreateTime | [CreateTime, LogAppendTime] | 中 |
log.preallocate | 创建新的日志段前是否应该预先分配文件?如果你在windows上使用kafka,你可能需要打开个这个选项 | boolean | false | 中 | |
log.retention.check.interval.ms | 日志清理器检查是否有日志符合删除的频率(以毫秒为单位) | long | 300000 | [1,...] | 中 |
max.connections.per.ip | 每个IP允许的最大连接数 | int | 2147483647 | [1,...] | 中 |
max.connections.per.ip.overrides | 每个IP或主机名将覆盖默认的最大连接数 | string | "" | 中 | |
num.partitions | 每个topic的默认日志分区数 | int | 1 | [1,...] | 中 |
principal.builder.class | 实现kafkaprincipalbuilder接口类的全名,该接口用于构建授权期间使用的kafkaprincipal对象。此配置还支持以前已弃用的用于ssl客户端身份验证的principalbuilder接口。如果未定义主体构建器,则默认采用所使用的安全协议。对于ssl身份验证,如果提供了一个主体名称,主体名称将是客户端证书的专有名称;否则,如果不需要客户端身份验证,则主体名称将是匿名的。对于sasl身份验证,如果使用gssapi,则将使用由sasl.kerberos.principal.to.local.rules 定义的规则来生成主体,而使用其他机制的sasl身份验证ID。若果用明文,委托人将是匿名的。 | class | null | 中 | |
producer.purgatory.purge.interval.requests | producer请求purgatory的清除间隔(请求数量) | int | 1000 | 中 | |
queued.max.request.bytes | 在不再读取请求之前队列的字节数 | long | -1 | 中 | |
replica.fetch.backoff.ms | 当拉取分区发生错误时,睡眠的时间。 | int | 1000 | [0,...] | 中 |
replica.fetch.max.bytes | 尝试提取每个分区的消息的字节数。这并不是绝对最大值,如果第一个非空分区的第一个批量记录大于这个值,那么批处理仍将被执行并返回,以确保进度可以正常进行下去。broker接受的最大批量记录大小通过message.max.bytes (broker配置)或max.message.bytes (topic配置)进行配置。 | int | 1048576 | [0,...] | medium |
replica.fetch.response.max.bytes | 预计整个获取响应的最大字节数。记录被批量取回时,如果取第一个非空分区的第一个批量记录大于此值,记录的批处理仍将被执行并返回以确保可以进行下去。因此,这不是绝对的最大值。 broker接受的最大批量记录大小通过message.max.bytes (broker配置)或max.message.bytes (topic配置)进行配置。 | int | 10485760 | [0,...] | 中 |
reserved.broker.max.id | 可以用于broker.id的最大数量 | int | 1000 | [0,...] | 中 |
sasl.enabled.mechanisms | kafka服务器中启用的sasl机制的列表。 该列表可能包含安全提供程序可用的任何机制。默认情况下只有gssapi是启用的。 | list | GSSAPI | 中 | |
sasl.kerberos.kinit.cmd | Kerberos kinit 命令路径。 | string | /usr/bin/kinit | 中 | |
sasl.kerberos.min.time.before.relogin | 登录线程在尝试刷新间隔内的休眠时间。 | long | 60000 | 中 | |
sasl.kerberos.principal.to.local.rules | 主体名称到简称映射的规则列表(通常是操作系统用户名)。按顺序,使用与principal名称匹配的第一个规则将其映射到简称。列表中的任何后续规则都将被忽略。 默认情况下,{username} / {hostname} @ {realm}形式的主体名称映射到{username}。 有关格式的更多细节,请参阅安全授权和acls。 请注意,如果由principal.builder.class配置提供了kafkaprincipalbuilder的扩展,则忽略此配置。 | list | DEFAULT | 中 | |
sasl.kerberos.service.name | kafka运行的kerberos的主体名称。 这可以在kafka的JAAS配置或在kafka的配置中定义。 | string | null | 中 | |
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动的百分比 | double | 0.05 | 中 | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将休眠,直到从上次刷新到ticket的到期的时间到达(指定窗口因子),在此期间它将尝试更新ticket。 | double | 0.8 | 中 | |
sasl.mechanism.inter.broker.protocol | SASL机制,用于broker之间的通讯,默认是GSSAPI。 | string | GSSAPI | 中 | |
security.inter.broker.protocol | broker之间的安全通讯协议,有效值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。同时设置此配置和inter.broker.listener.name属性会出错 | string | PLAINTEXT | 中 | |
ssl.cipher.suites | 密码套件列表。 这是一种用于使用tls或ssl网络协议来协商网络连接的安全设置的认证,加密,mac和密钥交换算法的命名组合。 默认情况下,所有可用的密码套件都受支持。 | list | null | 中 | |
ssl.client.auth | 配置请求客户端的broker认证。常见的设置:
| string | none | [required, requested, none] | 中 |
ssl.enabled.protocols | 已启用的SSL连接协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | 中 | |
ssl.key.password | 秘钥库文件中的私钥密码。对客户端是可选的。 | password | null | 中 | |
ssl.keymanager.algorithm | 用于SSL连接的密钥管理工厂算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。 | string | SunX509 | 中 | |
ssl.keystore.location | 密钥仓库文件的位置。客户端可选,并可用于客户端的双向认证。 | string | null | 中 | |
ssl.keystore.password | 密钥仓库文件的仓库密码。客户端可选,只有ssl.keystore.location配置了才需要。 | password | null | 中 | |
ssl.keystore.type | 密钥仓库文件的格式。客户端可选。 | string | JKS | 中 | |
ssl.protocol | 用于生成SSLContext,默认是TLS,适用于大多数情况。允许使用最新的JVM,LS, TLSv1.1 和TLSv1.2。 SSL,SSLv2和SSLv3 老的JVM也可能支持,但由于有已知的安全漏洞,不建议使用。 | string | TLS | ||
ssl.provider | 用于SSL连接的安全提供程序的名称。默认值由JVM的安全程序提供。 | string | null | 中 | |
ssl.trustmanager.algorithm | 信任管理工厂用于SSL连接的算法。默认为Java虚拟机配置的信任算法。 | string | PKIX | 中 | |
ssl.truststore.location | 信任文件的存储位置。 | string | null | 中 | |
ssl.truststore.password | 信任存储文件的密码。 如果密码未设置,则仍然可以访问信任库,但完整性检查将被禁用。 | password | null | 中 | |
ssl.truststore.type | 信任存储文件的文件格式。 | string | JKS | 中 | |
alter.config.policy.class.name | 应该用于验证的alter configs策略类。 该类应该实现org.apache.kafka.server.policy.alterconfigpolicy接口。 | class | null | 低 | |
authorizer.class.name | 用于认证授权的程序类 | string | "" | 低 | |
create.topic.policy.class.name | 用于验证的创建topic策略类。 该类应该实现org.apache.kafka.server.policy.createtopicpolicy接口。 | class | null | 低 | |
listener.security.protocol.map | 侦听器名称和安全协议之间的映射。必须定义为相同的安全协议可用于多个端口或IP。例如,即使两者都需要ssl,内部和外部流量也可以分开。具体的说,用户可以定义名字为INTERNAL和EXTERNAL的侦听器,这个属性为:internal:ssl,external:ssl。 如图所示,键和值由冒号分隔,映射条目以逗号分隔。 每个监听者名字只能在映射表上出现一次。 通过向配置名称添加规范化前缀(侦听器名称小写),可以为每个侦听器配置不同的安全性(ssl和sasl)设置。 例如,为内部监听器设置不同的密钥仓库,将会设置名称为“listener.name.internal.ssl.keystore.location”的配置。 如果没有设置侦听器名称的配置,配置将回退到通用配置(即`ssl.keystore.location`)。 | string | PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL | 低 | |
metric.reporters | 度量报告的类列表,通过实现MetricReporter 接口,允许插入新度量标准类。JmxReporter包含注册JVM统计。 | list | "" | 低 | |
metrics.num.samples | 维持计算度量的样本数 | int | 2 | [1,...] | 低 |
metrics.recording.level | 指标的最高记录级别 | string | INFO | 低 | |
metrics.sample.window.ms | 计算度量样本的时间窗口 | long | 30000 | [1,...] | 低 |
quota.window.num | 在内存中保留客户端限额的样本数 | int | 11 | [1,...] | 低 |
quota.window.size.seconds | 每个客户端限额的样本时间跨度 | int | 1 | [1,...] | 低 |
replication.quota.window.num | 在内存中保留副本限额的样本数 | int | 11 | [1,...] | 低 |
replication.quota.window.size.seconds | 每个副本限额样本数的时间跨度 | int | 1 | [1,...] | 低 |
ssl.endpoint.identification.algorithm | 端点身份标识算法,使用服务器证书验证服务器主机名 | string | null | 低 | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现 | string | null | 低 | |
transaction.abort.timed.out.transaction.cleanup.interval.ms | 回滚已超时的事务的时间间隔 | int | 60000 | [1,...] | 低 |
transaction.remove.expired.transaction.cleanup.interval.ms | 删除由于transactional.id.expiration.ms传递过程而过期的事务的时间间隔 | int | 3600000 | [1,...] | low |
zookeeper.sync.time.ms | ZK follower同步可落后leader多久/td> | int | 2000 | 低 |
More details about broker configuration can be found in the scala class kafka.server.KafkaConfig
.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。