赞
踩
Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。
join() 方法实现进程间的同步,
等待所有进程退出。multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
以 tuple 的形式传入
示例:
- import multiprocessing
- import os
-
- def run_proc(name):
- print('Child process {0} {1} Running '.format(name, os.getpid()))
-
- if __name__ == '__main__':
- print('Parent process {0} is Running'.format(os.getpid()))
- for i in range(5):
- p = multiprocessing.Process(target=run_proc, args=(str(i),))
- print('process start')
- p.start()
- p.join()
- print('Process close')
Pool 可以提供指定数量的进程供用户使用,默认是 CPU 核数。当有新的请求提交到 Poll 的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待。
- Pool 对象调用 join 方法会等待所有的子进程执行完毕
- 调用 join 方法之前,必须调用 close
- 调用 close 之后就不能继续添加新的 Process 了
apply_async 方法用来同步执行进程,允许多个进程同时进入池子。
- import multiprocessing
- import os
- import time
-
- def run_task(name):
- print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
- time.sleep(1)
- print('Task {0} end.'.format(name))
-
- if __name__ == '__main__':
- print('current process {0}'.format(os.getpid()))
- p = multiprocessing.Pool(processes=3)
- for i in range(6):
- p.apply_async(run_task, args=(i,))
- print('Waiting for all subprocesses done...')
- p.close()
- p.join()
- print('All processes done!')
apply(func[, args[, kwds]])
该方法只能允许一个进程进入池子,在一个进程结束之后,另外一个进程才可以进入池子。
- import multiprocessing
- import os
- import time
-
- def run_task(name):
- print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
- time.sleep(1)
- print('Task {0} end.'.format(name))
-
- if __name__ == '__main__':
- print('current process {0}'.format(os.getpid()))
- p = multiprocessing.Pool(processes=3)
- for i in range(6):
- p.apply(run_task, args=(i,))
- print('Waiting for all subprocesses done...')
- p.close()
- p.join()
- print('All processes done!')
上面我们是通过一个循环往进程池添加任务,Pool提供了更优雅的map方法来管理任务的提交。
- # coding: utf-8
-
- from multiprocessing import Pool
- import time
-
-
- def task(msg):
- print 'hello, %s' % msg
- time.sleep(1)
- return 'msg: %s' % msg
-
-
- if __name__ == '__main__':
- pool = Pool(processes=4)
-
- results = []
- msgs = [x for x in range(10)]
- results = pool.map(task, msgs)
-
- pool.close()
- pool.join()
-
- print 'processes done, result:'
-
- for x in results:
- print x
Queue 用来在多个进程间通信。Queue 有两个方法,get 和 put。
Put 方法用来插入数据到队列中,有两个可选参数,blocked 和 timeout。
- blocked = True(默认值),timeout 为正
该方法会阻塞 timeout 指定的时间,直到该队列有剩余空间。如果超时,抛出 Queue.Full 异常。
如果 Queue 已满,
立刻
抛出 Queue.Full 异常
get 方法用来从队列中读取并删除
一个元素。有两个参数可选,blocked 和 timeout
- blocked = False (默认),timeout 正值
等待时间内,没有取到任何元素,会抛出 Queue.Empty 异常。
Queue 有一个值可用,立刻返回改值;Queue 没有任何元素,
- from multiprocessing import Process, Queue
- import os, time, random
- # 写数据进程执行的代码:
- def proc_write(q,urls):
- print('Process(%s) is writing...' % os.getpid())
- for url in urls:
- q.put(url)
- print('Put %s to queue...' % url)
- time.sleep(random.random())
- # 读数据进程执行的代码:
- def proc_read(q):
- print('Process(%s) is reading...' % os.getpid())
- while True:
- url = q.get(True)
- print('Get %s from queue.' % url)
- if __name__=='__main__':
- # 父进程创建Queue,并传给各个子进程:
- q = Queue()
- proc_writer1 = Process(target=proc_write, args=(q,['url_1', 'url_2', 'url_3']))
- proc_writer2 = Process(target=proc_write, args=(q,['url_4','url_5','url_6']))
- proc_reader = Process(target=proc_read, args=(q,))
- # 启动子进程proc_writer,写入:
- proc_writer1.start()
- proc_writer2.start()
- # 启动子进程proc_reader,读取:
- proc_reader.start()
- # 等待proc_writer结束:
- proc_writer1.join()
- proc_writer2.join()
- # proc_reader进程里是死循环,无法等待其结束,只能强行终止:
- proc_reader.terminate()
常用来在两个进程间通信,两个进程分别位于管道的两端。
multiprocessing.Pipe([duplex])
示例一和示例二,也是网上找的别人的例子,尝试理解并增加了注释而已。网上的例子,大多是例子一和例子二在一起的,这里分开来看,比较容易理解。
示例一:
- from multiprocessing import Process, Pipe
-
- def send(pipe):
- pipe.send(['spam'] + [42, 'egg']) # send 传输一个列表
- pipe.close()
-
- if __name__ == '__main__':
- (con1, con2) = Pipe() # 创建两个 Pipe 实例
- sender = Process(target=send, args=(con1, )) # 函数的参数,args 一定是实例化之后的 Pip 变量,不能直接写 args=(Pip(),)
- sender.start() # Process 类启动进程
- print("con2 got: %s" % con2.recv()) # 管道的另一端 con2 从send收到消息
- con2.close() # 关闭管道
示例二:
- from multiprocessing import Process, Pipe
-
- def talk(pipe):
- pipe.send(dict(name='Bob', spam=42)) # 传输一个字典
- reply = pipe.recv() # 接收传输的数据
- print('talker got:', reply)
-
- if __name__ == '__main__':
- (parentEnd, childEnd) = Pipe() # 创建两个 Pipe() 实例,也可以改成 conf1, conf2
- child = Process(target=talk, args=(childEnd,)) # 创建一个 Process 进程,名称为 child
- child.start() # 启动进程
- print('parent got:', parentEnd.recv()) # parentEnd 是一个 Pip() 管道,可以接收 child Process 进程传输的数据
- parentEnd.send({x * 2 for x in 'spam'}) # parentEnd 是一个 Pip() 管道,可以使用 send 方法来传输数据
- child.join() # 传输的数据被 talk 函数内的 pip 管道接收,并赋值给 reply
- print('parent exit')
当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突
- # coding: utf-8
-
- from multiprocessing import Lock, Process
- import time
-
-
- def task1(lock, f):
- with lock:
- f = open(f, 'w+')
- f.write('hello ')
- time.sleep(1)
- f.close()
-
-
- def task2(lock, f):
- lock.acquire()
- try:
- f = open(f, 'a+')
- time.sleep(1)
- f.write('world!')
- except Exception as e:
- print(e)
- finally:
- f.close()
- lock.release()
-
-
- if __name__ == '__main__':
- lock = Lock()
- fn = './file.txt'
-
- start = time.time()
- p1 = Process(target=task1, args=(lock, fn,))
- p2 = Process(target=task2, args=(lock, fn,))
-
- p1.start()
- p2.start()
-
- p1.join()
- p2.join()
-
- end = time.time()
- print 'time cost: %s seconds' % (end - start)
-
- with open(fn, 'r') as f:
- for x in f.readlines():
- print x
因为要访问共享文件,先获得锁的进程会阻塞后面的进程,因此程序运行耗时约2s。
Semaphore 和 Lock 稍有不同,Semaphore 相当于 N 把锁,获取其中一把就可以执行了。 信号量的总数 N 在构造时传入,s = Semaphore(N)。 和 Lock 一样,如果信号量为0,则进程堵塞,直到信号大于0。Semaphore可用来控制对共享资源的访问数量,例如池的最大连接数。
- # coding: utf-8
-
- from multiprocessing import Semaphore, Process
- import time
-
-
- def task(s, msg):
- s.acquire()
- print 'hello, %s' % msg
- time.sleep(1)
- s.release()
-
-
- if __name__ == '__main__':
- s = Semaphore(2)
-
- processes = []
- for x in range(8):
- p = Process(target=task, args=(s, x,))
- processes.append(p)
-
- start = time.time()
- for p in processes:
- p.start()
-
- for p in processes:
- p.join()
-
- end = time.time()
-
- print '8 process takes %s seconds' % (end - start)
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是Dijkstra信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器、文件这样的有限资源。
Event
用来实现进程间同步通信。
- # coding: utf-8
-
- from multiprocessing import Process, Event
- import time
-
-
- def task1(e, msg):
- print 'task1 is waitting...'
- e.wait()
- time.sleep(1)
- print 'hello, %s, e.is_set(): %s' % (msg, e.is_set())
-
-
- def task2(e, msg):
- print 'task2 is waitting...'
- e.wait(msg)
- print 'hello, %s, e.is_set(): %s' % (msg, e.is_set())
-
-
- if __name__ == '__main__':
- e = Event()
-
- p1 = Process(target=task1, args=(e, 1))
- p2 = Process(target=task2, args=(e, 2))
-
- p1.start()
- p2.start()
-
- time.sleep(3)
-
- e.set()
- print 'main: event is set'
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。