赞
踩
文章引自 “nMask's Blog” https://thief.one/2016/11/23/Python-multiprocessing/
multiprocessing是Python的标准模块,它既可以用来编写多进程,也可以用来编写多线程。如果是多线程的话,用multiprocessing.dummy即可,用法与multiprocessing基本相同,这里主要介绍多进程的用法
(一)Multiprocessing介绍
为什么要使用python多进程?
因为python使用全局解释器锁(GIL),他会将进程中的线程序列化,也就是多核cpu实际上并不能达到并行提高速度的目的,而使用多进程则是不受限的,所以实际应用中都是推荐多进程的。
如果每个子进程执行需要消耗的时间非常短(执行+1操作等),这不必使用多进程,因为进程的启动关闭也会耗费资源。
当然使用多进程往往是用来处理CPU密集型(科学计算)的需求,如果是IO密集型(文件读取,爬虫等)则可以使用多线程去处理。
multiprocessing常用组件及功能
创建管理进程模块:
同步子进程模块:
(二)Multiprocessing进程管理模块
Process模块
Process模块用来创建子进程,是Multiprocessing核心模块,使用方式与Threading类似,可以实现多进程的创建,启动,关闭等操作。
利用multiprocessing.Process对象可以创建一个进程,该Process对象与Thread对象的用法相同,也有start(), run(), join()等方法。Process类适合简单的进程创建,如需资源共享可以结合multiprocessing.Queue使用;如果想要控制进程数量,则建议使用进程池Pool类。
Process介绍
构造方法:
实例方法:
属性:
创建多进程的两种方法
Process类中,可以使用两种方法创建子进程。
使用Process创建子进程
说明:用法与Threading相似
- from multiprocessing import Process #导入Process模块
- import os
- def test(name):
- '''
- 函数输出当前进程ID,以及其父进程ID。
- 此代码应在Linux下运行,因为windows下os模块不支持getppid() ,在windows中可以使用return 把想要的值返回。
- '''
- print "Process ID: %s" % (os.getpid())
- print "Parent Process ID: %s" % (os.getppid())
- if __name__ == "__main__":
- '''
- windows下,创建进程的代码一下要放在main函数里面
- '''
- proc = Process(target=test, args=('nmask',))
- proc.start()
- proc.join()
使用Process类继承创建子进程
说明:通过继承Process类,修改run函数代码。跟Thread 实例化也差不多
- from multiprocessing import Process
- import time
- class MyProcess(Process):
- '''
- 继承Process类,类似threading.Thread
- '''
- def __init__(self, arg):
- super(MyProcess, self).__init__()
- #multiprocessing.Process.__init__(self)
- self.arg = arg
- def run(self):
- '''
- 重构run函数
- '''
- print 'nMask', self.arg
- time.sleep(1)
- if __name__ == '__main__':
- for i in range(10):
- p = MyProcess(i)
- p.start()
- for i in range(10):
- p.join()
Pool模块
Pool模块是用来创建管理进程池的,当子进程非常多且需要控制子进程数量时可以使用此模块。
Multiprocessing.Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。在共享资源时,只能使用Multiprocessing.Manager类,而不能使用Queue或者Array。
用途
Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。
构造方法
实例方法
Pool使用方法
Pool+map函数
说明:此写法缺点在于只能通过map向函数传递一个参数。
- from multiprocessing import Pool
- def test(i):
- print(i) # 使用Spyder 能够实现,可以采用return的方式 pycharm没问题
- if __name__=="__main__":
- lists=[1,2,3]
- pool=Pool(processes=2) #定义最大的进程数
- pool.map(test,lists) #p必须是一个可迭代变量。
- pool.close()
- pool.join()
异步进程池(非阻塞)
- from multiprocessing import Pool
- def test(i):
- print(i)
- if __name__=="__main__":
- pool = Pool(processes=10)
- for i in range(500):
- '''
- For循环中执行步骤:
- (1)循环遍历,将500个子进程添加到进程池(相对父进程会阻塞)
- (2)每次执行10个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)
-
- apply_async为异步进程池写法。
- 异步指的是启动子进程的过程,与父进程本身的执行(print)是异步的,而For循环中往进程池添加子进程的过程,与父进程本身的执行却是同步的。
- '''
- pool.apply_async(test, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
- print(“test”)
- pool.close()
- pool.join()
执行顺序:For循环内执行了2个步骤,第一步:将500个对象放入进程池(阻塞)。第二步:同时执行10个子进程(非阻塞),有结束的就立即添加,维持10个子进程运行。(apply_async方法的会在执行完for循环的添加步骤后,直接执行后面的print语句,而apply方法会等所有进程池中的子进程运行完以后再执行后面的print语句(test 一般会打印在最前))
注意:调用join之前,先调用close或者terminate方法,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束。
同步进程池(阻塞)
- from multiprocessing import Pool
- import time
-
- def test(p):
- print(p)
- time.sleep(0.5)
- if __name__=="__main__":
- pool = Pool(processes=3)
- for i in range(6):
- pool.apply(test, args=(i,)) #维持执行的进程总数为3,当一个进程执行完后启动一个新进程.
- print('test')
- pool.close()
- pool.join()
- '''
- 实际测试发现,for循环内部执行步骤:
- (1)遍历500个可迭代对象,往进程池放一个子进程
- (2)执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行。(同时只执行一个子进程)
- for循环执行完毕,再执行print函数。
- '''
输出结果:
说明:for循环内执行的步骤顺序,往进程池中添加一个子进程,执行子进程,等待执行完毕再添加一个子进程…..等500个子进程都执行完了,再执行print “test”。(从结果来看,并没有多进程并发) 可以通过len(multiprocessing.active_children()) 来查看进程的数量
Queue模块
Queue模块用来控制进程安全,与线程中的Queue用法一样。
Pipe模块
Pipe模块用来管道操作。
Manager模块
Manager模块常与Pool模块一起使用,作用是共享资源。
(三)Multiprocessing同步进程模块
Lock模块
作用:当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
具体场景:所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。使用Lock同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。
代码实现:
- from multiprocessing import Process, Lock # 在pycharm、idle下根本测试不出来
- def l(lock, num):
- lock.acquire()
- print "Hello Num: %s" % (num)
- lock.release()
- if __name__ == '__main__':
- lock = Lock() #这个一定要定义为全局
- for num in range(20):
- Process(target=l, args=(lock, num)).start()
Semaphore模块
作用:用来控制对共享资源的访问数量,例如池的最大连接数。
Event模块 (跟多进程Threading里面的用法差不多)
作用:用来实现进程间同步通信。
(四)Multiprocessing.dummy多线程
Multiprocessing.dummy用法与Multiprocessing用法基本相同,只不过是用来创建多线程。
(五)使用Multiprocessing疑问
解答:windows系统下,想要启动一个子进程,必须加上if name==”main“:,linux则不需要。
解答:不行,因为每个进程享有独立的内存数据,如果想要共享资源,可以使用Manage类,或者Queue等模块。
解答:此需求可以稍作修改:所有的子进程都是为了完成一件事情,而当某个子进程完成该事情后,父进程就该结束所有子进程,请问该怎么做?此时结束所有子进程的操作可以交给父进程去做,因为子进程想要结束另外的子进程比较难实现。
那么问题就又变成了父进程什么时候该结束所有进程?
其中一个思路是获取每个子进程的返回值,一旦有返回True(结束的标记),则立马结束所有进程;
另外一种思路是使用共享资源,父进程可以一直去判断这个公共资源,一旦子进程将它改变,则结束所有子进程。(推荐使用前者,因为多进程中不推荐使用资源共享)(后期尝试一下)
解答:可以,子进程可以再创建进程,线程中也可以创建进程。
(六)多进程资源共享问题
多进程中不推荐使用资源共享,如果非要使用,可以参考以下链接。具体介绍请参考:多进程资源共享问题
(七)获取子进程返回值问题
多进程中往往会碰到获取子进程返回值的问题,如果遇到问题可以参考以下链接。
具体介绍请参考:获取子进程返回值问题 如下:(做了改动)
在实际使用多进程的时候,可能需要获取到子进程运行的返回值。如果只是用来存储,则可以将返回值保存到一个数据结构中;如果需要判断此返回值,从而决定是否继续执行所有子进程,则会相对比较复杂。另外在Multiprocessing中,可以利用Process与Pool创建子进程,这两种用法在获取子进程返回值上的写法上也不相同。这篇中,我们直接上代码,分析多进程中获取子进程返回值的不同用法,以及优缺点。
初级用法(Pool)
目的:存储子进程返回值
说明:如果只是单纯的存储子进程返回值,则可以使用Pool的apply_async异步进程池;当然也可以使用Process,用法与threading中的相同,这里只介绍前者。
实例:当进程池中所有子进程执行完毕后,输出每个子进程的返回值。
- from multiprocessing import Pool
- import time
- def test(p):
- time.sleep(0.01)
- return p
- if __name__=="__main__":
- start=time.time()
- pool = Pool(processes=10)
- result=[]
- for i in range(500):
- '''
- for循环执行流程:
- (1)添加子进程到pool,并将这个对象(子进程)添加到result这个列表中。(此时子进程并没有运行)
- (2)执行子进程(同时执行10个)
- '''
- result.append(pool.apply_async(test, args=(i,)))#维持执行的进程总数为10,当一个进程执行完后添加新进程.
- pool.close()
- pool.join()
- '''
- 遍历result列表,取出子进程对象,访问get()方法,获取返回值。(此时所有子进程已执行完毕)
- '''
- for i in result:
- print(i.get())
- print('总共耗时为:%0.3f' % (time.time()-start))
错误写法:
- for i in xrange(50000):
- t=pool.apply_async(test, args=(i,)))
- print(t.get())
说明:这样会造成阻塞,因为get()方法只能等子进程运行完毕后才能调用成功,否则会一直阻塞等待。如果写在for循环内容,相当于变成了同步,执行效率将会非常低。
高级用法(Pool)
目的:父进程实时获取子进程返回值,以此为标记结束所有进程。
实例(一)
执行子进程的过程中,不断获取返回值并校验,如果返回值为True则结果所有进程。(Queue模块已经发生了改变)
- from multiprocessing import Pool
- from asyncio import Queue
- import time
- def test(p):
- time.sleep(0.1)
- if p==100000:
- return True
- else:
- return False
- if __name__=="__main__":
- start=time.time()
- pool = Pool(processes=10)
- q=Queue()
- for i in range(500000):
-
- # 将子进程对象存入队列中。
- q.put(pool.apply_async(test, args=(i,)))#维持执行的进程总数为10,当一个进程执行完后添加新进程.
- # 因为这里使用的为pool.apply_async异步方法,因此子进程执行的过程中,父进程会执行while,获取返回值并校验。
- while 1:
- if q.get():
- pool.terminate() #结束进程池中的所有子进程。
- print('所有进程已经结束')
- break
- pool.join()
- print('总共耗时:%0.3f' % (time.time() - start))
输出为:
说明:总共要执行50000个子进程(并发数量为10),当其中一个子进程返回True时,结束进程池。因为使用了apply_async为异步进程,因此在执行完for循环的添加子进程操作后(只是添加并没有执行完所有的子进程),可以直接执行while代码,实时判断子进程返回值是否有True,有的话结束所有进程。
优点:不必等到所有子进程结束再结束程序,只要得到想要的结果就可以提前结束,节省资源。
不足:当需要执行的子进程非常大时,不适用,因为for循环在添加子进程时,要花费很长的时间,虽然是异步,但是也需要等待for循环添加子进程操作结束才能执行while代码,因此会比较慢。
实例(二)
多线程+多进程,添加执行子进程的过程中,不断获取返回值并校验,如果返回值为True则结果所有进程。
- import threading
-
- def test(p):
- time.sleep(0.1)
- if p==100000:
- return True
- else:
- return False
- if __name__=="__main__":
- start = time.time()
- result=Queue() #队列
- pool = Pool()
- def pool_th():
- for i in range(500000): ##这里需要创建执行的子进程非常多
- try:
- result.put(pool.apply_async(test, args=(i,)))
- except:
- break
- def result_th():
- while 1:
- a=result.get() #获取子进程返回值
- if a:
- pool.terminate() #结束所有子进程
- print("已经结束所有子进程")
- break
- '''
- 利用多线程,同时运行Pool函数创建执行子进程,以及运行获取子进程返回值函数。
- '''
- t1=threading.Thread(target=pool_th)
- t2=threading.Thread(target=result_th)
- t1.start()
- t2.start()
- t1.join()
- t2.join()
- pool.join()
- print('总共耗时为:%0.3f' % (time.time() - start))
输出结果:(速度杠杠滴)
执行流程:利用多线程,创建一个执行pool_th函数线程,一个执行result_th函数线程,pool_th函数用来添加进程池,开启进程执行功能函数并将子进程对象存入队列,而result_th()函数用来不停地从队列中取子进程对象,调用get()方法获取返回值。等发现其中存在子进程的返回值为True时,结束所有进程,最后结束线程。
优点:弥补了实例(一)的不足,即使for循环的子进程数量很多,也能提高性能,因为for循环与判断子进程返回值同时进行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。