当前位置:   article > 正文

Python3 multiprocessing使用小测试_pool.map

pool.map

       最近在使用python处理一些数据处理,使用单核是在慢的难以忍受了,准备使用多线程看看,发现好多说在cpython解释器的python下,python多线程只是一个美梦,实际实现了下确实如此,至于啥原因不是特别清楚,想要并行处理,还是多进程吧,于是乎就寻找多进程资料,并自己做了些测试,亲身感受了下,多进程确实对并行处理提高了不少,我把这些测试记录一下:

实验1:

  1. import multiprocessing as mp
  2. import time
  3. num=1000000
  4. corenum=8
  5. def job(iternum,q=None):
  6. res = 0
  7. for i in range(iternum):
  8. res += i
  9. if q is not None:
  10. q.put(int(res/100000))
  11. else:
  12. return int(res/100000)
  13. def normal():
  14. res = 0
  15. for _ in range(corenum):
  16. res+=job(num)
  17. return res
  18. def process_queue():
  19. q = mp.Queue()
  20. ps=[]
  21. for i in range(corenum):
  22. p = mp.Process(target=job, args=(num,q))
  23. p.start()
  24. ps.append(p)
  25. for i in range(corenum):
  26. p = ps[i]
  27. p.join()
  28. res=0
  29. for i in range(corenum):
  30. res+=q.get()
  31. return res
  32. def pool_map():
  33. pool=mp.Pool(processes=corenum)
  34. a=(num,)*corenum
  35. res = pool.map(job,a)
  36. pool.close()
  37. pool.join()
  38. sres=0
  39. for i in range(corenum):
  40. sres+=res[i]
  41. return sres
  42. def pool_apply_async():
  43. pool = mp.Pool(processes=corenum)
  44. ress=[]
  45. for i in range(corenum):
  46. ress.append(pool.apply_async(job,(num,)))
  47. pool.close()
  48. pool.join()
  49. sres=0
  50. for i in range(corenum):
  51. sres+=ress[i].get()
  52. return sres
  53. def pool_apply():
  54. pool = mp.Pool(processes=corenum)
  55. ress=[]
  56. for i in range(corenum):
  57. ress.append(pool.apply(job,(num,)))
  58. pool.close()
  59. pool.join()
  60. sres=0
  61. for i in range(corenum):
  62. sres+=ress[i]
  63. return sres
  64. if __name__ == '__main__':
  65. prest = time.time()
  66. sumval=normal()
  67. curst = time.time()
  68. print('normal:sumval={},time={}'.format(sumval,curst - prest))
  69. prest = time.time()
  70. sumval =process_queue()
  71. curst = time.time()
  72. print('process_queue:sumval={},time={}'.format(sumval, curst - prest))
  73. prest = time.time()
  74. sumval =pool_map()
  75. curst = time.time()
  76. print('pool_map:sumval={},time={}'.format(sumval, curst - prest))
  77. prest = time.time()
  78. sumval =pool_apply_async()
  79. curst = time.time()
  80. print('pool_apply_async:sumval={},time={}'.format(sumval, curst - prest))
  81. prest = time.time()
  82. sumval=pool_apply()
  83. curst = time.time()
  84. print('pool_apply:sumval={},time={}'.format(sumval, curst - prest))
  1. 测试结果如下:
  2. normal: sumval = 39999960, time = 0.2951953411102295
  3. process_queue: sumval = 39999960, time = 0.0895075798034668
  4. pool_map: sumval = 39999960, time = 0.07907938957214355
  5. pool_apply_async: sumval = 39999960, time = 0.0779426097869873
  6. pool_apply: sumval = 39999960, time = 0.30626559257507324
  7. process_queue、pool_map和pool_apply_async相似,总体比另外两个效率高,normal: sumval和pool_apply相似
  8. 实验2:
  1. import multiprocessing as mp
  2. import time
  3. import math
  4. num = 1000000000
  5. corenum = 16
  6. def add(s,e,q=None):
  7. res = 0
  8. for i in range(s,e):
  9. res += i
  10. if q is not None:
  11. q.put(res)
  12. else:
  13. return res
  14. def normal():
  15. res=add(0,num,q=None)
  16. return res
  17. def pool_apply_async():
  18. pool = mp.Pool()
  19. pergroupnum=math.ceil(num/corenum)
  20. ress = []
  21. for i in range(corenum):
  22. s,e=pergroupnum*i,pergroupnum*(i+1)
  23. if e>num:
  24. e=num
  25. ress.append(pool.apply_async(add, (s,e,)))
  26. pool.close()
  27. pool.join()
  28. sres = 0
  29. for i in range(corenum):
  30. sres += ress[i].get()
  31. return sres
  32. if __name__ == '__main__':
  33. prest = time.time()
  34. sumval = normal()
  35. curst = time.time()
  36. print('normal:sumval={},time={}'.format(sumval, curst - prest))
  37. prest = time.time()
  38. sumval = pool_apply_async()
  39. curst = time.time()
  40. print('pool_apply_async:sumval={},time={}'.format(sumval, curst - prest))

测试结果:

 normal: sumval = 499999999500000000, time = 260.55177640914917

 pool_apply_async: sumval = 499999999500000000, time = 67.02703285217285

  1. import multiprocessing as mp
  2. import time
  3. import math
  4. num = 1000000000
  5. corenum = 16
  6. class mathopt:
  7. def add(self,s, e, q=None):
  8. res = 0
  9. for i in range(s, e):
  10. res += i
  11. if q is not None:
  12. q.put(res)
  13. else:
  14. return res
  15. def normal(self):
  16. res = self.add(0, num, q=None)
  17. return res
  18. def pool_apply_async(self):
  19. pool = mp.Pool()
  20. pergroupnum = math.ceil(num / corenum)
  21. ress = []
  22. for i in range(corenum):
  23. s, e = pergroupnum * i, pergroupnum * (i + 1)
  24. if e > num:
  25. e = num
  26. ress.append(pool.apply_async(self.add, (s, e)))
  27. pool.close()
  28. pool.join()
  29. sres = 0
  30. for i in range(corenum):
  31. sres += ress[i].get()
  32. return sres
  33. if __name__ == '__main__':
  34. prest = time.time()
  35. sumval = mathopt().normal()
  36. curst = time.time()
  37. print('normal:sumval={},time={}'.format(sumval, curst - prest))
  38. prest = time.time()
  39. sumval = mathopt().pool_apply_async()
  40. curst = time.time()
  41. print('pool_apply_async:sumval={},time={}'.format(sumval, curst - prest))

结果:

normal:sumval=499999999500000000,time=60.43701696395874
pool_apply_async:sumval=499999999500000000,time=4.397899150848389

在学习中有关好的博客一并放置在此,感谢这些无私的奉献者:

1、关于python进程池先close再join的疑惑

https://blog.csdn.net/budong282712018/article/details/79958592

2、python多进程任务拆分之apply_async()和map_async()

https://blog.csdn.net/S_o_l_o_n/article/details/86066704

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/181321
推荐阅读
相关标签
  

闽ICP备14008679号