赞
踩
查询是否有未消费的消息的方式有三种:
kafka的bin目录下,执行命令:
kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group mylcy
可以看到当前的消费进度(CURRENT-OFFSET)、消息进度(LOG-END-OFFSET)、落后量(LAG):
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); try (AdminClient client = AdminClient.create(props)) { ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID); try { Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet()); return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 处理中断异常 // ... return Collections.emptyMap(); } catch (ExecutionException e) { // 处理ExecutionException // ... return Collections.emptyMap(); } catch (TimeoutException e) { throw new TimeoutException("Timed out when getting lag for consumer group " + groupID); } } }
JConsole 工具监控此 JMX 指标的截图。从这张图片中,可以看到,client-id 为 consumer-1 的消费者在给定的测量周期内最大的 Lag 值为 714202,最小的 Lead 值是 83,这说明此消费者有很大的消费滞后性。
监控到 Lag 越来越大,说明消费者程序变得越来越慢了,至少是追不上生产者程序了。
一旦你监测到 Lead 越来越小,甚至是快接近于 0 了,你就一定要小心了,这可能预示着消费者端要丢消息了(因为消息只保存七天,说明消息已经接近7天未被消费)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。