赞
踩
统计当前 TOPIC 中的消息总数。
def count_topic_messages(consumer: Consumer, topic_name: str) -> int:
"""统计当前 TOPIC 中的消息总数
Parameters
----------
consumer : Consumer
Kafka 消费者对象
topic_name : str
TOPIC 名称
Returns
-------
n_messages : int
当前 TOPIC 中的总记录数
"""
topics = consumer.list_topics(topic_name).topics # 获取 topic_name 对应的 TOPIC 列表
if topic_name not in topics:
return 0 # 如果 TOPIC 不存在,则视为只有 0 条记录
n_messages = 0
for partition in topics[topic_name].partitions:
topic_partition = TopicPartition(topic_name, partition)
start_offset, end_offset = consumer.get_watermark_offsets(topic_partition)
n_messages += end_offset - start_offset
return n_messages
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。