需求:提供一个能够监控 kafka 集群的环境下消费组的积压信息。当某个消费组积压的信息超过设定的阈值的时候,程序主动告警提醒。
<!-- 解决: java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version></version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version></version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency>
// 获取该集群下的所有主题
Set<String> topics = this.getAllTopic();
for (String topic : topics) {
// 查询该主题下绑定的消费组id
Set<String> groupIds = this.getAllGroupsByTopic(topic);
// 查询该主题下具体消费组的信息
for (String groupId : groupIds) {
this.getGroupInfoFromTopic(url, port, topic, groupId,list);
/** * 获取kafka集群下的主题 * 注意:AdminClient是org.apache.kafka.clients.admin包下的 */ public Set<String> getAllTopic(){ Properties props = new Properties(); props.put("bootstrap.servers", servers); org.apache.kafka.clients.admin.AdminClient adminClient = org.apache.kafka.clients.admin.AdminClient.create(props); ListTopicsResult listTopicsResult = adminClient.listTopics(); Set<String> topics = new HashSet<>(); try { topics = listTopicsResult.names().get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return topics; } /** * 获取指定主题下的消费组【group_id】 * @param topic * @return */ public Set<String> getAllGroupsByTopic(String topic){ String host = url + ":" + port; Set<String> groups; AdminClient client = AdminClient.createSimplePlaintext(host); try { Seq<GroupOverview> groupOverviewSeq = client.listAllGroupsFlattened().toSeq(); List<GroupOverview> allGroups = JavaConversions.seqAsJavaList(groupOverviewSeq); groups = new HashSet<>(); for (GroupOverview overview: allGroups) { String groupID = overview.groupId(); scala.collection.immutable.Map<TopicPartition, Object> map = client.listGroupOffsets(groupID); Map<TopicPartition, Object> offsets = JavaConversions.mapAsJavaMap(map); Set<TopicPartition> partitions = offsets.keySet(); for (TopicPartition tp: partitions) { if (tp.topic().equals(topic)) { groups.add(groupID); } } } } finally { client.close(); } return groups; } /** * @param url 集群服务器地址 * @param port 端口 * @param topic 主题 * @param groupId 消费组id * @param list 结果集合 */ private void getGroupInfoFromTopic(String url, Integer port, String topic, String groupId, List<KafkaInfoDto> list) { long sum = 0L; long sumOffset = 0L; long lag = 0L; //获取每个partation的元数据信息 TreeMap<Integer, PartitionMetadata> leader = this.findLeader(url, port, topic); List<TopicAndPartition> partitions = new ArrayList<>(); for (Map.Entry<Integer, PartitionMetadata> entry : leader.entrySet()) { int partition = entry.getKey(); TopicAndPartition testPartition = new TopicAndPartition(topic, partition); partitions.add(testPartition); } BlockingChannel channel = new BlockingChannel(url, port, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000); // 获取具体的kafka消费实例信息 String server = url + ":" + port; KafkaConsumer<String, String> kafkaConsumer = this.getKafkaConsumer(server,groupId,topic); // 遍历 for (Map.Entry<Integer, PartitionMetadata> entry : leader.entrySet()) { KafkaInfoDto kafkaInfoDto = new KafkaInfoDto(); Integer partition = entry.getKey(); channel.connect(); OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId, partitions, (short) 1, 0, null); channel.send(fetchRequest.underlying()); OffsetAndMetadata committed = kafkaConsumer.committed(new TopicPartition(topic, partition)); long partitionOffset = committed.offset(); sumOffset += partitionOffset; String leadUrl = entry.getValue().leader().host(); String clientName = "Client_" + topic + "_" + partition; SimpleConsumer consumer = new SimpleConsumer(leadUrl, port, 100000, 64 * 1024, clientName); // 获取该消费者组每个分区最后提交的偏移量 long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); sum += readOffset; // 注意,得关闭不然会出现异常 consumer.close(); log.info("主题是:{},消费者组:{},积压的偏移量为: :{},分区为:{}",topic,groupId,lag,partition); lag = sum - sumOffset; kafkaInfoDto.setSumOffset(sumOffset); kafkaInfoDto.setSum(sum); kafkaInfoDto.setLag(lag); kafkaInfoDto.setGroupId(groupId); kafkaInfoDto.setTopic(topic); kafkaInfoDto.setPartition(partition); list.add(kafkaInfoDto); } } /** * 获取最主要的leader服务下的partation元数据信息 * * @param url 服务器 * @param port 端口号 * @param topic 主题名 * @return */ private TreeMap<Integer, PartitionMetadata> findLeader(String url, int port, String topic) { TreeMap<Integer, PartitionMetadata> map = new TreeMap<>(); SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(url, port, 100000, 64 * 1024, "leaderLookup" + new Date().getTime()); List<String> topics = Collections.singletonList(topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { map.put(part.partitionId(), part); } } } catch (Exception e) { System.out.println("Error communicating with url [" + url + "] to find Leader for [" + topic + ", ] Reason: " + e); } finally { if (consumer != null) consumer.close(); } return map; } /** * 获取该消费者组每个分区最后提交的偏移量 * * @param consumer 消费者组对象 * @param topic 主题 * @param partition 分区 * @param whichTime 最晚时间 * @param clientName 客户端名称 * @return 偏移量 */ private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { log.error("Error fetching data Offset Data the url. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } /** * 获取Kafka消费者实例 * * group 消费者组 * topic 主题名 * servers 服务器列表 * @return KafkaConsumer<String, String> */ private KafkaConsumer<String, String> getKafkaConsumer(String servers, String group, String topic){ Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("max.poll.records", 100); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList(topic)); return consumer; }
// 服务的地址【ip+port 可以在配置文件设置多组,达到集群效果】
private String servers;
// 服务地址 【可以在配置文件设置多组,达到集群效果】
private String url;
// 端口
private Integer port;
# kafka配置
# bootstrap-servers: xxx
bootstrap-servers: xxx
# 自定义属性
url: xxx
port: xxx
如果出现 <!-- java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V -->
