赞
踩
Kafka的生产者(Producer)和消费者(Consumer)的关系,可以通过一个餐厅的例子来形象地说明。
想象一个忙碌的餐厅,这里有:
在这个餐厅中,有时候会有特别多的订单,厨师需要快速高效地准备菜肴。每当一道菜准备好,他们就会把它放到对应的部分在服务台上。服务台非常长,可以容纳很多菜肴,让不同的服务员能够同时服务多个顾客,提高效率。
假设一天晚上,餐厅接到了一个大型宴会的预订,需要同时准备多道菜。这时,厨师们(Producers)开始忙碌起来,每准备好一道菜,就会放到服务台(Topic)的指定位置(Partition)。服务员们(Consumers)各自负责一部分服务台,快速地将菜肴送到顾客手中。
在这个过程中,如果某一部分的菜准备得特别快,服务台上的这一部分就会堆积更多的菜肴。负责这一部分的服务员需要加快速度,以确保所有的菜肴都能及时送出。这就像在Kafka中,如果某个Partition的消息积压,负责这个Partition的消费者就需要更快地处理消息,以防止延迟。
通过这个例子,我们可以看到,Kafka的Producer和Consumer之间是如何通过Topic(服务台)和Partition(服务台的不同部分)协作的,以实现高效、可靠的消息处理。
from kafka import KafkaProducer, KafkaConsumer from kafka.errors import kafka_errors import traceback import json import time import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') def process(): # Kafka配置,需自行修改 bootstrap_servers = ['ip:port'] producer_topic = 'XXX_topic' # Kafka生产者 producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda m: json.dumps(m).encode('utf-8'), api_version=(1,0,0) ) data = { "task_id": 1, "image_path": "XXX", "video_path": "XXX", "guidence_text": "XXX", } # Kafka请求监听 try: res = data # 发送结果到Kafka producer.send(producer_topic, res) logging.info(f"send data to {producer_topic}") time.sleep(3) except Exception as e: # 记录错误日志 logging.error(f"Error processing kafka request: {e}") if __name__ == "__main__": process()
from kafka import KafkaConsumer import json import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') def consume_messages(): # Kafka配置 bootstrap_servers = ['ip:port'] consumer_topic = 'XXX' consumer_group = 'XXX' # Kafka消费者 consumer = KafkaConsumer( consumer_topic, bootstrap_servers=bootstrap_servers, group_id=consumer_group, # auto_offset_reset='earliest', # 从最早的消息开始读取 auto_offset_reset= "latest", value_deserializer=lambda m: json.loads(m.decode('utf-8')) # 解码JSON格式的消息 ) logging.info(f"Started consuming messages from {consumer_topic}") # 消费消息 try: for message in consumer: msg = message.value logging.info(f"Received message: {msg}") print(f"msg:{msg}") except KeyboardInterrupt: logging.info("Stopping consumer...") except Exception as e: logging.error(f"Error while consuming messages: {e}") finally: consumer.close() if __name__ == "__main__": consume_messages()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。