赞
踩
在之前的博客中有对并行和并发进行了介绍。在python种主要存在两种方法实行:多线程和多进程。
对于python来说,多线程实际上是并发的,并没有完全利用多核的优势。当然这也要看具体的需求,如果是计算密集型的,多采用并行的方法;如果是IO密集型的,多采用并发的方法。这要是考虑到互相切换的开销。
解下来,我们介绍一种多进程的方法:Pool类。Pool可以提供指定数量的进程,供用户调用。当有新的请求提交到Pool中时,如果Pool还没有满,此时产生的新进程将添加进Pool中,并执行任务。如果Pool已满,新的请求将等待,直到Pool中有进程结束,才会添加进去。
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
# 参数解读
- processes:要使用的进程数,如果没有提供,将使用os.cpu_count()个进程(全部使用)
- 如果initializer不是None,则使用设置的进程数,启动工作进程
- maxtasksperchild:是工作进程在退出并替换为新工作进程之前可以完成的任务数,以释放未使用的资源。默认的maxtasksperchild为None,这意味着工作进程将与池一样长时间地存在
- context:可用于指定用于启动工作进程的上下文。通常使用函数multiprocessing.pool()或上下文对象的pool()方法创建池。在这两种情况下,上下文都被适当地设置
# 一般情况下,进设置了processes,其他的保持默认值即可。
# 非阻塞示例
import multiprocessing
import time
def func(txt):
print("txt: ", txt)
time.sleep(4)
print("end")
if __name__ == '__main__':
with multiprocessing.Pool(processes=3) as pool:
for i in range(4):
txt = "TXT {}".format(i)
pool.apply_async(func, (txt,)) # 非阻塞方法
print("*"*18)
pool.close()
pool.join()
print("Sub-process done.")
执行结果
******************
txt: TXT 2
end
txt: TXT 0
end
txt: TXT 1
end
txt: TXT 3
end
Sub-process done.
执行说明:创建一个进程池pool,并设定进程的数量为3,range(4)会相继产生四个对象[0, 1, 2, 3],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3。因为为非阻塞,主函数会继续执行余下的程序,忽略子进程的执行状态,运行完for循环后直接输出“*********************“,主程序在pool.join() 处等待各个子进程的结束。
# 阻塞示例
import multiprocessing
import time
def func(txt):
print("txt: ", txt)
time.sleep(5)
print("end")
if __name__ == '__main__':
with multiprocessing.Pool(processes=3) as pool:
for i in range(4):
txt = "TXT {}".format(i)
pool.apply(func, (txt,)) # 阻塞方法
print("*"*18)
pool.close()
pool.join()
print("Sub-process done.")
函数原型:map(func, iterable[, chunksize=None])
Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回
注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程
from multiprocessing import Pool
import time
def run(fn):
time.sleep(1)
print(fn*fn)
if __name__ == '__main__':
tl = [1,2,3,4,5,6]
print("ordered")
s = time.time()
for i in tl:
run(i)
t1 = time.time()
print("ordered: {}".format(t1 - s))
print("Multi")
s2 = time.time()
pool = Pool(processes=3)
pool.map(run, tl)
pool.close()
pool.join()
t2 = time.time()
print("Multi: {}".format(t2-s2))
执行结果:
ordered
1
4
9
16
25
36
ordered: 6.057081460952759
Multi
1
36
9
16
4
25
Multi: 2.1325228214263916
函数原型:map_async(func, iterable[, chunksize[, callback]])
map_async方法与map方法用法一致,但是它是非阻塞的。
from multiprocessing import Pool
import time
def run(fn):
time.sleep(1)
print(fn*fn)
if __name__ == '__main__':
tl = [1,2,3,4,5,6]
print("ordered")
s = time.time()
for i in tl:
run(i)
t1 = time.time()
print("ordered: {}".format(t1 - s))
print("Multi")
s2 = time.time()
pool = Pool(processes=3)
pool.map_async(run, tl)
pool.close()
pool.join()
t2 = time.time()
print("Multi: {}".format(t2-s2))
执行结果:
ordered
1
4
9
16
25
36
ordered: 6.066484451293945
Multi
4
25
1
16
9
36
Multi: 2.137917995452881
from multiprocessing import Pool
import time
def fun_01(i):
time.sleep(2)
print('start_time:', time.ctime())
return i + 100
def fun_02(arg):
print('end_time:', arg, time.ctime())
if __name__ == '__main__':
pool = Pool(3)
for i in range(4):
pool.apply_async(func=fun_01, args=(i,), callback=fun_02) # fun_02的入参为fun_01的返回值
# pool.apply_async(func=fun_01, args=(i,))
pool.close()
pool.join()
print('done')
执行结果:
start_time: Thu Nov 14 16:31:41 2019
end_time: 100 Thu Nov 14 16:31:41 2019
start_time: Thu Nov 14 16:31:41 2019
end_time: 101 Thu Nov 14 16:31:41 2019
start_time: Thu Nov 14 16:31:41 2019
end_time: 102 Thu Nov 14 16:31:41 2019
start_time: Thu Nov 14 16:31:43 2019
end_time: 103 Thu Nov 14 16:31:43 2019
done
import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(3)
print("end")
return "done " + msg
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
result = []
for i in range(3):
msg = "hello {}".format(i)
result.append(pool.apply_async(func, (msg,)))
pool.close()
pool.join()
for res in result:
print(":::", res.get())
print("Sub-process done.")
执行结果:
msg: hello 2
end
msg: hello 1
end
msg: hello 0
end
::: done hello 0
::: done hello 1
::: done hello 2
Sub-process done.
import multiprocessing
import time
import os
def Lee():
print('\nRun task Lee--%s******ppid:%s' % (os.getpid(), os.getppid()), '~~~~', time.ctime())
start = time.time()
time.sleep(5)
end = time.time()
print('Task Lee,runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())
def Marlon():
print("\nRun task Marlon-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
start = time.time()
time.sleep(10)
end = time.time()
print('Task Marlon runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())
def Allen():
print("\nRun task Allen-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
start = time.time()
time.sleep(15)
end = time.time()
print('Task Allen runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())
def Frank():
print("\nRun task Frank-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
start = time.time()
time.sleep(20)
end = time.time()
print('Task Frank runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())
if __name__ == '__main__':
func_list = [Lee, Marlon, Allen, Frank]
print('parent process id %s' % os.getpid())
pool = multiprocessing.Pool(4)
for func in func_list:
pool.apply_async(func)
print('Waiting for all subprocesses done...')
pool.close()
pool.join()
print('All subprocesses done.')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。