当前位置:   article > 正文

confluent-kafka 和kafka-python操作kafka,并封装成一个类_python kafka封装

python kafka封装

为了向Kafka集群生产和消费消息,我们可以使用confluent-kafka库,它是Confluent为Python提供的官方Kafka客户端。以下是一个简化的示例,展示如何将Kafka的生产者和消费者操作封装到一个类中:

首先,确保你已经安装了所需的库:

pip install confluent-kafka

然后,你可以使用以下代码:

  1. from confluent_kafka import Producer, Consumer, KafkaError
  2. class KafkaManager:
  3. def __init__(self, bootstrap_servers):
  4. self.bootstrap_servers = bootstrap_servers
  5. def produce(self, topic, key, value):
  6. """生产消息到Kafka"""
  7. p = Producer({'bootstrap.servers': self.bootstrap_servers})
  8. def delivery_report(err, msg):
  9. """Called once for each message produced to indicate delivery result."""
  10. if err is not None:
  11. print('Message delivery failed: {}'.format(err))
  12. else:
  13. print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
  14. p.produce(topic, key=key, value=value, callback=delivery_report)
  15. p.flush()
  16. def consume(self, topic, group_id, timeout=1.0):
  17. """从Kafka消费消息"""
  18. c = Consumer({
  19. 'bootstrap.servers': self.bootstrap_servers,
  20. 'group.id': group_id,
  21. 'auto.offset.reset': 'earliest'
  22. })
  23. c.subscribe([topic])
  24. while True:
  25. msg = c.poll(timeout)
  26. if msg is None:
  27. continue
  28. if msg.error():
  29. if msg.error().code() == KafkaError._PARTITION_EOF:
  30. print('Reached end of partition')
  31. else:
  32. print('Error while consuming message: {}'.format(msg.error()))
  33. else:
  34. print('Received message: {}'.format(msg.value().decode('utf-8')))
  35. c.close()
  36. # 使用示例
  37. if __name__ == "__main__":
  38. manager = KafkaManager('localhost:9092')
  39. # 生产消息
  40. manager.produce('test_topic', 'key1', 'value1')
  41. # 消费消息
  42. manager.consume('test_topic', 'test_group')
pip install kafka-python
  1. from kafka import KafkaProducer, KafkaConsumer
  2. class KafkaManager:
  3. def __init__(self, bootstrap_servers):
  4. self.bootstrap_servers = bootstrap_servers
  5. def produce(self, topic, key, value):
  6. """生产消息到Kafka"""
  7. producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers,
  8. key_serializer=str.encode,
  9. value_serializer=str.encode)
  10. producer.send(topic, key=key, value=value)
  11. producer.flush()
  12. producer.close()
  13. def consume(self, topic, group_id, timeout=10):
  14. """从Kafka消费消息"""
  15. consumer = KafkaConsumer(topic,
  16. bootstrap_servers=self.bootstrap_servers,
  17. group_id=group_id,
  18. auto_offset_reset='earliest',
  19. key_deserializer=bytes.decode,
  20. value_deserializer=bytes.decode)
  21. for message in consumer:
  22. print(f"Received message: {message.value}")
  23. consumer.close()
  24. # 使用示例
  25. if __name__ == "__main__":
  26. manager = KafkaManager('localhost:9092')
  27. # 生产消息
  28. manager.produce('test_topic', 'key1', 'value1')
  29. # 消费消息
  30. manager.consume('test_topic', 'test_group')

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/543425
推荐阅读
相关标签
  

闽ICP备14008679号