当前位置:   article > 正文

multiprocessing.pool详解

multiprocessing.pool

由于python有全局锁限制,如果想利用多核,就需要使用多进程模块,但该模块有很多坑,本篇文章记录一下其用法以及踩过的坑。

一、map、apply、apply_async对比

先贴一个对比图,引自multiprocessin.pool

                  | Multi-args   Concurrence    Blocking     Ordered-results
---------------------------------------------------------------------
Pool.map          | no           yes            yes          yes
Pool.apply        | yes          no             yes          yes
Pool.apply_async  | yes          yes            no           no
  • 1
  • 2
  • 3
  • 4
  • 5

Multi-args意思是task可否传入不同的function
Ordered-results意识是结果是否有序。

具体看下使用方法:

apply()
import multiprocessing
import os
import time,datetime

# task
def square(n):
    print(f'process id is {os.getpid()}')
    if n == 5:
        time.sleep(5)
    else:
    	time.sleep(1)
    return n*n

def _apply():
    pool = multiprocessing.Pool()
    for i in l:
        res = pool.apply(square, args=(i,))
        print(res)

if __name__ == '__main__':
    start_time = datetime.datetime.now()
    l = [5, 0, 1, 2, 3, 4]
    print(f'main process id is {os.getpid()}')
    _apply()
    end_time = datetime.datetime.now()
    print('用时: ',end_time-start_time)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

输出:

main process id is 585033
child process id is 585034
25
child process id is 585035
0
child process id is 585036
1
child process id is 585037
4
child process id is 585038
9
child process id is 585039
16
用时:  0:00:11.024689
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

整个过程整整用了11s,与顺序执行的时间差不多,且计算结果与传参顺序保持一致,于是我们可以得到结论:

  • pool.apply()是阻塞的,在所有子进程返回之前,会阻塞主进程
  • 多个子进程是顺序执行的

进一步,我们可以推出结论:

  • pool.apply()无法实现并发。原因就在于,在同一个时刻,只有一个子进程在真正跑任务。所以,这个函数实在是鸡肋,想不到在什么场景下会应用到它
apply_async()
def division(n):
    print(f'child process id is {os.getpid()}')
    time.sleep(1)
    res = 10/n
    return res
    
def _apply_async():
    # 必须close+join,否则主进程跑完了,子进程还没完,就会报错
    pool = multiprocessing.Pool()
    for i in l:
        # proc_lst.append(pool.apply_async(square, args=(i,)))
        pool.apply_async(division, args=(i,), callback=print)
    pool.close()
    pool.join()
    
  start_time = datetime.datetime.now()
    l = [5, 0, 1, 2, 3, 4]
    print(f'main process id is {os.getpid()}')
    # _apply()
    _apply_async()
    end_time = datetime.datetime.now()
    print('用时: ',end_time-start_time)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

输出:

main process id is 586731
child process id is 586732
child process id is 586733
child process id is 586734
child process id is 586735
child process id is 586736
child process id is 586737
10.0
2.0
5.0
3.3333333333333335
2.5
用时:  0:00:01.016798
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

乍一看,总的用时1s钟,说明确实实现了并发。仔细观察下会发现,l中一共是6个参数,但输出为什么少了一个结果?这就是apply_async()的坑所在,深入研究下发现该函数有以下特性:

  • 从名称可以看出来,是异步的。而所谓异步,比较的对象是主进程,即主进程无需等待子进程的结果,可以继续往下执行,该特性是通过将apply_async()函数设计为非阻塞实现的,当调用apply_async()时,立马返回一个子进程对象,此时子进程可能还没有真正跑完,但不影响主进程继续执行。
  • apply_async()中的callback参数表示的是,当子进程执行完毕后,自动调用apply_async()代表的函数,上面实例中是print,因此会将结果打印出来。从这个例子中也可以理解回调这个概念了吧。而如果没有显示地传递callback参数,想要得到结果怎么办?那就需要调用apply_async().get()了,但是该函数是阻塞的,即在子进程结束前会一直阻塞主进程,因此如果你想实现并发,最好是在所有子进程启动后,再去get结果。
  • pool.close()pool.join()有什么用?前者表示将进程池关闭(不接收新的进程,但原有进程不影响),后者表示阻塞等待所有子进程结束。为什么一定要join?正如前所述,apply_async()是非阻塞的,如果不join,有可能主进程跑完了子进程还没跑完,那那些子进程就无法回收了,程序会报错,所以一定要有join。有的小伙伴还有疑问,那为什么join之前一定要close?这个其实是标准写法,这俩一定要配合使用。
  • 与传参顺序相比,结果是无序的。
  • 最后一个问题,为什么上面例子中的结果中少了一个?仔细观察发现,少了0对应的结果,因为10/0非法会弹出异常。但为什么没看到报错呢?这就是其中一个坑,apply_async()函数起的子进程中的异常,主进程是无感的。所以,在调试代码时,不要看到没报错就觉得万事大吉,说不定有隐藏的坑等着你呢!
map()
def _map():
    pool = multiprocessing.Pool()
    res = pool.map(square, l)
    print(res)

if __name__ == '__main__':
    start_time = datetime.datetime.now()
    l = [5, 0, 1, 2, 3, 4]
    print(f'main process id is {os.getpid()}')
    # _apply()
    # _apply_async()
    _map()
    end_time = datetime.datetime.now()
    print('用时: ',end_time-start_time)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

输出:

main process id is 588059
child process id is 588060
child process id is 588061
child process id is 588062
child process id is 588063
child process id is 588064
child process id is 588065
[25, 0, 1, 4, 9, 16]
用时:  0:00:06.018487
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

用时6s左右,且结果是一次性出来的,可以得出以下结论:

  • map是一次性启动与可迭代对象数量相等的子进程,因此是可以实现并发的
  • 该函数是阻塞的,即要等到所有子进程全都执行完毕,主进程才能继续往下执行
  • 结果是有序的。

二、多进程共享数据之Manager

此Manager又是一个大坑,慎用!有时间再填坑

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/860725
推荐阅读
相关标签
  

闽ICP备14008679号