当前位置:   article > 正文

【python】【multiprocessing】【Pool、pool.Pool、pool.ThreadPool】apply 和apply_async多进程有关时间的比较分析_multiprocessing pool和apply

multiprocessing pool和apply

every blog every motto: Light tomorrow with today.

0. 0 前言

因为最近在写有关多进程返回值的问题,涉及到这方面,索性进行简单的小结
说明: 其中有关函数计时用到了装饰器,可参考装饰器及文末的参考文章


说明:

  1. 下表为对同一段程序进行10次运行以后的平均时间(单位:秒)。

  2. 多进程方法均在multiprocessing中,

方法Poolpool.Poolpool.ThreadPool
apply3.7223.7741.748
apply_async1.8681.9260.863

注:

  • 未使用多进程时间为0.329(非平均,仅一次),并不能说明使用多进程使程序运行更慢,这应该与我的测试代码有关,为使文章完整,附上未使用多进程的测试时间。
  • 读者仅需比较不同进程方法下的apply和apply_async就可。

0.1 补充

补充时间:2020.8.10.16:47
在实际运行用中发现有意思的现象,准确说和本文得出的结论恰恰相反,遂记之。

方法Poolpool.Poolpool.ThreadPool
apply98~99~98~
apply_async33~34~98~

说明:

  • 实际中,反而是Pool和pool.Pool的apply_async方法更快
  • 上述时间非完全准确时间,细微差距读者不必深究,“~”表明大约之意。

同样的是,又同段程序测试了如下多进程方法:
时间约为33秒。

process_li = []
# 1. 数据处理,及保存
for file in files:

    # 多进程
    t = Process(target=preprocessing_data, args=(file,))
    t.start()
    process_li.append(t)
for i in process_li:
    i.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

0.2 补充

补充时间:2020.12.20
直接上结果:

方法Poolpool.Poolpool.ThreadPool
apply30/29/2638/28/2622/21/22
apply_async22/23/2221/59/2010/13/15

说明:

  1. 所要进行多进程测试的程序,一段对图片进行数据增强(深度学习中对图片扩充的一种方法,包含但不限于:旋转、翻转、裁剪等后 保存图片
  2. 未使用多进程的时间为:20/37/26/22
  3. pool.ThreadPool下的apply_async速度最快,为此多测了几次时间分别为:18/14/20/10

1. 正文

1.0 未使用多进程

import time

loop_number = 30000


def count_time(func):
    """装饰器:计算程序运行时间"""

    def wrapper(*args, **kwargs):
        t1 = time.time()
        func(*args, **kwargs)
        t2 = time.time()

        # 计算时间
        time_consumed = t2 - t1
        print('{}函数一共花费了{}秒'.format(func.__name__, time_consumed))

    return wrapper

def fun(k):
    """被测试函数"""
    print('-----fun函数内部,参数为{}----'.format(k))
    m = k + 10
    return m

@count_time
def call_fun():
    """没有使用多线程的情况"""
    number = 0

    for i in range(loop_number):
        print('number:{}'.format(number))
        number = fun(number)

def main():
    """主程序"""

    # 1. 没有使用多线程
    call_fun()

if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

1.1 multiprocessing.Pool下的两种方法

导入模块

from multiprocessing import Pool
  • 1

1.1.1 apply

描述: apply方法是阻塞的,即等待子进程执行完毕后,再执行下一个子进程。


def fun(k):
    """被测试函数"""
    print('-----fun函数内部,参数为{}----'.format(k))
    m = k + 10
    return m  

@count_time
def my_process():
    """多进程"""

    # 方法一:apply/apply_async
    pool = Pool(4)  # 创建4个进程
    k = 0
    for i in range(loop_number):
        pool.apply(fun, args=(k,))

def main():
    """主程序"""

    # 3. 使用多进程
    my_process()


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

结果:
在这里插入图片描述
分别记录10次的时间: 3.717 ,3.689,3.716,3.725,3.653,3.696,3.841,3.698,3.672,3.808
平均时间为:3.722

1.1.2 apply_async

描述: apply_async是异步非阻塞的,即不用等待当前进程执行完毕,随时根据系统调度来进行进程切换。

import multiprocessing

def fun(k):
    """被测试函数"""
    print('-----fun函数内部,参数为{}----'.format(k))
    m = k + 10
    return m  

@count_time
def my_process():
    """多进程"""

    # 方法一:apply/apply_async
    pool = Pool(4)  # 创建4个进程
    k = 0
    for i in range(loop_number):
        pool.apply_async(fun, args=(k,))
    # ----------------------------
    # apply_async为非阻塞式,要加close和join   
    # 行号:007
    pool.close()
    pool.join()
    # ------------------------------
	print('---888---') # 查看主进程运行情况
	
def main():
    """主程序"""

    # 3. 使用多进程
    my_process()


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

结果:
在这里插入图片描述

  1. 代码中 行号:007 注释时:
    分别记录10次的时间: 0.518,0.515,0.498,0.549,0.490,0.510,0.493,0.511,0.493,0.495
    平均时间: 0.507
  2. 代码中 行号:007 未注释时:
    分别记录10次的时间: 1.857,1.798,1.791,1.887,2.004,1.793,1.867,1.982,1.910,1.790
    平均时间: 1.868
    说明:
  • 第1种情况为错误情况,因为未加colse和join,子进程还没跑完就运行后面的,测试出来的时间也是不准确的(程序还没运行完,提前计算了时间),上面的”----888----会提前打印出来,可以佐证这一说法。
  • 上面第2中情况应该为正确情况,要等子进程跑完,主进程才能运行后面的。具体参考文献3

小结:

  1. apply是阻塞式,即:一个子进程结束以后才能进行下一个,等到所有进程结束才切换到主进程,运行剩余部分。可以理解为串行。
  2. 在主进程和多个子进程之间切换,为了防止子进程还没运行完,主进程已经运行完的情况,需要将代码中“007”的注释部分取消注释,即上面的第2种情况。

1.2 multiprocessing.pool下的两种方法

1.2.1 Pool

导入模块

from multiprocessing.pool import Pool
  • 1
1.2.1.1 apply

其余代码一样,为了查看主进程的运行效果,加了一行打印“----888----",位置位于:

@count_time
def my_process():
    """多进程"""

    # 方法一:apply/apply_async
    pool =Pool(4)  # 创建4个进程
    k = 0
    for i in range(loop_number):
        pool.apply(fun, args=(k,))
    # pool.close()
    # pool.join()
    print('---888---')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

说明:因为apply是阻塞式,不需要加以下代码

pool.close()
pool.join()
  • 1
  • 2

结果:
在这里插入图片描述

分别记录10次的时间: 4.024,3.772,3.617,3.673,3.906,3.890,3.712,3.719,3.668,3.838
平均时间: 3.774

1.2.1.2 apply_async

说明: apply_async为非阻塞式,需要加close和join

@count_time
def my_process():
    """多进程"""

    # 方法一:apply/apply_async
    pool =Pool(4)  # 创建4个进程
    k = 0
    for i in range(loop_number):
        pool.apply_async(fun, args=(k,))
    pool.close()
    pool.join()
    print('---888---')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

结果:
在这里插入图片描述
分别记录10次的时间: 1.866,2.082,1.801,1.858,2.115,1.846,1.857,2.088,1.880,1.868
平均时间: 1.926

1.2.2 ThreadPool

仅导入模块和创建进程部分不同,为了方便读者进行测试,此处贴完整代码

from multiprocessing.pool import ThreadPool
  • 1
1.2.2.1 apply
def fun(k):
    """被测试函数"""
    print('-----fun函数内部,参数为{}----'.format(k))
    m = k + 10
    return m
 
 @count_time
def my_process():
    """多进程"""

    # 方法一:apply/apply_async
    pool =ThreadPool(4)  # 创建4个进程
    k = 0
    for i in range(loop_number):
        pool.apply(fun, args=(k,))

    print('---888---')
   
def main():
	"""主程序"""
		
	# 3. 使用多进程
	my_process()


if __name__ == '__main__':
 main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

结果:
在这里插入图片描述
分别记录10次的时间: 1.715,1.716,1.715,1.714,1.839,1.815,1.714,1.715,1.826,1.713
平均时间: 1.748

1.2.2.2 apply_async
def fun(k):
    """被测试函数"""
    print('-----fun函数内部,参数为{}----'.format(k))
    m = k + 10
    return m
 
 @count_time
def my_process():
    """多进程"""

    # 方法一:apply/apply_async
    pool =ThreadPool(4)  # 创建4个进程
    k = 0
    for i in range(loop_number):
        pool.apply_async(fun, args=(k,))
    pool.close()
    pool.join()
    print('---888---')
   
def main():
	"""主程序"""
		
	# 3. 使用多进程
	my_process()


if __name__ == '__main__':
 main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

结果:
在这里插入图片描述
分别记录10次的时间: 0.815,0.818,0.927,0.838,0.923,0.918,0.923,0.824,0.817,0.824
平均时间: 0.863

1.3 分析

多进程方法均在multiprocessing中,

方法Poolpool.Poolpool.ThreadPool
apply3.7223.7741.748
apply_async1.8681.9260.863

由以上表格发现,运行时间相差2倍

  1. 阻塞式apply: 只有等子进程结束后,才能切换到另外一个子进程,直到所有子进程运行完毕,然后切换到主进程。
  2. 异步非阻塞式apply_async: 在多个子进程之间来回切换,为防止子进程还没运行完,主进程就运行了后面的代码,需要加close和join
  3. multiprocessing下的Pool和pool.Pool方法二者并没有区别,细微差别可能和计算机本身原因有关。
  4. multiprocessing下的pool.ThreadPool时间最短,具体原因暂时未知。

建议:使用multiprocessing下的pool.ThreadPool的apply_async方法


知识无价,如果帮助到你,不妨请喝一杯奶茶~~

在这里插入图片描述在这里插入图片描述

参考文献

[1] https://blog.csdn.net/weixin_39190382/article/details/107107980
[2] https://blog.csdn.net/htuhxf/article/details/101221768
[3] https://www.cnblogs.com/wr13640959765/p/9428245.html
[4] https://blog.csdn.net/weixin_43283397/article/details/104294890

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/151523
推荐阅读
相关标签
  

闽ICP备14008679号