当前位置:   article > 正文

kafka消费者配置参数详解

kafka消费者配置
kafka消费者配置参数
参数配置必要性默认值参数说明
bootstrap.servers必须null指定生产者客户端连接Kafka集群所需的broker地址列表,格式为host1:port1,host2:port2
key.deserializer必须org.apache.kafka.common.serialization.ByteArraySerialize这个接口表示类将会采用何种方式序列化,它的作用是把对象转换为字节。实现了Serializer接口的类主要有ByteArraySerializerStringSerializerIntegerSerializer。这三种序列化器的主要区别在于处理的数据类型不同,具体使用哪种序列化器取决于你需要序列化的数据类型。
value.deserializer必须org.apache.kafka.common.serialization.ByteArraySerialize同上
fetch.min.bytes1Kafka消费者在拉取请求中能从Kafka中拉取的最小数据量。当Kafka在收到消费者的拉取请求时,如果返回给消费者的数据量小于这个设置,那么Kafka会等待直到有足够的数据才返回给消费者。
1、适当调整fetch.min.bytes的值可以提高吞吐量,但也会造成额外的延迟。对于写入量不高的主题,这个参数可以减少broker和消费者的压力。对于有大量消费者的主题,则可以明显减轻broker压力。
2、如果消费者的请求经常得不到满足(即返回的数据量小于fetch.min.bytes),那么可能需要增加此值以减少请求的空转时间。反之,如果消费者的请求经常得到过多的数据(即返回的数据量大于fetch.min.bytes),那么可能需要减少此值以减少不必要的网络传输和存储开销。

ps:fetch.min.bytes并非独立工作的参数,需要与其他相关参数如fetch.max.wait.ms、max.partition.fetch.bytes等协同调整以达到最佳效果。例如,增加fetch.min.bytes可能会导致broker压力增加,此时可以考虑增加fetch.max.wait.ms以延长等待时间从而减少broker的压力。
group.id必须Kafka消费者应用程序通常需要指定GroupId,以便Kafka Broker知道它们属于哪个消费者组。
heartbeat.interval.ms3000在 Kafka 中,Consumer 的 heartbeat interval 是用于向 Kafka 发送心跳以告知它消费者仍然活动的频率。如果不定期发送心跳,Kafka 会认为消费者已经死亡并触发重新平衡。
请注意,heartbeat interval 不能设置得太短,否则会发送太多不必要的 heartbeat,浪费网络带宽和 Kafka broker 的资源。通常建议设置为30秒到几分钟之间。
max.partition.fetch.bytes1048576 指定了服务器从每个分区返回给消费者的最大字节数。默认值是1MB。这意味着,KafkaConsumer.poll()方法从每个分区里返回的记录最多不超过max.partition.fetch.bytes指定的字节。

这个属性的值必须比broker能够接收的最大消息的字节数(通过max.message.size属性配置)大,否则消费者可能无法读取消息,导致消费者一直挂起重试。
session.timeout.ms45000 它用于定义消费者与Kafka集群之间的会话超时时间。

如果消费者在这个超时时间内未发送心跳到服务器,服务器将认为消费者不再活跃,并将其从消费者组中移除。
ps:消费者移除后,集群会进行rebalance,rebalance期间消费者组无法进行消费。
通过设置合适的session.timeout.ms,可以确保消费者在正常情况下能够及时响应,而且在网络或其他问题导致连接中断时,能够及时被检测到并从消费者组中移除,以避免不必要的资源浪费。
ssl.keystore.key根据安全协议配置nullkey store 文件中私钥的密码。这对于客户端来说是可选的。
ssl.keystore.location根据安全协议配置nullkey store 文件的位置。这对于客户端来说是可选的,可用于客户端的双向身份验证。
ssl.keystore.password根据安全协议配置nullkey store 文件的密码。这对于客户端是可选的,只有配置了 ssl.keystore.location 才需要配置该选项。
ssl.truststore.certificates根据安全协议配置
ssl.truststore.location根据安全协议配置nulltrust store 文件的位置。
ssl.truststore.password根据安全协议配置nulltrust store 文件的密码。如果一个密码没有设置到 trust store ,这个密码仍然是可用的,但是完整性检查是禁用的。
auto.offset.reset必须latest在 Kafka 中,auto.offset.reset 是一个消费组的配置选项,用于确定当 Kafka 消费者启动时,如果它没有初始的偏移量应该从哪里开始读取。
以下是 auto.offset.reset 的可用值:

none: 如果消费者没有初始偏移量,并且至少一个分区有未消费的消息,那么它将不会开始消费。
earliest: 如果消费者没有初始偏移量,它将从每个分区的最早的消息开始消费。
latest: 如果消费者没有初始偏移量,它将从每个分区的最新消息开始消费。
auto.offset.reset.strategy: 使用一个自定义的策略来决定如何重置偏移量。
connections.max.idle.ms540000在此配置指定的毫秒数后关闭空闲连接。
default.api.timeout.ms60000Kafka 客户端配置中的一个参数,用于指定客户端与 Kafka 服务器交互时的超时时间。这个参数决定了客户端等待 Kafka 服务器响应的最长时间。

具体来说,当客户端向 Kafka 服务器发送请求时,它会等待 default.api.timeout.ms 指定的时间,等待服务器在此期间内响应。如果服务器在超时时间内没有响应,客户端将认为请求失败并采取相应的措施,例如重试请求或报告错误。

通过合理地设置 default.api.timeout.ms,您可以确保客户端与 Kafka 服务器的通信不会因为网络延迟或其他问题而长时间等待。然而,设置太短的超时时间可能会导致频繁的请求失败,而设置太长的超时时间则可能导致客户端等待过长时间。因此,根据您的应用程序的需求和网络环境,选择合适的超时时间是很重要的。
enable.auto.commit必须TRUE控制是否自动提交消费的偏移量
默认情况下,enable.auto.commit 的值为 true,表示 Kafka 消费者会自动提交消费的偏移量。这意味着在每次从 Kafka 服务器拉取消息后,消费者会自动将当前消费的偏移量提交到服务器,以确保消息被成功消费。

如果将 enable.auto.commit 设置为 false,则消费者不会自动提交消费的偏移量。在这种情况下,您需要手动调用 commit() 方法来提交偏移量。

需要注意的是,即使您设置了 enable.auto.commit=false,Kafka 仍然会在每次 poll() 方法的调用中记录偏移量。但是,这些偏移量记录不会立即被提交,而是在您调用 commit() 方法时一起提交。
exclude.internal.topicsTRUE用于指定是否排除 Kafka 中的内部主题。
Kafka 有两个内部主题:__consumer_offsets 和 __transaction_state,这些主题用于存储消费者偏移量和事务状态。默认情况下,这些内部主题是被排除的,即 exclude.internal.topics 的值为 true。这意味着消费者不会自动订阅这些内部主题,并且如果尝试订阅这些主题,将会收到错误消息。
fetch.max.bytes5242880050M用于指定在一次拉取请求中从 Kafka 服务器获取的最大数据量。

这个参数决定了消费者在一次拉取操作中能够从 Kafka 服务器获取的最大数据量。如果某个主题的消息大小超过了该值,那么消费者可能需要执行多次拉取操作才能获取所有消息。
isolation.levelread_uncommitted控制事务的隔离级别
这个参数是用来配置消费者的事务隔离级别的,默认值为read_uncommitted,也就是消费者可以消费到未提交的事务。
如果将isolation.level设置为read_committed,那么只有已经提交的事务,消费者才能消费到。
max.poll.interval.ms300000用于控制消费者从 broker 拉取数据的最长等待时间。这个参数可以防止消费者在拉取数据时因为网络问题或其他原因导致的长时间等待。

具体来说,max.poll.interval.ms 参数定义了消费者在连续两次拉取数据之间的最大时间间隔。如果在这个时间内,消费者没有从 broker 拉取到数据,那么就会触发一个超时错误。
max.poll.records500用于控制一次 poll() 调用返回的记录数。这个参数可以用来控制消费者在拉取循环中的处理数据量。
需要注意的是,max.poll.records 参数和 max.poll.interval.ms 参数是相互关联的。如果 max.poll.records 参数设置得较大,那么 max.poll.interval.ms 参数应该设置得相对较小,以确保消费者能够及时处理数据。反之,如果 max.poll.records 参数设置得较小,那么 max.poll.interval.ms 参数可以设置得相对较大。
partition.assignment.strategyclass org.apache.kafka.clients.consumer.RangeAssignor用于指定分区分配策略。Kafka 提供了多种分区分配策略,包括 RangeAssignor、RoundRobinAssignor 和 StickyAssignor 等。

默认情况下,partition.assignment.strategy 的值为 RangeAssignor,即采用 RangeAssignor 分配策略。该策略按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。

除了 RangeAssignor,Kafka 还提供了 RoundRobinAssignor 和 StickyAssignor 两种分配策略。其中,RoundRobinAssignor 按照轮询方式将分区分配给消费者,而 StickyAssignor 则采用粘性分配策略,尽量将分区分配给上次的消费者。
receive.buffer.bytes65536用于设置 socket 接收缓冲区的大小。如果设置为 -1,则使用操作系统的默认值。

这个参数主要影响消费者从 broker 拉取数据时的性能。如果接收缓冲区的大小设置得较小,可能会导致消费者频繁地从 broker 拉取数据,这会增加网络开销。如果接收缓冲区的大小设置得较大,则可以减少拉取数据的次数,从而提高性能。

需要注意的是,receive.buffer.bytes 参数和 send.buffer.bytes 参数是相互关联的。如果 receive.buffer.bytes 参数设置得较大,那么 send.buffer.bytes 参数也应该相应地设置得较大,以确保网络传输的效率。反之,如果 receive.buffer.bytes 参数设置得较小,那么 send.buffer.bytes 参数可以设置得相对较小。
request.timeout.ms30000用于设置 Consumer 等待请求响应的最长时间。默认值为 30000 毫秒(30 秒)。

这个参数对于控制 Consumer 与 Kafka Broker 之间的交互非常关键。如果 Consumer 向 Broker 发送请求后,在 request.timeout.ms 时间内没有收到响应,那么 Consumer 会认为请求超时并采取相应的处理措施。
sasl.jaas.config根据安全协议配置
sasl.kerberos.service.name根据安全协议配置
sasl.mechanism根据安全协议配置GSSAPI
security.protocolPLAINTEXT
send.buffer.bytes131072
ssl.enabled.protocols根据安全协议配置TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type根据安全协议配置JKS
ssl.protocol根据安全协议配置TLS
ssl.provider根据安全协议配置null
ssl.truststore.type根据安全协议配置5000用于设置自动提交偏移量的时间间隔。默认值为 5000 毫秒(5 秒)。

在 Kafka 中,消费者通过拉取数据并处理,然后提交偏移量来保证消息的可靠消费。auto.commit.interval.ms 参数允许消费者自动提交偏移量,以避免手动提交的繁琐操作。

当消费者拉取数据后,它会检查是否达到了 auto.commit.interval.ms 设定的时间间隔。如果达到了时间间隔,消费者会自动将当前消费的偏移量提交给 Kafka,以确保之前消费的消息得到确认。
auto.commit.interval.ms60 * 1000
client.id
fetch.max.wait.ms500
allow.auto.create.topicsTRUE自动创建topic

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号