当前位置:   article > 正文

Python3多进程multiprocess学习

multiprocess

由于Python存在GIL锁,对于多线程来说,这只是部分代码可以使用多CPU的优势,对于想全部使用多CPU的性能,让每一个任务都充分地使用CPU,那么使用多进程就是达到此目的,因为每个进程在Python里单独的GIL锁,这样就不会在不同进程之间进行了阻塞。因此,如果是需要使用大量CPU计算资源的需要,就应该使用多进程的方式。

什么是全局解释器锁GIL

Python代码的执行由Python 虚拟机(也叫解释器主循环,CPython版本)来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
在多线程环境中,Python 虚拟机按以下方式执行:
1. 设置GIL
2. 切换到一个线程去运行
3. 运行:
     a. 指定数量的字节码指令,或者
    b. 线程主动让出控制(可以调用time.sleep(0))
4. 把线程设置为睡眠状态
5. 解锁GIL
6. 再次重复以上所有步骤

在调用外部代码(如C/C++扩展函数)的时候,GIL 将会被锁定,直到这个函数结束为止(由于在这期间没有Python 的字节码被运行,所以不会做线程切换)。

全局解释器锁GIL设计理念与限制

GIL的设计简化了CPython的实现,使得对象模型,包括关键的内建类型如字典,都是隐含可以并发访问的。锁住全局解释器使得比较容易的实现对多线程的支持,但也损失了多处理器主机的并行计算能力。
但是,不论标准的,还是第三方的扩展模块,都被设计成在进行密集计算任务是,释放GIL。
还有,就是在做I/O操作时,GIL总是会被释放。对所有面向I/O 的(会调用内建的操作系统C 代码的)程序来说,GIL 会在这个I/O 调用之前被释放,以允许其它的线程在这个线程等待I/O 的时候运行。如果是纯计算的程序,没有 I/O 操作,解释器会每隔 100 次操作就释放这把锁,让别的线程有机会执行(这个次数可以通过 sys.setcheckinterval 来调整)如果某线程并未使用很多I/O 操作,它会在自己的时间片内一直占用处理器(和GIL)。也就是说,I/O 密集型的Python 程序比计算密集型的程序更能充分利用多线程环境的好处。

多进程 Multiprocessing 模块

Process 类

Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。

  • star() 方法启动进程,
  • join() 方法实现进程间的同步,等待所有进程退出。
  • close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  • 1
  • target 是函数名字,需要调用的函数
  • args 函数需要的参数,以 tuple 的形式传入

示例:

  1. import multiprocessing
  2. import os
  3. def run_proc(name):
  4. print('Child process {0} {1} Running '.format(name, os.getpid()))
  5. if __name__ == '__main__':
  6. print('Parent process {0} is Running'.format(os.getpid()))
  7. for i in range(5):
  8. p = multiprocessing.Process(target=run_proc, args=(str(i),))
  9. print('process start')
  10. p.start()
  11. p.join()
  12. print('Process close')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

Pool

Pool 可以提供指定数量的进程供用户使用,默认是 CPU 核数。当有新的请求提交到 Poll 的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待。 
- Pool 对象调用 join 方法会等待所有的子进程执行完毕 
- 调用 join 方法之前,必须调用 close 
- 调用 close 之后就不能继续添加新的 Process 了

pool.apply_async

apply_async 方法用来同步执行进程,允许多个进程同时进入池子。

  1. import multiprocessing
  2. import os
  3. import time
  4. def run_task(name):
  5. print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
  6. time.sleep(1)
  7. print('Task {0} end.'.format(name))
  8. if __name__ == '__main__':
  9. print('current process {0}'.format(os.getpid()))
  10. p = multiprocessing.Pool(processes=3)
  11. for i in range(6):
  12. p.apply_async(run_task, args=(i,))
  13. print('Waiting for all subprocesses done...')
  14. p.close()
  15. p.join()
  16. print('All processes done!')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

 pool.apply

apply(func[, args[, kwds]])
  • 1

该方法只能允许一个进程进入池子,在一个进程结束之后,另外一个进程才可以进入池子。

  1. import multiprocessing
  2. import os
  3. import time
  4. def run_task(name):
  5. print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
  6. time.sleep(1)
  7. print('Task {0} end.'.format(name))
  8. if __name__ == '__main__':
  9. print('current process {0}'.format(os.getpid()))
  10. p = multiprocessing.Pool(processes=3)
  11. for i in range(6):
  12. p.apply(run_task, args=(i,))
  13. print('Waiting for all subprocesses done...')
  14. p.close()
  15. p.join()
  16. print('All processes done!')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

 pool.map方法

上面我们是通过一个循环往进程池添加任务,Pool提供了更优雅的map方法来管理任务的提交。

  1. # coding: utf-8
  2. from multiprocessing import Pool
  3. import time
  4. def task(msg):
  5. print 'hello, %s' % msg
  6. time.sleep(1)
  7. return 'msg: %s' % msg
  8. if __name__ == '__main__':
  9. pool = Pool(processes=4)
  10. results = []
  11. msgs = [x for x in range(10)]
  12. results = pool.map(task, msgs)
  13. pool.close()
  14. pool.join()
  15. print 'processes done, result:'
  16. for x in results:
  17. print x

Queue 进程间通信

Queue 用来在多个进程间通信。Queue 有两个方法,get 和 put。

put 方法

Put 方法用来插入数据到队列中,有两个可选参数,blocked 和 timeout。 
- blocked = True(默认值),timeout 为正

该方法会阻塞 timeout 指定的时间,直到该队列有剩余空间。如果超时,抛出 Queue.Full 异常。


  • blocked = False 

如果 Queue 已满, 立刻抛出 Queue.Full 异常

get 方法

get 方法用来从队列中读取并删除一个元素。有两个参数可选,blocked 和 timeout 
- blocked = False (默认),timeout 正值

等待时间内,没有取到任何元素,会抛出 Queue.Empty 异常。


  • blocked = True 

Queue 有一个值可用,立刻返回改值;Queue 没有任何元素,
  1. from multiprocessing import Process, Queue
  2. import os, time, random
  3. # 写数据进程执行的代码:
  4. def proc_write(q,urls):
  5. print('Process(%s) is writing...' % os.getpid())
  6. for url in urls:
  7. q.put(url)
  8. print('Put %s to queue...' % url)
  9. time.sleep(random.random())
  10. # 读数据进程执行的代码:
  11. def proc_read(q):
  12. print('Process(%s) is reading...' % os.getpid())
  13. while True:
  14. url = q.get(True)
  15. print('Get %s from queue.' % url)
  16. if __name__=='__main__':
  17. # 父进程创建Queue,并传给各个子进程:
  18. q = Queue()
  19. proc_writer1 = Process(target=proc_write, args=(q,['url_1', 'url_2', 'url_3']))
  20. proc_writer2 = Process(target=proc_write, args=(q,['url_4','url_5','url_6']))
  21. proc_reader = Process(target=proc_read, args=(q,))
  22. # 启动子进程proc_writer,写入:
  23. proc_writer1.start()
  24. proc_writer2.start()
  25. # 启动子进程proc_reader,读取:
  26. proc_reader.start()
  27. # 等待proc_writer结束:
  28. proc_writer1.join()
  29. proc_writer2.join()
  30. # proc_reader进程里是死循环,无法等待其结束,只能强行终止:
  31. proc_reader.terminate()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

Pipe 进程间通信

常用来在两个进程间通信,两个进程分别位于管道的两端。

multiprocessing.Pipe([duplex])
  • 1

示例一和示例二,也是网上找的别人的例子,尝试理解并增加了注释而已。网上的例子,大多是例子一和例子二在一起的,这里分开来看,比较容易理解。

示例一:

  1. from multiprocessing import Process, Pipe
  2. def send(pipe):
  3. pipe.send(['spam'] + [42, 'egg']) # send 传输一个列表
  4. pipe.close()
  5. if __name__ == '__main__':
  6. (con1, con2) = Pipe() # 创建两个 Pipe 实例
  7. sender = Process(target=send, args=(con1, )) # 函数的参数,args 一定是实例化之后的 Pip 变量,不能直接写 args=(Pip(),)
  8. sender.start() # Process 类启动进程
  9. print("con2 got: %s" % con2.recv()) # 管道的另一端 con2 从send收到消息
  10. con2.close() # 关闭管道
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

示例二:

  1. from multiprocessing import Process, Pipe
  2. def talk(pipe):
  3. pipe.send(dict(name='Bob', spam=42)) # 传输一个字典
  4. reply = pipe.recv() # 接收传输的数据
  5. print('talker got:', reply)
  6. if __name__ == '__main__':
  7. (parentEnd, childEnd) = Pipe() # 创建两个 Pipe() 实例,也可以改成 conf1, conf2
  8. child = Process(target=talk, args=(childEnd,)) # 创建一个 Process 进程,名称为 child
  9. child.start() # 启动进程
  10. print('parent got:', parentEnd.recv()) # parentEnd 是一个 Pip() 管道,可以接收 child Process 进程传输的数据
  11. parentEnd.send({x * 2 for x in 'spam'}) # parentEnd 是一个 Pip() 管道,可以使用 send 方法来传输数据
  12. child.join() # 传输的数据被 talk 函数内的 pip 管道接收,并赋值给 reply
  13. print('parent exit')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Lock

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突

  1. # coding: utf-8
  2. from multiprocessing import Lock, Process
  3. import time
  4. def task1(lock, f):
  5. with lock:
  6. f = open(f, 'w+')
  7. f.write('hello ')
  8. time.sleep(1)
  9. f.close()
  10. def task2(lock, f):
  11. lock.acquire()
  12. try:
  13. f = open(f, 'a+')
  14. time.sleep(1)
  15. f.write('world!')
  16. except Exception as e:
  17. print(e)
  18. finally:
  19. f.close()
  20. lock.release()
  21. if __name__ == '__main__':
  22. lock = Lock()
  23. fn = './file.txt'
  24. start = time.time()
  25. p1 = Process(target=task1, args=(lock, fn,))
  26. p2 = Process(target=task2, args=(lock, fn,))
  27. p1.start()
  28. p2.start()
  29. p1.join()
  30. p2.join()
  31. end = time.time()
  32. print 'time cost: %s seconds' % (end - start)
  33. with open(fn, 'r') as f:
  34. for x in f.readlines():
  35. print x
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

因为要访问共享文件,先获得锁的进程会阻塞后面的进程,因此程序运行耗时约2s。

Semaphore

Semaphore 和 Lock 稍有不同,Semaphore 相当于 N 把锁,获取其中一把就可以执行了。 信号量的总数 N 在构造时传入,s = Semaphore(N)。 和 Lock 一样,如果信号量为0,则进程堵塞,直到信号大于0。Semaphore可用来控制对共享资源的访问数量,例如池的最大连接数。

  1. # coding: utf-8
  2. from multiprocessing import Semaphore, Process
  3. import time
  4. def task(s, msg):
  5. s.acquire()
  6. print 'hello, %s' % msg
  7. time.sleep(1)
  8. s.release()
  9. if __name__ == '__main__':
  10. s = Semaphore(2)
  11. processes = []
  12. for x in range(8):
  13. p = Process(target=task, args=(s, x,))
  14. processes.append(p)
  15. start = time.time()
  16. for p in processes:
  17. p.start()
  18. for p in processes:
  19. p.join()
  20. end = time.time()
  21. print '8 process takes %s seconds' % (end - start)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是Dijkstra信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器、文件这样的有限资源。

Event

Event用来实现进程间同步通信。

  1. # coding: utf-8
  2. from multiprocessing import Process, Event
  3. import time
  4. def task1(e, msg):
  5. print 'task1 is waitting...'
  6. e.wait()
  7. time.sleep(1)
  8. print 'hello, %s, e.is_set(): %s' % (msg, e.is_set())
  9. def task2(e, msg):
  10. print 'task2 is waitting...'
  11. e.wait(msg)
  12. print 'hello, %s, e.is_set(): %s' % (msg, e.is_set())
  13. if __name__ == '__main__':
  14. e = Event()
  15. p1 = Process(target=task1, args=(e, 1))
  16. p2 = Process(target=task2, args=(e, 2))
  17. p1.start()
  18. p2.start()
  19. time.sleep(3)
  20. e.set()
  21. print 'main: event is set'


声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号