赞
踩
""" 测试multiprocessing模块中 Pool和dummy.Pool 的4个方法的使用区别和返回值 阻塞方法: apply() 、 map() 非阻塞方法: apply_async() 、 imap() 进程池映射模式:map()、 imap() """ import random import time import os from multiprocessing import Pool, Process from multiprocessing.dummy import Pool as dummy_Pool # 任务函数 def exe_func(msg): start_time = time.time() print(f"{msg} 任务开始执行, 子进程号为{os.getpid()}") time.sleep(random.random() * 2) end_time = time.time() print(f"{msg} 执行完毕, 子进程号为{os.getpid()} --执行时间{end_time - start_time}", "") return msg # 执行任务的参数列表 msgs = [f"no.{i}" for i in range(1, 11)] # Pool测试函数 def test_Pool(pool_obj, model, main_process_delay=True, wait_subprocess=True): print("-------- multiprocessing 模块中的Pool 和 dummy.Pool 对比测试---------") pool = pool_obj(3) # apply() : def test_apply(): print("****** apply()测试 *******") for msg in msgs: print(f"分派任务{msg}") result = pool.apply(exe_func, (msg,)) print(f">>>>>>>>>>> 返回值为: {result} <<<<<<<<<<") # apply_async() def test_apply_async(): print("****** apply_async()测试 *******") for msg in msgs: print(f"分派任务{msg}") result = pool.apply_async(exe_func, (msg,)) print(f">>>>>>>>>>> 返回值为: {result} <<<<<<<<<<") # map() def test_map(): print("****** map()测试 *******") print(f"分派任务{msgs}") result = pool.map(exe_func, msgs) print(f">>>>>>>>>>> 返回值为: {result} <<<<<<<<<<") # imap() def test_imap(): print("****** imap()测试 *******") print(f"分派任务{msgs}") result = pool.imap(exe_func, msgs) print(f">>>>>>>>>>> 返回值为: {result} <<<<<<<<<<") test_mode = { "apply": test_apply, "apply_async": test_apply_async, "map": test_map, "imap": test_imap, } start_time = time.time() test_mode[model]() print("----- start : 主进程任务已分派------") # 设置主进程延时 if main_process_delay: print(f"【任务分派完成,主进程{os.getpid()}延时10秒,查看子进程是否等待主进程!】") time.sleep(10) # 利用join() 让主进程等待子进程执行完毕 if wait_subprocess: print("【主进程开始等待子进程执行完毕】") pool.close() pool.join() print("【主进程结束等待】") end_time = time.time() print(f"【主进程执行时间为:{end_time - start_time} 秒】") print("------ end :主进程结束------") if __name__ == '__main__': """ 进程池4个工作模式: apply、apply_async、map、imap """ # ================================ 测试Pool ================================ # apply模式: 等待执行的任务依次按顺序执行,进程任务非并行执行 # 关闭主进程延时和主进程join等待;进程池中的进程依次执行,子进程执行完毕才会继续执行主进程 test_Pool(Pool, "apply", main_process_delay=False, wait_subprocess=False) # apply_async模式: 进程池中的进程和主进程并行执行, 进程间相互独立,不会相互等待(加入join例外) # 开启主进程延时,子进程并不会等待主进程 # test_Pool(Pool, "apply_async", main_process_delay=True) # 关闭主进程延时和主进程join等待;主进程不会等待子进程 # test_Pool(Pool, "apply_async", main_process_delay=False, wait_subprocess=False) # map模式: 可以同时分配多个任务; 执行方式与 apply模式 类似,等待执行的任务依次按顺序执行, 已经在进程池中的任务并行执行; # 关闭主进程延时和主进程join等待;进程池中的进程依次执行,子进程执行完毕才会继续执行主进程 # test_Pool(Pool, "map", main_process_delay=False, wait_subprocess=False) # imap模式: 可以同时分配多个任务; 执行方式与 apply_async模式 类似,进程池中的进程和主进程并行执行, 进程间相互独立,不会相互等待(加入join例外) # 开启主进程延时,子进程并不会等待主进程 # test_Pool(Pool, "imap", main_process_delay=True) # 关闭主进程延时和主进程join等待;主进程不会等待子进程 # test_Pool(Pool, "imap", main_process_delay=False, wait_subprocess=False) # ================================ 测试dummy.Pool ================================ # apply模式: 等待执行的任务依次按顺序执行,进程任务非并行执行 # 关闭主进程延时和主进程join等待;进程池中的进程依次执行,子进程执行完毕才会继续执行主进程 # test_Pool(dummy_Pool, "apply", main_process_delay=False, wait_subprocess=False) # apply_async模式: 进程池中的进程和主进程并行执行, 进程间相互独立,不会相互等待(加入join例外) # 开启主进程延时,子进程并不会等待主进程 # test_Pool(dummy_Pool, "apply_async", main_process_delay=True) # 关闭主进程延时和主进程join等待;主进程不会等待子进程 # test_Pool(dummy_Pool, "apply_async", main_process_delay=False, wait_subprocess=False) # map模式: 可以同时分配多个任务; 执行方式与 apply模式 类似,等待执行的任务依次按顺序执行, 已经在进程池中的任务并行执行; # 关闭主进程延时和主进程join等待;进程池中的进程依次执行,子进程执行完毕才会继续执行主进程 # test_Pool(dummy_Pool, "map", main_process_delay=False, wait_subprocess=False) # imap模式: 可以同时分配多个任务; 执行方式与 apply_async模式 类似,进程池中的进程和主进程并行执行, 进程间相互独立,不会相互等待(加入join例外) # 开启主进程延时,子进程并不会等待主进程 # test_Pool(dummy_Pool, "imap", main_process_delay=True) # 关闭主进程延时和主进程join等待;主进程不会等待子进程 # test_Pool(dummy_Pool, "imap", main_process_delay=False, wait_subprocess=False)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。