当前位置:   article > 正文

使用Golang获取kafka数据积压量(消费者组offset/kafka中数据量)_go 获取kafka 消费状态

go 获取kafka 消费状态

本需求是工作过程中,使用kafka中间件的分析任务速度过慢,排查问题原因时产生,需要获取kafka中的数据积压量(已经数据量-消费者组已消费数据量)。废话不多说,直接上代码。

  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "strings"
  6. "github.com/IBM/sarama"
  7. )
  8. func Usage() {
  9. println("输入有误,使用格式为: ./backlog kafkaAddress topic groupId")
  10. println("示例: ./backlog 10.67.54.24:9092,10.67.54.25:9093 test_in consumer_test_in")
  11. }
  12. func main() {
  13. if len(os.Args) != 4 {
  14. Usage()
  15. os.Exit(1)
  16. }
  17. kafkaAddress := os.Args[1] // kafka 地址
  18. topic := os.Args[2] // 主题
  19. groupId := os.Args[3] // 消费者组
  20. broker := strings.Split(kafkaAddress, ",")
  21. // 获取分区数
  22. config := sarama.NewConfig()
  23. client, err := sarama.NewClient(broker, config)
  24. if err != nil {
  25. panic(err.Error())
  26. }
  27. defer client.Close()
  28. partitions, err := client.Partitions(topic)
  29. if err != nil {
  30. panic(err.Error())
  31. }
  32. om, err := sarama.NewOffsetManagerFromClient(groupId, client)
  33. if err != nil {
  34. panic(err.Error())
  35. }
  36. var totalBacklog int64 = 0
  37. for _, partition := range partitions {
  38. // 调用 sarama.Client的GetOffset方法。返回kafka中下一条数据的offset。
  39. // 如果该分区没有过数据,返回值将为0。
  40. offset, err := client.GetOffset(topic, partition, -1)
  41. if err != nil {
  42. panic(err.Error())
  43. }
  44. pom, err := om.ManagePartition(topic, partition) // 创建 sarama.PartitionOffsetManager
  45. if err != nil {
  46. panic(err.Error())
  47. }
  48. var backlog int64
  49. // 调用 sarama.PartitionOffsetManager的NextOffset方法。返回下一次要消费的offset
  50. // 如果消费者组未消费过该partation的数据,返回值将为-1
  51. n, string := pom.NextOffset()
  52. if string != "" {
  53. panic(string)
  54. }
  55. if n == -1 {
  56. backlog = offset
  57. } else {
  58. backlog = offset - n
  59. }
  60. totalBacklog += backlog
  61. fmt.Printf("partation %d, Kafka中下一条数据offset: %d, 消费者下次提交的offset: %d, 数据积压量: %d\n", partition, offset, n, backlog)
  62. }
  63. fmt.Printf("\n%s 的 %s主题上,消费者组 %s 的数据积压量总数为 %d\n\n", kafkaAddress, topic, groupId, totalBacklog)
  64. fmt.Println("【注意】Kafka中下一条数据offset为0表示该Partation无数据, 消费者下次提交的offset为-1表示消费者组未消费过该Partation")
  65. }

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/735013
推荐阅读
相关标签
  

闽ICP备14008679号