当前位置:   article > 正文

python文字处理dummy_python中multiprocessing、multiprocessing.dummy和threading用法笔记

multiprocessing.dummy python

一、multiprocessing

用法参考地址:multiprocessing用法

首先解释一个误区:

进程池的大小是每次同时执行的进程数,但是并不会影响主进程申请进程的数量。主进程申请多进程量不等于池子大小。

1、子进程无返回值

# -*- coding:utf-8 -*-

from multiprocessing import Pool as Pool

import time

def func(msg):

print 'msg:', msg

time.sleep(2)

print 'end:'

pool = Pool(processes=3)

for i in xrange(1, 5):

msg = 'hello %d' % (i)

pool.apply_async(func,(msg,)) # 非阻塞

# pool.apply(func,(msg,)) # 阻塞,apply()源自内建函数,用于间接的调用函数,并且按位置把元祖或字典作为参数传入。

# pool.imap(func,[msg,]) # 非阻塞, 注意与apply传的参数的区别

# pool.map(func, [msg, ]) # 阻塞

print 'Mark~~~~~~~~~~~~~~~'

pool.close()

pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

print 'sub-process done'

非阻塞方法

multiprocessing.Pool.apply_async() 和 multiprocessing.Pool.imap()

进程并发执行

阻塞方法

multiprocessing.Pool.apply()和 multiprocessing.Pool.map()

进程顺序执行

2、子进程有返回值

只有apply_async可以有返回值,apply,map,imap不可以设置返回值.

# -*- coding:utf-8 -*-

from multiprocessing import Pool as Pool

import time

def func(msg):

print 'msg:', msg

time.sleep(2)

print 'end:'

return msg

pool = Pool(processes=3)

result = []

for i in xrange(1, 5):

msg = 'hello %d' % (i)

res = pool.apply_async(func,(msg,)) # 非阻塞 只有apply_async可以有返回值,apply,map,imap不可以设置返回值

result.append(res)

print 'Mark~~~~~~~~~~~~~~~'

pool.close()

pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

for res in result:

print "sub_process return: ", res.get()

print 'sub-process done'

一定要注意res.get()方法是堵塞的。只有子进程执行完毕并返回数据时 res.get()方法才会执行,否则主进程堵塞,并等待。

看下面这个程序: 如何高效处理子进程有返回值的多进程任务

from multiprocessing import Pool

import Queue

import time

def test(p):

time.sleep(0.5)

if p == 100:

return (p,True)

else:

return (p,False)

if __name__ == "__main__":

pool = Pool(processes=10)

q = Queue.Queue()

for i in xrange(500):

# 将子进程对象存入队列中。

q.put( pool.apply_async(test, args=(i,)) ) # 维持执行的进程总数为10,当一个进程执行完后添加新进程.

print(i)

'''

因为这里使用的为pool.apply_async异步方法,因此子进程执行的过程中,父进程会执行while,获取返回值并校验。

'''

print("======", q.qsize())

while 1:

a = q.get().get();

print(a)

if a[1]:

pool.terminate() # 结束进程池中的所有子进程。

break

pool.join()

该程序瞬间执行到 print("======", q.qsize()) 行,并且每次执行 a = q.get().get()代码时,如果对应进程没有执行完,即没有返回输出值时,该行代码导致主进程堵塞等待。

如果需要申请庞大的进程数量时,就会很浪费资源比如下面:

for i in xrange(500000000):

# 将子进程对象存入队列中。

q.put( pool.apply_async(test, args=(i,)) ) # 维持执行的进程总数为10,当一个进程执行完后添加新进程.

print(i)

我们可以开启2个线程,一个线程申请进程,另一个线程判断结束所有子进程的进程是否已经到达。

如下:

from multiprocessing import Pool

import Queue

import threading

import time

def test(p):

time.sleep(0.001)

if p == 10000:

return True

else:

return False

if __name__ == "__main__":

result = Queue.Queue() # 队列

pool = Pool()

def pool_th():

for i in xrange(50000000000): ##这里需要创建执行的子进程非常多

try:

result.put(pool.apply_async(test, args=(i,)))

except:

break

def result_th():

while 1:

a = result.get().get() # 获取子进程返回值

if a:

pool.terminate() # 结束所有子进程

break

'''

利用多线程,同时运行Pool函数创建执行子进程,以及运行获取子进程返回值函数。

'''

t1 = threading.Thread(target=pool_th)

t2 = threading.Thread(target=result_th)

t1.start()

t2.start()

t1.join()

t2.join()

pool.join()

3、多进程共享资源

申请进程有两种方式一种是multiprocessing.Process(),另一种是multiprocessing.Pool(process=3).apply_async().

multiprocessing提供三种多进程之间共享数据的数据结构: Queue, Array 和Manager.

from multiprocessing import Queue, Array, Manager

Queue、和Array只适用Process类申请的多进程共享资源。

Manager可以适用Pool和Process类申请的多进程共享资源。

import time

from multiprocessing import Manager, Pool

lists = Manager().list() # 定义可被子进程共享的全局变量lists

def func(i):

# time.sleep(1)

lists.append(i)

print i

pool = Pool(processes=3)

for i in xrange(10000000):

if len(lists) <= 0:

pool.apply_async(func, args=(i,))

else:

break

pool.close()

pool.join()

print(lists)

输出结果为:且i最大值不定。主进程申请多进程量不等于池子大小。

二、多线程 Multiprocessing.dummy

1、子进程无返回值

Multiprocessing.dummy.Pool() 与Multiprocessing.Pool() 的用法一样

非阻塞方法

multiprocessing.dummy.Pool.apply_async() 和 multiprocessing.dummy.Pool.imap()

线程并发执行

阻塞方法

multiprocessing.dummy.Pool.apply()和 multiprocessing.dummy.Pool.map()

线程顺序执行

from multiprocessing.dummy import Pool as Pool

import time

def func(msg):

print('msg:', msg)

time.sleep(2)

print('end:')

pool = Pool(processes=3)

for i in range(1, 5):

msg = 'hello %d' % (i)

pool.apply_async(func, (msg,)) # 非阻塞

# pool.apply(func,(msg,)) # 阻塞,apply()源自内建函数,用于间接的调用函数,并且按位置把元祖或字典作为参数传入。

# pool.imap(func,[msg,]) # 非阻塞, 注意与apply传的参数的区别

# pool.map(func, [msg, ]) # 阻塞

print('Mark~~~~~~~~~~~~~~~')

pool.close()

pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

print('sub-process done')

2、子进程有返回值

与多进程一样,只有multiprocessing.dummy.Pool.apply_async()可以有返回值,apply,map,imap不可以设置返回值.

3、多进程共享资源

三、多线程 Threading

1、创建方法

直接使用Thread类

from threading import Thread

import time

def run(a = None, b = None) :

print a, b

time.sleep(1)

t = Thread(target = run, args = ("this is a", "thread"))

#此时线程是新建状态

print t.getName()#获得线程对象名称

print t.isAlive()#判断线程是否还活着。

t.start()#启动线程

t.join()#等待其他线程运行结束

继承Thread类

from threading import Thread

import time

class MyThread(Thread) :

def __init__(self, a) :

super(MyThread, self).__init__()

#调用父类的构造方法

self.a = a

def run(self) :

print "sleep :", self.a

time.sleep(self.a)

t1 = MyThread(2)

t2 = MyThread(4)

t1.start()

t2.start()

t1.join()

t2.join()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/74910
推荐阅读
相关标签
  

闽ICP备14008679号