赞
踩
目前我们用的自己写的lag计算程序,会出现偶发性的增长,lag值会一直增长并不会降下来,但从Flink的监控上面看到并没有这么多的积压,把Flink任务重启一把后这个程序获取的lag值才会恢复正常,怀疑是计算程序出现问题,但这个问题不好复现,所以找到了这个工具。
https://github.com/seglo/kafka-lag-exporter
exporter在github上目前有500多颗星,一直在维护,有星就说明这个还行。
cluster_name:静态定义的Kafka群集名称
topic:Kafka主题
partition:Kafka分区
group:Kafka消费者组
member_host:运行分配给此分区的使用者组成员的计算机或容器的主机名或IP
client_id:使用者组成员的ID。通常由组协调器自动生成。
consumer_id:使用者组成员的全局唯一ID。通常是client_id和组协调器生成的GUID的组合。
标签: cluster_name, group, topic, partition, member_host, consumer_id, client_id
The offset of the last consumed offset for this partition in this topic partition for this group.
此分区在此组的此主题分区中上次消费的偏移量的偏移量。
标签: cluster_name, group, topic, partition, member_host, consumer_id, client_id
The difference between the last produced offset and the last consumed offset for this partition in this topic partition for this group.
在此组的此主题分区中,此分区的上次生成的偏移量与上次使用的偏移量之间的差异。(积压)
标签: cluster_name, group, topic, partition, member_host, consumer_id, client_id
The estimated lag in seconds. This metric correlates with lag in offsets.
估计的滞后时间(秒)
标签: cluster_name, group, is_simple_consumer
The highest (maximum) lag in offsets for a given consumer group.
给定消费者组的最高(最大)偏移滞后。
标签: cluster_name, group, is_simple_consumer
The highest (maximum) lag in time for a given consumer group.
给定消费者组的最高(最大)滞后时间。
标签: cluster_name, group
The sum of the difference between the last produced offset and the last consumed offset of all partitions for this group.
此组中所有分区的上次生成的偏移量与上次使用的偏移量之间的差值之和。
标签: cluster_name, group, topic
The sum of the difference between the last produced offset and the last consumed offset of all partitions in this topic for this group.
本主题中此组的所有分区的上次生成的偏移量与上次使用的偏移量之间的差值之和。
标签: cluster_name, topic, partition
The latest offset available for topic partition.
主题分区可用的最新偏移量。
标签: cluster_name, topic, partition
The earliest offset available for topic partition.
主题分区可用的最早偏移量。
标签: cluster_name
The time taken to poll (milli seconds) all the information from all consumer groups for every cluster.
轮询来自每个集群的所有使用者组的所有信息所用的时间(毫秒)。
它提供了很多方式来运行exporter
这个没试验,我们很少用k8s所以就没有研究,可以自己研究下
这里面提供了两种方法,分别说下
从这个链接下载https://github.com/seglo/kafka-lag-exporter/releases
我们大部分环境用的都是jdk8,就导致运行exporter程序时报错
这里提供一个对比表
要作为Java程序运行,jdk最低版本是11,看看你们环境支不支持
进到这个项目路径把这两个文件下载下来,或者把整个项目拉下来
这里提供输出到InfluxDB和Prometheus中的配置例子,我这里Kafka用的单节点的,按照自己实际修改
kafka-lag-exporter {
reporters.prometheus.port = 8000 //暴露在外的给Prometheus的端口
reporters.influxdb.endpoint = "http://192.168.43.205" // influxDB的地址
reporters.influxdb.port = 8086 // influxDB的端口号
reporters.influxdb.database = "KafkaMetrics" // 数据库名
reporters.influxdb.username = "admin" // 用户名
reporters.influxdb.password = "admin" // 密码
sinks = ["InfluxDBPusherSink","PrometheusEndpointSink"] // 输出端类型,默认是Prometheus
clusters = [ //Kafka集群配置
{
name = "test-cluster" // Kafka集群名称(自己起的)
bootstrap-brokers = "192.168.43.210:9092" // Kafka broker地址(我这是单节点的,集群要把所有机器填上)
}
]
}
// 其他具体配置细节看一下官方文档
因为java环境不支持,所以我选择了这种方式。
安装好docker之后直接运行
docker run -it --net=host \
-v $(pwd):/opt/soft/ \
seglo/kafka-lag-exporter:0.7.0 \
/opt/docker/bin/kafka-lag-exporter \
-Dconfig.file=/opt/soft/application.conf \
-Dlogback.configurationFile=/opt/soft/logback.xml
// 具体各个参数表达的什么意思可以自行搜索一下
我这里用的Grafana,这张图的数据源是InfluxDB,只截取了一部分指标,可以看到我们原来计算程序还是挺准的,但是不清楚啥时候就会出问题,就很烦。
官方提供的是Prometheus的监控图表,在这个位置
可以把这个JSON文件导入到Grafana中,具体怎么导入JSON可以参考一下我的另一篇文章,https://blog.csdn.net/weixin_44328192/article/details/126738284
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。