当前位置:   article > 正文

Python编程之多进程(multiprocessing)详解_multiprocessing.process

multiprocessing.process

引言

multiprocessing是一个用于产生多进程的包,与threading模块的API类似。multiprocessing既可以实现本地的多进程,也可以实现远程的多进程。通过使用多个子进程而非线程可以绕开Python的全局解释器锁(GIL),同时允许在多种系统平台使用。

1. Process 模块

1.1 Process介绍

Process模块是一个创建进程的模块,可以通过Process直接创建进程。

  1. multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  2. """
  3. 参数介绍:
  4. 1. group默认为None(目前未使用)
  5. 2. target代表调用对象,即子进程执行的任务
  6. 3. name为进程名称
  7. 4. args调用对象的位置参数元组,args=(value1, value2, ...)
  8. 5. kwargs调用对象的字典,kwargs={key1:value1, key2:value2, ...}
  9. 6. daemon表示进程是否为守护进程,布尔值
  10.     
  11. 方法介绍:
  12.   Process.start() 启动进程,并调用子进程中的run()方法
  13.   Process.run() 进程启动时运行的方法,在自定义时必须要实现该方法
  14.   Process.terminate() 强制终止进程,不进行清理操作,如果Process创建了子进程,会导致该进程变成僵尸进程
  15.   Process.join() 阻塞进程使主进程等待该进程终止
  16.   Process.kill() 与terminate()相同
  17.   Process.is_alive() 判断进程是否还存活,如果存活,返回True
  18.   Process.close() 关闭进程对象,并清理资源,如果进程仍在运行则返回错误
  19.   
  20. """

注意:

  • 在Windows中,由于没有fork(Linux中创建进程的机制),在创建进程的时候会import启动该文件,而在import文件的时候又会再次运行整个文件,如果把Process()放在 if __name__ == '__main__' 判断之外,则Process()在被import的时候也会被运行,导致无限递归创建子进程导致报错,所以在Windows系统下,必须把Process()放在 if __name__ == '__main__' 的判断保护之下。
  • 在子进程中不能使用input,因为输入台只显示在主进程中,故如果在子进程中使用input,会导致报错。

1.2 Process实例

  1. from multiprocessing import Process
  2. def main(name):
  3. print(f'{name}: Hello World')
  4. if __name__ == '__main__':
  5. # 创建子进程
  6. p = Process(target=main, args=('LovefishO',))
  7. # 开始进程
  8. p.start()
  9. # 阻塞进程
  10. p.join()
'
运行

1.3 Process类实现

  1. from multiprocessing import Process
  2. class NewProcess(Process):
  3. def __init__(self, name):
  4. # 执行父类的init()
  5. super().__init__()
  6. # 创建新参数
  7. self.name = name
  8. # 在自定义Process类时,必须实现run()方法
  9. def run(self):
  10. print(f'{self.name}: Hello World')
  11. if __name__ == '__main__':
  12. # 创建一个新的子进程,并传入参数
  13. np = NewProcess('LovefishO')
  14. # 开始子进程
  15. np.start()
  16. # 加入阻塞,保证主进程在子进程之后结束
  17. np.join()
  18. print('主进程结束')
  19. # LovefishO: Hello World
  20. # 主进程结束

1.4 守护进程

正常情况下,当子进程和主进程都结束时,程序才会结束。但是当我们需要在主进程结束时,由该主进程创建的子进程也必须跟着结束时,就需要使用守护进程。当一个子进程为守护进程时,在主进程结束时,该子进程也会跟着结束。

  1. from multiprocessing import Process
  2. def main(name):
  3. print(f'{name}: Hello World')
  4. if __name__ == '__main__':
  5. # 创建守护进程, 设置daemon = True
  6. p = Process(target=main, daemon=True, args=('LovefishO',))
  7. # 开始进程
  8. p.start()
  9. # 阻塞进程
  10. p.join()

2. Pool 模块

2.1 Pool介绍

Pool模块控制着一个进程池,池中是可以执行很多任务的进程

  1. multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)
  2. """
  3. 参数介绍:
  4. processes: 设置要使用的进程数量,如果 processes 为 None,则使用 os.cpu_count() 返回的值
  5. initializer: 是每个工作进程启动时要执行的可调用对象,默认为None
  6. maxtasksperchild: 工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,为了释放闲置资源
  7. context: 可被用于指定启动的工作进程的上下文
  8. """

2.2 Pool中分派任务的方式

  • apply(func[, args[, kwds]])方法是阻塞,意味着当前的进程没有执行完的话,后续的进程需要等待该进程执行结束才能执行,实际上该方法是串行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]])方法是异步非阻塞的,意味着不用等待当前进程执行完成,即可根据系统的调度切换进程,该方法是并行。

  • map(funciterable[, chunksize])方法将iterable对象分成一些块,作为单独的任务提交给进程池。 这些块的(近似)大小可以通过将chunksize设置为正整数来指定, 并且该方法是阻塞的。如果可迭代对象很多时,会消耗较大的内存,可以考虑使用imap或imap_unordered。

  • map_async(funciterable[, chunksize[, callback[, error_callback]]])方法是map的变种,是非阻塞的。

  • imap(funciterable[, chunksize])该方法和map一样,只不过该方法适用于对大量数据的遍历,返回的结果顺序和输入相同。

  • imap_unordered(funciterable[, chunksize])与imap()一样,只不过输出的顺序是任意的

2.3 Pool实例

  1. from multiprocessing import Pool
  2. def main(name, num):
  3. print(f'{num} {name}: Hello World')
  4. if __name__ == '__main__':
  5. # 创建进程池
  6. p = Pool()
  7. for i in range(5):
  8. p.apply(func=main, args=('LovefishO', i, ))
  9. # 关闭进程池
  10. p.close()
  11. # 阻塞进程, 等待子进程执行结束
  12. p.join()
  13. print('主进程结束')
  14. # 0 LovefishO: Hello World
  15. # 1 LovefishO: Hello World
  16. # 2 LovefishO: Hello World
  17. # 3 LovefishO: Hello World
  18. # 4 LovefishO: Hello World
  19. # 主进程结束
  20. Pool例子

3. Queue 模块

3.1 Queue介绍

 由于进程彼此之间互相隔离,要实现进程间通信,multiprocessing提供了两种形式:队列(queue)和管道。队列可以简单的理解为一种特殊的列表,可以设置固定的长度,从左边插入数据,从右边获取数据,并满足先进先出。并且队列时进程安全的,即同一时刻只有一个进程能够对队列进行操作。

  1. multiprocessing.Queue(maxsize)
  2. """
  3. 参数介绍:
  4. maxsize:设置队列长度
  5. 方法介绍:
  6. qsize():返回队列长度,该结果是不可靠的,因为在使用该方法时,队列中的数据仍在进行删除增加
  7. empty():如果队列为空,则返回True,反之亦然,该状态不可靠
  8. full():如果队列满了,则返回True,反之亦然,该状态不可靠
  9. put(obj[, block[, timeout]]):将obj放入队列
  10. get([block[, timeout]]):从队列中取出并返回对象
  11. close(): 当前进程不会在队列中放入对象
  12. """

3.2 Queue实例

  1. from multiprocessing import Process, Queue
  2. def product(queue, num):
  3. # 把obj插入队列
  4. queue.put(num)
  5. print(f'Product {num}')
  6. def consumer(queue):
  7. # 从队列中获取数据
  8. temp = queue.get()
  9. print(f'consumer consumed {temp} product')
  10. if __name__ == '__main__':
  11. # 创建队列
  12. q = Queue()
  13. # 生产商品
  14. for i in range(5):
  15. p1 = Process(target=product, args=(q, i, ))
  16. p1.start()
  17. # 消费生产的商品
  18. for i in range(5):
  19. p2 = Process(target=consumer, args=(q,))
  20. p2.start()
  21. Queue例子

注意:

  • 在使用队列时(Queue)如果要使用进程池则不能使用multiprocessing.Pool,而是应用使用multiprocessing.Manager().Pool()

3.3 Queue + Pool实例

  1. import multiprocessing
  2. def product(queue, num):
  3. # 把obj插入队列
  4. queue.put(num)
  5. print(f'Product {num}')
  6. def consumer(queue):
  7. # 从队列中获取obj
  8. num = queue.get()
  9. print(f'consumer consumed {num} product')
  10. if __name__ == '__main__':
  11. # 创建队列
  12. q = multiprocessing.Manager().Queue()
  13. # 创建进程池
  14. p = multiprocessing.Pool()
  15. # 生产商品
  16. for i in range(5):
  17. p.apply(func=product, args=(q, i,))
  18. # 消费生产的商品
  19. for i in range(5):
  20. p.apply(func=consumer, args=(q,))
  21. # 关闭进程池
  22. p.close()
  23. # 阻塞进程
  24. p.join()
  25. print('主进程结束')
  26. # Product 0
  27. # Product 1
  28. # Product 2
  29. # Product 3
  30. # Product 4
  31. # consumer consumed 0 product
  32. # consumer consumed 1 product
  33. # consumer consumed 2 product
  34. # consumer consumed 3 product
  35. # consumer consumed 4 product
  36. # 主进程结束
  37. Queue + Pool例子

4. Reference

Multiprocessing官方文档

参考博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/860623
推荐阅读
相关标签
  

闽ICP备14008679号