赞
踩
最近再看Kafka性能调优,找到了两篇文档这里整合一下供大家查阅
Kafka 监控和调优:如何监控和调优 Kafka 的性能 - 掘金
Apache Kafka 是一个高性能、高可靠性的分布式消息队列,但在实际应用中,为了保证 Kafka 的稳定性和可靠性,需要进行监控和调优。本文将介绍如何监控和调优 Kafka 的性能。
Kafka 提供了多种监控指标,可以帮助我们了解 Kafka 的运行情况和性能瓶颈。下面是一些常用的 Kafka 监控指标:
Broker 监控指标
Topic 监控指标
Consumer 监控指标
Kafka 可以通过 JMX、Kafka Manager、Grafana 等工具进行监控。下面是一个使用 JMX 进行监控的示例代码:
JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi")); MBeanServerConnection connection = connector.getMBeanServerConnection(); ObjectName brokerObjectName = new ObjectName("kafka.server:type=KafkaServer,name=BrokerState"); String brokerState = (String) connection.getAttribute(brokerObjectName, "Value"); System.out.println("Broker state: " + brokerState);
上面的代码中,首先创建了一个 JMXConnector 对象,用于连接到 Kafka 的 JMX 端口(默认为 9999)。然后,获取 MBeanServerConnection 对象,用于获取 BrokerState MBean 的属性。最后,获取 BrokerState 的属性值,即 Broker 的状态信息。
Kafka 的性能受多个因素影响,如磁盘、内存、网络等。下面是一些常用的 Kafka 调优技巧:
调整 Kafka 配置参数
Kafka 提供了多个配置参数,可以用于调整 Kafka 的性能。如下表所示:
配置参数 | 描述 |
---|---|
num.io.threads | 每个 Broker 的 I/O 线程数,默认为 8 |
num.network.threads | 每个 Broker 的网络线程数,默认为 3 |
log.retention.bytes | 每个分区保留的日志文件大小,默认为 -1(表示无限制) |
log.retention.hours | 每个分区保留的日志文件时间,默认为 168 小时(7 天) |
log.segment.bytes | 日志文件分段的大小,默认为 1GB |
log.flush.interval.messages | 每隔多少条消息进行一次日志刷盘操作,默认为 10000 |
log.flush.interval.ms | 每隔多少毫秒进行一次日志刷盘操作,默认为 1000 |
replica.fetch.max.bytes | 每个副本从 Leader 中获取消息的最大字节数,默认为 1MB |
replica.fetch.wait.max.ms | 副本等待 Leader 返回消息的最大时间,默认为 500 |
选择合适的硬件配置
Kafka 的性能受硬件配置的影响较大,选择合适的硬件配置可以提高 Kafka 的性能。如下表所示:
硬件配置 | 描述 |
---|---|
CPU | Kafka 的性能受 CPU 的影响较大,选择高性能的 CPU 可以提高 Kafka 的性能。 |
内存 | Kafka 会将消息缓存到内存中,选择足够的内存可以提高 Kafka 的性能。 |
磁盘 | Kafka 的性能受磁盘的影响较大,选择高性能的磁盘可以提高 Kafka 的性能。 |
网络 | Kafka 的性能受网络的影响较大,选择高性能的网络设备可以提高 Kafka 的性能。 |
分区数量和副本数量的选择
分区数量和副本数量的选择会影响 Kafka 的性能和可靠性。通常情况下,应该选择足够的分区数量和副本数量,以提高 Kafka 的性能和可靠性。但是,分区数量和副本数量过多会增加 Kafka 的负担,降低 Kafka 的性能。
在实际应用中,需要根据具体的业务需求和性能要求选择合适的 Kafka 配置参数和硬件配置,以提高 Kafka 的性能和可靠性。
常规参数
为了更好的使用Kafka Producer,首先介绍一下几个基本参数。
bootstrap.servers
配置Kafka broker的服务器地址列表,多个用英文逗号分开,可以不必写全,Kafka内部有自动感知Kafka broker的机制。
client.dns.lookup
客户端寻找bootstrap地址的方式,支持如下两种方式:
resolve_canonical_bootstrap_servers_only
这种方式,会依据bootstrap.servers提供的主机名(hostname),根据主机上的名称服务返回其IP地址的数组(InetAddress.getAllByName),然后依次获取inetAddress.getCanonicalHostName(),再建立tcp连接。
一个主机可配置多个网卡,如果启用该功能,应该可以有效利用多网卡的优势,降低Broker的网络端负载压力。
use_all_dns_ips
这种方式会直接使用bootstrap.servers中提供的hostname、port创建tcp连接,默认选项。
compression.type
消息的压缩算法,目前可选值:none、gzip、snappy、lz4、zstd,默认不压缩,建议与Kafka服务器配置的一样,当然Kafka服务端可以配置的压缩类型为 producer,即采用与发送方配置的压缩类型。发送方与Broker 服务器采用相同的压缩类型,可有效避免在Broker服务端进行消息的压缩与解压缩,大大降低Broker的CPU使用压力。
client.id
客户端ID,如果不设置默认为producer-递增,强烈建议设置该值,尽量包含ip,port,pid。
send.buffer.bytes
网络通道(TCP)的发送缓存区大小,默认为128K。
receive.buffer.bytes
网络通道(TCP)的接收缓存区大小,默认为32K。
reconnect.backoff.ms
重新建立链接的等待时长,默认为50ms,属于底层网络参数,基本无需关注。
reconnect.backoff.max.ms
重新建立链接的最大等待时长,默认为1s,连续两次对同一个连接建立重连,等待时间会在reconnect.backoff.ms的初始值上成指数级递增,但超过max后,将不再指数级递增。
key.serializer
消息key的序列化策略,为org.apache.kafka.common.serialization接口的实现类。
value.serializer
消息体的序列化策略
partitioner.class
消息发送队列负载算法,其默 DefaultPartitioner,路由算法如下:
如果指定了 key ,则使用 key 的 hashcode 与分区数取模。
如果未指定 key,则轮询所有的分区。
interceptor.classes
拦截器列表,kafka运行在消息真正发送到broker之前对消息进行拦截加工。
enable.idempotence
是否开启发送端的幂等,这个机制后续会重点剖析其实现原理,默认为false。
transaction.timeout.ms
事务协调器等待客户端的事务状态反馈的最大超时时间,默认为60s。
transactional.id
事务id,用于在一个事务中唯一标识一个客户端。
5.1 核心参数一览
工作机制相关参数,涉及到消息发送是如何工作的,本节首先将罗列参数,做简单说明,然后再给出运作图,进一步阐述其工作机制。
buffer.memory
用于设置一个生产者(KafkaProducer)中缓存池的内存大小,默认为32M。
max.block.ms
当消息发送者申请空闲内存时,如果可用内存不足的等待时长,默认为60s,如果在指定时间内未申请到内存,消息发送端会直接报TimeoutException,这个时间包含了发送端用于查找元信息的时间。
retries
重试次数,Kafka Sender线程从缓存区尝试发送到Broker端的重试次数,默认为Integer.MAX_VALUE,为了避免无限重试,只针对可恢复的异常,例如Leader选举中这种异常就是可恢复的,重试最终是能解决问题的。
acks
用来定义消息“已提交”的条件(标准),就是 Broker 端向客户端承偌已提交的条件,可选值如下:
0
表示生产者不关心该条消息在 broker 端的处理结果,只要调用 KafkaProducer 的 send 方法返回后即认为成功,显然这种方式是最不安全的,因为 Broker 端可能压根都没有收到该条消息或存储失败。
all 或 -1
表示消息不仅需要 Leader 节点已存储该消息,并且要求其副本(准确的来说是 ISR 中的节点)全部存储才认为已提交,才向客户端返回提交成功。这是最严格的持久化保障,当然性能也最低。
1
表示消息只需要写入 Leader 节点后就可以向客户端返回提交成功。
batch.size
在消息发送端Kafka引入了批的概念,发送到服务端的消息通常不是一条一条发送,而是一批一批发送,该值用于设置每一个批次的内存大小,一个批次对应源码层级为ProducerBatch对象,默认为16K。
linger.ms
该参数与batch.size配合使用。Kafka希望一个批次一个批次去发送到Broker,应用程序往KafkaProducer中发送一条消息,首先会进入到内部缓冲区,具体是会进入到某一个批次中(ProducerBatch),等待该批次堆满后一次发送到Broker,这样能提高消息的吞吐量,但其消息发送的延迟也会相应提高,试想一下,如果在某一个时间端,应用端发送到broker的消息太少,不足以填满一个批次,那岂不是消息一直无法发送到Broker端吗?
为了解决该问题,linger.ms参数应运而生。它的作用是控制在缓存区中未积满时来控制消息发送线程的行为。如果linger.ms 设置为 0表示立即发送,如果设置为大于0,则消息发送线程会等待这个值后才会向broker发送。有点类似于 TCP 领域的 Nagle 算法。
delivery.timeout.ms
消息在客户端缓存中的过期时间,在Kafka的消息发送模型中,消息先进入到消息发送端的双端缓存队列中,然后单独一个线程将缓存区中的消息发送到Broker,该参数控制在双端队列中的过期时间,默认为120s,从进入双端队列开始计时,超过该值后会返回超时异常(TimeoutException)。
request.timeout.ms
请求的超时时间,主要是Kafka消息发送线程(Sender)与Broker端的网络通讯的请求超时时间。
max.request.size
Send线程一次发送的最大字节数量,也就是Send线程向服务端一次消息发送请求的最大传输数据,默认为1M。
max.in.flight.requests.per.connection
设置每一个客户端与服务端连接,在应用层一个通道的积压消息数量,默认为5,有点类似Netty用高低水位线控制发送缓冲区中积压的多少,避免内存溢出。
上面的核心参数在表述上可能不够直观,接下来我想简单通过两张图阐述一下Kafka消息发送相关的核心原理。
首先,我们来看一下消息发送者相关的数据结构:
Kafka的每一个消息发送者,也就是KafkaProducer对象内部会有一块缓存区,其总大小由buffer.memory指定,默认为32M,但内存的组织会按照topic+parition构建双端队列,队列中的每一个元素为一个ProducerBatch对象,表示一个消息发送批次,但发送线程将消息发送到Broker端时,一次可以包含多个批次。一次允许发送的消息总大小受max.request.size控制,默认为1M。
在了解了核心数据结构后,我们再看一下各个核心参数在消息发送的各个阶段是如何工作的。
从Kafka Producer 的工作原理来看,在客户端所谓的性能优化,其实就是延迟、吞吐率、数据完整性的一个权衡。在具体的实践中通常可以调整的参数主要如下:
acks 这个只能是根据业务的特点,对数据丢失的容忍度,通常该参数在实践过程中遇到性能瓶颈后,调整该参数的可能性几乎没有,因为需要牺牲数据的完整性,此举并不是一个好的方案。
batch.size 与 linger.ms
通常可以适当修改batch.size与linger.ms的值,特别是linger.ms值,牺牲一定的延时,方便更多数据进入到Batch,从而提高Sender线程一次发送的数据大小,提高带宽,显著提高吞吐率,但牺牲延时。当然如果是想提高响应延迟,则采取的手段则恰恰相反。
buffer.memory、max.request.size
如果需要进一步提高吞吐量,可以适当提高buffer.memory的大小,让客户端能缓存更多数据,并且调高max.request.size,进一步提高单次消息发送的消息量。
5.5 备忘录
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。