赞
踩
对于 Kafka 消费者,最重要的就是监控它们的消费进度,或者说监控它们消费的滞后程度(消费者 Lag 或 Consumer Lag)。
所谓滞后程度,就是指消费者当前落后于生产者的程度。Lag 的单位是消息数。我们一般是在主题这个级别上讨论 Lag 的,但实际上,Kafka 监控 Lag 的层级是在分区上的。如果要计算主题级别的,你需要手动汇总所有主题分区的 Lag,将它们累加起来,合并成最终的 Lag 值。
一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。如果消费者的速度无法匹及生产者的速度,极有可能导致它消费的数据已经不在操作系统的页缓存中了,那么这些数据就会失去享有 Zero Copy 技术的资格。如此,消费者就不得不从磁盘上读取它们,这就进一步拉大了与生产者的差距。
如何监控消费进度?
1.使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。
2.使用 Kafka Java Consumer API 编程。
3.使用 Kafka 自带的 JMX 监控指标。
使用 Kafka 自带的命令行工具 bin/kafka-consumer-groups.sh(bat)。kafka-consumer-groups 脚本是 Kafka 提供的最直接的监控消费者消费进度的工具。
它不只是能操作和管理消费者组,也能够监控独立消费者(Standalone Consumer)的 Lag。独立消费者就是没有使用消费者组机制的消费者程序。和消费者组相同的是,它们也要配置 group.id 参数值,但和消费者组调用 KafkaConsumer.subscribe() 不同的是,独立消费者调用 KafkaConsumer.assign() 方法直接消费指定分区。
该脚本位于 Kafka 安装目录的 bin 子目录下,我们可以通过下面的命令来查看某个给定消费者的 Lag 值:
bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker 连接信息 > --describe --group <group 名称 >
Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值。
在运行命令时,指定了 Kafka 集群的连接信息,即 localhost:9092。还设置了要查询的消费者组名:testgroup。
输出信息:
按照消费者组订阅主题的分区进行展示,每个分区一行数据;
每个分区当前最新生产的消息的位移值(即 LOG-END-OFFSET 列值),该消费者组当前最新消费消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前两者的差值)、消费者实例 ID、消费者连接 Broker 的主机名以及消费者的 CLIENT-ID 信息。
有的时候,你运行这个脚本可能会出现下面这种情况:
这是因为我们运行 kafka-consumer-groups 脚本时没有启动消费者程序。虽然这些列没有值,但 LAG 列依然是有效的,它依然能够正确地计算出此消费者组的 Lag 值。
用程序的方式自动化监控。kafka社区提供的 Java Consumer API 分别提供了查询当前分区最新消息位移和消费者组最新消费消息位移两组方法,使用它们相减就能计算出对应的 Lag。
public static Map<TopicPartition
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。