赞
踩
http://kafka.apachecn.org/documentation.html#configuration
Kafkas使用property文件格式的键值对来配置程序。这些键值对配置既可以来自property文件也可以来程序内部。
核心基础配置如下:
broker.id
log.dirs
zookeeper.connect
Topic-level配置及其默认值在下面有更详尽的讨论。
名称 | 描述 | 类型 | 默认值 | 有效值 | 重要性 |
---|---|---|---|---|---|
zookeeper.connect | Zookeeper主机地址 | string | 高 | ||
advertised.host.name | 不建议:仅在未设置`advertised.listeners` 或 `listeners`时使用。用`advertised.listeners`替换。 主机名发布到zookeeper供客户端使用。在IaaS环境,这可能需要与broker绑定不通的端口。如果未设置,将使用`host.name`的值(如果已经配置)。否则,他将使用java.net.InetAddress.getCanonicalHostName()返回的值。 | string | null | 高 | |
advertised.listeners | 监听器发布到ZooKeeper供客户端使用,如果与`listeners`配置不同。在IaaS环境,这可能需要与broker绑定不通的接口。如果没有设置,将使用`listeners`的配置。与`listeners`不同的是,配置0.0.0.0元地址是无效的。 | string | null | 高 | |
advertised.port | 不建议:仅在未设置“advertised.listeners”或“listeners”时使用。使用`advertised.listeners`代替。 这个端口发布到ZooKeeper供客户端使用。在IaaS环境,这可能需要与broker绑定不通的端口。如果没有设置,它将绑定和broker相同的端口。 | int | null | 高 | |
auto.create.topics.enable | 是否允许在服务器上自动创建topic | boolean | true | 高 | |
auto.leader.rebalance.enable | 是否允许leader平衡。后台线程会定期检查并触发leader平衡。 | boolean | true | 高 | |
background.threads | 用于处理各种后台任务的线程数量 | int | 10 | [1,...] | 高 |
broker.id | 用于服务的broker id。如果没设置,将生存一个唯一broker id。为了避免ZooKeeper生成的id和用户配置的broker id相冲突,生成的id将在reserved.broker.max.id的值基础上加1。 | int | -1 | 高 | |
compression.type | 为特点的topic指定一个最终压缩类型。此配置接受的标准压缩编码方式有('gzip', 'snappy', 'lz4')。此外还有'uncompressed'相当于不压缩;'producer'意味着压缩类型由'producer'决定。 | string | producer | 高 | |
delete.topic.enable | 是否允许删除topic。如果关闭此配置,通过管理工具删除topic将不再生效。 | boolean | true | 高 | |
host.name | 不建议: 仅在未设置`listeners`时使用。使用`listeners`来代替。 如果设置了broker主机名,则他只会当定到这个地址。如果没设置,将绑定到所有接口。 | string | "" | 高 | |
leader.imbalance.check.interval.seconds | 由控制器触发分区重新平衡检查的频率设置 | long | 300 | 高 | |
leader.imbalance.per.broker.percentage | 每个broker允许的不平衡的leader的百分比,如果高于这个比值将触发leader进行平衡。这个值用百分比来指定。 | int | 10 | 高 | |
listeners | 监听器列表 - 使用逗号分隔URI列表和监听器名称。如果侦听器名称不是安全协议,则还必须设置listener.security.protocol.map。指定主机名为0.0.0.0来绑定到所有接口。留空则绑定到默认接口上。合法监听器列表的示例:PLAINTEXT:// myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION:// localhost:9093 | string | null | 高 | |
log.dir | 保存日志数据的目录(对log.dirs属性的补充) | string | /tmp/kafka-logs | 高 | |
log.dirs | 保存日志数据的目录,如果未设置将使用log.dir的配置。 | string | null | 高 | |
log.flush.interval.messages | 在将消息刷新到磁盘之前,在日志分区上累积的消息数量。 | long | 9223372036854775807 | [1,...] | 高 |
log.flush.interval.ms | 在刷新到磁盘之前,任何topic中的消息保留在内存中的最长时间(以毫秒为单位)。如果未设置,则使用log.flush.scheduler.interval.ms中的值。 | long | null | 高 | |
log.flush.offset.checkpoint.interval.ms | 日志恢复点的最后一次持久化刷新记录的频率 | int | 60000 | [0,...] | 高 |
log.flush.scheduler.interval.ms | 日志刷新器检查是否需要将所有日志刷新到磁盘的频率(以毫秒为单位) | long | 9223372036854775807 | 高 | |
log.flush.start.offset.checkpoint.interval.ms | 我们更新日志持久化记录开始offset的频率 | int | 60000 | [0,...] | 高 |
log.retention.bytes | 日志删除的大小阈值 | long | -1 | 高 | |
log.retention.hours | 日志删除的时间阈值(小时为单位) | int | 168 | 高 | |
log.retention.minutes | 日志删除的时间阈值(分钟为单位),如果未设置,将使用log.retention.hours的值 | int | null | 高 | |
log.retention.ms | 日志删除的时间阈值(毫秒为单位),如果未设置,将使用log.retention.minutes的值 | long | null | 高 | |
log.roll.hours | 新日志段轮转时间间隔(小时为单位),次要配置为log.roll.ms | int | 168 | [1,...] | 高 |
log.roll.jitter.hours | 从logrolltimemillis(以小时计)中减去的最大抖动,次要配置log.roll.jitter.ms | int | 0 | [0,...] | 高 |
log.roll.jitter.ms | 从logrolltimemillis(以毫秒计)中减去的最大抖动,如果未设置,则使用log.roll.jitter.hours的配置 | long | null | 高 | |
log.roll.ms | 新日志段轮转时间间隔(毫秒为单位),如果未设置,则使用log.roll.hours配置 | long | null | 高 | |
log.segment.bytes | 单个日志段文件最大大小 | int | 1073741824 | [14,...] | 高 |
log.segment.delete.delay.ms | 从文件系统中删除一个日志段文件前的保留时间 | long | 60000 | [0,...] | 高 |
message.max.bytes | kafka允许的最大的一个批次的消息大小。 如果这个数字增加,且有0.10.2版本以下的consumer,那么consumer的提取大小也必须增加,以便他们可以取得这么大的记录批次。 在最新的消息格式版本中,记录总是被组合到一个批次以提高效率。 在以前的消息格式版本中,未压缩的记录不会分组到批次中,并且此限制仅适用于该情况下的单个记录。 可以使用topic设置`max.message.bytes`来设置每个topic。 | int | 1000012 | [0,...] | 高 |
min.insync.replicas | 当producer将ack设置为“全部”(或“-1”)时,min.insync.replicas指定了被认为写入成功的最小副本数。如果这个最小值不能满足,那么producer将会引发一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。当一起使用时,min.insync.replicas和acks允许您强制更大的耐久性保证。 一个经典的情况是创建一个复本数为3的topic,将min.insync.replicas设置为2,并且producer使用“all”选项。 这将确保如果大多数副本没有写入producer则抛出异常。 | int | 1 | [1,...] | 高 |
num.io.threads | 服务器用于处理请求的线程数,可能包括磁盘I/O | int | 8 | [1,...] | 高 |
num.network.threads | 服务器用于从接收网络请求并发送网络响应的线程数 | int | 3 | [1,...] | 高 |
num.recovery.threads.per.data.dir | 每个数据目录,用于启动时日志恢复和关闭时刷新的线程数 | int | 1 | [1,...] | 高 |
num.replica.fetchers | 从源broker复制消息的拉取器的线程数。增加这个值可以增加follow broker的I/O并行度。 | int | 1 | 高 | |
offset.metadata.max.bytes | 与offset提交相关联的元数据条目的最大大小 | int | 4096 | 高 | |
offsets.commit.required.acks | 在offset提交可以接受之前,需要设置acks的数目,一般不需要更改,默认值为-1。 | short | -1 | 高 | |
offsets.commit.timeout.ms | offset提交将延迟到topic所有副本收到提交或超时。这与producer请求超时类似。 | int | 5000 | [1,...] | 高 |
offsets.load.buffer.size | 每次从offset段文件往缓存加载时,批量读取的数据大小 | int | 5242880 | [1,...] | 高 |
offsets.retention.check.interval.ms | 检查失效offset的频率 | long | 600000 | [1,...] | 高 |
offsets.retention.minutes | 超过这个保留期限未提交的offset将被丢弃 | int | 1440 | [1,...] | 高 |
offsets.topic.compression.codec | 用于offsets topic的压缩编解码器 - 压缩可用于实现“原子”提交 | int | 0 | 高 | |
offsets.topic.num.partitions | Offsets topic的分区数量(部署后不应更改) | int | 50 | [1,...] | 高 |
offsets.topic.replication.factor | offset topic的副本数(设置的越大,可用性越高)。内部topic创建将失败,直到集群大小满足此副本数要求。 | short | 3 | [1,...] | 高 |
offsets.topic.segment.bytes | 为了便于更快的日志压缩和缓存加载,offset topic段字节应该保持相对较小 | int | 104857600 | [1,...] | 高 |
port | 不建议: 仅在未设置“listener”时使用。使用`listeners`来代替。端口用来来监听和接受连接 | int | 9092 | 高 | |
queued.max.requests | 网络线程阻塞前队列允许的最大请求数 | int | 500 | [1,...] | 高 |
quota.consumer.default | 不建议:仅在动态默认配额未配置或在zookeeper中使用。任何由clientid区分开来的consumer,如果它每秒产生的字节数多于这个值,就会受到限制 | long | 9223372036854775807 | [1,...] | 高 |
quota.producer.default | 不建议:仅在动态默认配额未配置或在zookeeper中使用。任何由clientid区分开来的producer,如果它每秒产生的字节数多于这个值,就会受到限制 | long | 9223372036854775807 | [1,...] | 高 |
replica.fetch.min.bytes | 复制数据过程中,replica收到的每个fetch响应,期望的最小的字节数,如果没有收到足够的字节数,就会等待更多的数据,直到达到replicaMaxWaitTimeMs(复制数据超时时间) | int | 1 | 高 | |
replica.fetch.wait.max.ms | 副本follow同leader之间通信的最大等待时间,失败了会重试。 此值始终应始终小于replica.lag.time.max.ms,以防止针对低吞吐量topic频繁收缩ISR | int | 500 | 高 | |
replica.high.watermark.checkpoint.interval.ms | high watermark被保存到磁盘的频率,用来标记日后恢复点/td> | long | 5000 | 高 | |
replica.lag.time.max.ms | 如果一个follower在这个时间内没有发送fetch请求或消费leader日志到结束的offset,leader将从ISR中移除这个follower,并认为这个follower已经挂了 | long | 10000 | 高 | |
replica.socket.receive.buffer.bytes | socket接收网络请求的缓存大小 | int | 65536 | 高 | |
replica.socket.timeout.ms | 副本复制数据过程中,发送网络请求的socket超时时间。这个值应该大于replica.fetch.wait.max.ms的值 | int | 30000 | 高 | |
request.timeout.ms | 该配置控制客户端等待请求响应的最长时间。如果在超时之前未收到响应,则客户端将在必要时重新发送请求,如果重试仍然失败,则请求失败。 | int | 30000 | 高 | |
socket.receive.buffer.bytes | 服务端用来处理socket连接的SO_RCVBUFF缓冲大小。如果值为-1,则使用系统默认值。 | int | 102400 | 高 | |
socket.request.max.bytes | socket请求的最大大小,这是为了防止server跑光内存,不能大于Java堆的大小。 | int | 104857600 | [1,...] | 高 |
socket.send.buffer.bytes | 服务端用来处理socket连接的SO_SNDBUF缓冲大小。如果值为-1,则使用系统默认值。 | int | 102400 | 高 | |
transaction.max.timeout.ms | 事务允许的最大超时时间。如果客户请求的事务超时,那么broker将在InitProducerIdRequest中返回一错误。 这样可以防止客户超时时间过长,从而阻碍consumers读取事务中包含的topic。 | int | 900000 | [1,...] | 高 |
transaction.state.log.load.buffer.size | 将producer ID和事务加载到高速缓存中时,从事务日志段(the transaction log segments)中批量读取的大小。 | int | 5242880 | [1,...] | 高 |
transaction.state.log.min.isr | 覆盖事务topic的min.insync.replicas配置 | int | 2 | [1,...] | 高 |
transaction.state.log.num.partitions | 事务topic的分区数(部署后不应该修改) | int | 50 | [1,...] | 高 |
transaction.state.log.replication.factor | 事务topic的副本数(设置的越大,可用性越高)。内部topic在集群数满足副本数之前,将会一直创建失败。 | short | 3 | [1,...] | 高 |
transaction.state.log.segment.bytes | 事务topic段应保持相对较小,以便于更快的日志压缩和缓存负载。 | int | 104857600 | [1,...] | 高 |
transactional.id.expiration.ms | 事务协调器在未收到任何事务状态更新之前,主动设置producer的事务标识为过期之前将等待的最长时间(以毫秒为单位) | int | 604800000 | [1,...] | 高 |
unclean.leader.election.enable | 指定副本是否能够不再ISR中被选举为leader,即使这样可能会丢数据 | boolean | false | 高 | |
zookeeper.connection.timeout.ms | 与ZK server建立连接的超时时间,没有配置就使用zookeeper.session.timeout.ms | int | null | 高 | |
zookeeper.session.timeout.ms | ZooKeeper的session的超时时间 | int | 6000 | 高 | |
zookeeper.set.acl | ZooKeeper客户端连接是否设置ACL安全y安装 | boolean | false | 高 | |
broker.id.generation.enable | 是否允许服务器自动生成broker.id。如果允许则产生的值会交由reserved.broker.max.id审核 | boolean | true | 中 | |
broker.rack | broker的机架位置。 这将在机架感知副本分配中用于容错。例如:RACK1,us-east-1 | string | null | 中 | |
connections.max.idle.ms | 连接空闲超时:服务器socket处理线程空闲超时关闭时间 | long | 600000 | 中 | |
controlled.shutdown.enable | 是否允许服务器关闭broker服务 | boolean | true | 中 | |
controlled.shutdown.max.retries | 当发生失败故障时,由于各种原因导致关闭服务的次数 | int | 3 | 中 | |
controlled.shutdown.retry.backoff.ms | 在每次重试关闭之前,系统需要时间从上次故障状态(控制器故障切换,副本延迟等)中恢复。 这个配置决定了重试之前等待的时间。 | long | 5000 | 中 | |
controller.socket.timeout.ms | 控制器到broker通道的socket超时时间 | int | 30000 | 中 | |
default.replication.factor | 自动创建topic时的默认副本个数 | int | 1 | 中 | |
delete.records.purgatory.purge.interval.requests | 删除purgatory中请求的清理间隔时间(purgatory:broker对于无法立即处理的请求,将会放在purgatory中,当请求完成后,并不会立即清除,还会继续在purgatory中占用资源,直到下一次delete.records.purgatory.purge.interval.requests) | int | 1 | 中 | |
fetch.purgatory.purge.interval.requests | 提取purgatory中请求的间隔时间 | int | 1000 | 中 | |
group.initial.rebalance.delay.ms | 在执行第一次重新平衡之前,group协调器将等待更多consumer加入group的时间。延迟时间越长意味着重新平衡的工作可能越小,但是等待处理开始的时间增加。 | int | 3000 | 中 | |
group.max.session.timeout.ms | consumer注册允许的最大会话超时时间。超时时间越短,处理心跳越频繁从而使故障检测更快,但会导致broker被抢占更多的资源。 | int | 300000 | medium | |
group.min.session.timeout.ms | consumer注册允许的最小会话超时时间。超时时间越短,处理心跳越频繁从而使故障检测更快,但会导致broker被抢占更多的资源。 | int | 6000 | 中 | |
inter.broker.listener.name | broker间通讯的监听器名称。如果未设置,则侦听器名称由security.inter.broker.protocol定义。 同时设置此项和security.inter.broker.protocol属性是错误的,只设置一个。 | string | null | 中 | |
inter.broker.protocol.version | 指定使用哪个版本的 inter-broker 协议。 在所有broker升级到新版本之后,这通常会有冲突。一些有效的例子是:0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1,详情可以检查apiversion的完整列表 | string | 1.0-IV0 | 中 | |
log.cleaner.backoff.ms | 检查log是否需要清除的时间间隔。 | long | 15000 | [0,...] | 中 |
log.cleaner.dedupe.buffer.size | 日志去重清理线程所需要的内存 | long | 134217728 | 中 | |
log.cleaner.delete.retention.ms | 日志记录保留时间 | long | 86400000 | 中 | |
log.cleaner.enable | 在服务器上启用日志清理器进程。如果任何topic都使用cleanup.policy = compact,包括内部topic offset,则建议开启。如果被禁用的话,这些topic将不会被压缩,而且会不断增长。 | boolean | true | 中 | |
log.cleaner.io.buffer.load.factor | 日志清理器去重的缓存负载数。完全重复数据的缓存比例可以改变。数值越高,清理的越多,但会导致更多的hash冲突 | double | 0.9 | 中 | |
log.cleaner.io.buffer.size | 所有清理线程的日志清理I/O缓存区所需要的内存 | int | 524288 | [0,...] | 中 |
log.cleaner.io.max.bytes.per.second | 日志清理器受到的大小限制数,因此它的I/O读写总和将小于平均值 | double | 1.7976931348623157E308 | 中 | |
log.cleaner.min.cleanable.ratio | 日志中脏数据清理比例 | double | 0.5 | 中 | |
log.cleaner.min.compaction.lag.ms | 消息在日志中保持未压缩的最短时间。 仅适用于正在压缩的日志。 | long | 0 | 中 | |
log.cleaner.threads | 用于日志清理的后台线程的数量 | int | 1 | [0,...] | 中 |
log.cleanup.policy | 超出保留窗口期的日志段的默认清理策略。用逗号隔开有效策略列表。有效策略:“delete”和“compact” | list | delete | [compact, delete] | 中 |
log.index.interval.bytes | 添加offset索引字段大小间隔(设置越大,代表扫描速度越快,但是也更耗内存) | int | 4096 | [0,...] | 中 |
log.index.size.max.bytes | offset索引的最大字节数 | int | 10485760 | [4,...] | 中 |
log.message.format.version | 指定broker用于将消息附加到日志的消息格式版本。应该是一个有效的apiversion值。例如:0.8.2,0.9.0.0,0.10.0,详情去看apiversion。通过设置特定的消息格式版本,用户得保证磁盘上的所有现有消息的版本小于或等于指定的版本。不正确地设置这个值会导致旧版本的用户出错,因为他们将接收到他们无法处理的格式消息。 | string | 1.0-IV0 | 中 | |
log.message.timestamp.difference.max.ms | broker收到消息时的时间戳和消息中指定的时间戳之间允许的最大差异。当log.message.timestamp.type=CreateTime,如果时间差超过这个阈值,消息将被拒绝。如果log.message.timestamp.type = logappendtime,则该配置将被忽略。允许的最大时间戳差值,不应大于log.retention.ms,以避免不必要的频繁日志滚动。 | long | 9223372036854775807 | 中 | |
log.message.timestamp.type | 定义消息中的时间戳是消息创建时间还是日志追加时间。 该值应该是“createtime”或“logappendtime”。 | string | CreateTime | [CreateTime, LogAppendTime] | 中 |
log.preallocate | 创建新的日志段前是否应该预先分配文件?如果你在windows上使用kafka,你可能需要打开个这个选项 | boolean | false | 中 | |
log.retention.check.interval.ms | 日志清理器检查是否有日志符合删除的频率(以毫秒为单位) | long | 300000 | [1,...] | 中 |
max.connections.per.ip | 每个IP允许的最大连接数 | int | 2147483647 | [1,...] | 中 |
max.connections.per.ip.overrides | 每个IP或主机名将覆盖默认的最大连接数 | string | "" | 中 | |
num.partitions | 每个topic的默认日志分区数 | int | 1 | [1,...] | 中 |
principal.builder.class | 实现kafkaprincipalbuilder接口类的全名,该接口用于构建授权期间使用的kafkaprincipal对象。此配置还支持以前已弃用的用于ssl客户端身份验证的principalbuilder接口。如果未定义主体构建器,则默认采用所使用的安全协议。对于ssl身份验证,如果提供了一个主体名称,主体名称将是客户端证书的专有名称;否则,如果不需要客户端身份验证,则主体名称将是匿名的。对于sasl身份验证,如果使用gssapi,则将使用由sasl.kerberos.principal.to.local.rules 定义的规则来生成主体,而使用其他机制的sasl身份验证ID。若果用明文,委托人将是匿名的。 | class | null | 中 | |
producer.purgatory.purge.interval.requests | producer请求purgatory的清除间隔(请求数量) | int | 1000 | 中 | |
queued.max.request.bytes | 在不再读取请求之前队列的字节数 | long | -1 | 中 | |
replica.fetch.backoff.ms | 当拉取分区发生错误时,睡眠的时间。 | int | 1000 | [0,...] | 中 |
replica.fetch.max.bytes | 尝试提取每个分区的消息的字节数。这并不是绝对最大值,如果第一个非空分区的第一个批量记录大于这个值,那么批处理仍将被执行并返回,以确保进度可以正常进行下去。broker接受的最大批量记录大小通过message.max.bytes (broker配置)或max.message.bytes (topic配置)进行配置。 | int | 1048576 | [0,...] | medium |
replica.fetch.response.max.bytes | 预计整个获取响应的最大字节数。记录被批量取回时,如果取第一个非空分区的第一个批量记录大于此值,记录的批处理仍将被执行并返回以确保可以进行下去。因此,这不是绝对的最大值。 broker接受的最大批量记录大小通过message.max.bytes (broker配置)或max.message.bytes (topic配置)进行配置。 | int | 10485760 | [0,...] | 中 |
reserved.broker.max.id | 可以用于broker.id的最大数量 | int | 1000 | [0,...] | 中 |
sasl.enabled.mechanisms | kafka服务器中启用的sasl机制的列表。 该列表可能包含安全提供程序可用的任何机制。默认情况下只有gssapi是启用的。 | list | GSSAPI | 中 | |
sasl.kerberos.kinit.cmd | Kerberos kinit 命令路径。 | string | /usr/bin/kinit | 中 | |
sasl.kerberos.min.time.before.relogin | 登录线程在尝试刷新间隔内的休眠时间。 | long | 60000 | 中 | |
sasl.kerberos.principal.to.local.rules | 主体名称到简称映射的规则列表(通常是操作系统用户名)。按顺序,使用与principal名称匹配的第一个规则将其映射到简称。列表中的任何后续规则都将被忽略。 默认情况下,{username} / {hostname} @ {realm}形式的主体名称映射到{username}。 有关格式的更多细节,请参阅安全授权和acls。 请注意,如果由principal.builder.class配置提供了kafkaprincipalbuilder的扩展,则忽略此配置。 | list | DEFAULT | 中 | |
sasl.kerberos.service.name | kafka运行的kerberos的主体名称。 这可以在kafka的JAAS配置或在kafka的配置中定义。 | string | null | 中 | |
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动的百分比 | double | 0.05 | 中 | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将休眠,直到从上次刷新到ticket的到期的时间到达(指定窗口因子),在此期间它将尝试更新ticket。 | double | 0.8 | 中 | |
sasl.mechanism.inter.broker.protocol | SASL机制,用于broker之间的通讯,默认是GSSAPI。 | string | GSSAPI | 中 | |
security.inter.broker.protocol | broker之间的安全通讯协议,有效值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。同时设置此配置和inter.broker.listener.name属性会出错 | string | PLAINTEXT | 中 | |
ssl.cipher.suites | 密码套件列表。 这是一种用于使用tls或ssl网络协议来协商网络连接的安全设置的认证,加密,mac和密钥交换算法的命名组合。 默认情况下,所有可用的密码套件都受支持。 | list | null | 中 | |
ssl.client.auth | 配置请求客户端的broker认证。常见的设置:
| string | none | [required, requested, none] | 中 |
ssl.enabled.protocols | 已启用的SSL连接协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | 中 | |
ssl.key.password | 秘钥库文件中的私钥密码。对客户端是可选的。 | password | null | 中 | |
ssl.keymanager.algorithm | 用于SSL连接的密钥管理工厂算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。 | string | SunX509 | 中 | |
ssl.keystore.location | 密钥仓库文件的位置。客户端可选,并可用于客户端的双向认证。 | string | null | 中 | |
ssl.keystore.password | 密钥仓库文件的仓库密码。客户端可选,只有ssl.keystore.location配置了才需要。 | password | null | 中 | |
ssl.keystore.type | 密钥仓库文件的格式。客户端可选。 | string | JKS | 中 | |
ssl.protocol | 用于生成SSLContext,默认是TLS,适用于大多数情况。允许使用最新的JVM,LS, TLSv1.1 和TLSv1.2。 SSL,SSLv2和SSLv3 老的JVM也可能支持,但由于有已知的安全漏洞,不建议使用。 | string | TLS | ||
ssl.provider | 用于SSL连接的安全提供程序的名称。默认值由JVM的安全程序提供。 | string | null | 中 | |
ssl.trustmanager.algorithm | 信任管理工厂用于SSL连接的算法。默认为Java虚拟机配置的信任算法。 | string | PKIX | 中 | |
ssl.truststore.location | 信任文件的存储位置。 | string | null | 中 | |
ssl.truststore.password | 信任存储文件的密码。 如果密码未设置,则仍然可以访问信任库,但完整性检查将被禁用。 | password | null | 中 | |
ssl.truststore.type | 信任存储文件的文件格式。 | string | JKS | 中 | |
alter.config.policy.class.name | 应该用于验证的alter configs策略类。 该类应该实现org.apache.kafka.server.policy.alterconfigpolicy接口。 | class | null | 低 | |
authorizer.class.name | 用于认证授权的程序类 | string | "" | 低 | |
create.topic.policy.class.name | 用于验证的创建topic策略类。 该类应该实现org.apache.kafka.server.policy.createtopicpolicy接口。 | class | null | 低 | |
listener.security.protocol.map | 侦听器名称和安全协议之间的映射。必须定义为相同的安全协议可用于多个端口或IP。例如,即使两者都需要ssl,内部和外部流量也可以分开。具体的说,用户可以定义名字为INTERNAL和EXTERNAL的侦听器,这个属性为:internal:ssl,external:ssl。 如图所示,键和值由冒号分隔,映射条目以逗号分隔。 每个监听者名字只能在映射表上出现一次。 通过向配置名称添加规范化前缀(侦听器名称小写),可以为每个侦听器配置不同的安全性(ssl和sasl)设置。 例如,为内部监听器设置不同的密钥仓库,将会设置名称为“listener.name.internal.ssl.keystore.location”的配置。 如果没有设置侦听器名称的配置,配置将回退到通用配置(即`ssl.keystore.location`)。 | string | PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL | 低 | |
metric.reporters | 度量报告的类列表,通过实现MetricReporter 接口,允许插入新度量标准类。JmxReporter包含注册JVM统计。 | list | "" | 低 | |
metrics.num.samples | 维持计算度量的样本数 | int | 2 | [1,...] | 低 |
metrics.recording.level | 指标的最高记录级别 | string | INFO | 低 | |
metrics.sample.window.ms | 计算度量样本的时间窗口 | long | 30000 | [1,...] | 低 |
quota.window.num | 在内存中保留客户端限额的样本数 | int | 11 | [1,...] | 低 |
quota.window.size.seconds | 每个客户端限额的样本时间跨度 | int | 1 | [1,...] | 低 |
replication.quota.window.num | 在内存中保留副本限额的样本数 | int | 11 | [1,...] | 低 |
replication.quota.window.size.seconds | 每个副本限额样本数的时间跨度 | int | 1 | [1,...] | 低 |
ssl.endpoint.identification.algorithm | 端点身份标识算法,使用服务器证书验证服务器主机名 | string | null | 低 | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现 | string | null | 低 | |
transaction.abort.timed.out.transaction.cleanup.interval.ms | 回滚已超时的事务的时间间隔 | int | 60000 | [1,...] | 低 |
transaction.remove.expired.transaction.cleanup.interval.ms | 删除由于transactional.id.expiration.ms传递过程而过期的事务的时间间隔 | int | 3600000 | [1,...] | low |
zookeeper.sync.time.ms | ZK follower同步可落后leader多久/td> | int | 2000 | 低 |
More details about broker configuration can be found in the scala class kafka.server.KafkaConfig
.
与Topic相关的配置既包含服务器默认值,也包含可选的每个Topic覆盖值。 如果没有给出每个Topic的配置,那么服务器默认值就会被使用。 通过提供一个或多个 --config
选项,可以在创建Topic时设置覆盖值。 本示例使用自定义的最大消息大小和刷新率创建了一个名为 my-topic 的topic:
1 2 |
|
也可以在使用alter configs命令稍后更改或设置覆盖值. 本示例重置my-topic的最大消息的大小:
1 2 |
|
您可以执行如下操作来检查topic设置的覆盖值
1 |
|
您可以执行如下操作来删除一个覆盖值
1 |
|
以下是Topic级别配置。 “服务器默认属性”列是该属性的默认配置。 一个Topic如果没有给出一个明确的覆盖值,相应的服务器默认配置将会生效。
名称 | 描述 | 类型 | 默认值 | 有效值 | 服务器默认属性 | 重要性 |
---|---|---|---|---|---|---|
cleanup.policy | 该配置项可以是 "delete" 或 "compact"。 它指定在旧日志段上使用的保留策略。 默认策略 ("delete") 将在达到保留时间或大小限制时丢弃旧段。 "compact" 设置将启用该topic的日志压缩 。 | list | delete | [compact, delete] | log.cleanup.policy | medium |
compression.type | 为给定的topic指定最终压缩类型。这个配置接受标准的压缩编解码器 ('gzip', 'snappy', lz4) 。它为'uncompressed'时意味着不压缩,当为'producer'时,这意味着保留producer设置的原始压缩编解码器。 | string | producer | [uncompressed, snappy, lz4, gzip, producer] | compression.type | medium |
delete.retention.ms | 保留 日志压缩 topics的删除墓碑标记的时间。此设置还对consumer从偏移量0开始时必须完成读取的时间进行限制,以确保它们获得最后阶段的有效快照(否则,在完成扫描之前可能会收集到删除墓碑)。 | long | 86400000 | [0,...] | log.cleaner.delete.retention.ms | medium |
file.delete.delay.ms | 删除文件系统上的一个文件之前所需等待的时间。 | long | 60000 | [0,...] | log.segment.delete.delay.ms | medium |
flush.messages | 这个设置允许指定一个时间间隔n,每隔n个消息我们会强制把数据fsync到log。例如,如果设置为1,我们会在每条消息之后同步。如果是5,我们会在每五个消息之后进行fsync。一般来说,我们建议您不要设置它,而是通过使用replication机制来持久化数据,和允许更高效的操作系统后台刷新功能。这个设置可以针对每个topic的情况自定义 (请参阅 topic的配置部分). | long | 9223372036854775807 | [0,...] | log.flush.interval.messages | medium |
flush.ms | 这个设置允许指定一个时间间隔,每隔一段时间我们将强制把数据fsync到log。例如,如果这个设置为1000,我们将在1000 ms后执行fsync。一般来说,我们建议您不要设置它,而是通过使用replication机制来持久化数据,和允许更高效的操作系统后台刷新功能。 | long | 9223372036854775807 | [0,...] | log.flush.interval.ms | medium |
follower.replication.throttled.replicas | 应该在follower侧限制日志复制的副本列表。该列表应以[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...的形式描述一组副本,或者也可以使用通配符“*”来限制该topic的所有副本。 | list | "" | [partitionId],[brokerId]:[partitionId],[brokerId]:... | follower.replication.throttled.replicas | medium |
index.interval.bytes | 此设置控制Kafka向其偏移索引添加索引条目的频率。默认设置确保我们大约每4096个字节索引一条消息。更多的索引允许读取更接近日志中的确切位置,但这会使索引更大。您可能不需要改变该值。 | int | 4096 | [0,...] | log.index.interval.bytes | medium |
leader.replication.throttled.replicas | 应该在leader侧限制日志复制的副本列表。该列表应以[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...的形式描述一组副本,或者也可以使用通配符“*”来限制该topic的所有副本。 | list | "" | [partitionId],[brokerId]:[partitionId],[brokerId]:... | leader.replication.throttled.replicas | medium |
max.message.bytes | Kafka允许的最大记录批次大小。如果这个参数被增加了且consumers是早于0.10.2版本,那么consumers的fetch size必须增加到该值,以便他们可以取得这么大的记录批次。 在最新的消息格式版本中,记录总是分组成多个批次以提高效率。在以前的消息格式版本中,未压缩的记录不会分组到多个批次,并且限制在该情况下只能应用单条记录。 | int | 1000012 | [0,...] | message.max.bytes | medium |
message.format.version | 指定broker将用于将消息附加到日志的消息格式版本。该值应该是有效的ApiVersion。如:0.8.2,0.9.0.0,0.10.0,查看ApiVersion获取更多细节。通过设置特定的消息格式版本,用户将发现磁盘上的所有现有消息都小于或等于指定的版本。不正确地设置此值将导致旧版本的使用者中断,因为他们将收到他们不理解的格式的消息。 | string | 1.0-IV0 | log.message.format.version | medium | |
message.timestamp.difference.max.ms | broker接收消息时所允许的时间戳与消息中指定的时间戳之间的最大差异。如果message.timestamp.type=CreateTime,则如果时间戳的差异超过此阈值,则将拒绝消息。如果message.timestamp.type=LogAppendTime,则忽略此配置。 | long | 9223372036854775807 | [0,...] | log.message.timestamp.difference.max.ms | medium |
message.timestamp.type | 定义消息中的时间戳是消息创建时间还是日志附加时间。值应该是“CreateTime”或“LogAppendTime” | string | CreateTime | log.message.timestamp.type | medium | |
min.cleanable.dirty.ratio | 此配置控制日志compaction程序尝试清理日志的频率(假设启用了log compaction )。默认情况下,我们将避免清除超过50%的日志已经合并的日志。这个比率限制了重复在日志中浪费的最大空间(最多为50%,日志中最多有50%可能是重复的)。一个更高的比率将意味着更少,更高效的清理,但将意味着在日志中浪费更多的空间。 | double | 0.5 | [0,...,1] | log.cleaner.min.cleanable.ratio | medium |
min.compaction.lag.ms | 消息在日志中保持未压缩的最短时间。仅适用于被合并的日志。 | long | 0 | [0,...] | log.cleaner.min.compaction.lag.ms | medium |
min.insync.replicas | 当producer将ack设置为“all”(或“-1”)时,此配置指定必须确认写入才能被认为成功的副本的最小数量。如果这个最小值无法满足,那么producer将引发一个异常(NotEnough Replicas或NotEnough ReplicasAfterAppend)。 当使用时,min.insync.Copicas和ack允许您执行更好的持久化保证。一个典型的场景是创建一个复制因子为3的topic,将min.insync.Copicas设置为2,并生成带有“All”的ack。这将确保如果大多数副本没有接收到写,则producer将引发异常。 | int | 1 | [1,...] | min.insync.replicas | medium |
preallocate | 如果在创建新的日志段时应该预先分配磁盘上的文件,则为True。 | boolean | false | log.preallocate | medium | |
retention.bytes | 如果使用“delete”保留策略,此配置控制分区(由日志段组成)在放弃旧日志段以释放空间之前的最大大小。默认情况下,没有大小限制,只有时间限制。由于此限制是在分区级别强制执行的,因此,将其乘以分区数,计算出topic保留值,以字节为单位。 | long | -1 | log.retention.bytes | medium | |
retention.ms | 如果使用“delete”保留策略,此配置控制保留日志的最长时间,然后将旧日志段丢弃以释放空间。这代表了用户读取数据的速度的SLA。 | long | 604800000 | log.retention.ms | medium | |
segment.bytes | 此配置控制日志的段文件大小。保留和清理总是一次完成一个文件,所以更大的段大小意味着更少的文件,但对保留的粒度控制更少。 | int | 1073741824 | [14,...] | log.segment.bytes | medium |
segment.index.bytes | 此配置控制将偏移量映射到文件位置的索引大小。我们预先分配这个索引文件并且只在日志滚动后收缩它。您通常不需要更改此设置。 | int | 10485760 | [0,...] | log.index.size.max.bytes | medium |
segment.jitter.ms | 从预定的分段滚动时间减去最大随机抖动,以避免段滚动产生惊群效应。 | long | 0 | [0,...] | log.roll.jitter.ms | medium |
segment.ms | 这个配置控制在一段时间后,Kafka将强制日志滚动,即使段文件没有满,以确保保留空间可以删除或合并旧数据。 | long | 604800000 | [0,...] | log.roll.ms | medium |
unclean.leader.election.enable | 指示是否启用不在ISR集合中的副本选为领导者作为最后的手段,即使这样做可能导致数据丢失。 | boolean | false | unclean.leader.election.enable | medium |
以下是JAVA生产者的配置:
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
bootstrap.servers | 这是一个用于建立初始连接到kafka集群的"主机/端口对"配置列表。不论这个参数配置了哪些服务器来初始化连接,客户端都是会均衡地与集群中的所有服务器建立连接。—配置的服务器清单仅用于初始化连接,以便找到集群中的所有服务器。配置格式: host1:port1,host2:port2,... . 由于这些主机是用于初始化连接,以获得整个集群(集群是会动态变化的),因此这个配置清单不需要包含整个集群的服务器。(当然,为了避免单节点风险,这个清单最好配置多台主机)。 | list | high | ||
key.serializer | 关键字的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer 接口。 | class | high | ||
value.serializer | 值的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer 接口。 | class | high | ||
acks | 此配置是 Producer 在确认一个请求发送完成之前需要收到的反馈信息的数量。 这个参数是为了保证发送请求的可靠性。以下配置方式是允许的:
| string | 1 | [all, -1, 0, 1] | high |
buffer.memory | Producer 用来缓冲等待被发送到服务器的记录的总字节数。如果记录发送的速度比发送到服务器的速度快, Producer 就会阻塞,如果阻塞的时间超过 max.block.ms 配置的时长,则会抛出一个异常。 这个配置与 Producer 的可用总内存有一定的对应关系,但并不是完全等价的关系,因为 Producer 的可用内存并不是全部都用来缓存。一些额外的内存可能会用于压缩(如果启用了压缩),以及维护正在运行的请求。 | long | 33554432 | [0,...] | high |
compression.type | Producer 生成数据时可使用的压缩类型。默认值是none(即不压缩)。可配置的压缩类型包括:none , gzip , snappy , 或者 lz4 。压缩是针对批处理的所有数据,所以批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩)。 | string | none | high | |
retries | 若设置大于0的值,则客户端会将发送失败的记录重新发送,尽管这些记录有可能是暂时性的错误。请注意,这种 retry 与客户端收到错误信息之后重新发送记录并无区别。允许 retries 并且没有设置max.in.flight.requests.per.connection 为1时,记录的顺序可能会被改变。比如:当两个批次都被发送到同一个 partition ,第一个批次发生错误并发生 retries 而第二个批次已经成功,则第二个批次的记录就会先于第一个批次出现。 | int | 0 | [0,...,2147483647] | high |
ssl.key.password | key store 文件中私钥的密码。这对于客户端来说是可选的。 | password | null | high | |
ssl.keystore.location | key store 文件的位置。这对于客户端来说是可选的,可用于客户端的双向身份验证。 | string | null | high | |
ssl.keystore.password | key store 文件的密码。这对于客户端是可选的,只有配置了 ssl.keystore.location 才需要配置该选项。 | password | null | high | |
ssl.truststore.location | trust store 文件的位置。 | string | null | high | |
ssl.truststore.password | trust store 文件的密码。如果一个密码没有设置到 trust store ,这个密码仍然是可用的,但是完整性检查是禁用的。 | password | null | high | |
batch.size | 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。 当记录的大小超过了配置的字节数, Producer 将不再尝试往批次增加记录。 发送到 broker 的请求会包含多个批次的数据,每个批次对应一个 partition 的可用数据 小的 batch.size 将减少批处理,并且可能会降低吞吐量(如果 batch.size = 0的话将完全禁用批处理)。 很大的 batch.size 可能造成内存浪费,因为我们一般会在 batch.size 的基础上分配一部分缓存以应付额外的记录。 | int | 16384 | [0,...] | medium |
client.id | 发出请求时传递给服务器的 ID 字符串。这样做的目的是为了在服务端的请求日志中能够通过逻辑应用名称来跟踪请求的来源,而不是只能通过IP和端口号跟进。 | string | "" | medium | |
connections.max.idle.ms | 在此配置指定的毫秒数之后,关闭空闲连接。 | long | 540000 | medium | |
linger.ms | producer 会将两个请求发送时间间隔内到达的记录合并到一个单独的批处理请求中。通常只有当记录到达的速度超过了发送的速度时才会出现这种情况。然而,在某些场景下,即使处于可接受的负载下,客户端也希望能减少请求的数量。这个设置是通过添加少量的人为延迟来实现的—即,与其立即发送记录, producer 将等待给定的延迟时间,以便将在等待过程中到达的其他记录能合并到本批次的处理中。这可以认为是与 TCP 中的 Nagle 算法类似。这个设置为批处理的延迟提供了上限:一旦我们接受到记录超过了分区的 batch.size ,Producer 会忽略这个参数,立刻发送数据。但是如果累积的字节数少于 batch.size ,那么我们将在指定的时间内“逗留”(linger),以等待更多的记录出现。这个设置默认为0(即没有延迟)。例如:如果设置linger.ms=5 ,则发送的请求会减少并降低部分负载,但同时会增加5毫秒的延迟。 | long | 0 | [0,...] | medium |
max.block.ms | 该配置控制 KafkaProducer.send() 和KafkaProducer.partitionsFor() 允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。 | long | 60000 | [0,...] | medium |
max.request.size | 请求的最大字节数。这个设置将限制 Producer 在单个请求中发送的记录批量的数量,以避免发送巨大的请求。这实际上也等同于批次的最大记录数的限制。请注意,服务器对批次的大小有自己的限制,这可能与此不同。 | int | 1048576 | [0,...] | medium |
partitioner.class | 指定计算分区的类,实现 org.apache.kafka.clients.producer.Partitioner 接口。 | class | org.apache.kafka.clients.producer.internals.DefaultPartitioner | medium | |
receive.buffer.bytes | 定义读取数据时 TCP 接收缓冲区(SO_RCVBUF)的大小,如果设置为-1,则使用系统默认值。 | int | 32768 | [-1,...] | medium |
request.timeout.ms | 客户端等待请求响应的最大时长。如果超时未收到响应,则客户端将在必要时重新发送请求,如果重试的次数达到允许的最大重试次数,则请求失败。这个参数应该比 replica.lag.time.max.ms (Broker 的一个参数)更大,以降低由于不必要的重试而导致的消息重复的可能性。 | int | 30000 | [0,...] | medium |
sasl.jaas.config | SASL 连接使用的 JAAS 登陆上下文参数,以 JAAS 配置文件的格式进行配置。 JAAS 配置文件格式可参考这里。值的格式: ' (=)*;' | password | null | medium | |
sasl.kerberos.service.name | Kafka 运行时的 Kerberos 主体名称。可以在 Kafka 的 JAAS 配置文件或者 Kafka 的配置文件中配置。 | string | null | medium | |
sasl.mechanism | 用于客户端连接的 SASL 机制。可以是任意安全可靠的机制。默认是 GSSAPI 机制。 | string | GSSAPI | medium | |
security.protocol | 与 brokers 通讯的协议。可配置的值有: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. | string | PLAINTEXT | medium | |
send.buffer.bytes | 定义发送数据时的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果设置为-1,则使用系统默认值。 | int | 131072 | [-1,...] | medium |
ssl.enabled.protocols | 可用于 SSL 连接的协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | medium | |
ssl.keystore.type | key store 文件的文件格类型。这对于客户端来说是可选的。 | string | JKS | medium | |
ssl.protocol | 用于生成SSLContext的SSL协议。默认设置是TLS,大多数情况下不会有问题。在最近的jvm版本中,允许的值是TLS、tlsv1.1和TLSv1.2。在旧的jvm中可能会支持SSL、SSLv2和SSLv3,但是由于存在已知的安全漏洞,因此不建议使用。 | string | TLS | medium | |
ssl.provider | 用于 SSL 连接security provider 。默认值是当前 JVM 版本的默认 security provider 。 | string | null | medium | |
ssl.truststore.type | trust store 的文件类型。 | string | JKS | medium | |
enable.idempotence | 当设置为true时, Producer 将确保每个消息在 Stream 中只写入一个副本。如果为false,由于 Broker 故障导致 Producer 进行重试之类的情况可能会导致消息重复写入到 Stream 中。请注意,启用幂等性需要确保 max.in.flight.requests.per.connection 小于或等于5,retries 大于等于0,并且ack 必须设置为all 。如果这些值不是由用户明确设置的,那么将自动选择合适的值。如果设置了不兼容的值,则将抛出一个ConfigException的异常。 | boolean | false | low | |
interceptor.classes | 配置 interceptor 类的列表。实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口之后可以拦截(并可能改变)那些 Producer 还没有发送到 kafka 集群的记录。默认情况下,没有 interceptor 。 | list | null | low | |
max.in.flight.requests.per.connection | 在发生阻塞之前,客户端的一个连接上允许出现未确认请求的最大数量。注意,如果这个设置大于1,并且有失败的发送,则消息可能会由于重试而导致重新排序(如果重试是启用的话)。 | int | 5 | [1,...] | low |
metadata.max.age.ms | 刷新元数据的时间间隔,单位毫秒。即使没有发现任何分区的 leadership 发生变更也会强制刷新以便能主动发现新的 Broker 或者新的分区。 | long | 300000 | [0,...] | low |
metric.reporters | 用于指标监控报表的类清单。实现org.apache.kafka.common.metrics.MetricsReporter 接口之后允许插入能够通知新的创建度量的类。JmxReporter 总是包含在注册的 JMX 统计信息中。 | list | "" | low | |
metrics.num.samples | 计算 metrics 所需要维持的样本数量。 | int | 2 | [1,...] | low |
metrics.recording.level | metrics 的最高纪录级别。 | string | INFO | [INFO, DEBUG] | low |
metrics.sample.window.ms | 计算 metrics 样本的时间窗口。 | long | 30000 | [0,...] | low |
reconnect.backoff.max.ms | 当重新连接到一台多次连接失败的 Broker 时允许等待的最大毫秒数。如果配置该参数,则每台主机的 backoff 将呈指数级增长直到达到配置的最大值。当统计到 backoff 在增长,系统会增加20%的随机波动以避免大量的连接失败。 | long | 1000 | [0,...] | low |
reconnect.backoff.ms | 在尝试重新连接到给定的主机之前,需要等待的基本时间。这避免了在一个紧凑的循环中反复连接到同一个主机。这个 backoff 机制应用于所有客户端尝试连接到 Broker 的请求。 | long | 50 | [0,...] | low |
retry.backoff.ms | 在尝试将一个失败的请求重试到给定的 topic 分区之前需要等待的时间。这避免在某些失败场景下在紧凑的循环中重复发送请求。 | long | 100 | [0,...] | low |
sasl.kerberos.kinit.cmd | Kerberos kinit 命令的路径。 | string | /usr/bin/kinit | low | |
sasl.kerberos.min.time.before.relogin | 重新尝试登陆之前,登录线程的休眠时间。 | long | 60000 | low | |
sasl.kerberos.ticket.renew.jitter | 随机抖动增加到更新时间的百分比。 | double | 0.05 | low | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将持续休眠直到上一次刷新到 ticket 的过期时间窗口,在此时间窗口它将尝试更新 ticket 。 | double | 0.8 | low | |
ssl.cipher.suites | 密码套件列表。密码套件是利用 TLS 或 SSL 网络协议来实现网络连接的安全设置,是一个涵盖认证,加密,MAC和密钥交换算法的组合。默认情况下,支持所有可用的密码套件。 | list | null | low | |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的 endpoint 识别算法。 | string | null | low | |
ssl.keymanager.algorithm | key manager factory 用于 SSL 连接的算法。默认值是Java虚拟机配置的 key manager factory 算法。 | string | SunX509 | low | |
ssl.secure.random.implementation | 用于 SSL 加密操作的 SecureRandom PRNG 实现。 | string | null | low | |
ssl.trustmanager.algorithm | trust manager factory 用于SSL连接的算法。默认值是Java虚拟机配置的 trust manager factory 算法。 | string | PKIX | low | |
transaction.timeout.ms | 主动中止进行中的事务之前,事务协调器等待 Producer 更新事务状态的最长时间(以毫秒为单位)。如果此值大于 Broker 中的 max.transaction.timeout.ms 设置的时长,则请求将失败并提示"InvalidTransactionTimeout"错误。 | int | 60000 | low | |
transactional.id | 用于事务交付的 TransactionalId。 这使跨越多个生产者会话的可靠性语义成为可能,因为它可以保证客户在开始任何新的事务之前,使用相同的 TransactionalId 的事务都已经完成。 如果没有提供 TransactionalId ,则 Producer 被限制为幂等递送。 请注意,如果配置了 TransactionalId,则必须启用 enable.idempotence 。 缺省值为空,这意味着无法使用事务。 | string | null | non-empty string | low |
如果对老的Scala版本的 Producer 配置感兴趣,请点击 这里.
In 0.9.0.0 we introduced the new Java consumer as a replacement for the older Scala-based simple and high-level consumers. The configs for both new and old consumers are described below.
Below is the configuration for the new consumer:
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
bootstrap.servers | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,... . Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). | list | high | ||
key.deserializer | Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface. | class | high | ||
value.deserializer | Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface. | class | high | ||
fetch.min.bytes | The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency. | int | 1 | [0,...] | high |
group.id | A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy. | string | "" | high | |
heartbeat.interval.ms | The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms , but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. | int | 3000 | high | |
max.partition.fetch.bytes | The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size. | int | 1048576 | [0,...] | high |
session.timeout.ms | The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms . | int | 10000 | high | |
ssl.key.password | The password of the private key in the key store file. This is optional for client. | password | null | high | |
ssl.keystore.location | The location of the key store file. This is optional for client and can be used for two-way authentication for client. | string | null | high | |
ssl.keystore.password | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. | password | null | high | |
ssl.truststore.location | The location of the trust store file. | string | null | high | |
ssl.truststore.password | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. | password | null | high | |
auto.offset.reset | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
| string | latest | [latest, earliest, none] | medium |
connections.max.idle.ms | Close idle connections after the number of milliseconds specified by this config. | long | 540000 | medium | |
enable.auto.commit | If true the consumer's offset will be periodically committed in the background. | boolean | true | medium | |
exclude.internal.topics | Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to true the only way to receive records from an internal topic is subscribing to it. | boolean | true | medium | |
fetch.max.bytes | The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel. | int | 52428800 | [0,...] | medium |
isolation.level | Controls how to read messages written transactionally. If set to Messages will always be returned in offset order. Hence, in Further, when in | string | read_uncommitted | [read_committed, read_uncommitted] | medium |
max.poll.interval.ms | The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. | int | 300000 | [1,...] | medium |
max.poll.records | The maximum number of records returned in a single call to poll(). | int | 500 | [1,...] | medium |
partition.assignment.strategy | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | list | class org.apache.kafka.clients.consumer.RangeAssignor | medium | |
receive.buffer.bytes | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used. | int | 65536 | [-1,...] | medium |
request.timeout.ms | The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. | int | 305000 | [0,...] | medium |
sasl.jaas.config | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is: ' (=)*;' | password | null | medium | |
sasl.kerberos.service.name | The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. | string | null | medium | |
sasl.mechanism | SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism. | string | GSSAPI | medium | |
security.protocol | Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. | string | PLAINTEXT | medium | |
send.buffer.bytes | The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. | int | 131072 | [-1,...] | medium |
ssl.enabled.protocols | The list of protocols enabled for SSL connections. | list | TLSv1.2,TLSv1.1,TLSv1 | medium | |
ssl.keystore.type | The file format of the key store file. This is optional for client. | string | JKS | medium | |
ssl.protocol | The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. | string | TLS | medium | |
ssl.provider | The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. | string | null | medium | |
ssl.truststore.type | The file format of the trust store file. | string | JKS | medium | |
auto.commit.interval.ms | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true . | int | 5000 | [0,...] | low |
check.crcs | Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. | boolean | true | low | |
client.id | An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging. | string | "" | low | |
fetch.max.wait.ms | The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes. | int | 500 | [0,...] | low |
interceptor.classes | A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.consumer.ConsumerInterceptor interface allows you to intercept (and possibly mutate) records received by the consumer. By default, there are no interceptors. | list | null | low | |
metadata.max.age.ms | The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. | long | 300000 | [0,...] | low |
metric.reporters | A list of classes to use as metrics reporters. Implementing the org.apache.kafka.common.metrics.MetricsReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. | list | "" | low | |
metrics.num.samples | The number of samples maintained to compute metrics. | int | 2 | [1,...] | low |
metrics.recording.level | The highest recording level for metrics. | string | INFO | [INFO, DEBUG] | low |
metrics.sample.window.ms | The window of time a metrics sample is computed over. | long | 30000 | [0,...] | low |
reconnect.backoff.max.ms | The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. | long | 1000 | [0,...] | low |
reconnect.backoff.ms | The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. | long | 50 | [0,...] | low |
retry.backoff.ms | The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. | long | 100 | [0,...] | low |
sasl.kerberos.kinit.cmd | Kerberos kinit command path. | string | /usr/bin/kinit | low | |
sasl.kerberos.min.time.before.relogin | Login thread sleep time between refresh attempts. | long | 60000 | low | |
sasl.kerberos.ticket.renew.jitter | Percentage of random jitter added to the renewal time. | double | 0.05 | low | |
sasl.kerberos.ticket.renew.window.factor | Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket. | double | 0.8 | low | |
ssl.cipher.suites | A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported. | list | null | low | |
ssl.endpoint.identification.algorithm | The endpoint identification algorithm to validate server hostname using server certificate. | string | null | low | |
ssl.keymanager.algorithm | The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. | string | SunX509 | low | |
ssl.secure.random.implementation | The SecureRandom PRNG implementation to use for SSL cryptography operations. | string | null | low | |
ssl.trustmanager.algorithm | The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. | string | PKIX | low |
The essential old consumer configurations are the following:
group.id
zookeeper.connect
PROPERTY | DEFAULT | DESCRIPTION |
---|---|---|
group.id | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. | |
zookeeper.connect | Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3 . The server may also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in its connection string. For example to give a chroot path of | |
consumer.id | null | Generated automatically if not set. |
socket.timeout.ms | 30 * 1000 | The socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. |
socket.receive.buffer.bytes | 64 * 1024 | The socket receive buffer for network requests |
fetch.message.max.bytes | 1024 * 1024 | The number of bytes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. |
num.consumer.fetchers | 1 | The number fetcher threads used to fetch data. |
auto.commit.enable | true | If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin. |
auto.commit.interval.ms | 60 * 1000 | The frequency in ms that the consumer offsets are committed to zookeeper. |
queued.max.message.chunks | 2 | Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes. |
rebalance.max.retries | 4 | When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up. |
fetch.min.bytes | 1 | The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. |
fetch.wait.max.ms | 100 | The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes |
rebalance.backoff.ms | 2000 | Backoff time between retries during rebalance. If not set explicitly, the value in zookeeper.sync.time.ms is used. |
refresh.leader.backoff.ms | 200 | Backoff time to wait before trying to determine the leader of a partition that has just lost its leader. |
auto.offset.reset | largest | What to do when there is no initial offset in ZooKeeper or if an offset is out of range: |
consumer.timeout.ms | -1 | Throw a timeout exception to the consumer if no message is available for consumption after the specified interval |
exclude.internal.topics | true | Whether messages from internal topics (such as offsets) should be exposed to the consumer. |
client.id | group id value | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request. |
zookeeper.session.timeout.ms | 6000 | ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur. |
zookeeper.connection.timeout.ms | 6000 | The max time that the client waits while establishing a connection to zookeeper. |
zookeeper.sync.time.ms | 2000 | How far a ZK follower can be behind a ZK leader |
offsets.storage | zookeeper | Select where offsets should be stored (zookeeper or kafka). |
offsets.channel.backoff.ms | 1000 | The backoff period when reconnecting the offsets channel or retrying failed offset fetch/commit requests. |
offsets.channel.socket.timeout.ms | 10000 | Socket timeout when reading responses for offset fetch/commit requests. This timeout is also used for ConsumerMetadata requests that are used to query for the offset manager. |
offsets.commit.max.retries | 5 | Retry the offset commit up to this many times on failure. This retry count only applies to offset commits during shut-down. It does not apply to commits originating from the auto-commit thread. It also does not apply to attempts to query for the offset coordinator before committing offsets. i.e., if a consumer metadata request fails for any reason, it will be retried and that retry does not count toward this limit. |
dual.commit.enabled | true | If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any given consumer group, it is safe to turn this off after all instances within that group have been migrated to the new version that commits offsets to the broker (instead of directly to ZooKeeper). |
partition.assignment.strategy | range | Select between the "range" or "roundrobin" strategy for assigning partitions to consumer streams. The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.) Round-robin assignment is permitted only if: (a) Every topic has the same number of streams within a consumer instance (b) The set of subscribed topics is identical for every consumer instance within the group. Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. |
More details about consumer configuration can be found in the scala class kafka.consumer.ConsumerConfig
.
Below is the configuration of the Kafka Connect framework.
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
config.storage.topic | The name of the Kafka topic where connector configurations are stored | string | high | ||
group.id | A unique string that identifies the Connect cluster group this worker belongs to. | string | high | ||
key.converter | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. | class | high | ||
offset.storage.topic | The name of the Kafka topic where connector offsets are stored | string | high | ||
status.storage.topic | The name of the Kafka topic where connector and task status are stored | string | high | ||
value.converter | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. | class | high | ||
internal.key.converter | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. This setting controls the format used for internal bookkeeping data used by the framework, such as configs and offsets, so users can typically use any functioning Converter implementation. | class | low | ||
internal.value.converter | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. This setting controls the format used for internal bookkeeping data used by the framework, such as configs and offsets, so users can typically use any functioning Converter implementation. | class | low | ||
bootstrap.servers | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,... . Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). | list | localhost:9092 | high | |
heartbeat.interval.ms | The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than session.timeout.ms , but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. | int | 3000 | high | |
rebalance.timeout.ms | The maximum allowed time for each worker to join the group once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures. | int | 60000 | high | |
session.timeout.ms | The timeout used to detect worker failures. The worker sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove the worker from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms . | int | 10000 | high | |
ssl.key.password | The password of the private key in the key store file. This is optional for client. | password | null | high | |
ssl.keystore.location | The location of the key store file. This is optional for client and can be used for two-way authentication for client. | string | null | high | |
ssl.keystore.password | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. | password | null | high | |
ssl.truststore.location | The location of the trust store file. | string | null | high | |
ssl.truststore.password | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. | password | null | high | |
connections.max.idle.ms | Close idle connections after the number of milliseconds specified by this config. | long | 540000 | medium | |
receive.buffer.bytes | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used. | int | 32768 | [0,...] | medium |
request.timeout.ms | The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. | int | 40000 | [0,...] | medium |
sasl.jaas.config | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is: ' (=)*;' | password | null | medium | |
sasl.kerberos.service.name | The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. | string | null | medium | |
sasl.mechanism | SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism. | string | GSSAPI | medium | |
security.protocol | Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. | string | PLAINTEXT | medium | |
send.buffer.bytes | The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. | int | 131072 | [0,...] | medium |
ssl.enabled.protocols | The list of protocols enabled for SSL connections. | list | TLSv1.2,TLSv1.1,TLSv1 | medium | |
ssl.keystore.type | The file format of the key store file. This is optional for client. | string | JKS | medium | |
ssl.protocol | The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. | string | TLS | medium | |
ssl.provider | The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. | string | null | medium | |
ssl.truststore.type | The file format of the trust store file. | string | JKS | medium | |
worker.sync.timeout.ms | When the worker is out of sync with other workers and needs to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and waiting a backoff period before rejoining. | int | 3000 | medium | |
worker.unsync.backoff.ms | When the worker is out of sync with other workers and fails to catch up within worker.sync.timeout.ms, leave the Connect cluster for this long before rejoining. | int | 300000 | medium | |
access.control.allow.methods | Sets the methods supported for cross origin requests by setting the Access-Control-Allow-Methods header. The default value of the Access-Control-Allow-Methods header allows cross origin requests for GET, POST and HEAD. | string | "" | low | |
access.control.allow.origin | Value to set the Access-Control-Allow-Origin header to for REST API requests.To enable cross origin access, set this to the domain of the application that should be permitted to access the API, or '*' to allow access from any domain. The default value only allows access from the domain of the REST API. | string | "" | low | |
client.id | An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging. | string | "" | low | |
config.storage.replication.factor | Replication factor used when creating the configuration storage topic | short | 3 | [1,...] | low |
metadata.max.age.ms | The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. | long | 300000 | [0,...] | low |
metric.reporters | A list of classes to use as metrics reporters. Implementing the org.apache.kafka.common.metrics.MetricsReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. | list | "" | low | |
metrics.num.samples | The number of samples maintained to compute metrics. | int | 2 | [1,...] | low |
metrics.recording.level | The highest recording level for metrics. | string | INFO | [INFO, DEBUG] | low |
metrics.sample.window.ms | The window of time a metrics sample is computed over. | long | 30000 | [0,...] | low |
offset.flush.interval.ms | Interval at which to try committing offsets for tasks. | long | 60000 | low | |
offset.flush.timeout.ms | Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. | long | 5000 | low | |
offset.storage.partitions | The number of partitions used when creating the offset storage topic | int | 25 | [1,...] | low |
offset.storage.replication.factor | Replication factor used when creating the offset storage topic | short | 3 | [1,...] | low |
plugin.path | List of paths separated by commas (,) that contain plugins (connectors, converters, transformations). The list should consist of top level directories that include any combination of: a) directories immediately containing jars with plugins and their dependencies b) uber-jars with plugins and their dependencies c) directories immediately containing the package directory structure of classes of plugins and their dependencies Note: symlinks will be followed to discover dependencies or plugins. Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors | list | null | low | |
reconnect.backoff.max.ms | The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. | long | 1000 | [0,...] | low |
reconnect.backoff.ms | The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. | long | 50 | [0,...] | low |
rest.advertised.host.name | If this is set, this is the hostname that will be given out to other workers to connect to. | string | null | low | |
rest.advertised.port | If this is set, this is the port that will be given out to other workers to connect to. | int | null | low | |
rest.host.name | Hostname for the REST API. If this is set, it will only bind to this interface. | string | null | low | |
rest.port | Port for the REST API to listen on. | int | 8083 | low | |
retry.backoff.ms | The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. | long | 100 | [0,...] | low |
sasl.kerberos.kinit.cmd | Kerberos kinit command path. | string | /usr/bin/kinit | low | |
sasl.kerberos.min.time.before.relogin | Login thread sleep time between refresh attempts. | long | 60000 | low | |
sasl.kerberos.ticket.renew.jitter | Percentage of random jitter added to the renewal time. | double | 0.05 | low | |
sasl.kerberos.ticket.renew.window.factor | Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket. | double | 0.8 | low | |
ssl.cipher.suites | A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported. | list | null | low | |
ssl.endpoint.identification.algorithm | The endpoint identification algorithm to validate server hostname using server certificate. | string | null | low | |
ssl.keymanager.algorithm | The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. | string | SunX509 | low | |
ssl.secure.random.implementation | The SecureRandom PRNG implementation to use for SSL cryptography operations. | string | null | low | |
ssl.trustmanager.algorithm | The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. | string | PKIX | low | |
status.storage.partitions | The number of partitions used when creating the status storage topic | int | 5 | [1,...] | low |
status.storage.replication.factor | Replication factor used when creating the status storage topic | short | 3 | [1,...] | low |
task.shutdown.graceful.timeout.ms | Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially. | long | 5000 | low |
Below is the configuration of the Kafka Streams client library.
Below is the configuration of the Kafka Admin client library.
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
bootstrap.servers | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,... . Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). | list | high | ||
ssl.key.password | The password of the private key in the key store file. This is optional for client. | password | null | high | |
ssl.keystore.location | The location of the key store file. This is optional for client and can be used for two-way authentication for client. | string | null | high | |
ssl.keystore.password | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. | password | null | high | |
ssl.truststore.location | The location of the trust store file. | string | null | high | |
ssl.truststore.password | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. | password | null | high | |
client.id | An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging. | string | "" | medium | |
connections.max.idle.ms | Close idle connections after the number of milliseconds specified by this config. | long | 300000 | medium | |
receive.buffer.bytes | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used. | int | 65536 | [-1,...] | medium |
request.timeout.ms | The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. | int | 120000 | [0,...] | medium |
sasl.jaas.config | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is: ' (=)*;' | password | null | medium | |
sasl.kerberos.service.name | The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. | string | null | medium | |
sasl.mechanism | SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism. | string | GSSAPI | medium | |
security.protocol | Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. | string | PLAINTEXT | medium | |
send.buffer.bytes | The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. | int | 131072 | [-1,...] | medium |
ssl.enabled.protocols | The list of protocols enabled for SSL connections. | list | TLSv1.2,TLSv1.1,TLSv1 | medium | |
ssl.keystore.type | The file format of the key store file. This is optional for client. | string | JKS | medium | |
ssl.protocol | The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. | string | TLS | medium | |
ssl.provider | The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. | string | null | medium | |
ssl.truststore.type | The file format of the trust store file. | string | JKS | medium | |
metadata.max.age.ms | The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. | long | 300000 | [0,...] | low |
metric.reporters | A list of classes to use as metrics reporters. Implementing the org.apache.kafka.common.metrics.MetricsReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. | list | "" | low | |
metrics.num.samples | The number of samples maintained to compute metrics. | int | 2 | [1,...] | low |
metrics.recording.level | The highest recording level for metrics. | string | INFO | [INFO, DEBUG] | low |
metrics.sample.window.ms | The window of time a metrics sample is computed over. | long | 30000 | [0,...] | low |
reconnect.backoff.max.ms | The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. | long | 1000 | [0,...] | low |
reconnect.backoff.ms | The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. | long | 50 | [0,...] | low |
retries | The maximum number of times to retry a call before failing it. | int | 5 | [0,...] | low |
retry.backoff.ms | The amount of time to wait before attempting to retry a failed request. This avoids repeatedly sending requests in a tight loop under some failure scenarios. | long | 100 | [0,...] | low |
sasl.kerberos.kinit.cmd | Kerberos kinit command path. | string | /usr/bin/kinit | low | |
sasl.kerberos.min.time.before.relogin | Login thread sleep time between refresh attempts. | long | 60000 | low | |
sasl.kerberos.ticket.renew.jitter | Percentage of random jitter added to the renewal time. | double | 0.05 | low | |
sasl.kerberos.ticket.renew.window.factor | Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket. | double | 0.8 | low | |
ssl.cipher.suites | A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported. | list | null | low | |
ssl.endpoint.identification.algorithm | The endpoint identification algorithm to validate server hostname using server certificate. | string | null | low | |
ssl.keymanager.algorithm | The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. | string | SunX509 | low | |
ssl.secure.random.implementation | The SecureRandom PRNG implementation to use for SSL cryptography operations. | string | null | low | |
ssl.trustmanager.algorithm | The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. | string | PKIX | low |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。