当前位置:   article > 正文

Python多进程(process)(一)进程和进程池_python 进程池

python 进程池

Python实用教程_spiritx的博客-CSDN博客

在Python中因为有GIL的原因,线程能提供的并发效果并不理想,Python在多核上如果要释放并发的性能,更多的依靠多进程,我们会比线程更深入的去学习多进程。

进程(Process),顾名思义,就是进行中的程序。有一句话说得好:程序是一个没有生命的实体,只有处理器赋予程序生命时,它才能成为一个活动的实体。进程是资源分配的最小单元,也就是说每个进程都有其单独的内存空间。

Unix/Linux系统通过fork系统调用创建一个进程,但是在Windows中并没有fork调用。但是别担心,Python中内置的multiprocessing模块是跨平台的,我们可以通过对multiprocess模块中的Process类进行实例化创建一个进程对象,如:

  1. import os
  2. from multiprocessing import Process
  3. def run_a_sub_proc(name):
  4. print(f'子进程:{name}{os.getpid()})开始...')
  5. if __name__ == '__main__':
  6. print(f'主进程({os.getpid()})开始...')
  7. # 通过对Process类进行实例化创建一个子进程
  8. p = Process(target=run_a_sub_proc, args=('测试进程', ))
  9. p.start()
  10. p.join()
'
运行

Process对象

定义:

class multiprocessing.Process(group=Nonetarget=Nonename=Noneargs=()kwargs={}*daemon=None)

进程对象表示在单独进程中运行的活动。

Process 类拥有和 threading.Thread 等价的大部分方法。

  • 应始终使用关键字参数调用构造函数。
  • group 应该始终是 None ,它仅用于兼容 threading.Thread 。
  • target 是由 run() 方法调用的可调用对象,可以是一个函数,也可以是一个可调用的函数对象,默认为 None ,None意味着什么都没有被调用。
  • name 是进程名称。
  • args 是目标调用的参数元组,元组中的每个元素对应一个函数的参数。
  • kwargs 是目标调用的关键字参数字典。
  • daemon 将进程 daemon 标志设置为 True 或 False 。如果是 None (默认值),则该标志将从创建的进程继承。

在默认情况下,不会将任何参数传递给 target。 args 参数默认值为 (),可被用来指定要传递给 target 的参数列表或元组。

属性和方法

属性和方法名说明
name进程的名称。该名称是一个字符串,仅用于识别目的。它没有语义。可以为多个进程指定相同的名称。
初始名称由构造器设定。 如果没有为构造器提供显式名称,则会构造一个形式为 'Process-N1:N2:...:Nk' 的名称,其中每个 Nk 是其父亲的第 N 个孩子。
run()Process对象最重要的方法,表示进程活动的方法,可以在子类中重载此方法。
start()启动进程活动。这个方法每个进程对象最多只能调用一次。它会将对象的 run() 方法安排在一个单独的进程中调用。
join([timeout])如果可选参数 timeout 是 None (默认值),则该方法将阻塞,直到调用 join() 方法的进程终止。如果 timeout 是一个正数,它最多会阻塞 timeout 秒。请注意,如果进程终止或方法超时,则该方法返回 None 。检查进程的 exitcode 以确定它是否终止。
一个进程可以被 join 多次。
进程无法join自身,因为这会导致死锁。尝试在启动进程之前join进程是错误的。
is_alive()返回进程是否还活着。粗略地说,从 start() 方法返回到子进程终止之前,进程对象仍处于活动状态。
daemon进程的守护标志,一个布尔值。这必须在 start() 被调用之前设置。
初始值继承自创建进程。
当进程退出时,它会尝试终止其所有守护进程子进程。
请注意,不允许在守护进程中创建子进程。这是因为当守护进程由于父进程退出而中断时,其子进程会变成孤儿进程。 另外,这些 不是 Unix 守护进程或服务,它们是正常进程,如果非守护进程已经退出,它们将被终止(并且不被合并)。
pid返回进程ID。在生成该进程之前,这将是 None 。
exitcode子进程的退出代码。如果该进程尚未终止则为 None 。
如果子进程的 run() 方法正常返回,退出代码将是 0 。 如果它通过 sys.exit() 终止,并有一个整数参数 N ,退出代码将是 N 。
如果子进程由于在 run() 内的未捕获异常而终止,退出代码将是 1 。 如果它是由信号 N 终止的,退出代码将是负值 -N 。
authkey进程的身份验证密钥(字节字符串)。
当 multiprocessing 初始化时,主进程使用 os.urandom() 分配一个随机字符串。
当创建 Process 对象时,它将继承其父进程的身份验证密钥,可以通过将 authkey 设置为另一个字节字符串来更改。
sentinel系统对象的数字句柄
terminate()终止进程,在UNIX系统中,使用SIGTERM进行终止,在windows系统,调用TerminateProcess进行终止,请注意,进程的后代进程将不会被终止 —— 它们将简单地变成孤立的。
kill()在UNIX系统中,使用SIGTERM终止进程
close()关闭 Process 对象,释放与之关联的所有资源。如果底层进程仍在运行,则会引发 ValueError 。一旦 close() 成功返回, Process 对象的大多数其他方法和属性将引发 ValueError 。

注意 start() 、 join() 、 is_alive() 、 terminate() 和 exitcode 方法只能由创建进程对象的进程调用。

子进程的创建

与线程一样,子进程也有两种创建方式,一种是直接使用process对象来拉起一个对象,另外一种是使用process的子类。

通过Process创建子进程

和使用 thread 类创建子线程的方式非常类似,使用 Process 类创建实例化对象,其本质是调用该类的构造方法创建新进程。Process 类的构造方法格式如下:

def __init__(self,group=None,target=None,name=None,args=(),kwargs={})

其中,各个参数的含义为:

  • group:该参数未进行实现,不需要传参;
  • target:为新建进程指定执行任务,也就是指定一个函数或者一个可执行对象;
  • name:为新建进程设置名称;
  • args:为 target 参数指定的参数传递非关键字参数;
  • kwargs:为 target 参数指定的参数传递关键字参数。

  1. import os, time
  2. from multiprocessing import Process
  3. URLS=['https://blog.csdn.net/spiritx/article/details/132783171',
  4. 'https://blog.csdn.net/spiritx/article/details/132782806',
  5. 'https://blog.csdn.net/spiritx/article/details/132778558',
  6. 'https://blog.csdn.net/spiritx/article/details/132773698']
  7. def downloadUrl(url, nsecs):
  8. time.sleep(1)
  9. print(f'进程ID={os.getpid()}, {__name__=} download: {url}')
  10. # p1 = Process(target=downloadUrl, args=(URLS[0], 3)) #RuntimeError
  11. # p1.start()
  12. # downloadUrl(URLS[1], 4)
  13. # p1.join()
  14. if __name__ == '__main__':
  15. p1 = Process(target=downloadUrl, args=(URLS[0], 3))
  16. p2 = Process(target=downloadUrl, args=(URLS[1], 4))
  17. p1.start()
  18. p2.start()
  19. downloadUrl(URLS[3], 4)
  20. p1.join()
  21. p2.join()
  22. ‘’'
  23. 进程ID=64482, __name__='__main__' download: https://blog.csdn.net/spiritx/article/details/132773698
  24. 进程ID=64484, __name__='__mp_main__' download: https://blog.csdn.net/spiritx/article/details/132783171
  25. 进程ID=64485, __name__='__mp_main__' download: https://blog.csdn.net/spiritx/article/details/132782806
  26. ‘''

注意:因为进程和子进程复用同一套代码,一定要写if __name__ == '__main__':这段代码,否则会出现RuntimeError的异常。

从输出来看,子进程和父进程的__name__并不是一个,父进程是__main__,而子进程是__mp_main__

通过Process子类创建子进程

和使用 thread 子类创建线程的方式类似,除了直接使用 Process 类创建进程,还可以通过创建 Process 的子类来创建进程。
需要注意的是,在创建 Process 的子类时,需在子类重写 run() 方法。实际上,该方法所起到的作用,就如同第一种创建方式中 target 参数执行的函数。
另外,通过 Process 子类创建进程,和使用 Process 类一样,先创建该类的实例对象,然后调用 start() 方法启动该进程。下面程序演示如何通过 Process 子类创建一个进程。

  1. import os, time
  2. import multiprocessing
  3. URLS=['https://blog.csdn.net/spiritx/article/details/132783171',
  4. 'https://blog.csdn.net/spiritx/article/details/132782806',
  5. 'https://blog.csdn.net/spiritx/article/details/132778558',
  6. 'https://blog.csdn.net/spiritx/article/details/132773698']
  7. def downloadUrl(url='', secs=1):
  8. time.sleep(secs)
  9. print(f'进程ID={os.getpid()}, {__name__=} download: {url}')
  10. class MyChildProcess(multiprocessing.Process):
  11. def __init__(self, url, secs):
  12. #super.__init__(self) #TypeError: descriptor '__init__' requires a 'super' object but received a 'MyChildProcess'
  13. multiprocessing.Process.__init__(self)
  14. self.url = url
  15. self.secs = secs
  16. def run(self):
  17. downloadUrl(self.url, self.secs)
  18. if __name__ == '__main__':
  19. p1 = MyChildProcess(URLS[0], 3)
  20. p2 = MyChildProcess(URLS[1], 4)
  21. p1.start()
  22. p2.start()
  23. downloadUrl(URLS[3], 4)
  24. p1.join()
  25. p2.join()
  26. ‘’'
  27. 进程ID=65688, __name__='__mp_main__' download: https://blog.csdn.net/spiritx/article/details/132783171
  28. 进程ID=65686, __name__='__main__' download: https://blog.csdn.net/spiritx/article/details/132773698
  29. 进程ID=65689, __name__='__mp_main__' download: https://blog.csdn.net/spiritx/article/details/132782806
  30. ‘''

有几个地方要特别注意:

  • 注意一定要重写__init__()和run()方法,如果不重写__init__()方法,参数要和Process一致。
  • 在子类的__init__()方法中一定要调用Process.__init__()方法,写super.__init__()方法会报TypeError的异常。

进程池

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes — 进程池中进程数量,如果为 None,则使用 os.cpu_count() 返回的值
  • initializer — 如果该参数不为 None,则所有进程池中的进程启动时都会先执行 initializer(*initargs)
  • maxtasksperchild — 如果该参数不为 None,则进程在执行 maxtasksperchild 次任务后会被自动销毁、重启
  • context — 用于指定进程池中进程运行的上下文

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程他的任务结束,新的任务会交给这个空闲进程去处理。

  1. import os, time
  2. import multiprocessing
  3. import random
  4. URLS=['https://blog.csdn.net/spiritx/article/details/132783171',
  5. 'https://blog.csdn.net/spiritx/article/details/132782806',
  6. 'https://blog.csdn.net/spiritx/article/details/132778558',
  7. 'https://blog.csdn.net/spiritx/article/details/132773698',
  8. 'https://blog.csdn.net/spiritx/article/details/5',
  9. 'https://blog.csdn.net/spiritx/article/details/6',
  10. 'https://blog.csdn.net/spiritx/article/details/7',
  11. 'https://blog.csdn.net/spiritx/article/details/8',
  12. 'https://blog.csdn.net/spiritx/article/details/9']
  13. INDEXS=[1,2,3,4,5,6,7,8,9]
  14. PARAMS=list(zip(URLS, INDEXS)) #通过zip()方法将多个参数压缩成list对象
  15. def downloadUrl(url, index):
  16. #url, index = param # 对多个参数进行解包
  17. print(f'进程ID={os.getpid()}, {__name__=} start download: {url=},{index=}')
  18. time.sleep(random.randint(1,3))
  19. print(f'进程ID={os.getpid()}, {__name__=} end download: {url=},{index=}')
  20. return index
  21. if __name__ == '__main__':
  22. pool = multiprocessing.Pool(4)
  23. results = [pool.apply_async(downloadUrl, param) for param in PARAMS]
  24. pool.close()
  25. pool.join()
  26. for result in results:
  27. print(f'{result.get()=}')
  28. ‘’'
  29. 进程ID=75676, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/132783171',index=1
  30. 进程ID=75677, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/132782806',index=2
  31. 进程ID=75675, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/132778558',index=3
  32. 进程ID=75678, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/132773698',index=4
  33. 进程ID=75677, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/132782806',index=2
  34. 进程ID=75677, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  35. 进程ID=75675, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/132778558',index=3
  36. 进程ID=75675, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  37. 进程ID=75678, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/132773698',index=4
  38. 进程ID=75678, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  39. 进程ID=75676, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/132783171',index=1
  40. 进程ID=75676, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  41. 进程ID=75677, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  42. 进程ID=75677, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  43. 进程ID=75676, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  44. 进程ID=75675, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  45. 进程ID=75678, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  46. 进程ID=75677, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  47. result.get()=1
  48. result.get()=2
  49. result.get()=3
  50. result.get()=4
  51. result.get()=5
  52. result.get()=6
  53. result.get()=7
  54. result.get()=8
  55. result.get()=9
  56. ‘''

从上面的结果可以看出,75677进程在处理任务结束后,立即就得到了一个新的任务进行处理。

属性和方法

属性和方法名说明
apply(unc[, args=()[, kwds={}]])同步执行,很少用。使用 args 参数以及 kwds 命名参数调用 func , 它会返回结果前阻塞。不建议使用,并且3.x以后不再出现
apply_async(func[, args[, kwds[, callback[, error_callback]]]])异步执行。apply() 方法的一个变种,返回一个 AsyncResult 对象。
如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback 。
如果指定了 error_callback , 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时, 会将抛出的异常对象作为参数传递给 error_callback 执行。
回调函数应该立即执行完成,否则会阻塞负责处理结果的进程。
map(funciterable[, chunksize])内置 map() 函数的并行版本 。 它会保持阻塞直到获得结果。
这个方法会将可迭代对象分割为许多块,然后提交给进程池。可以将 chunksize 设置为一个正整数从而(近似)指定每个块的大小可以。
注意对于很长的迭代对象,可能消耗很多内存。可以考虑使用 imap() 或 imap_unordered() 并且显式指定 chunksize 以提升效率。
map_async(funciterable[, chunksize[, callback[, error_callback]]])map() 方法的一个变种,返回一个 AsyncResult 对象。
如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback 。
如果指定了 error_callback , 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时, 会将抛出的异常对象作为参数传递给 error_callback 执行。
回调函数应该立即执行完成,否则会阻塞负责处理结果的进程。
imap(funciterable[, chunksize])map() 的延迟执行版本。
chunksize 参数的作用和 map() 方法的一样。对于很长的迭代器,给 chunksize 设置一个很大的值会比默认值 1 极大 地加快执行速度。
同样,如果 chunksize 是 1 , 那么 imap() 方法所返回的迭代器的 next() 方法拥有一个可选的 timeout 参数: 如果无法在 timeout 秒内执行得到结果,则 next(timeout) 会抛出 multiprocessing.TimeoutError 异常。
imap_unordered(funciterable[, chunksize])和 imap() 相同,只不过通过迭代器返回的结果是任意的。(当进程池中只有一个工作进程的时候,返回结果的顺序才能认为是"有序"的)
starmap(funciterable[, chunksize])和 map() 类似,不过 iterable 中的每一项会被解包再作为函数参数。
比如可迭代对象 [(1,2), (3, 4)] 会转化为等价于 [func(1,2), func(3,4)] 的调用。
starmap_async(funciterable[, chunksize[, callback[, error_callback]]])相当于 starmap() 与 map_async() 的结合,迭代 iterable 的每一项,解包作为 func 的参数并执行,返回用于获取结果的对象。
close()阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出。
terminate()不必等待未完成的任务,立即停止工作进程。当进程池对象被垃圾回收时,会立即调用 terminate()。
join()等待工作进程结束。调用 join() 前必须先调用 close() 或者 terminate() 

进程池提交任务的方法

进程池有多种方式可以提交任务,这里举例说明几种常用的方法。

一般方法apply()和apply_async()

apply(func [,args [,kwds ] ] )

使用参数args和关键字参数kwds调用func。它会阻塞,直到结果准备就绪。鉴于此块,更适合并行执行工作。此外,func 仅在池中的一个工作程序中执行。

  1. import os
  2. from multiprocessing import Pool
  3. import time
  4. def foo(p):
  5. print(f'进程ID{os.getpid()},{p=}')
  6. time.sleep(0.1)
  7. if __name__=="__main__":
  8. pool = Pool(processes=3)
  9. for i in range(10):
  10. pool.apply(foo, args=(i,))
  11. print('子进程提交完成')
  12. pool.close()
  13. pool.join()
  14. ‘’'
  15. 进程ID69407,p=0
  16. 进程ID69406,p=1
  17. 进程ID69409,p=2
  18. 进程ID69407,p=3
  19. 进程ID69406,p=4
  20. 进程ID69409,p=5
  21. 进程ID69407,p=6
  22. 进程ID69406,p=7
  23. 进程ID69409,p=8
  24. 进程ID69407,p=9
  25. 子进程提交完成
  26. ‘''

  1. import os
  2. from multiprocessing import Pool
  3. import time
  4. def foo(p):
  5. print(f'进程ID{os.getpid()} 开始....{p=}')
  6. time.sleep(0.1)
  7. print(f'进程ID{os.getpid()} 结束.{p=}')
  8. if __name__=="__main__":
  9. pool = Pool(processes=3)
  10. for i in range(10):
  11. pool.apply(foo, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
  12. print(f'进程{i}启动...')
  13. print('子进程提交完成')
  14. pool.close()
  15. pool.join()
  16. ‘’'
  17. 进程ID69541 开始....p=0
  18. 进程ID69541 结束.p=0
  19. 进程0启动...
  20. 进程ID69542 开始....p=1
  21. 进程ID69542 结束.p=1
  22. 进程1启动...
  23. 进程ID69543 开始....p=2
  24. 进程ID69543 结束.p=2
  25. 进程2启动...
  26. 进程ID69541 开始....p=3
  27. 进程ID69541 结束.p=3
  28. 进程3启动...
  29. 进程ID69542 开始....p=4
  30. 进程ID69542 结束.p=4
  31. 进程4启动...
  32. 进程ID69543 开始....p=5
  33. 进程ID69543 结束.p=5
  34. 进程5启动...
  35. 进程ID69541 开始....p=6
  36. 进程ID69541 结束.p=6
  37. 进程6启动...
  38. 进程ID69542 开始....p=7
  39. 进程ID69542 结束.p=7
  40. 进程7启动...
  41. 进程ID69543 开始....p=8
  42. 进程ID69543 结束.p=8
  43. 进程8启动...
  44. 进程ID69541 开始....p=9
  45. 进程ID69541 结束.p=9
  46. 进程9启动...
  47. 子进程提交完成
  48. ‘''

从上面的例子可以看到,apply()是顺序启动进程的,在被启动的进程执行完后,再启动下一个进程,实际是没有任何并发执行。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

异步进程池(非阻塞),返回结果对象的方法的变体。如果指定了回调,则它应该是可调用的,它接受单个参数。当结果变为就绪时,将对其应用回调,即除非调用失败,在这种情况下将应用error_callback。如果指定了error_callback,那么它应该是一个可调用的,它接受一个参数。如果目标函数失败,则使用异常实例调用error_callback。回调应立即完成,否则处理结果的线程将被阻止。

  1. import os
  2. from multiprocessing import Pool
  3. import time
  4. def foo(p):
  5. print(f'进程ID{os.getpid()} 开始....{p=}')
  6. time.sleep(0.1)
  7. print(f'进程ID{os.getpid()} 结束.{p=}')
  8. if __name__=="__main__":
  9. pool = Pool(processes=3)
  10. for i in range(10):
  11. pool.apply_async(foo, args=(i,))
  12. print(f'进程{i}启动...')
  13. print('子进程提交完成')
  14. pool.close()
  15. pool.join()
  16. ’’’
  17. 进程0启动...
  18. 进程1启动...
  19. 进程2启动...
  20. 进程3启动...
  21. 进程4启动...
  22. 进程5启动...
  23. 进程6启动...
  24. 进程7启动...
  25. 进程8启动...
  26. 进程9启动...
  27. 子进程提交完成
  28. 进程ID69599 开始....p=0
  29. 进程ID69598 开始....p=1
  30. 进程ID69600 开始....p=2
  31. 进程ID69599 结束.p=0
  32. 进程ID69599 开始....p=3
  33. 进程ID69598 结束.p=1
  34. 进程ID69598 开始....p=4
  35. 进程ID69600 结束.p=2
  36. 进程ID69600 开始....p=5
  37. 进程ID69599 结束.p=3
  38. 进程ID69599 开始....p=6
  39. 进程ID69598 结束.p=4
  40. 进程ID69598 开始....p=7
  41. 进程ID69600 结束.p=5
  42. 进程ID69600 开始....p=8
  43. 进程ID69599 结束.p=6
  44. 进程ID69599 开始....p=9
  45. 进程ID69598 结束.p=7
  46. 进程ID69600 结束.p=8
  47. 进程ID69599 结束.p=9
  48. ‘’‘

从上面的例子可以看到,apply_async()是异步启动的,子进程还未完全启动,apply_async()就已经返回。

AsyncResult对象

AsyncResult是Pool.apply_async() 和 Pool.map_async() 返回的对象所属的类。

主要方法有

方法说明
get([timeout])用于获取执行结果。如果 timeout 不是 None 并且在 timeout 秒内仍然没有执行完得到结果,则抛出 multiprocessing.TimeoutError 异常。如果远程调用发生异常,这个异常会通过 get() 重新抛出。
wait([timeout])阻塞,直到返回结果,或者 timeout 秒后超时。
ready()返回执行状态,是否已经完成。
successful()判断调用是否已经完成并且未引发异常。 如果还未获得结果则将引发 ValueError。
  1. import os, time
  2. import multiprocessing
  3. URLS=['https://blog.csdn.net/spiritx/article/details/132783171',
  4. 'https://blog.csdn.net/spiritx/article/details/132782806',
  5. 'https://blog.csdn.net/spiritx/article/details/132778558',
  6. 'https://blog.csdn.net/spiritx/article/details/132773698']
  7. def downloadUrl(url, nsecs):
  8. time.sleep(1)
  9. print(f'进程ID={os.getpid()}, {__name__=} download: {url}')
  10. return os.getpid()
  11. if __name__ == '__main__':
  12. pool = multiprocessing.Pool(4)
  13. results = [pool.apply_async(downloadUrl, (url, 3)) for url in URLS]
  14. pool.close()
  15. pool.join()
  16. for result in results:
  17. print(f'{result.get()=}')
  18. ‘’'
  19. 进程ID=72390, __name__='__mp_main__' download: https://blog.csdn.net/spiritx/article/details/132783171
  20. 进程ID=72391, __name__='__mp_main__' download: https://blog.csdn.net/spiritx/article/details/132778558
  21. 进程ID=72392, __name__='__mp_main__' download: https://blog.csdn.net/spiritx/article/details/132782806
  22. 进程ID=72393, __name__='__mp_main__' download: https://blog.csdn.net/spiritx/article/details/132773698
  23. result.get()=72390
  24. result.get()=72392
  25. result.get()=72391
  26. result.get()=72393
  27. ‘''

多个参数的可调用对象

apply()和apply_async()支持多参数的可调用对象,但是在传入参数的时候,需要将多个参数封装为tuple进行传入,apply()和apply_async()会自动解包传给可调用对象。

如下:

  1. import os, time
  2. import multiprocessing
  3. import random
  4. URLS=['https://blog.csdn.net/spiritx/article/details/132783171',
  5. 'https://blog.csdn.net/spiritx/article/details/132782806',
  6. 'https://blog.csdn.net/spiritx/article/details/132778558',
  7. 'https://blog.csdn.net/spiritx/article/details/132773698']
  8. INDEXS=[1,2,3,4,5,6,7,8,9]
  9. PARAMS=list(zip(URLS, INDEXS)) #通过zip()方法将多个参数压缩成list对象
  10. def downloadUrl(url, index): #使用*param也是可以的,需要在函数里解包
  11. #url, index = param # 对多个参数进行解包
  12. print(f'进程ID={os.getpid()}, {__name__=} start download: {url=},{index=}')
  13. time.sleep(random.randint(1,3))
  14. print(f'进程ID={os.getpid()}, {__name__=} end download: {url=},{index=}')
  15. return index
  16. if __name__ == '__main__':
  17. pool = multiprocessing.Pool(4)
  18. results = [pool.apply_async(downloadUrl, param) for param in PARAMS]
  19. pool.close()
  20. pool.join()
  21. for result in results:
  22. print(f'{result.get()=}')
  23. ’’’
  24. 进程ID=72786, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/132783171',index=1
  25. 进程ID=72785, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/132782806',index=2
  26. 进程ID=72784, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/132778558',index=3
  27. 进程ID=72787, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/132773698',index=4
  28. 进程ID=72787, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/132773698',index=4
  29. 进程ID=72786, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/132783171',index=1
  30. 进程ID=72785, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/132782806',index=2
  31. 进程ID=72784, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/132778558',index=3
  32. result.get()=1
  33. result.get()=2
  34. result.get()=3
  35. result.get()=4
  36. ''

注意:apply_async的可调用对象接收的是多个参数。

map()和map_async()

内置 map() 函数的并行版本 。 它会保持阻塞直到获得结果。

map(funciterable[, chunksize])

这个方法会将可迭代对象分割为许多块,然后提交给进程池。可以将 chunksize 设置为一个正整数从而(近似)指定每个块的大小可以。

注意,map()和map_async()的可调用对象不支持多个参数,需要将多参数包装为tuple,或者使用starmap()和starmap_async()。

  1. import os, time
  2. import multiprocessing
  3. import random
  4. URLS=['https://blog.csdn.net/spiritx/article/details/1',
  5. 'https://blog.csdn.net/spiritx/article/details/2',
  6. 'https://blog.csdn.net/spiritx/article/details/3',
  7. 'https://blog.csdn.net/spiritx/article/details/4',
  8. 'https://blog.csdn.net/spiritx/article/details/5',
  9. 'https://blog.csdn.net/spiritx/article/details/6',
  10. 'https://blog.csdn.net/spiritx/article/details/7',
  11. 'https://blog.csdn.net/spiritx/article/details/8',
  12. 'https://blog.csdn.net/spiritx/article/details/9']
  13. INDEXS=[1,2,3,4,5,6,7,8,9]
  14. PARAMS=list(zip(URLS, INDEXS)) #通过zip()方法将多个参数压缩成list对象
  15. def downloadUrl(param): #不能接受多个参数
  16. url,index = param #对多个参数进行解包
  17. print(f'进程ID={os.getpid()}, {__name__=} start download: {url=},{index=}')
  18. time.sleep(random.randint(1,3))
  19. print(f'进程ID={os.getpid()}, {__name__=} end download: {url=},{index=}')
  20. return index
  21. if __name__ == '__main__':
  22. pool = multiprocessing.Pool(4)
  23. pool.map(downloadUrl, PARAMS)
  24. print('pool.map submit ok.')
  25. pool.close()
  26. pool.join()
  27. ‘’'
  28. 进程ID=72929, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  29. 进程ID=72930, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  30. 进程ID=72932, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  31. 进程ID=72931, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  32. 进程ID=72929, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  33. 进程ID=72929, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  34. 进程ID=72930, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  35. 进程ID=72930, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  36. 进程ID=72930, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  37. 进程ID=72929, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  38. 进程ID=72930, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  39. 进程ID=72929, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  40. 进程ID=72932, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  41. 进程ID=72932, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  42. 进程ID=72931, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  43. 进程ID=72929, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  44. 进程ID=72930, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  45. 进程ID=72932, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  46. pool.map submit ok.
  47. ‘''

从上面的输出可以看出,pool.map submit ok是在最后输出的,start download是按顺序输出的,但end download没有按顺序输出,说明:

  • pool.map是按顺序启动的子进程
  • 子进程是并行执行的
  • 主进程在子进程执行完后,才会执行map之后的代码。

map_async(funciterable[, chunksize[, callback[, error_callback]]])

map_async()是map() 方法的一个变种,返回一个 AsyncResult 对象。
如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback 。
如果指定了 error_callback , 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时, 会将抛出的异常对象作为参数传递给 error_callback 执行。
回调函数应该立即执行完成,否则会阻塞负责处理结果的进程。

  1. import os, time
  2. import multiprocessing
  3. import random
  4. URLS = ['https://blog.csdn.net/spiritx/article/details/1',
  5. 'https://blog.csdn.net/spiritx/article/details/2',
  6. 'https://blog.csdn.net/spiritx/article/details/3',
  7. 'https://blog.csdn.net/spiritx/article/details/4',
  8. 'https://blog.csdn.net/spiritx/article/details/5',
  9. 'https://blog.csdn.net/spiritx/article/details/6',
  10. 'https://blog.csdn.net/spiritx/article/details/7',
  11. 'https://blog.csdn.net/spiritx/article/details/8',
  12. 'https://blog.csdn.net/spiritx/article/details/9']
  13. INDEXS = [1, 2, 3, 4, 5, 6, 7, 8, 9]
  14. PARAMS = list(zip(URLS, INDEXS)) # 通过zip()方法将多个参数压缩成list对象
  15. def downloadUrl(param): # 只支持一个参数
  16. url, index = param # 对多个参数进行解包
  17. print(f'进程ID={os.getpid()}, {__name__=} start download: {url=},{index=}')
  18. time.sleep(random.randint(1, 3))
  19. print(f'进程ID={os.getpid()}, {__name__=} end download: {url=},{index=}')
  20. return index
  21. if __name__ == '__main__':
  22. pool = multiprocessing.Pool(4)
  23. pool.map_async(downloadUrl, PARAMS)
  24. print('pool.map submit ok.')
  25. pool.close()
  26. pool.join()
  27. ‘’'
  28. pool.map submit ok.
  29. 进程ID=73087, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  30. 进程ID=73088, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  31. 进程ID=73089, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  32. 进程ID=73090, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  33. 进程ID=73087, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  34. 进程ID=73087, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  35. 进程ID=73088, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  36. 进程ID=73088, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  37. 进程ID=73089, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  38. 进程ID=73089, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  39. 进程ID=73087, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  40. 进程ID=73087, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  41. 进程ID=73090, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  42. 进程ID=73090, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  43. 进程ID=73087, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  44. 进程ID=73088, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  45. 进程ID=73089, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  46. 进程ID=73090, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  47. ‘''

从上面的例子可以看出,pool.map submit打印在最前面,start download是顺序启动的:

  • map_async在启动子进程时,不等子进程执行完,立即就执行主进程后面的语句
  • 子进程是按序启动的,并且都是并发的
  • 子进程与主进程也是并发的

imap()和imap_unordered()

imap(funciterable[, chunksize])

map() 的延迟执行版本。
chunksize 参数的作用和 map() 方法的一样。对于很长的迭代器,给 chunksize 设置一个很大的值会比默认值 1 极大 地加快执行速度。
同样,如果 chunksize 是 1 , 那么 imap() 方法所返回的迭代器的 next() 方法拥有一个可选的 timeout 参数: 如果无法在 timeout 秒内执行得到结果,则 next(timeout) 会抛出 multiprocessing.TimeoutError 异常。

所谓延迟执行版本,就是并不是所有的子进程都执行完了,才能获得结果,可以在部分进程执行完后,就拿到结果。

下面是map()与imap()拿结果的对比

map版本:

  1. import os, time
  2. import multiprocessing
  3. import random
  4. URLS = ['https://blog.csdn.net/spiritx/article/details/1',
  5. 'https://blog.csdn.net/spiritx/article/details/2',
  6. 'https://blog.csdn.net/spiritx/article/details/3',
  7. 'https://blog.csdn.net/spiritx/article/details/4',
  8. 'https://blog.csdn.net/spiritx/article/details/5',
  9. 'https://blog.csdn.net/spiritx/article/details/6',
  10. 'https://blog.csdn.net/spiritx/article/details/7',
  11. 'https://blog.csdn.net/spiritx/article/details/8',
  12. 'https://blog.csdn.net/spiritx/article/details/9']
  13. INDEXS = [1, 2, 3, 4, 5, 6, 7, 8, 9]
  14. PARAMS = list(zip(URLS, INDEXS)) # 通过zip()方法将多个参数压缩成list对象
  15. def downloadUrl(param): # 只支持一个参数
  16. url, index = param # 对多个参数进行解包
  17. print(f'进程ID={os.getpid()}, {__name__=} start download: {url=},{index=}')
  18. #time.sleep(random.randint(1, 3))
  19. time.sleep(9 - index)
  20. print(f'进程ID={os.getpid()}, {__name__=} end download: {url=},{index=}')
  21. return 'result:', index
  22. if __name__ == '__main__':
  23. pool = multiprocessing.Pool(4)
  24. its = pool.map(downloadUrl, PARAMS)
  25. print('pool.map submit ok.')
  26. for it in its:
  27. print(it)
  28. pool.close()
  29. pool.join()
  30. ‘’’
  31. 进程ID=73691, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  32. 进程ID=73693, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  33. 进程ID=73690, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  34. 进程ID=73692, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  35. 进程ID=73692, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  36. 进程ID=73692, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  37. 进程ID=73690, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  38. 进程ID=73690, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  39. 进程ID=73693, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  40. 进程ID=73693, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  41. 进程ID=73691, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  42. 进程ID=73691, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  43. 进程ID=73691, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  44. 进程ID=73691, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  45. 进程ID=73691, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  46. 进程ID=73693, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  47. 进程ID=73690, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  48. 进程ID=73692, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  49. pool.map submit ok.
  50. ('result:', 1)
  51. ('result:', 2)
  52. ('result:', 3)
  53. ('result:', 4)
  54. ('result:', 5)
  55. ('result:', 6)
  56. ('result:', 7)
  57. ('result:', 8)
  58. ('result:', 9)
  59. ’‘’

map()版本的返回值都是要在所有进程都执行完后才能获得。

imap()版本:

  1. import os, time
  2. import multiprocessing
  3. import random
  4. URLS = ['https://blog.csdn.net/spiritx/article/details/1',
  5. 'https://blog.csdn.net/spiritx/article/details/2',
  6. 'https://blog.csdn.net/spiritx/article/details/3',
  7. 'https://blog.csdn.net/spiritx/article/details/4',
  8. 'https://blog.csdn.net/spiritx/article/details/5',
  9. 'https://blog.csdn.net/spiritx/article/details/6',
  10. 'https://blog.csdn.net/spiritx/article/details/7',
  11. 'https://blog.csdn.net/spiritx/article/details/8',
  12. 'https://blog.csdn.net/spiritx/article/details/9']
  13. INDEXS = [1, 2, 3, 4, 5, 6, 7, 8, 9]
  14. PARAMS = list(zip(URLS, INDEXS)) # 通过zip()方法将多个参数压缩成list对象
  15. def downloadUrl(param): # 只支持一个参数
  16. url, index = param # 对多个参数进行解包
  17. print(f'进程ID={os.getpid()}, {__name__=} start download: {url=},{index=}')
  18. #time.sleep(random.randint(1, 3))
  19. time.sleep(9 - index)
  20. print(f'进程ID={os.getpid()}, {__name__=} end download: {url=},{index=}')
  21. return 'result:', index
  22. if __name__ == '__main__':
  23. pool = multiprocessing.Pool(4)
  24. its = pool.imap(downloadUrl, PARAMS)
  25. print('pool.map submit ok.')
  26. for it in its:
  27. print(it)
  28. pool.close()
  29. pool.join()
  30. ‘’'
  31. pool.map submit ok.
  32. 进程ID=73772, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  33. 进程ID=73773, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  34. 进程ID=73776, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  35. 进程ID=73775, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  36. 进程ID=73775, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  37. 进程ID=73775, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  38. 进程ID=73776, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  39. 进程ID=73776, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  40. 进程ID=73773, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  41. 进程ID=73773, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  42. 进程ID=73772, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  43. 进程ID=73772, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  44. ('result:', 1)
  45. ('result:', 2)
  46. ('result:', 3)
  47. ('result:', 4)
  48. 进程ID=73772, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  49. 进程ID=73772, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  50. 进程ID=73772, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  51. 进程ID=73773, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  52. 进程ID=73776, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  53. 进程ID=73775, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  54. ('result:', 5)
  55. ('result:', 6)
  56. ('result:', 7)
  57. ('result:', 8)
  58. ('result:', 9)
  59. ‘''

pool.map submit在首行就打印出来,说明imap的子进程与父进程也是并行执行的。

结果数据打印夹杂在子进程输出的结果打印中,是在一项任务执行完后打印出来。但仍可以看出来,结果数据是按任务的进入顺序打印的。(为了演示,我们实际的任务执行时间是,越早提交的任务运行的时间越长。)

imap_unordered(funciterable[, chunksize])

和 imap() 相同,只不过通过迭代器返回的结果是任意的。(当进程池中只有一个工作进程的时候,返回结果的顺序才能认为是"有序"的)

  1. import os, time
  2. import multiprocessing
  3. import random
  4. URLS = ['https://blog.csdn.net/spiritx/article/details/1',
  5. 'https://blog.csdn.net/spiritx/article/details/2',
  6. 'https://blog.csdn.net/spiritx/article/details/3',
  7. 'https://blog.csdn.net/spiritx/article/details/4',
  8. 'https://blog.csdn.net/spiritx/article/details/5',
  9. 'https://blog.csdn.net/spiritx/article/details/6',
  10. 'https://blog.csdn.net/spiritx/article/details/7',
  11. 'https://blog.csdn.net/spiritx/article/details/8',
  12. 'https://blog.csdn.net/spiritx/article/details/9']
  13. INDEXS = [1, 2, 3, 4, 5, 6, 7, 8, 9]
  14. PARAMS = list(zip(URLS, INDEXS)) # 通过zip()方法将多个参数压缩成list对象
  15. def downloadUrl(param): # 只支持一个参数
  16. url, index = param # 对多个参数进行解包
  17. print(f'进程ID={os.getpid()}, {__name__=} start download: {url=},{index=}')
  18. #time.sleep(random.randint(1, 3))
  19. time.sleep(9 - index)
  20. print(f'进程ID={os.getpid()}, {__name__=} end download: {url=},{index=}')
  21. return 'result:', index
  22. if __name__ == '__main__':
  23. pool = multiprocessing.Pool(4)
  24. its = pool.imap_unordered(downloadUrl, PARAMS)
  25. print('pool.map submit ok.')
  26. for it in its:
  27. print(it)
  28. pool.close()
  29. pool.join()
  30. ‘’’
  31. pool.map submit ok.
  32. 进程ID=73862, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  33. 进程ID=73864, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  34. 进程ID=73863, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  35. 进程ID=73865, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  36. 进程ID=73865, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  37. 进程ID=73865, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  38. ('result:', 4)
  39. 进程ID=73863, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  40. 进程ID=73863, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  41. ('result:', 3)
  42. 进程ID=73864, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  43. 进程ID=73864, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  44. ('result:', 2)
  45. 进程ID=73862, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  46. 进程ID=73862, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  47. ('result:', 1)
  48. 进程ID=73862, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  49. 进程ID=73862, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  50. 进程ID=73862, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  51. ('result:', 8)
  52. ('result:', 9)
  53. 进程ID=73864, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  54. ('result:', 7)
  55. 进程ID=73863, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  56. ('result:', 6)
  57. 进程ID=73865, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  58. ('result:', 5)
  59. ’‘’

imap_unordered()与imap()几乎一样,只是结果数据是谁先完成谁就能先获得。

starmap()和starmap_async()

map()和map_async()不能支持多参数的可调用对象,starmap()和starmap_async()看上去就像是打的补丁,让他们可以支持多参数的可调用对象。

starmap(func, iterable[, chunksize])
和 map() 类似,不过 iterable 中的每一项会被解包再作为函数参数。
比如可迭代对象 [(1,2), (3, 4)] 会转化为等价于 [func(1,2), func(3,4)] 的调用。

  1. import os, time
  2. import multiprocessing
  3. import random
  4. URLS=['https://blog.csdn.net/spiritx/article/details/1',
  5. 'https://blog.csdn.net/spiritx/article/details/2',
  6. 'https://blog.csdn.net/spiritx/article/details/3',
  7. 'https://blog.csdn.net/spiritx/article/details/4',
  8. 'https://blog.csdn.net/spiritx/article/details/5',
  9. 'https://blog.csdn.net/spiritx/article/details/6',
  10. 'https://blog.csdn.net/spiritx/article/details/7',
  11. 'https://blog.csdn.net/spiritx/article/details/8',
  12. 'https://blog.csdn.net/spiritx/article/details/9']
  13. INDEXS=[1,2,3,4,5,6,7,8,9]
  14. PARAMS=list(zip(URLS, INDEXS)) #通过zip()方法将多个参数压缩成list对象
  15. def downloadUrl(url, index): #可以支持多个参数
  16. #url,index = param #对多个参数进行解包
  17. print(f'进程ID={os.getpid()}, {__name__=} start download: {url=},{index=}')
  18. time.sleep(random.randint(1,3))
  19. print(f'进程ID={os.getpid()}, {__name__=} end download: {url=},{index=}')
  20. return index
  21. if __name__ == '__main__':
  22. pool = multiprocessing.Pool(4)
  23. pool.starmap(downloadUrl, PARAMS)
  24. print('pool.map submit ok.')
  25. pool.close()
  26. pool.join()
  27. ‘’'
  28. 进程ID=73960, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  29. 进程ID=73962, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  30. 进程ID=73961, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  31. 进程ID=73963, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  32. 进程ID=73960, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  33. 进程ID=73960, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  34. 进程ID=73962, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  35. 进程ID=73962, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  36. 进程ID=73961, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  37. 进程ID=73961, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  38. 进程ID=73960, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  39. 进程ID=73960, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  40. 进程ID=73963, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  41. 进程ID=73963, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  42. 进程ID=73962, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  43. 进程ID=73961, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  44. 进程ID=73963, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  45. 进程ID=73960, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  46. pool.map submit ok.
  47. ‘''

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
相当于 starmap() 与 map_async() 的结合,迭代 iterable 的每一项,解包作为 func 的参数并执行,返回用于获取结果的对象。

  1. import os, time
  2. import multiprocessing
  3. import random
  4. URLS = ['https://blog.csdn.net/spiritx/article/details/1',
  5. 'https://blog.csdn.net/spiritx/article/details/2',
  6. 'https://blog.csdn.net/spiritx/article/details/3',
  7. 'https://blog.csdn.net/spiritx/article/details/4',
  8. 'https://blog.csdn.net/spiritx/article/details/5',
  9. 'https://blog.csdn.net/spiritx/article/details/6',
  10. 'https://blog.csdn.net/spiritx/article/details/7',
  11. 'https://blog.csdn.net/spiritx/article/details/8',
  12. 'https://blog.csdn.net/spiritx/article/details/9']
  13. INDEXS = [1, 2, 3, 4, 5, 6, 7, 8, 9]
  14. PARAMS = list(zip(URLS, INDEXS)) # 通过zip()方法将多个参数压缩成list对象
  15. def downloadUrl(url, index): # 只支持一个参数
  16. #url, index = param # 对多个参数进行解包
  17. print(f'进程ID={os.getpid()}, {__name__=} start download: {url=},{index=}')
  18. time.sleep(random.randint(1, 3))
  19. print(f'进程ID={os.getpid()}, {__name__=} end download: {url=},{index=}')
  20. return index
  21. if __name__ == '__main__':
  22. pool = multiprocessing.Pool(4)
  23. pool.starmap_async(downloadUrl, PARAMS)
  24. print('pool.map submit ok.')
  25. pool.close()
  26. pool.join()
  27. ‘’'
  28. pool.map submit ok.
  29. 进程ID=73991, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  30. 进程ID=73990, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  31. 进程ID=73993, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  32. 进程ID=73992, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  33. 进程ID=73993, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/3',index=3
  34. 进程ID=73993, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  35. 进程ID=73992, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/4',index=4
  36. 进程ID=73992, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  37. 进程ID=73991, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/1',index=1
  38. 进程ID=73991, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  39. 进程ID=73990, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/2',index=2
  40. 进程ID=73990, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  41. 进程ID=73993, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/5',index=5
  42. 进程ID=73993, __name__='__mp_main__' start download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  43. 进程ID=73991, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/7',index=7
  44. 进程ID=73992, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/6',index=6
  45. 进程ID=73990, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/8',index=8
  46. 进程ID=73993, __name__='__mp_main__' end download: url='https://blog.csdn.net/spiritx/article/details/9',index=9
  47. ‘''

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/860754
推荐阅读
相关标签
  

闽ICP备14008679号