赞
踩
本需求是工作过程中,使用kafka中间件的分析任务速度过慢,排查问题原因时产生,需要获取kafka中的数据积压量(已经数据量-消费者组已消费数据量)。废话不多说,直接上代码。
- package main
-
- import (
- "fmt"
- "os"
- "strings"
-
- "github.com/IBM/sarama"
- )
-
- func Usage() {
- println("输入有误,使用格式为: ./backlog kafkaAddress topic groupId")
- println("示例: ./backlog 10.67.54.24:9092,10.67.54.25:9093 test_in consumer_test_in")
- }
- func main() {
- if len(os.Args) != 4 {
- Usage()
- os.Exit(1)
- }
- kafkaAddress := os.Args[1] // kafka 地址
- topic := os.Args[2] // 主题
- groupId := os.Args[3] // 消费者组
- broker := strings.Split(kafkaAddress, ",")
- // 获取分区数
- config := sarama.NewConfig()
- client, err := sarama.NewClient(broker, config)
- if err != nil {
- panic(err.Error())
- }
- defer client.Close()
- partitions, err := client.Partitions(topic)
- if err != nil {
- panic(err.Error())
- }
-
- om, err := sarama.NewOffsetManagerFromClient(groupId, client)
- if err != nil {
- panic(err.Error())
- }
- var totalBacklog int64 = 0
- for _, partition := range partitions {
- // 调用 sarama.Client的GetOffset方法。返回kafka中下一条数据的offset。
- // 如果该分区没有过数据,返回值将为0。
- offset, err := client.GetOffset(topic, partition, -1)
- if err != nil {
- panic(err.Error())
- }
- pom, err := om.ManagePartition(topic, partition) // 创建 sarama.PartitionOffsetManager
- if err != nil {
- panic(err.Error())
- }
- var backlog int64
- // 调用 sarama.PartitionOffsetManager的NextOffset方法。返回下一次要消费的offset
- // 如果消费者组未消费过该partation的数据,返回值将为-1
- n, string := pom.NextOffset()
- if string != "" {
- panic(string)
- }
- if n == -1 {
- backlog = offset
- } else {
- backlog = offset - n
- }
- totalBacklog += backlog
- fmt.Printf("partation %d, Kafka中下一条数据offset: %d, 消费者下次提交的offset: %d, 数据积压量: %d\n", partition, offset, n, backlog)
- }
- fmt.Printf("\n%s 的 %s主题上,消费者组 %s 的数据积压量总数为 %d\n\n", kafkaAddress, topic, groupId, totalBacklog)
-
- fmt.Println("【注意】Kafka中下一条数据offset为0表示该Partation无数据, 消费者下次提交的offset为-1表示消费者组未消费过该Partation")
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。