赞
踩
from confluent_kafka import Consumer
class Customer:
def __init__(self) -> None:
self.consumer = Consumer({'bootstrap.servers': 'localhost:9092','group.id':"customertest",
'enable.auto.commit': False,
"heartbeat.interval.ms":3000,
# 'session.timeout.ms': 30000,
# 'max.poll.interval.ms':30000,
# 'auto.offset.reset': 'latest'}
'auto.offset.reset': 'earliest',
'message.max.bytes': 8388608,
'compression.type': 'gzip'})
self.consumer.subscribe(['filelack'])
def getMessage(self):
try:
while 1:
part = self.consumer.assignment()
msg = self.consumer.poll(1)
if msg is None:
continue
else:
if not msg.error() is None:
print (msg.error())
else:
message = msg.value()
print(msg.partition(), msg.offset())
except Exception as e:
...
finally:
self.close()
def close(self):
self.consumer.close()
https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html#id1
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。