当前位置:   article > 正文

Java kafka监控 topic的数据量count情况,每个topic的Summed Recent Offsets(总结最近的偏移量)

summed recent offsets

Java使用kafka的API来监控kafka的某些topic的数据量增量,offset,定时查总量之后,然后计算差值,然后就可以算单位间隔的每个topic的增量,kafka监控一般都是监控的吞吐量,即数据量的大小,而不在意这个count,数量。额,这个就是在意count。统计一下count。总结最近的偏移量---Summed Recent Offsets.

使用的jar依赖

    compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.8.0'

Java代码

  1. import com.google.common.collect.Lists;
  2. import com.google.common.collect.Maps;
  3. import kafka.api.PartitionOffsetRequestInfo;
  4. import kafka.common.TopicAndPartition;
  5. import kafka.javaapi.OffsetResponse;
  6. import kafka.javaapi.PartitionMetadata;
  7. import kafka.javaapi.TopicMetadata;
  8. import kafka.javaapi.TopicMetadataRequest;
  9. import kafka.javaapi.consumer.SimpleConsumer;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import java.util.Date;
  13. import java.util.List;
  14. import java.util.Map;
  15. /**
  16. * kafka监控 topic的数据消费情况
  17. *
  18. * @author LiXuekai on 2020/9/16
  19. */
  20. public class KafkaMonitorTools {
  21. private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMonitorTools.class);
  22. public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
  23. TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
  24. Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = Maps.newHashMap();
  25. requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
  26. kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
  27. OffsetResponse response = consumer.getOffsetsBefore(request);
  28. if (response.hasError()) {
  29. LOGGER.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
  30. return 0;
  31. }
  32. long[] offsets = response.offsets(topic, partition);
  33. return offsets[0];
  34. }
  35. /**
  36. * @param brokers broker 地址
  37. * @param topic topic
  38. * @return map<分区, 分区count信息>
  39. */
  40. public static Map<Integer, PartitionMetadata> findLeader(List<String> brokers, String topic) {
  41. Map<Integer, PartitionMetadata> map = Maps.newHashMap();
  42. for (String broker : brokers) {
  43. SimpleConsumer consumer = null;
  44. try {
  45. String[] hostAndPort = broker.split(":");
  46. consumer = new SimpleConsumer(hostAndPort[0], Integer.parseInt(hostAndPort[1]), 100000, 64 * 1024, "leaderLookup" + new Date().getTime());
  47. List<String> topics = Lists.newArrayList(topic);
  48. TopicMetadataRequest req = new TopicMetadataRequest(topics);
  49. kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
  50. List<TopicMetadata> metaData = resp.topicsMetadata();
  51. for (TopicMetadata item : metaData) {
  52. for (PartitionMetadata part : item.partitionsMetadata()) {
  53. map.put(part.partitionId(), part);
  54. }
  55. }
  56. } catch (Exception e) {
  57. LOGGER.error("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + ", ] Reason: " + e);
  58. } finally {
  59. if (consumer != null)
  60. consumer.close();
  61. }
  62. }
  63. return map;
  64. }
  65. public static Map<String, Long> monitor(List<String> brokers, List<String> topics) {
  66. if (brokers == null || brokers.isEmpty()) {
  67. return null;
  68. }
  69. if (topics == null || topics.isEmpty()) {
  70. return null;
  71. }
  72. Map<String, Long> map = Maps.newTreeMap();
  73. for (String topicName : topics) {
  74. Map<Integer, PartitionMetadata> metadata = findLeader(brokers, topicName);
  75. long size = 0L;
  76. for (Map.Entry<Integer, PartitionMetadata> entry : metadata.entrySet()) {
  77. int partition = entry.getKey();
  78. String leadBroker = entry.getValue().leader().host();
  79. String clientName = "Client_" + topicName + "_" + partition;
  80. SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000, 64 * 1024, clientName);
  81. long readOffset = getLastOffset(consumer, topicName, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
  82. size += readOffset;
  83. consumer.close();
  84. }
  85. map.put(topicName, size);
  86. }
  87. return map;
  88. }
  89. }

测试代码:

  1. private final String topics = "a,b,c,d,e,f";
  2. private final String server = "1.1.1.11:92";
  3. @Test
  4. public void monitor() {
  5. Map<String, Long> monitor = KafkaMonitorTools.monitor(Lists.newArrayList(server), Lists.newArrayList(topics.split(",")));
  6. monitor.forEach((k, v)-> System.out.println("topic:" + k + " \tSummed Recent Offsets:" + v));
  7. }

使用的卡夫卡版本的截图

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/712327
推荐阅读
相关标签
  

闽ICP备14008679号