当前位置:   article > 正文

Python中的Queue与多进程(multiprocessing)_multiprocessing.queue

multiprocessing.queue

一、Queue(队列对象)

在多进程中环境中,要使用进程安全的multiprocessing.Queue() ,而非Python标准库中的Queue

当一个队列为空的时候如果再用get取则会堵塞,所以取队列的时候一般是用到

get_nowait()方法,这种方法在向一个空队列取值的时候会抛一个Empty异常

所以更常用的方法是先判断一个队列是否为空,如果不为空则取值

队列中常用的方法

Queue.qsize() 返回队列的大小  
Queue.empty() 如果队列为空,返回True,反之False  
Queue.full() 如果队列满了,返回True,反之False 
Queue.get([block[, timeout]]) 获取队列,timeout等待时间  
Queue.get_nowait() 相当Queue.get(False) 
非阻塞 Queue.put(item) 写入队列,timeout等待时间  
Queue.put_nowait(item) 相当Queue.put(item, False)

二、multiprocessing中使用子进程概念

from multiprocessing import Process'
运行

可以通过Process来构造一个子进程

  1. p1 = Process(target=fun,args=(args))
  2. p2 = Process(target=fun, kwargs={'name': name, 'symbol': symbol})

同时,在这里说明一下,multiprocessing.Process() 也要在main所在py文件下运行

通过p.start()来启动子进程

再通过p.join()方法来使得子进程运行结束后再执行父进程

  1. from multiprocessing import Process
  2. import os
  3. # 子进程要执行的代码
  4. def run_proc(name):
  5. print 'Run child process %s (%s)...' % (name, os.getpid())
  6. if __name__=='__main__':
  7. print 'Parent process %s.' % os.getpid()
  8. p = Process(target=run_proc, args=('test',))
  9. print 'Process will start.'
  10. p.start()
  11. p.join()
  12. print 'Process end.'

 三、在multiprocessing中使用pool

如果需要多个子进程时可以考虑使用进程池(pool)来管理

from multiprocessing import Pool'
运行
  1. from multiprocessing import Pool
  2. import os, time
  3. def long_time_task(name):
  4. print 'Run task %s (%s)...' % (name, os.getpid())
  5. start = time.time()
  6. time.sleep(3)
  7. end = time.time()
  8. print 'Task %s runs %0.2f seconds.' % (name, (end - start))
  9. if __name__=='__main__':
  10. print 'Parent process %s.' % os.getpid()
  11. p = Pool()
  12. for i in range(5):
  13. p.apply_async(long_time_task, args=(i,))
  14. print 'Waiting for all subprocesses done...'
  15. p.close()
  16. p.join()
  17. print 'All subprocesses done.'

pool创建子进程的方法与Process不同,是通过

p.apply_async(func, args=(args))实现,一个池子里能同时运行的任务是取决你电脑的cpu数量,如我的电脑现在是有4个cpu,那会子进程task0,task1,task2,task3可以同时启动,task4则在之前的一个某个进程结束后才开始

上面的程序运行后的结果其实是按照上图中1,2,3分开进行的,先打印1,3秒后打印2,再3秒后打印3

代码中的p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

当时也可以是实例pool的时候给它定义一个进程的多少

如果上面的代码中p=Pool(5)那么所有的子进程就可以同时进行

四、多个子进程间的通信

多个子进程间的通信就要采用第一步中说到的Queue,比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据

  1. from multiprocessing import Process, Queue
  2. import os, time, random
  3. # 写数据进程执行的代码:
  4. def write(q):
  5. for value in ['A', 'B', 'C']:
  6. print 'Put %s to queue...' % value
  7. q.put(value)
  8. time.sleep(random.random())
  9. # 读数据进程执行的代码:
  10. def read(q):
  11. while True:
  12. if not q.empty():
  13. value = q.get(True)
  14. print 'Get %s from queue.' % value
  15. time.sleep(random.random())
  16. else:
  17. break
  18. if __name__=='__main__':
  19. # 父进程创建Queue,并传给各个子进程:
  20. q = Queue()
  21. pw = Process(target=write, args=(q,))
  22. pr = Process(target=read, args=(q,))
  23. # 启动子进程pw,写入:
  24. pw.start()
  25. # 等待pw结束:
  26. pw.join()
  27. # 启动子进程pr,读取:
  28. pr.start()
  29. pr.join()
  30. # pr进程里是死循环,无法等待其结束,只能强行终止:
  31. print
  32. print '所有数据都写入并且读完'

五、关于上面代码的几个有趣的问题

  1. if __name__=='__main__':
  2. # 父进程创建Queue,并传给各个子进程:
  3. q = Queue()
  4. p = Pool()
  5. pw = p.apply_async(write,args=(q,))
  6. pr = p.apply_async(read,args=(q,))
  7. p.close()
  8. p.join()
  9. print
  10. print '所有数据都写入并且读完'

如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的错误,查了下,大意是队列对象不能在父进程与子进程间通信,这个如果想要使用进程池中使用队列则要使用multiprocess的Manager类

  1. if __name__=='__main__':
  2. manager = multiprocessing.Manager()
  3. # 父进程创建Queue,并传给各个子进程:
  4. q = manager.Queue()
  5. p = Pool()
  6. pw = p.apply_async(write,args=(q,))
  7. time.sleep(0.5)
  8. pr = p.apply_async(read,args=(q,))
  9. p.close()
  10. p.join()
  11. print
  12. print '所有数据都写入并且读完'

这样这个队列对象就可以在父进程与子进程间通信,不用池则不需要Manager

六、关于锁的应用

在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象

中的锁

  1. from multiprocessing import Process,Queue,Pool
  2. import multiprocessing
  3. import os, time, random
  4. # 写数据进程执行的代码:
  5. def write(q,lock):
  6. lock.acquire() #加上锁
  7. for value in ['A', 'B', 'C']:
  8. print 'Put %s to queue...' % value
  9. q.put(value)
  10. lock.release() #释放锁
  11. # 读数据进程执行的代码:
  12. def read(q):
  13. while True:
  14. if not q.empty():
  15. value = q.get(False)
  16. print 'Get %s from queue.' % value
  17. time.sleep(random.random())
  18. else:
  19. break
  20. if __name__=='__main__':
  21. manager = multiprocessing.Manager()
  22. # 父进程创建Queue,并传给各个子进程:
  23. q = manager.Queue()
  24. lock = manager.Lock() #初始化一把锁
  25. p = Pool()
  26. pw = p.apply_async(write,args=(q,lock))
  27. pr = p.apply_async(read,args=(q,))
  28. p.close()
  29. p.join()
  30. print
  31. print '所有数据都写入并且读完'

七、关于如何结束子进程

简单粗暴的做法是使用:multiprocessing.Process.terminate()

在主进程里直接terminate子线程,是不推荐的做法,因为结束的时候不清楚子线程的运行状况,有很大可能性导致子线程在不恰当的时刻被结束。
在主进程里优雅的结束子进程,推荐的方法是,通过全局变量、互斥锁或信号量等进程间通信手段来达到。
方式1:使用multiprocessing.Value对象,来传递信息。通知子线程:“辛苦了,你可以休息了”,然后让子线程自身决定退出的时刻,可以选择一个适当的时刻来结束任务。
下面的代码,在外部修改alive.value的值,子进程得知后,选择在没有sleep的时候退出。

  1. from multiprocessing import Process, Value
  2. import time
  3. alive = Value('b', False)
  4. def worker(alive):
  5. while alive.value:
  6. time.sleep(0.1)
  7. print("running")
  8. if __name__ == '__main__':
  9. p = Process(target=worker, args=(alive,))
  10. alive.value = True
  11. p.start()
  12. time.sleep(1)
  13. alive.value = False
'
运行

方式2:更优雅的方式是使用信号Signal,具体使用参考:

python进程间通信--信号Signal - 持之以恒18 - 博客园

八、multiprocessing.freeze_support()

Python多进程multiprocessing在windows的控制台或者IDE下运行不了会报错
打包成exe包双击之后会一直打开exe,导致内存占满,在linux下没有问题。
经过各种查阅资料,终于解决了这个bug:
只要在main入口下添加 multiprocessing.freeze_support()就可以了

  1. if __name__ == "__main__":
  2. multiprocessing.freeze_support()

参考文章:

python Queue模块_Yatere的天平秤-CSDN博客

多进程 - 廖雪峰的官方网站

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/860620
推荐阅读
相关标签
  

闽ICP备14008679号