当前位置:   article > 正文

multiprocessing.Queue 多个进程生产和多个进程消费怎么处理

multiprocessing.Queue 多个进程生产和多个进程消费怎么处理

在这个示例中,我们创建了一个队列 q,并通过 multiprocessing.Manager().Queue() 来确保队列可以在多个进程之间共享。我们定义了 consumerproducer 函数,分别用于从队列中获取数据和向队列中放入数据。

在主进程中,我们创建了多个消费者和生产者进程,并将它们启动。生产者进程将数据放入队列,消费者进程从队列中取出数据并处理。生产者进程完成后,我们向队列发送 None 作为结束信号,告知消费者没有更多数据。每个消费者在接收到 None 后会停止工作。

注意,我们在 consumer 函数中使用了 queue.task_done() 来标记任务完成。这是可选的,但在使用 join() 方法等待队列中的所有任务完成时很有用。

这个模式允许多个生产者并发地向队列中放入数据,同时多个消费者并发地从队列中取出并处理数据,直到所有生产者完成生产,消费者接收到结束信号。

当使用 multiprocessing.Queue 进行多个生产者和多个消费者的场景时,队列可以很好地协调这些进程。以下是一个示例,展示了如何创建多个生产者和多个消费者,它们共享同一个队列:

  1. # encoding:utf-8
  2. import multiprocessing
  3. import time
  4. import random
  5. def consumer(queue):
  6. """
  7. 作者:阙辉
  8. """
  9. while True:
  10. item = queue.get() # 从队列中获取数据
  11. if item is None:
  12. print(f"Consumer {multiprocessing.current_process().name} received end signal.")
  13. queue.task_done() # 标记任务完成
  14. break
  15. print(f"Consumer {multiprocessing.current_process().name} received {item}")
  16. time.sleep(random.uniform(0.5, 1.5)) # 模拟处理时间
  17. queue.task_done() # 标记任务完成
  18. def producer(queue, items):
  19. """
  20. 作者:阙辉
  21. """
  22. for item in items:
  23. print(f"Producer {multiprocessing.current_process().name} sent {item}")
  24. queue.put(item)
  25. time.sleep(random.uniform(0.5, 1.5)) # 模拟生产时间
  26. if __name__ == '__main__':
  27. manager = multiprocessing.Manager()
  28. q = manager.Queue() # 使用 Manager.Queue 来支持多个生产者和消费者模式
  29. # 创建多个消费者进程
  30. consumers = [multiprocessing.Process(target=consumer, args=(q,)) for _ in range(4)]
  31. # 创建多个生产者进程
  32. producers = [multiprocessing.Process(target=producer, args=(q, range(20))) for _ in range(4)]
  33. # 启动所有消费者进程
  34. for c in consumers:
  35. c.start()
  36. # 启动所有生产者进程
  37. for p in producers:
  38. p.start()
  39. # 等待所有生产者完成
  40. for p in producers:
  41. p.join()
  42. # 发送结束信号,告知所有消费者没有更多数据
  43. for _ in consumers:
  44. q.put(None)
  45. # 等待所有消费者完成
  46. for c in consumers:
  47. c.join()
  48. print("All tasks completed.")

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/760408
推荐阅读
相关标签
  

闽ICP备14008679号