赞
踩
很多时候,当我们需要使用Python来处理大量的数据的时候,为了缩短处理的时间,我们会使用多线程
或多进程
来并行处理任务。
由于Python全局解释器锁
的存在,导致在执行多线程的时候实际上只有一个线程在运行,这使得多核CPU无法发挥它真正的效率。而多进程就可以很好的解决这个问题。如果你打开多进程的姿势不对,会导致它比单进程更慢,下面我们就来看看如何正确的打开多进程。
这个示例是基于Python对图片做一个预处理
import numpy as np
import multiprocessing as mp
import cv2,os,time
def preprocess(image_bytes,output_shape=(512,512)):
img_array = np.array(bytearray(image_bytes), dtype=np.uint8)
img_bgr = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
img = cv2.resize(img_bgr, (output_shape[1], output_shape[0]))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.transpose((2, 0, 1)).astype(np.float32)
img /= 255.0
return img
data_dir = "/data/images"
img_name_list = os.listdir(data_dir)
img_bytes_list = []
for img_name in img_name_list:
img_path = os.path.join(data_dir,img_name)
with open(img_path,"rb") as rf:
img_bytes = rf.read()
img_bytes_list.append(img_bytes)
这里我们直接通过循环调用图片的预处理函数,其实也就是单进程。处理了1349张
图片,一共花了将近10s
。这里我为了方便就没有采用多次调用来取平均值了,如果大家想要计算的更加准确,可以采用取平均时间。
start_time = time.time()
res_img_list = []
for img_bytes in img_bytes_list:
img = preprocess(img_bytes)
res_img_list.append(img)
print("img_num:%d,consume time:%.2f"%(len(img_name_list),time.time()-start_time))
#img_num:1349,consume time:9.86
start_time = time.time()
pool = mp.Pool(4)
#异步调用
res = pool.map_async(preprocess,img_bytes_list)
res_img_list = []
#获取处理结果
for res_img in res.get():
res_img_list.append(res_img)
print("img_num:%d,consume time:%.2f"%(len(img_name_list),time.time()-start_time))
#img_num:1349,consume time:12.45
使用4个进程居然花了将近13s
,按道理来说这不科学呀?4个进程的处理速度应该要快于单个进程,现在看来居然还更慢。也就是说,我们花了更多的硬件资源,居然还花费了更多的时间。这是为什么呢?
接下来看看,我们使用Queue
来改进使用多进程对图片进行预处理
start_time = time.time() process_num = 4 img_num = len(img_bytes_list) chunk_size = img_num // process_num res_queue = mp.Queue() img_index_list = range(img_num) def chunk_preprocess(start_index,end_index): for img_byte,img_index in zip(img_bytes_list[start_index:end_index], img_index_list[start_index:end_index]): pre_img = preprocess(img_byte) res_queue.put((img_index,pre_img)) task_list = [] for i in range(process_num): s_index = i * chunk_size if i == process_num - 1: e_index = img_num else: e_index = s_index + chunk_size p_task = mp.Process(target=chunk_preprocess,args=(s_index,e_index)) task_list.append(p_task) p_task.start() res_img_list = [""] * img_num get_queue_num = 0 while True: res_index,res_img = res_queue.get() res_img_list[res_index] = res_img get_queue_num += 1 if get_queue_num == img_num: break for task in task_list: task.join() print("img_num:%d,consume time:%.2f"%(len(img_name_list),time.time()-start_time)) #img_num:1349,consume time:6.90
惊讶的发现,当我们将进程池改为根据进程的个数来分发任务时,居然速度要快将近一倍左右。
特别注意
:这里其实使用多线程来处理会比多进程的速度更快,而且消耗的资源也要少点。举这个例子只是为了说明,影响多进程速度的原因。
进程池速度慢可能有下面几个原因:
Lock
处理共享的数据os.fork()
在上面的例子中,其实影响多进程速度的主要原因是因为调用preprocess
函数每次都会返回一个image array
占用的内存比较大,如果你将返回值由image array
改为一个字符串
你会发现最终它们的速度会差不多。
那为什么使用Queue
的速度会比进程池快那么多呢?这里主要也是因为进程池在保存数据与Queue
的差异导致的。
虽然说,我们在使用进程池的时候采用的也是异步调用的方式。但是,进程池在接受返回结果的时候使用了
self.wait(timeout)
,而进程池最终返回结果的顺序也和调用的时候保持一致。而Queue
在保存数据的时候,会通过后台的线程来写数据,所以它最终保存的结果是乱序的,相对来说它的速度会更快点。
参考:
4. https://docs.python.org/zh-cn/3/library/multiprocessing.html
5. https://stackoverflow.com/questions/20727375/multiprocessing-pool-slower-than-just-using-ordinary-functions
6. https://gist.github.com/MichaelCurrie/314d664ccaaadde8a7e8
7. https://stackoverflow.com/questions/43439194/python-multiprocessing-queue-vs-multiprocessing-manager-queue
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。