当前位置:   article > 正文

KafkaQ - 好用的 Kafka Linux 命令行可视化工具_kafka命令行工具

kafka命令行工具

软件效果前瞻 ~

鉴于并没有在网上找到比较好的linux平台的kafka可视化工具,今天为大家介绍一下自己开发的在 Linux 平台上使用的可视化工具KafkaQ

虽然简陋,主要可以实现下面的这些功能:

1)查看当前topic的分片数量和副本数量

2)查看当前topic下面每个分片的最大offset

3)查看当前topic某个分片下面指定offset范围的数据

4)搜索当前topic指定关键词的message

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

KafkaQ分为普通版本和搜索版本:

* 普通版本支持上述3种查询

* 搜索版本支持上述3种查询之外,增加关键词搜索,即在分片中搜索指定关键词的message

一、普通版 KafkaQ.sh

使用方法:

  1. Usage: KafkaQ.sh --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>]
  2. --topic 话题名称
  3. --partition 分片索引(可选)
  4. --offset 从第k个offset开始检索(可选)
  5. --limit 从第k个offset开始检索X条结果(可选)

显示的效果如下,十分简洁,分片数据里面左边一列是消息入库的时间,右边是message内容:

KafkaQ 源码如下:

  1. #!/bin/bash
  2. # 默认值
  3. PARTITION=${2:-0}
  4. OFFSET=${3:-0}
  5. LIMIT=${4:-0}
  6. # 检查参数
  7. if [ -z "$1" ]; then
  8. echo "Usage: $0 --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>]"
  9. exit 1
  10. fi
  11. TOPIC="$1"
  12. # 检查Kafka命令是否存在
  13. if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; then
  14. echo "Kafka not found at /usr/local/kafka/bin/"
  15. exit 1
  16. fi
  17. # 获取Topic信息
  18. echo -e "\033[0;31m* 话题: $TOPIC\033[0m"
  19. # 获取分区数和副本数
  20. PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
  21. PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
  22. REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')
  23. echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"
  24. # 获取分片a和分片b的最大偏移量
  25. MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC" | awk -F: '{ printf " 分片: %s,MaxOffset: %s\n", $2, $3 }')
  26. echo "$MAX_OFFSET"
  27. # 获取分片数据
  28. if [ "$LIMIT" -gt 0 ]; then
  29. echo -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"
  30. MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)
  31. # 格式化输出消息
  32. echo "$MESSAGES" | awk -F'\t' 'BEGIN {
  33. print "* 分片数据:"
  34. }
  35. {
  36. if ($3 != "null") {
  37. timestamp = substr($1, 12) / 1000 # 从第10个字符开始提取时间戳,并除以1000以转换为秒级时间戳
  38. value = $3
  39. printf "\033[0;33m%s\033[0m %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value
  40. }
  41. }'
  42. fi

二、搜索版 KafkaQ-Search.sh

使用方法:

  1. Usage: KafkaQ-Search.sh --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>] [--search<keyword>]
  2. --topic 话题名称
  3. --partition 分片索引(可选)
  4. --offset 从第k个offset开始检索(可选)
  5. --limit 从第k个offset开始检索X条结果(可选)
  6. --search 搜索字符串

示例(所有参数是必选的哦):

sh KafkaQ-Search.sh --topic log --partition 0 --offset 0 --limit 18480 --search '9fea9c52-c0fe-4429-81e1-d045f35f9be9'

显示效果如下:

KafkaQ-Search.sh 源码如下:

  1. #!/bin/bash
  2. # 默认值
  3. PARTITION=${2:-0}
  4. OFFSET=${3:-0}
  5. LIMIT=${4:-0}
  6. SEARCH=${5:-""}
  7. # 检查参数
  8. if [ -z "$1" ]; then
  9. echo "Usage: $0 --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>] [--search<keyword>]"
  10. exit 1
  11. fi
  12. while [[ $# -gt 0 ]]; do
  13. case "$1" in
  14. --topic)
  15. TOPIC="$2"
  16. shift 2
  17. ;;
  18. --partition)
  19. PARTITION="$2"
  20. shift 2
  21. ;;
  22. --offset)
  23. OFFSET="$2"
  24. shift 2
  25. ;;
  26. --limit)
  27. LIMIT="$2"
  28. shift 2
  29. ;;
  30. --search)
  31. SEARCH="$2"
  32. shift 2
  33. ;;
  34. *)
  35. echo "Unknown parameter: $1"
  36. exit 1
  37. ;;
  38. esac
  39. done
  40. # 检查Kafka命令是否存在
  41. if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; then
  42. echo "Kafka not found at /usr/local/kafka/bin/"
  43. exit 1
  44. fi
  45. # 获取Topic信息
  46. echo -e "\033[0;31m* 话题: $TOPIC\033[0m"
  47. # 获取分区数和副本数
  48. PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
  49. PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
  50. REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')
  51. echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"
  52. # 获取分片a和分片b的最大偏移量
  53. MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC" | awk -F: '{ printf " 分片: %s,MaxOffset: %s\n", $2, $3 }')
  54. echo "$MAX_OFFSET"
  55. # 获取分片数据
  56. if [ "$LIMIT" -gt 0 ]; then
  57. echo -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"
  58. MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)
  59. # 搜索关键词并输出结果
  60. if [[ ! -z $SEARCH ]]; then
  61. echo -e "\033[0;32m* 搜索条件:$SEARCH\033[0m"
  62. echo " 搜索结果:"
  63. echo "$MESSAGES" | grep --color=auto "$SEARCH" | awk -F'\t' '{
  64. timestamp = substr($1, 12) / 1000 # Extract timestamp starting from the 12th character and convert to seconds
  65. value = $3
  66. # Print with timestamp formatted and value highlighted in yellow (ANSI escape code 0;33m) if matches $SEARCH
  67. printf "\033[0;33m%s\033[0m ", strftime("%Y-%m-%d %H:%M:%S", timestamp)
  68. gsub("'"$SEARCH"'", "\033[0;33m&\033[0m", value)
  69. print value
  70. }'
  71. fi
  72. fi

 * (附注)参考的shell如下

1、获取kafka的topic 分区数量

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic <topic>

2、获取kafka每个分片最大的offset

/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic>

3、获取kafka分片指定offset范围的具体信息

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic> --partition <partition> --offset <offset> --max-messages <max-message> --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/911998
推荐阅读
相关标签
  

闽ICP备14008679号