赞
踩
1.并行处理
并行处理是一种在同一台计算机的多个处理器中同时运行任务的工作模式。这种工作模式的目的就是减少总的任务处理时间,但是进程之间的通信会有额外的开销,因此对小的任务而言,总的任务时间会有所增加而不是是减少。
在Python语言中,multiprocessing模块通过使用子进程(而不是线程)来运行独立的并行进程。它可以让您利用机器上的多个处理器(Windows和Unix),也就是说,多个进程可以完全独立的在内存中运行
2.自己的设备最多可以进行多少个并行处理
一次可以运行的最大进程数受计算机中处理器数量的限制。可以使用multiprocessing模块中的cpu_count()函数进行显示
import multiprocessing as mp print("Number of processers: ", mp.cpu_count())像我电脑只有四个:
3.同步执行和异步执行
并行处理中,有两种执行类型: 同步和异步
同步执行就是各个进程按照启动的先后顺序完成。这是通过锁定主程序直到相应的进程执行完毕来实现的。
异步执行,换句话说,进程的执行不涉及锁定,这样做的结果就是,进程结果返回的顺序可能混淆,但通常情况下,异步执行会更快完成。
multiprocessing模块中有两个对象是用来实现函数并行执行的:Pool类和Process类
4.解决实际问题实例 : 计算每行中给定数值范围内的元素个数
给定一个二维矩阵(或者列表和多维列表),计算每行中给定数值范围内的元素个数
import numpy as np from time import time # RandomState()是一个伪随机数生成器 np.random.RandomState(100) # 0, 10 : 生成0到10的随机整数 # size=[200000, 5] 即生成200000行,一列的 ndarray(二维矩阵的形式,每个里面5个元素) arr = np.random.randint(0, 10, size=[200000, 5]) data = arr.tolist() # 将numpy.ndarray 转化为list # 因为是随机的,所以每次的数字不确定 data = data[:5] print("数据为:", data) """ 运行结果: 数据为: [[5, 6, 7, 0, 9], [4, 0, 6, 7, 4], [7, 3, 8, 3, 9], [2, 1, 9, 3, 2], [0, 0, 9, 5, 2]] """4.1 不使用并行处理的参考代码
函数howmany_within_range()进行重复以检查在范围内的数有多少个病返回计数
"""不使用并行处理""" def howmany_within_range(row, minimum, maximum): count = 0 for n in row: if minimum <= n <= maximum: count += 1 return count result = [] for row in data: result.append(howmany_within_range(row, minimum=4, maximum=8)) print("给定数值范围中的元素个数:", result[:10]) """ 注意:以下只是参考输出,因为输入序列是随机的,每次输出结果并不固定 运行结果: 给定数值范围中的元素 [3, 2, 3, 4, 2, 3, 3, 2, 2, 2] """4.2 对函数进行并行化处理
对代码进行并行处理通常的做法是取出其中可以多次运行的特定函数,将其放在不同的处理器上并运行,要做到这一点,就需要使用Pool类对数目为n的处理器进行初始化,之后将想要并运行的函数传递给Pool类中并行方法。
multipprocessing.Pool()中提供了apply(),map()和starmap()等方法对传入的函数并行运行。
apply()和map() 之间又有什么区别呢?
apply()和map()都是把要进行并行化的函数作为主要参数,但是不同的是,apply() 接收args参数,通过args将各个参数传送给被并行化处理的函数,而map仅将一个迭代器作为参数。
因此使用,对于简单的可迭代的操作,使用map()进行并行处理更适合,而且能更快完成工作
4.2.1 Pool.apply()进行并行化处理
if __name__ == '__main__': # 1.初始化 multiprocessing.Pool() pool = mp.Pool(mp.cpu_count()) # 2.使用apply(), 将函数howmany_within_range作为主参传进去 results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data] # 3. 不要忘记关闭进程 pool.close() print(results[:10])注意: 使用 if __name__ == '__main__': 将你的代码放到下面去执行,不然会报错
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.我们如果在这段程序之外打印,会发现,程序会有个并行进行运行,也就多打印程序外的内容多次
4.2.2 Parallelizing using Pool.map()
Pool.map()仅接受一个迭代器参数。对howmany_within_range()函数进行简单的修改,修改为howmany_within_range_rowonly()把minimum和maximum设置为固定值,即为 只接受行数据列表迭代器作为输入,不是最好的办法,但清楚的显示了它与apply()的不同之处
import multiprocessing as mp def howmany_within_range_rowonly(row, minimum=4, maximum=8): count = 0 for n in row: if minimum <= n <= maximum: count += 1 return count pool = mp.Pool(mp.cpu_count()) results = pool.map(howmany_within_range_rowonly,[row for row in data]) pool.close() print(results[:10])4.2.3 使用Pool.starmap()进行并行化
与Pool.map()一样,Pool.starmap()也只仅接受一个迭代器参数,但在starmap()中,迭代器中的每一个元件也是一个迭代器。你可以通过这个内部迭代器向被并行化处理的函数传递参数,在执行时再顺序解开,只要传递和解开的顺序一致就行
实际上,Pool.starmap()就像是一个接受参数的Pool.map()版本
import multiprocessing as mp pool = mp.Pool(mp.cpu_count()) results = pool.starmap(howmany_within_range, [(row, 4, 8) for row in data]) pool.close() print(results[:10])5.异步并行处理
和同步并行处理对等的异步并行处理函数apply_async(),map_async()和starmap_async()允许以异步方式并行执行进程,即下一个进程可以在前一个进程完成时立即启动,而不考虑启动顺序。因此,无法保证结果与输入的顺序相同
6. 使用Pool.apply_async()进行并行化
持续更新
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。