赞
踩
先说结论,需要考虑的因素有以下几个:
每天的消息数
每条消息大小
副本数
消息保留时间
是否启用压缩
建议安装 Kafka 的服务器节点的内存至少大于等于 8G。
Kafka 的内存由堆内存+页缓存组成。
① 堆内存配置
需要在kafka-server-start.sh
进行修改
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"
fi
通过 jps 命令查看 Kafka 的进程号
根据 Kafka 进程号,查看 Kafka 的堆内存
jmap -heap 2321
④ Kafka 页缓存
页缓存是 Linux 系统服务器的内存。我们只需要保证 1 个 segment(默认值为 1G)中 25%的数据在内存中就好。
官网提示:以下三个参数必配:
参数名 | 备注 | 默认值 |
---|---|---|
broker.id | 此参数必配,每台服务器都不相同且唯一。 | -1 |
log.dirs | 此参数必配,而且强烈建议多目录。而且最好多个目录分属于不同的物理磁盘。 好处1:提高读写性能,多块物理磁盘能使读写具有更高的IO量; 好处2:实现故障转移,如果磁盘坏了,可以自动将数据转移到正常磁盘上,且Broker不会宕机。 如果这个参数没配,则使用 log.dir 中的参数配置,默认为/tmp/kafka-logs | null |
log.dir | 如果log.dirs 没配,则使用log.dir | null |
zookeeper.connect | 此参数必配。 格式为: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。例如我们经常配置的: hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka ,切记不要写成: hadoop102:2181/kafka,hadoop103:2181/kafka,hadoop104:2181/kafka | null |
参数名 | 备注 | 默认值 |
---|---|---|
auto.create.topics.enable | 测试环境中,这个参数通常开启,设置为true ,但是生产环境中一般关闭,设置为false。我们会发现一个现象,如果我们不创建主题,直接向名为“test”的主题发送消息,则kafka中会使用默认主题配置自动创建名为test的topic,但是这种情况一般在生产环境中不允许。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。生产环境建议将该参数设置为false 。 | true |
delete.topic.enable | 是否允许删除主题,默认为true ,一般不修改。一般会通过权限管理来限制部分用户删除主题。个别权限控制严格的公司可能会修改为false,但这样就无法删除topic,生产中建议不修改! | true |
参数名 | 备注 | 默认值 |
---|---|---|
background.threads | 默认为10,后台处理任务的线程数。如果不出问题,默认不修改。 | 10 |
num.io.threads | 服务器用于处理请求的线程数,其可能包括磁盘I/O | 8 |
num.network.threads | 服务器用于接收来自网络的请求并向网络发送响应的线程数 | 3 |
num.recovery.threads.per.data.dir | 在启动时用于日志恢复和在关闭时刷新的每个数据目录的线程数 | 1 |
num.replica.alter.log.dirs.threads | 可以在日志目录之间移动副本的线程数,这可能包括磁盘I/O,不算常驻线程,无需设置。 | null |
num.replica.fetchers | 用于复制来自源代理的消息线程的数量。增加该值可以增加follower broker中的I/O平行度的程度。 | 1 |
log.cleaner.threads | 用于日志清理的后台线程数 | 1 |
生产中,一般Kafka会配置压缩以减少磁盘占用。
参数名 | 备注 | 默认值 |
---|---|---|
compression.type | 配置消息压缩格式,有以下几种:'gzip', 'snappy', 'lz4', 'zstd' ,生产环境中需要按照实际是否需要压缩进行配置。还可以配置 'uncompressed' 意为不压缩,还可以配置'producer' 意为和producer端的压缩格式保持一致。 | producer |
ZK相关的参数一般不做修改。默认即可。
参数名 | 备注 | 默认值 |
---|---|---|
controller.quorum.election.backoff.max.ms | 开始新的选举前的最长时间(毫秒),防止选举陷入僵局。默认1000ms,不修改。 | 1000 |
controller.quorum.election.timeout.ms | 没有leader后触发选举的等待市场,默认1000ms,不修改 | 1000 |
controller.quorum.fetch.timeout.ms | 在成为候选者和触发投票选举之前从当前leader处拉取数据失败等待的最大时长,默认2000ms。不修改。 | 2000 |
controller.quorum.voters | 参与投票的服务器列表,逗号分割。需要填入非空的集合 | “” |
zookeeper.connection.timeout.ms | 客户端等待与Zookeeper建立连接的最大时间。 如果没有设置,则使用 zookeeper.session.timeout.ms 中的值 | null |
zookeeper.max.in.flight.requests | 客户端在阻塞前发送给Zookeeper的未确认请求的最大数量。 | 10 |
zookeeper.session.timeout.ms | ZK的会话超时时间 | 18000 |
重平衡相关参数需要根据实际需求进行调整,原理类似于HDFS中的重平衡。
参数名 | 备注 | 默认值 |
---|---|---|
unclean.leader.election.enable | 是否将未设置在ISR中的副本作为最后的手段来选举为leader,即使这样做可能会导致数据丢失。 这个参数默认为false。意为关闭“不干净的”leader选举。 Kafka有多个副本,每个分区都有多个副本,但只有一个leader副本对外提供服务。不是所有的副本都有资格竞选leader,只有保存数据量多的副本才有资格竞选,即所谓的ISR中的才能竞选。未在ISR中的副本被称为unclean的。 如果此参数设置为true,则未在ISR中的副本也可参与竞选,这样就会有丢失数据的风险。 生产环境中建议设置此值为false! | false |
auto.leader.rebalance.enable | 请注意,此参数在生产环境中极易被忽视。 设置为true则表示允许Kafka定期进行对一些topic的leader的重新选举。即便是之前的leader没有任何问题,也有可能在满足选举条件之后换leader。 但是在生产中换leader的成本极高,且没有性能收益,所以在生产中建议设置为false! | true |
leader.imbalance.check.interval.seconds | 检查各个分区是否平衡的频率,默认300s,不修改 | 300 |
leader.imbalance.per.broker.percentage | 触发重平衡的阈值百分比,默认为10。 举例,如果一个broker上有10个分区,有两个分区的leader不是preferred leader,比例超过10%,则会触发重平衡。 | 10 |
参数名 | 备注 | 默认值 |
---|---|---|
log.flush.interval.messages | 刷写数据到磁盘的消息条数间隔,默认值为Long的最大值9223372036854775807,不建议修改! | 9223372036854775807 |
log.flush.interval.ms | 消息在刷新到磁盘之前保存在内存中的最大时间(以ms为单位)。如果没有设置,则使用log.flush.scheduler.interval.ms 中的值,默认null | null |
log.flush.scheduler.interval.ms | 日志刷新器检查是否有日志需要刷新到磁盘的频率(以ms为单位),默认值Long的最大值9223372036854775807,不建议修改! | 9223372036854775807 |
log.flush.offset.checkpoint.interval.ms | 更新作为日志恢复点的上次刷新的持久记录的频率,默认60000ms(1min)。 | 60000 |
log.flush.start.offset.checkpoint.interval.ms | 更新日志开始偏移量的持久记录的频率,默认60000ms(1min)。 | 60000 |
日志保留相关参数需要根据具体的生产实际及磁盘容量与数据量进行调整。
参数名 | 备注 | 默认值 |
---|---|---|
log.retention.bytes | 删除日志前的日志大小,此参数一般不用,默认值为-1。表示在当前Broker上保存的数据容量不受限制。 如果你所在的公司是一个对外提供云服务的公司,需要做租户管理,那么这个参数就有设置的必要性,可以为单租户设置Kafka保存消息的容量上限。 | -1 |
log.retention.hours | 日志保留的时长,默认168小时,7天(建议根据实际情况修改) | 168 |
log.retention.minutes | 日志保留的分长,默认null | null |
log.retention.ms | 日志保留的毫秒值。默认null,如果设置为-1,那么日志无限期保留。 以上三个参数的优先级为: ms > minutes > hours ,如果没有配ms,找minutes,如果没有minutes找hours | null |
log.retention.check.interval.ms | 清理器检查日志符合被删除条件的轮询时间 | 5m |
log.cleaner.delete.retention.ms | 标记为delete的segment日志的保留时长 | 1day |
log.cleaner.backoff.ms | 标记为delete的segment日志清理线程的工作周期 | 15s |
日志滚动与切片参数建议根据生产实际进行调整。
参数名 | 备注 | 默认值 |
---|---|---|
log.roll.hours | 优先级小于log.roll.ms ,日志滚动的最大时长,默认值168hours,也就是7days | 168 |
log.roll.ms | 日志滚动的最大时长,默认值null,这个值未设置会去找log.roll.hours | null |
log.roll.jitter.hours | 给日志段的切分加一个扰动值,避免大量日志段在同一时间进行切分操作。 如果发现kafka有周期性的磁盘I/O打满情况,建议设置此值。 | 0 |
log.roll.jitter.ms | 同上一个参数,优先级高于log.roll.jitter.hours | null |
log.segment.bytes | 单个log文件的最大大小,默认1073741824,也就是1G | 1073741824 |
log.segment.delete.delay.ms | 从文件系统中删除文件之前等待的时间 | 60000 (1 minute) |
副本相关参数一般不做调整。
参数名 | 备注 | 默认值 |
---|---|---|
min.insync.replicas | 当生产者将ack设置为“all”(或“-1”)时,min.insync.replicas 指定必须确认写入成功的最小副本数量。如果无法满足此最小值,则生产者将引发异常(NotEnoughReplicas 或NotEnoughReplicasApend )此参数和ack设置配合使用可以允许您强制执行更大的耐用性保证。典型的方案是创建一个主题,复制因子为3,设置 min.insync.replicas 为2,并使用ack级别为all,如果大多数副本没有收到写入,这将确保生产者提出异常。 | 1 |
replica.fetch.min.bytes | 每次读取响应所需的最小字节数。如果没有足够的字节,则等待下面的replica.fetch.wait.max.ms 参数 | 1 |
replica.fetch.wait.max.ms | follower副本发出的每个获取器请求的最大等待时间。该值在任何时候都应该小于replica.lag.time.max.ms ,以防止低吞吐量主题的ISR频繁收缩 | 500 |
replica.high.watermark.checkpoint.interval.ms | 将高水位保存到磁盘的频率 | 5000 |
replica.lag.time.max.ms | 如果follower没有发送任何fetch请求,或者没有消耗leader日志结束偏移量,leader将从ISR中删除follower | 30000 |
replica.socket.receive.buffer.bytes | 用于网络请求的套接字接收缓冲区,64K | 65536 |
replica.socket.timeout.ms | 网络请求的套接字超时。它的值至少应该是replica.fetch.wait.max.ms | 30000 |
参数名 | 备注 | 默认值 |
---|---|---|
message.max.bytes | Kafka允许的最大消息批次大小,如果启用了压缩,那么计算压缩之后的大小。 在老版本的Kafka中,如果消息未压缩,则不会进行批处理,则此值设置的为单条消息的大小上限。 最新官方文档中默认数值为1048588,刚刚比1M多出来12Bytes。 需要注意,如果在实际生产中,你的单条消息超过了1M,则必须增大此值。 一般地,在生产环境中,尤其是有接收较大单条数据的场景中,为了防止该值过小造成接收数据失败,均需将此值调大。 | 1048588 |
Topic级别配置详解
topic级别的参数,一般都在broker中对应有默认配置,但是也可以对单独的topic进行设置,可以在topic创建之初使用--config
来进行指定,也可以在创建完成之后再进行修改。
参数名 | 备注 | 默认值 | 未指定时参考配置 |
---|---|---|---|
cleanup.policy | 指定过期日志使用的保留策略,默认为delete将删除过期日志段,设置为compact将会进行压缩。可选值为delete或者compact | delete | log.cleanup.policy |
compression.type | 参考broker设置中的压缩,可选值为[uncompressed, zstd, lz4, snappy, gzip, producer] | producer | compression.type |
delete.retention.ms | 配此配置专门针对tombstone类型的消息进行设置,默认为86400000,也就是1天,这个tombstone在当次compact完成后并不会被清理,在下次compact时候,最后的修改时间加上此配置时间值大于当前时间才会被删除。 | 86400000 | log.cleaner.delete.retention.ms |
file.delete.delay.ms | 从文件系统中删除文件前的等待时间 | 60000 | log.segment.delete.delay.ms |
min.cleanable.dirty.ratio | 默认值为0.5,此配置项控制日志压缩的比率,比率越高,则压缩的日志越少。 | 0.5 | log.cleaner.min.cleanable.ratio |
min.compaction.lag.ms | 此参数配合上面的min.cleanable.dirty.ratio 使用。配置该值后,如果上述压缩比率满足,且日志在该值持续时间内有未压缩记录,则日志符合压缩条件。 | 0 | log.cleaner.min.compaction.lag.ms |
max.compaction.lag.ms | 消息在日志中不符合压缩条件的最长时间。仅适用于正在压缩的日志。配置该值后,如果日志在该值持续时间内有未压缩记录,则日志符合压缩条件。 | 9223372036854775807 | log.cleaner.max.compaction.lag.ms |
参数名 | 备注 | 默认值 | 未指定时参考配置 |
---|---|---|---|
flush.messages | 强制刷写的消息条数,建议不要设置此值!! 相关说明参考broker端配置。 | 9223372036854775807 | log.flush.interval.messages |
flush.ms | 强制刷写的时间间隔,建议不要设置此值!! 相关说明参考broker端配置。 | 9223372036854775807 | log.flush.interval.ms |
参数名 | 备注 | 默认值 | 未指定时参考配置 |
---|---|---|---|
index.interval.bytes | 索引的间隔,我们知道Kafka为稀疏索引,默认值为4096,如果设置较小的值,则索引间隔变短,索引的容量会变大,但能更快命中。建议不修改,默认即可。 | 4096 | log.index.interval.bytes |
参数名 | 备注 | 默认值 | 未指定时参考配置 |
---|---|---|---|
max.message.bytes | 参考broker端的message.max.bytes 配置项,设置单条消息的大小上限。建议在生产中调大! | 1048588 | message.max.bytes |
message.timestamp.type | 定义消息中的时间戳取创建时间还是日志追加时间,可选值为[CreateTime, LogAppendTime] ,默认为CreateTime ,无需修改。 | CreateTime | log.message.timestamp.type |
min.insync.replicas | 具体解释参考broker端配置项min.insync.replicas | 1 | min.insync.replicas |
unclean.leader.election.enable | 具体解释参考broker端配置项unclean.leader.election.enable | false | unclean.leader.election.enable |
参数名 | 备注 | 默认值 | 未指定时参考配置 |
---|---|---|---|
retention.bytes | 具体解释参考broker端配置项log.retention.bytes | -1 | log.retention.bytes |
retention.ms | 具体解释参考broker端配置项log.retention.ms | 604800000 | log.retention.ms |
segment.bytes | 具体解释参考broker端配置项log.segment.bytes | 1073741824 | log.segment.bytes |
segment.index.bytes | 具体解释参考broker端配置项log.index.size.max.bytes | 10485760 | log.index.size.max.bytes |
segment.jitter.ms | 具体解释参考broker端配置项log.roll.jitter.ms | 0 | log.roll.jitter.ms |
segment.ms | 具体解释参考broker端配置项log.roll.ms | 604800000 | log.roll.ms |
Producer端配置详解
Producer负责向服务器发送数据,在实际生产中,更多的是使用API作为Producer端进行数据发送。
以下是Producer端中比较重要的配置参数。
参数名 | 备注 | 默认值 |
---|---|---|
key.serializer | 需填写key的序列化器,实现org.apache.kafka.common.serialization.Serializer 接口 | null |
value.serializer | 需填写value的序列化器,实现org.apache.kafka.common.serialization.Serializer 接口 | null |
partitioner.class | 分区器的全类名。可以通过实现org.apache.kafka.clients.producer.Partitioner 接口自定义分区器。 | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
interceptor.classes | 拦截器全类名,可以通过实现org.apache.kafka.clients.producer.ProducerInterceptor 接口自定义拦截器 | null |
参数名 | 备注 | 默认值 |
---|---|---|
bootstrap.servers | 须填写Kafka集群的地址与端口,如果包含多个broker,可以只填写其中一个,但是为预防填写单个broker产生宕机,建议填写全部可连接broker地址 | null |
compression.type | 压缩格式,默认为none。因为broker端压缩配置默认为producer,意为与producer保持一致。可选值为[none, zstd, lz4, snappy, gzip] 。设置压缩可以提高消息发送效率。 | none |
参数名 | 备注 | 默认值 |
---|---|---|
buffer.memory | 生产者用于将消息发送至服务器的总缓冲区大小,如果消息发送的速度超过了传送到服务器的速度,生产者将会阻塞max.block.ms 时间,之后抛出异常。默认值为33554432,也就是32M。 生产实际中 ,如果单条消息过大,需要调整该参数。 | 33554432 |
batch.size | 此参数十分重要! 默认16K。 如果消息发往同一个分区,producer端将会把消息以批处理的形式发送,一个批次的大小由该参数控制! 如果消息大小大于该值,则消息不会以批处理形式发送。 该值不宜设置过大,太大的批会浪费内存空间。 如果一批数据未达到该值,则该批数据会等待 linger.ms 时间后发送。为了提升处理效率,Kafka将从缓冲区中预先分配 batch.size 大小的内存空间。生产中一般会根据实际消息大小,将 buffer.memory 和batch.size 配合调整。 | 16384 |
linger.ms | 一批消息发送之前的等待时长,默认为0。合理设置此时间将有助于有效填满批次,提高效率。但是如果每次一批数据都能攒满发送,设置此值将会造成整体消息延迟。生产中需要根据具体需求设置。 | 0 |
max.request.size | 单个请求的最大值。如果单个消息过大,需要调整该值。 | 1048576 |
参数名 | 备注 | 默认值 |
---|---|---|
receive.buffer.bytes | TCP接收缓冲大小,默认不修改 | 32768 |
send.buffer.bytes | TCP发送缓冲大小,默认不修改 | 131072 |
参数名 | 备注 | 默认值 |
---|---|---|
retries | 此值大于0,客户端将会重新发送失败的消息。如果max.in.flight.requests.per.connection 的值不为1,则重试有可能会改变消息的有序性。一般这个值默认即可,不需要调整,重试由delivery.timeout.ms 参数控制。 | 2147483647 |
acks | 重要参数!ack级别决定了一个请求完成之前,生产者要求leader返回的确认数。 可选值有 [all, -1, 0, 1] 。acks为0表示生产者不会等待服务器确认,这样不能保证服务器已经收到消息。 acks为1表示leader已经确认,但是未等待follower确认。 ack为all或者-1需要等待ISR中的所有副本确认,保证数据不丢失。 | all |
enable.idempotence | 默认为true。生产者将确保每个消息精准写入一次。如果设置为false,可能因为broker端失败而重复写入消息。如果要启用幂等,需要满足:max.in.flight.requests.per.connection 小于等于5,retries 大于0,acks 为all。 | true |
max.in.flight.requests.per.connection | 客户端在单个连接上发送的未确认请求的最大数量。如果此值大于1,且enable.idempotence 为false,则消息有可能因重试而产生乱序。 | 5 |
参数名 | 备注 | 默认值 |
---|---|---|
transactional.id | 如果需要启用事务,需要分配该值。默认为null | null |
Consumer对数据进行消费,一般通过流处理框架对Kafka中的数据进行消费处理,因此,Consumer端是否消费正常对于数据处理显得尤为重要。
以下是Consumer端的重要参数。
参数名 | 备注 | 默认值 |
---|---|---|
bootstrap.servers | 须填写Kafka集群的地址与端口,如果包含多个broker,可以只填写其中一个,但是为预防填写单个broker产生宕机,建议填写全部可连接broker地址 | null |
group.id | 规定消费者属于哪个消费者组。如果需要使用Topic,Partition和Group来限定消费offset,则此值必须设置! | null |
auto.offset.reset | Kafka中没有初始偏移量的时候,或者当前偏移量在服务器上没有时候的消费策略。 可选值: [latest, earliest, none] 。1.earliest会重置到最早偏移量。 2.latest重置为最近偏移量。 3.设置为none时候,如果消费者所在的组没有找到之前的偏移量,则抛出异常。 | latest |
参数名 | 备注 | 默认值 |
---|---|---|
key.deserializer | key的反序列化器,需要实现org.apache.kafka.common.serialization.Deserializer 接口 | null |
value.deserializer | value的反序列化器,需要实现org.apache.kafka.common.serialization.Deserializer 接口 | null |
interceptor.classes | 拦截器的类。可通过实现org.apache.kafka.clients.consumer.ConsumerInterceptor 进行自定义。 | null |
参数名 | 备注 | 默认值 |
---|---|---|
enable.auto.commit | 设置是否周期性自动提交offset。如果在代码中手动维护偏移量,则需要将此值设置为false。 | true |
auto.commit.interval.ms | 如果设置了自动提交enable.auto.commit 为true,那么该参数生效,提交间隔默认为5秒。 | 5000 |
参数名 | 备注 | 默认值 |
---|---|---|
fetch.min.bytes | 服务器在获取请求时返回的最小数据量,默认为1,意味着只要有一个字节的数据可用,就会立即相应请求,调大该值将会让服务器等待接受更多数据,但是会略微增加延迟。 | 1 |
fetch.max.bytes | 服务器在获取请求时返回的最大数据量。默认50M。 如果Kafka发送的每条消息都比这个数值要大,那只会返回非空分区中的第一条数据。 Kafka中所能接收的最大消息大小受两个参数控制: 1.broker端参数 message.max.bytes 2.topic级别参数 max.message.bytes | 52428800 |
fetch.max.wait.ms | 如果没有足够的数据满足fetch.min.bytes ,服务器将等待的最大时长。 | 500 |
max.partition.fetch.bytes | 默认1M。 服务器将返回每个分区的最大数据量。 | 1048576 |
参数名 | 备注 | 默认值 |
---|---|---|
partition.assignment.strategy | 消费者的分区分配策略。有以下几种:org.apache.kafka.clients.consumer.RangeAssignor ,org.apache.kafka.clients.consumer.RoundRobinAssignor ,org.apache.kafka.clients.consumer.StickyAssignor ,org.apache.kafka.clients.consumer.CooperativeStickyAssignor 。第一种按照消费者总数和分区总数进行均分, 第二种为轮循分配, 第三种为黏性分区, 第四种与第三种类似,但是多了再平衡。 默认为第一种。 也可以自定义,只需要实现 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 接口 | class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor |
receive.buffer.bytes | TCP接收缓冲大小,默认不修改 | 65536 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。