赞
踩
在这个示例中,我们创建了一个队列 q
,并通过 multiprocessing.Manager().Queue()
来确保队列可以在多个进程之间共享。我们定义了 consumer
和 producer
函数,分别用于从队列中获取数据和向队列中放入数据。
在主进程中,我们创建了多个消费者和生产者进程,并将它们启动。生产者进程将数据放入队列,消费者进程从队列中取出数据并处理。生产者进程完成后,我们向队列发送 None
作为结束信号,告知消费者没有更多数据。每个消费者在接收到 None
后会停止工作。
注意,我们在 consumer
函数中使用了 queue.task_done()
来标记任务完成。这是可选的,但在使用 join()
方法等待队列中的所有任务完成时很有用。
这个模式允许多个生产者并发地向队列中放入数据,同时多个消费者并发地从队列中取出并处理数据,直到所有生产者完成生产,消费者接收到结束信号。
当使用 multiprocessing.Queue
进行多个生产者和多个消费者的场景时,队列可以很好地协调这些进程。以下是一个示例,展示了如何创建多个生产者和多个消费者,它们共享同一个队列:
- # encoding:utf-8
- import multiprocessing
- import time
- import random
-
- def consumer(queue):
- """
- 作者:阙辉
- """
- while True:
- item = queue.get() # 从队列中获取数据
- if item is None:
- print(f"Consumer {multiprocessing.current_process().name} received end signal.")
- queue.task_done() # 标记任务完成
- break
- print(f"Consumer {multiprocessing.current_process().name} received {item}")
- time.sleep(random.uniform(0.5, 1.5)) # 模拟处理时间
- queue.task_done() # 标记任务完成
-
- def producer(queue, items):
- """
- 作者:阙辉
- """
- for item in items:
- print(f"Producer {multiprocessing.current_process().name} sent {item}")
- queue.put(item)
- time.sleep(random.uniform(0.5, 1.5)) # 模拟生产时间
-
- if __name__ == '__main__':
- manager = multiprocessing.Manager()
- q = manager.Queue() # 使用 Manager.Queue 来支持多个生产者和消费者模式
-
- # 创建多个消费者进程
- consumers = [multiprocessing.Process(target=consumer, args=(q,)) for _ in range(4)]
-
- # 创建多个生产者进程
- producers = [multiprocessing.Process(target=producer, args=(q, range(20))) for _ in range(4)]
-
- # 启动所有消费者进程
- for c in consumers:
- c.start()
-
- # 启动所有生产者进程
- for p in producers:
- p.start()
-
- # 等待所有生产者完成
- for p in producers:
- p.join()
-
- # 发送结束信号,告知所有消费者没有更多数据
- for _ in consumers:
- q.put(None)
-
- # 等待所有消费者完成
- for c in consumers:
- c.join()
-
- print("All tasks completed.")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。