赞
踩
为了向Kafka集群生产和消费消息,我们可以使用confluent-kafka
库,它是Confluent为Python提供的官方Kafka客户端。以下是一个简化的示例,展示如何将Kafka的生产者和消费者操作封装到一个类中:
首先,确保你已经安装了所需的库:
pip install confluent-kafka
然后,你可以使用以下代码:
- from confluent_kafka import Producer, Consumer, KafkaError
-
- class KafkaManager:
- def __init__(self, bootstrap_servers):
- self.bootstrap_servers = bootstrap_servers
-
- def produce(self, topic, key, value):
- """生产消息到Kafka"""
- p = Producer({'bootstrap.servers': self.bootstrap_servers})
-
- def delivery_report(err, msg):
- """Called once for each message produced to indicate delivery result."""
- if err is not None:
- print('Message delivery failed: {}'.format(err))
- else:
- print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
-
- p.produce(topic, key=key, value=value, callback=delivery_report)
- p.flush()
-
- def consume(self, topic, group_id, timeout=1.0):
- """从Kafka消费消息"""
- c = Consumer({
- 'bootstrap.servers': self.bootstrap_servers,
- 'group.id': group_id,
- 'auto.offset.reset': 'earliest'
- })
-
- c.subscribe([topic])
-
- while True:
- msg = c.poll(timeout)
- if msg is None:
- continue
- if msg.error():
- if msg.error().code() == KafkaError._PARTITION_EOF:
- print('Reached end of partition')
- else:
- print('Error while consuming message: {}'.format(msg.error()))
- else:
- print('Received message: {}'.format(msg.value().decode('utf-8')))
-
- c.close()
-
- # 使用示例
- if __name__ == "__main__":
- manager = KafkaManager('localhost:9092')
-
- # 生产消息
- manager.produce('test_topic', 'key1', 'value1')
-
- # 消费消息
- manager.consume('test_topic', 'test_group')
-

pip install kafka-python
- from kafka import KafkaProducer, KafkaConsumer
-
- class KafkaManager:
- def __init__(self, bootstrap_servers):
- self.bootstrap_servers = bootstrap_servers
-
- def produce(self, topic, key, value):
- """生产消息到Kafka"""
- producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers,
- key_serializer=str.encode,
- value_serializer=str.encode)
-
- producer.send(topic, key=key, value=value)
- producer.flush()
- producer.close()
-
- def consume(self, topic, group_id, timeout=10):
- """从Kafka消费消息"""
- consumer = KafkaConsumer(topic,
- bootstrap_servers=self.bootstrap_servers,
- group_id=group_id,
- auto_offset_reset='earliest',
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode)
-
- for message in consumer:
- print(f"Received message: {message.value}")
-
- consumer.close()
-
- # 使用示例
- if __name__ == "__main__":
- manager = KafkaManager('localhost:9092')
-
- # 生产消息
- manager.produce('test_topic', 'key1', 'value1')
-
- # 消费消息
- manager.consume('test_topic', 'test_group')

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。