赞
踩
最近在使用python处理一些数据处理,使用单核是在慢的难以忍受了,准备使用多线程看看,发现好多说在cpython解释器的python下,python多线程只是一个美梦,实际实现了下确实如此,至于啥原因不是特别清楚,想要并行处理,还是多进程吧,于是乎就寻找多进程资料,并自己做了些测试,亲身感受了下,多进程确实对并行处理提高了不少,我把这些测试记录一下:
实验1:
- import multiprocessing as mp
- import time
-
- num=1000000
- corenum=8
-
-
- def job(iternum,q=None):
- res = 0
- for i in range(iternum):
- res += i
- if q is not None:
- q.put(int(res/100000))
- else:
- return int(res/100000)
-
-
- def normal():
- res = 0
- for _ in range(corenum):
- res+=job(num)
-
- return res
-
-
- def process_queue():
- q = mp.Queue()
- ps=[]
- for i in range(corenum):
- p = mp.Process(target=job, args=(num,q))
- p.start()
- ps.append(p)
-
-
- for i in range(corenum):
- p = ps[i]
- p.join()
-
- res=0
- for i in range(corenum):
- res+=q.get()
-
- return res
-
-
- def pool_map():
- pool=mp.Pool(processes=corenum)
- a=(num,)*corenum
-
- res = pool.map(job,a)
- pool.close()
- pool.join()
-
- sres=0
- for i in range(corenum):
- sres+=res[i]
-
- return sres
-
-
-
- def pool_apply_async():
- pool = mp.Pool(processes=corenum)
-
- ress=[]
- for i in range(corenum):
- ress.append(pool.apply_async(job,(num,)))
-
- pool.close()
- pool.join()
-
- sres=0
- for i in range(corenum):
- sres+=ress[i].get()
-
- return sres
-
-
- def pool_apply():
- pool = mp.Pool(processes=corenum)
-
- ress=[]
- for i in range(corenum):
- ress.append(pool.apply(job,(num,)))
-
- pool.close()
- pool.join()
- sres=0
- for i in range(corenum):
- sres+=ress[i]
-
- return sres
-
-
-
- if __name__ == '__main__':
- prest = time.time()
- sumval=normal()
- curst = time.time()
- print('normal:sumval={},time={}'.format(sumval,curst - prest))
-
- prest = time.time()
- sumval =process_queue()
- curst = time.time()
- print('process_queue:sumval={},time={}'.format(sumval, curst - prest))
-
- prest = time.time()
- sumval =pool_map()
- curst = time.time()
- print('pool_map:sumval={},time={}'.format(sumval, curst - prest))
-
- prest = time.time()
- sumval =pool_apply_async()
- curst = time.time()
- print('pool_apply_async:sumval={},time={}'.format(sumval, curst - prest))
-
- prest = time.time()
- sumval=pool_apply()
- curst = time.time()
- print('pool_apply:sumval={},time={}'.format(sumval, curst - prest))
- 测试结果如下:
- normal: sumval = 39999960, time = 0.2951953411102295
- process_queue: sumval = 39999960, time = 0.0895075798034668
- pool_map: sumval = 39999960, time = 0.07907938957214355
- pool_apply_async: sumval = 39999960, time = 0.0779426097869873
- pool_apply: sumval = 39999960, time = 0.30626559257507324
-
- process_queue、pool_map和pool_apply_async相似,总体比另外两个效率高,normal: sumval和pool_apply相似
-
- 实验2:
- import multiprocessing as mp
- import time
- import math
-
- num = 1000000000
- corenum = 16
-
-
- def add(s,e,q=None):
- res = 0
- for i in range(s,e):
- res += i
- if q is not None:
- q.put(res)
- else:
- return res
-
-
- def normal():
- res=add(0,num,q=None)
-
- return res
-
-
- def pool_apply_async():
- pool = mp.Pool()
- pergroupnum=math.ceil(num/corenum)
-
- ress = []
- for i in range(corenum):
- s,e=pergroupnum*i,pergroupnum*(i+1)
- if e>num:
- e=num
- ress.append(pool.apply_async(add, (s,e,)))
-
- pool.close()
- pool.join()
-
- sres = 0
- for i in range(corenum):
- sres += ress[i].get()
-
- return sres
-
-
- if __name__ == '__main__':
- prest = time.time()
- sumval = normal()
- curst = time.time()
- print('normal:sumval={},time={}'.format(sumval, curst - prest))
-
- prest = time.time()
- sumval = pool_apply_async()
- curst = time.time()
- print('pool_apply_async:sumval={},time={}'.format(sumval, curst - prest))
测试结果:
normal: sumval = 499999999500000000, time = 260.55177640914917
pool_apply_async: sumval = 499999999500000000, time = 67.02703285217285
- import multiprocessing as mp
- import time
- import math
-
- num = 1000000000
- corenum = 16
-
- class mathopt:
- def add(self,s, e, q=None):
- res = 0
- for i in range(s, e):
- res += i
- if q is not None:
- q.put(res)
- else:
- return res
-
-
- def normal(self):
- res = self.add(0, num, q=None)
-
- return res
-
-
- def pool_apply_async(self):
- pool = mp.Pool()
- pergroupnum = math.ceil(num / corenum)
-
- ress = []
- for i in range(corenum):
- s, e = pergroupnum * i, pergroupnum * (i + 1)
- if e > num:
- e = num
- ress.append(pool.apply_async(self.add, (s, e)))
-
- pool.close()
- pool.join()
-
- sres = 0
- for i in range(corenum):
- sres += ress[i].get()
-
- return sres
-
-
- if __name__ == '__main__':
- prest = time.time()
- sumval = mathopt().normal()
- curst = time.time()
- print('normal:sumval={},time={}'.format(sumval, curst - prest))
-
- prest = time.time()
- sumval = mathopt().pool_apply_async()
- curst = time.time()
- print('pool_apply_async:sumval={},time={}'.format(sumval, curst - prest))
结果:
normal:sumval=499999999500000000,time=60.43701696395874
pool_apply_async:sumval=499999999500000000,time=4.397899150848389
在学习中有关好的博客一并放置在此,感谢这些无私的奉献者:
1、关于python进程池先close再join的疑惑
https://blog.csdn.net/budong282712018/article/details/79958592
2、python多进程任务拆分之apply_async()和map_async()
https://blog.csdn.net/S_o_l_o_n/article/details/86066704
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。