当前位置:   article > 正文

Python 多进程:multiprocessing、aiomultiprocess(异步多进程)

Python 多进程:multiprocessing、aiomultiprocess(异步多进程)

multiprocessing 文档:https://docs.python.org/zh-cn/3.10/library/multiprocessing.html

Process、Lock、Semaphore、Queue、Pipe、Pool:https://cuiqingcai.com/3335.html

1、进程、线程、并发、并行

进程、线程

  • 进程:可以看作是线程的集合,由一个或多个线程构成。同时进程也是系统进行资源分配和调度的最小独立单位,所以各进程之间的数据是无法共享的,如多个进程无法共享一个全局变量,进程之间的数据共享需要由单独的机制来实现。
  • 线程:是操作系统进行运算调度的最小单位,是进程中的最小运行单元。也是CPU调度的最小单元

并发、并行

计算机中运行一个程序,底层是通过处理器运行一条条指今来实现的。处理器同一时刻只能执行一条指令,

  • 并发(concurrency):是指多个线程对应的多条指令被快速轮换地执行。例如:一个处理器,它先执行线程A的指令一段时间,再执行线程B的指令一段时间,然后再切回线程A执行一段时间。处理器执行指令的速度和切换线程的速度都非常快,人完全感知不到计算机在这个过程中还切换了多个线程的上下文,这使得多个线程从宏观上看起来是同时在运行。从微观上看,处理器连续不断地在多个线程之间切换和执行,每个线程的执行都一定会占用这个处理器的一个时间片段,因此同一时刻其实只有一个线程被执行。
  • 并行(parallel):指同一时刻有多条指令在多个处理器上同时执行,这意着并行必须依赖多个处理器。不论是从宏观还是微观上看,多个线程都是在同一时刻一起执行的。并行只能存在于多处理器系统中,因此如果计算机处理器只有一个核,就不可能实现并行。而并发在单处理器和多处理器系统中都可以存在,因为仅靠一个核,就可以实现并发。例如。系统处理器需要同时运行多个线程。如果系统处理器只有一个核,那它只能通过并发的方式来运行这些线程。而如果系统处理器有多个核,那么在一个核执行一个线程的同时,另一个核可以执行另一个线程,这样这两个线程就实现了并行执行。当然,其他线程也可能和另外的线程在同一个核上执行,它们之间就是并发执行。具体的执行方式,取决于操作系统如何调度。

Python 并发执行

Python 标准库:https://docs.python.org/zh-cn/3.12/library/index.html

2、Python "多线程、多进程"

GIL 对 Python 多线程 影响

GIL (全称为 GlobalInterpreter Lock 意思是全局解释器锁) 设计之初是对数据安全的考虑。在 Python多线程,每个线程的执行方式分如下三步:" 获取 GIL; 执行对应线程的代码; 释放 GIL; ",可见,某个线程要想执行,必须先拿到 GIL,可以把 GIL看作通行证,并且在一个Python进程中,GIL 只有一个。线程要是拿不到通行证,就不允许执行。这样会导致即使在多核条件下,一个Python进程中的多个线程在同一时刻也只能执行一个。

总结:

  • 多线程 场景中,Python因为 GIL 的限制,不论是单核还是多核下,同一时刻都只能运行一个线程,这使得 Python 多线程无法发挥多核并行的优势。
  • 多进程 场景中 "每个进程都有属于自己的GIL" 所以在多核处理器下,多进程的运行是不会受GL影响的。所以多进程能够更好地发挥多核优势。但是使用多进程时,要注意 进程间共享数据、进程同步问题。

Python 单线程

就是 Python 代码按照顺序依次执行。执行过程中,不会中断去执行其他代码

  1. import time
  2. import datetime
  3. def func_music(argv):
  4. for i in range(2):
  5. print(f"听音乐 {argv}. {datetime.datetime.now()}")
  6. time.sleep(1)
  7. def func_movie(argv):
  8. for i in range(2):
  9. print(f"看电影 {argv}. {datetime.datetime.now()}")
  10. time.sleep(5)
  11. if __name__ == '__main__':
  12. func_music('trouble is a friend')
  13. func_movie('变形金刚')
  14. print(f"结束: {datetime.datetime.now()}")

Python 多线程

Python 多线程的用法:https://setup.scrape.center/python-threading

Python 中使用线程有两种方式:

  • 通过 类继承 threading.Thread 创建一个线程对象
  • 通过使用 start_new_thread 函数

方法 1:继承 创建线程

通过继承 threading.Thread 并重写 run 方法,来实现多线程。threading.Thread 方法说明

  • t.start()       激活线程,   
  • t.getName()     获取线程的名称   
  • t.setName()     设置线程的名称   
  • t.name          获取或设置线程的名称   
  • t.is_alive()    判断线程是否为激活状态   
  • t.isAlive()     判断线程是否为激活状态   
  • t.setDaemon()   设置为后台线程或前台线程(默认:False)必须在执行start()方法之后;
  • t.isDaemon()    判断是否为守护线程   
  • t.ident         获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。   
  • t.join()        逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义   
  • t.run()         线程被cpu调度后自动执行线程对象的run方法

示例代码:

  1. import time
  2. import threading
  3. from datetime import datetime
  4. exitFlag = 0
  5. def print_time(thread_name, delay, counter):
  6. while counter:
  7. if exitFlag:
  8. thread_name.exit()
  9. time.sleep(delay)
  10. print(f"{thread_name}: {datetime.now()}")
  11. counter -= 1
  12. class MyThread(threading.Thread):
  13. def __init__(self, thread_id, name, delay):
  14. super().__init__()
  15. self.thread_id = thread_id
  16. self.name = name
  17. self.delay = delay
  18. def run(self):
  19. print("开始线程:" + self.name)
  20. print_time(self.name, self.delay, 5)
  21. print("退出线程:" + self.name)
  22. # 创建新线程
  23. thread1 = MyThread(1, "Thread-1", 1)
  24. thread2 = MyThread(2, "Thread-2", 2)
  25. # 开启新线程
  26. thread1.start()
  27. thread2.start()
  28. thread1.join()
  29. thread2.join()
  30. print("退出主线程")

_thread 和 threading 模块( 强烈建议直接使用 threading )。python 提供了两个模块来实现多线程。 _thread 有一些缺点,在 threading 得到了弥补。

方法 2:函数式 创建线程

函数式:调用 _thread 模块中的 start_new_thread() 函数来产生新线程

语法如下:

  • _thread.start_new_thread ( function, args[, kwargs] )

参数说明:

  • function - 线程函数。
  • args - 传递给线程函数的参数,必须是个tuple类型。
  • kwargs - 可选参数。

使用示例:

  1. import time
  2. import _thread
  3. # 为线程定义一个函数
  4. def print_time(thread_name, delay):
  5. count = 0
  6. while count < 5:
  7. time.sleep(delay)
  8. count += 1
  9. print("%s: %s" % (thread_name, time.ctime(time.time())))
  10. if __name__ == '__main__':
  11. try:
  12. # 创建两个线程
  13. _thread.start_new_thread(print_time, ("Thread-1", 2,))
  14. _thread.start_new_thread(print_time, ("Thread-2", 4,))
  15. except BaseException as be:
  16. print(f"Error: 无法启动线程 ---> {be}")
  17. while True:
  18. pass

守护线程 ( 精灵线程 )

通过 setDaemon(True) 将线程声明为守护线程,必须在 start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。子线程启动后,父线程也继续执行下去,当父线程执行完最后一条语句 print("父进程退出") 后,没有等待子线程,直接就退出了,同时子线程也一同结束。

  1. # -*- coding: utf-8 -*-
  2. import time
  3. import threading
  4. import datetime
  5. def music(music_name):
  6. for i in range(2):
  7. print(f"[{datetime.datetime.now().replace(microsecond=0)}] 听音乐 {music_name}")
  8. time.sleep(1)
  9. def movie(movie_name):
  10. for i in range(2):
  11. print(f"[{datetime.datetime.now().replace(microsecond=0)}] 看电影 {movie_name}")
  12. time.sleep(5)
  13. tid_list = [
  14. threading.Thread(target=music, args=('trouble is a friend',)),
  15. threading.Thread(target=movie, args=('变形金刚',))
  16. ]
  17. if __name__ == '__main__':
  18. for tid in tid_list:
  19. tid.setDaemon(True)
  20. tid.start()
  21. print("父进程退出")
  22. pass

join() 等待线程终止

join() 的作用:在子线程完成运行之前,这个子线程的父线程将一直被阻塞。注意:上面程序中 join() 方法的位置是在 for 循环外的,也就是说必须等待for循环里的两个进程都结束后,才去执行主进程。

  1. # -*- coding: utf-8 -*-
  2. import time
  3. import threading
  4. import datetime
  5. def music(music_name):
  6. for i in range(2):
  7. print(f"[{datetime.datetime.now().replace(microsecond=0)}] 听音乐 {music_name}")
  8. time.sleep(1)
  9. def movie(movie_name):
  10. for i in range(2):
  11. print(f"[{datetime.datetime.now().replace(microsecond=0)}] 看电影 {movie_name}")
  12. time.sleep(5)
  13. tid_list = [
  14. threading.Thread(target=music, args=('trouble is a friend',)),
  15. threading.Thread(target=movie, args=('变形金刚',))
  16. ]
  17. if __name__ == '__main__':
  18. for tid in tid_list:
  19. tid.start()
  20. for tid in tid_list:
  21. tid.join()
  22. print("父进程退出")

线程 事件

Python 线程的事件用于主线程控制其他线程的执行。

threading.Event 事件主要提供了三个方法

  • set 。将 “Flag” 设置为 False
  • wait 。将 “Flag” 设置为 True
  • clear 。判断标识位是否为Ture。

事件处理的机制:全局定义了一个 “Flag”,如果 “Flag” 值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果 “Flag” 值为True,那么 event.wait 方法时便不再阻塞。

  1. import threading
  2. def do(event):
  3. print('start')
  4. event.wait()
  5. print('execute')
  6. event_obj = threading.Event()
  7. for i in range(10):
  8. t = threading.Thread(target=do, args=(event_obj,))
  9. t.start()
  10. event_obj.clear()
  11. # inp = input('input:')
  12. inp = raw_input('input:')
  13. if inp == 'true':
  14. event_obj.set()

当线程执行的时候,如果 flag 为False,则线程会阻塞,当 flag 为True 的时候,线程不会阻塞。它提供了 本地 和 远程 的并发性。

线程同步:线程锁

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。

多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。考虑这样一种情况:一个列表里所有元素都是 0,线程 "set" 从后向前把所有元素改成1,而线程 "print" 负责从前往后读取列表并打印。那么,可能线程 "set" 开始改的时候,线程 "print" 便来打印列表了,输出就成了一半 0 一半 1,这就是 数据的不同步。
为了避免这种情况,引入了
的概念。锁有两种状态:锁定未锁定
每当一个线程比如 "set" 要访问共享数据时,必须先获得锁定;如果已经有别的线程比如 "print" 获得锁定了,那么就让线程 "set" 暂停,也就是同步阻塞;等到线程 "print" 访问完毕,释放锁以后,再让线程 "set" 继续。经过这样的处理,打印列表时要么全部输出0,要么全部输出1,不会再出现一半0一半1的尴尬场面。

线程锁

threading.RLockthreading.Lock 可以实现简单的线程同步,这两个对象都有 acquire方法 release方法。对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和release 方法之间。

threading.RLockthreading.Lock 的区别:

  • RLock 允许在同一线程中被多次 acquire。而 Lock 却不允许这种情况。
  • RLock 时 acquire 和 release 必须成对出现,即调用了 n 次 acquire,必须调用 n 次的release 才能真正释放所占用的琐。

示例代码

  1. # -*- coding: utf-8 -*-
  2. import threading
  3. lock = threading.Lock() # Lock对象
  4. rLock = threading.RLock() # RLock对象
  5. def main_1():
  6. lock.acquire()
  7. lock.acquire() # 产生了死琐。
  8. lock.release()
  9. lock.release()
  10. def main_2():
  11. rLock.acquire()
  12. rLock.acquire() # 在同一线程内,程序不会堵塞。
  13. rLock.release()
  14. rLock.release()

示例:

  1. # -*- coding: utf-8 -*-
  2. import time
  3. import datetime
  4. import threading
  5. # 定义一个 "线程锁"
  6. threadLock = threading.Lock()
  7. count = 20
  8. class MyThread(threading.Thread):
  9. def __init__(self, name, delay_second):
  10. threading.Thread.__init__(self)
  11. self.name = name
  12. self.delay_second = delay_second
  13. def run(self):
  14. print("Starting " + self.name)
  15. result = print_time()
  16. while result > 0:
  17. print(f"[{datetime.datetime.now().replace(microsecond=0)}] {self.name} ---> {result}")
  18. result = print_time()
  19. time.sleep(self.delay_second)
  20. def print_time():
  21. # 获得锁,成功获得锁定后返回 True
  22. # 可选的 timeout 参数不填时将一直阻塞直到获得锁定
  23. # 否则超时后将返回 False
  24. threadLock.acquire()
  25. global count
  26. count -= 1
  27. # 释放锁
  28. threadLock.release()
  29. return count
  30. if __name__ == '__main__':
  31. tid_list = [
  32. MyThread("thread_1", 1),
  33. MyThread("thread_2", 1)
  34. ]
  35. for tid in tid_list:
  36. tid.start()
  37. for tid in tid_list:
  38. tid.join()
  39. print("主线程退出")
  40. pass

示例:

  1. import threading
  2. import time
  3. globals_num = 0
  4. lock = threading.RLock()
  5. def func():
  6. lock.acquire() # 获得锁
  7. global globals_num
  8. globals_num += 1
  9. time.sleep(1)
  10. print(globals_num)
  11. lock.release() # 释放锁
  12. for i in range(10):
  13. t = threading.Thread(target=func)
  14. t.start()
  15. pass

线程同步:条件变量

一个  threading.Condition 变量总是与某些类型的锁相联系,当几个condition变量必须共享和同一个锁的时候是很有用的。锁 是 conditon 对象的一部分:没有必要分别跟踪。

Condition 类实现了一个 conditon 变量。这个 conditiaon 变量允许一个或多个线程等待,直到他们被另一个线程通知。

  • 如果 lock 参数非空,那么他必须是一个 lock 或者 Rlock 对象,它用来做底层锁。
  • 如果 lock 参数为空,则会创建一个新的 Rlock 对象,用来做底层锁。

condition 变量服从上下文管理协议:with 语句块封闭之前可以获取与锁的联系。
acquire() 和 release() 会调用与锁相关联的相应的方法。
其他和锁关联的方法必须被调用,wait()方法会释放锁,
当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,
wait(timeout=None) :等待通知,或者等到设定的超时时间。
当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError异常。
wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前会一直阻塞。
wait()还可以指定一个超时时间。 如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。

注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。
除非线程调用notify()和notify_all()之后放弃了锁的所有权。
在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想得到的状态前,会反复调用wait()。
修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽可能的获取到想要的一个等待者状态。

例子:生产者-消费者模型

  1. import threading
  2. import time
  3. def consumer(cond):
  4. with cond:
  5. print("consumer before wait")
  6. cond.wait()
  7. print("consumer after wait")
  8. def producer(cond):
  9. with cond:
  10. print("producer before notifyAll")
  11. cond.notifyAll()
  12. print("producer after notifyAll")
  13. condition = threading.Condition()
  14. consumer_1 = threading.Thread(name="c1", target=consumer, args=(condition,))
  15. consumer_2 = threading.Thread(name="c2", target=consumer, args=(condition,))
  16. producer = threading.Thread(name="p", target=producer, args=(condition,))
  17. consumer_1.start()
  18. time.sleep(2)
  19. consumer_2.start()
  20. time.sleep(2)
  21. producer.start()

线程同步:线程安全的 queue

使用 "线程锁" 可以实现线程同步,

同样,使用 "线程安全的 queue (队列)" 也可以达到线程同步。

Python 的 queue ( 线程安全 )

https://docs.python.org/zh-cn/3.10/search.html?q=queue+&check_keywords=yes&area=default

Python 的 queue 模块中提供了同步的、线程安全的队列类。包括

  • FIFO(先入先出) 队列
  • LIFO(后入先出)队列
  • 优先级队列 PriorityQueue

这些队列都实现了 锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

 Queue 模块中的常用方法:

  1. Queue.qsize() 返回队列的大小
  2. Queue.empty() 如果队列为空,返回True,反之False
  3. Queue.full() 如果队列满了,返回True,反之False
  4. Queue.full 与 maxsize 大小对应
  5. Queue.get([block[, timeout]]) 获取队列,timeout是等待时间
  6. Queue.get_nowait() 相当Queue.get(False)
  7. Queue.put(item) 写入队列,timeout是等待时间
  8. Queue.put_nowait(item) 相当Queue.put(item, False)
  9. Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
  10. Queue.join() 实际上意味着等到队列为空,再执行别的操作

示例:

  1. # -*- coding: utf-8 -*-
  2. import time
  3. import queue
  4. import threading
  5. task_queue = queue.Queue()
  6. def produce():
  7. while True:
  8. for num in range(100):
  9. task_queue.put(num)
  10. time.sleep(0.1)
  11. def consume():
  12. while True:
  13. if task_queue.empty():
  14. print('队列为空')
  15. continue
  16. num = task_queue.get()
  17. print(num)
  18. time.sleep(1)
  19. if __name__ == '__main__':
  20. thread_list = []
  21. t1 = threading.Thread(target=produce)
  22. thread_list.append(t1)
  23. for i in range(3):
  24. t_id = threading.Thread(target=consume)
  25. thread_list.append(t_id)
  26. for index in thread_list:
  27. index.start()
  28. for index in thread_list:
  29. index.join()

Python 多进程

Python 多进程的用法:https://setup.scrape.center/python-multiprocessingo

把一个 "多线程" 改成 "多进程",主要有下面几种方法:

  • subprocess
  • signal
  • multiprocessing
  • concurrent.futures 标准库。

concurrent.futures 标准库

"concurrent.futures 标准库" 提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,实现了对 threading multiprocessing 的更高级的抽象,对编写 " 线程池 / 进程池 "  提供了直接的支持。

使用示例代码:

  1. import redis
  2. from redis import WatchError
  3. from concurrent.futures import ProcessPoolExecutor
  4. r = redis.Redis(host='127.0.0.1', port=6379)
  5. # 减库存函数, 循环直到减库存完成
  6. # 库存充足, 减库存成功, 返回True
  7. # 库存不足, 减库存失败, 返回False
  8. def reduce_stock():
  9. # python中redis事务是通过pipeline的封装实现的
  10. with r.pipeline() as pipe:
  11. while True:
  12. try:
  13. # watch库存键, multi后如果该key被其他客户端改变,
  14. # 事务操作会抛出WatchError异常
  15. pipe.watch('stock:count')
  16. count = int(pipe.get('stock:count'))
  17. if count > 0: # 有库存
  18. # 事务开始
  19. pipe.multi()
  20. pipe.decr('stock:count')
  21. # 把命令推送过去
  22. # execute返回命令执行结果列表, 这里只有一个decr返回当前值
  23. print(pipe.execute()[0])
  24. return True
  25. else:
  26. return False
  27. except WatchError as ex:
  28. # 打印WatchError异常, 观察被watch锁住的情况
  29. print(ex)
  30. pipe.unwatch()
  31. def worker():
  32. while True:
  33. # 没有库存就退出
  34. if not reduce_stock():
  35. break
  36. def main():
  37. # 设置库存为100
  38. r.set("stock:count", 100)
  39. # 多进程模拟多个客户端提交
  40. with ProcessPoolExecutor() as pool:
  41. for _ in range(10):
  42. pool.submit(worker)
  43. if __name__ == "__main__":
  44. main()
  45. pass


 

3、multiprocessing  模块

文档:https://docs.python.org/zh-cn/3.13/library/multiprocessing.html

概述

由于 Python(这里指 CPython) 限制(GLI) 最多只能用满1个CPU核心。但是 multiprocessing 使用 "子进程 而非线程" 有效地绕过了 全局解释器锁,从而可以使用多核处理器。

multiprocessing 包中也有 Lock / Event / Semaphore / Condition类,用来同步进程,其用法也与 threading 包中的同名类一样。multiprocessing 的很大一部份与 threading 使用同一套 API,只不过换到了多进程的场景。但在使用这些共享API的时候,我们要注意以下几点:

  • 在 UNIX 平台上,当某个进程终结之后,该进程需要被其父进程调用 wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
  • multiprocessing 提供了 threading 包中没有的 IPC ( 比如:Pipe 和 Queue ),效率上更高。应优先考虑 Pipe 和 Queue,避免使用 Lock / Event / Semaphore / Condition 等同步方式 (因为它们占据的不是用户进程的资源 )。
  • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如 使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过 "共享内存" 和 "Manager 的方法" 来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。
  • Process.PID中保存有PID,如果进程还没有start(),则PID为None。

multiprocessing 的 __init__.py通过 __all__ 可以看到所有可用的 对象或者属性

Process

Process 类的构造方法:

参数说明:
    group:进程所属组。基本不用
    target:表示调用对象。
    args:表示调用对象的位置参数元组。
    name:别名
    kwargs:表示调用对象的字典。

利用 multiprocessing.Process 可以创建一个 Process 对象,该对象与 Thread对象 的用法相同,拥有如下方法:

  • is_alive()
  • join([timeout])
  • run()
  • start()
  • terminate()
  • ........

属性有:

  • authkey
  • daemon(要通过start()设置)、
  • exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、
  • name
  • pid
  • ........

直接 创建 进程

  1. # -*- coding: utf-8 -*-
  2. from multiprocessing import Process, current_process
  3. def worker_func(num):
  4. print(f'worker_func [进程号:{current_process().ident}] ---> {num}')
  5. def main():
  6. pid_list = []
  7. for i in range(5):
  8. pid = Process(target=worker_func, args=(i,))
  9. pid_list.append(pid)
  10. for pid in pid_list:
  11. pid.start()
  12. for pid in pid_list:
  13. pid.join()
  14. if __name__ == '__main__':
  15. main()

继承 multiprocessing.Process 创建 进程

利用 class 来创建进程,定制子类

  1. from multiprocessing import Process, current_process
  2. class Worker(Process):
  3. def run(self):
  4. print(f'进程号:{current_process().ident} ---> 进程名:{self.name}')
  5. return
  6. if __name__ == '__main__':
  7. jobs = []
  8. for i in range(5):
  9. p = Worker()
  10. jobs.append(p)
  11. p.start()
  12. for j in jobs:
  13. j.join()

守护进程

守护进程就是主程序退出后,子进程不退出,仍然继续执行。

mutilprocess.setDaemon(True) 设置 "精灵进程"

也可以等待守护进程退出,要加上 join,join 可以传入浮点数值,等待n久就不等了

  1. import multiprocessing
  2. import time
  3. import sys
  4. def daemon():
  5. name = multiprocessing.current_process().name
  6. print('Starting:', name)
  7. time.sleep(2)
  8. print('Exiting :', name)
  9. def non_daemon():
  10. name = multiprocessing.current_process().name
  11. print('Starting:', name)
  12. print('Exiting :', name)
  13. if __name__ == '__main__':
  14. pid_1 = multiprocessing.Process(name='daemon', target=daemon)
  15. pid_1.daemon = True
  16. pid_2 = multiprocessing.Process(name='non-daemon', target=non_daemon)
  17. pid_2.daemon = False
  18. pid_1.start()
  19. pid_2.start()
  20. pid_1.join(1)
  21. print(f'pid_2.is_alive() ---> {pid_2.is_alive()}')
  22. pid_2.join()

终止进程

最好使用 poison pill,强制的使用 terminate()。注意 terminate 之后要 join,使其可以更新状态

  1. import multiprocessing
  2. import time
  3. def slow_worker():
  4. print('Starting worker')
  5. time.sleep(0.1)
  6. print('Finished worker')
  7. if __name__ == '__main__':
  8. p = multiprocessing.Process(target=slow_worker)
  9. print('BEFORE:', p, p.is_alive())
  10. p.start()
  11. print('DURING:', p, p.is_alive())
  12. p.terminate()
  13. print('TERMINATED:', p, p.is_alive())
  14. p.join()
  15. print('JOINED:', p, p.is_alive())

进程的退出状态

  •  == 0     未生成任何错误
  •  0           进程有一个错误,并以该错误码退出
  •  < 0       进程由一个-1 * exitcode信号结束
  1. import multiprocessing
  2. import sys
  3. import time
  4. def exit_error():
  5. sys.exit(1)
  6. def exit_ok():
  7. return
  8. def return_value():
  9. return 1
  10. def raises():
  11. raise RuntimeError('There was an error!')
  12. def terminated():
  13. time.sleep(3)
  14. if __name__ == '__main__':
  15. jobs = []
  16. for f in [exit_error, exit_ok, return_value, raises, terminated]:
  17. print('Starting process for', f.func_name)
  18. j = multiprocessing.Process(target=f, name=f.func_name)
  19. jobs.append(j)
  20. j.start()
  21. jobs[-1].terminate()
  22. for j in jobs:
  23. j.join()
  24. print('%15s.exitcode = %s' % (j.name, j.exitcode))

异常

进程间通信 (IPC):管道、队列

multiprocessing 提供了 threading 包中没有的 IPC ( 比如:Pipe 和 Queue ),效率上更高。应优先考虑 Pipe 和 Queue,避免使用 Lock / Event / Semaphore / Condition 等同步方式 (因为它们占据的不是用户进程的资源 )。Pipe 和 Queue 可以用来传送常见的对象。

管道

Pipe 可以是单向(half-duplex),也可以是双向(duplex)。

通过mutiprocessing.Pipe(duplex=False) 创建单向管道 (默认为双向)。一个进程从 PIPE 一端输入对象,然后被 PIPE 另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。下面的程序展示了 Pipe 的使用:( 这里的 Pipe 是双向的。 )

  1. import multiprocessing as mul
  2. def proc1(pipe=None):
  3. pipe.send('hello')
  4. print('proc1 rec:', pipe.recv())
  5. def proc2(pipe=None):
  6. print('proc2 rec:', pipe.recv())
  7. pipe.send('hello, too')
  8. # Build a pipe
  9. pipe = mul.Pipe()
  10. # Pass an end of the pipe to process 1
  11. p1 = mul.Process(target=proc1, args=(pipe[0],))
  12. # Pass the other end of the pipe to process 2
  13. p2 = mul.Process(target=proc2, args=(pipe[1],))
  14. p1.start()
  15. p2.start()
  16. p1.join()
  17. p2.join()

Pipe 对象建立的时候,返回一个含有两个元素的表,每个元素代表 Pipe 的一端(Connection对象)。对 Pipe 的某一端调用 send() 方法来传送对象,在另一端使用 recv() 来接收。

队列 Queue

mutiprocessing.Queue 与 Pipe 相类似,都是先进先出的结构。但 mutiprocessing.Queue允许多个进程放入,多个进程从队列取出对象。下面的程序展示了 Queue 的使用:

  1. import os
  2. import multiprocessing
  3. import time
  4. # input worker
  5. def input_queue(queue=None):
  6. info = str(os.getpid()) + '(put):' + str(time.time())
  7. queue.put(info)
  8. # output worker
  9. def output_queue(queue=None, lock):
  10. info = queue.get()
  11. lock.acquire()
  12. print(str(os.getpid()) + '(get):' + info)
  13. lock.release()
  14. # ===================
  15. # Main
  16. record1 = [] # store input processes
  17. record2 = [] # store output processes
  18. lock = multiprocessing.Lock() # To prevent messy print
  19. queue = multiprocessing.Queue(3)
  20. # input processes
  21. for i in range(10):
  22. process = multiprocessing.Process(target=input_queue, args=(queue,))
  23. process.start()
  24. record1.append(process)
  25. # output processes
  26. for i in range(10):
  27. process = multiprocessing.Process(target=output_queue, args=(queue, lock))
  28. process.start()
  29. record2.append(process)
  30. for p in record1:
  31. p.join()
  32. queue.close() # No more object will come, close the queue
  33. for p in record2:
  34. p.join()

一些进程使用 put() 在 Queue 中放入字符串,这个字符串中包含 PID 和时间。另一些进程从Queue 中取出,并打印自己的 PID 以及 get() 的字符串

简单队列 SimpleQueue

JoinableQueue

示例:

 Python 进程间传递消息,一般的情况是 Queue 来传递。

  • multiprocessing.Queue "进程安全的队列" 。multiprocessing.Queue 在 "多线程环境中并不适用",因为它无法提供线程安全的特性。
  • queue.Queue(位于标准库的 queue 模块中)提供了 "线程安全的队列",并且可供多个线程同时使用。
  1. import multiprocessing
  2. class MyFancyClass(object):
  3. def __init__(self, name):
  4. self.name = name
  5. def do_something(self):
  6. proc_name = multiprocessing.current_process().name
  7. print(f"{proc_name} ---> {self.name}")
  8. def worker(q):
  9. obj = q.get()
  10. obj.do_something()
  11. if __name__ == '__main__':
  12. queue = multiprocessing.Queue()
  13. p = multiprocessing.Process(target=worker, args=(queue,))
  14. p.start()
  15. queue.put(MyFancyClass('Fancy Dan'))
  16. # Wait for the worker to finish
  17. queue.close()
  18. queue.join_thread()
  19. p.join()

示例:

  1. import time
  2. import multiprocessing
  3. class Consumer(multiprocessing.Process):
  4. def __init__(self, task_queue, result_queue):
  5. super(Consumer, self).__init__()
  6. self.task_queue = task_queue
  7. self.result_queue = result_queue
  8. def run(self):
  9. proc_name = self.name
  10. while True:
  11. next_task = self.task_queue.get()
  12. if next_task is None:
  13. # Poison pill means shutdown
  14. print(f"退出进程 ---> {proc_name}")
  15. self.task_queue.task_done()
  16. break
  17. print(f'{proc_name} 下一个任务 ---> {next_task}')
  18. answer = next_task()
  19. self.task_queue.task_done()
  20. self.result_queue.put(answer)
  21. return
  22. class Task(object):
  23. def __init__(self, a, b):
  24. self.a = a
  25. self.b = b
  26. def __call__(self):
  27. time.sleep(0.1) # pretend to take some time to do the work
  28. return f'{self.a} * {self.b} = {self.a * self.b}'
  29. def __str__(self):
  30. return str(self.a * self.a)
  31. if __name__ == '__main__':
  32. # Establish communication queues
  33. tasks = multiprocessing.JoinableQueue()
  34. results = multiprocessing.Queue()
  35. # Start consumers
  36. num_consumers = multiprocessing.cpu_count() * 2
  37. print(f"创建 {num_consumers} 消费者")
  38. consumers = [Consumer(tasks, results) for i in range(num_consumers)]
  39. for w in consumers:
  40. w.start()
  41. # Enqueue jobs
  42. num_jobs = 10
  43. for i in range(num_jobs):
  44. tasks.put(Task(i, i))
  45. # Add a poison pill for each consumer
  46. for i in range(num_consumers):
  47. tasks.put(None)
  48. # Wait for all tasks to finish
  49. tasks.join()
  50. # Start printing results
  51. while num_jobs:
  52. result = results.get()
  53. print(f"结果 ---> {result}")
  54. num_jobs -= 1

示例:队列

  1. from multiprocessing import Process, Queue
  2. def f(q):
  3. q.put([42, None, 'hello'])
  4. if __name__ == '__main__':
  5. q = Queue()
  6. p = Process(target=f, args=(q,))
  7. p.start()
  8. print(q.get()) # prints "[42, None, 'hello']"
  9. p.join()

示例:管道

  1. from multiprocessing import Process, Pipe
  2. def f(conn):
  3. conn.send([42, None, 'hello'])
  4. conn.close()
  5. if __name__ == '__main__':
  6. parent_conn, child_conn = Pipe()
  7. p = Process(target=f, args=(child_conn,))
  8. p.start()
  9. print(parent_conn.recv()) # prints "[42, None, 'hello']"
  10. p.join()

杂项

连接对象(Connection)

进程  同步

同步原语

通常来说同步原语在多进程环境中并不像它们在多线程环境中那么必要。参考 threading 模块的文档。multiprocessing 包含来自 threading 的所有同步原语的等价物。也可以使用管理器对象创建同步原语,参考 管理器 。

例如,可以使用锁来确保一次只有一个进程打印到标准输出:

  1. from multiprocessing import Process, Lock
  2. def f(l, i):
  3. l.acquire()
  4. try:
  5. print('hello world', i)
  6. finally:
  7. l.release()
  8. if __name__ == '__main__':
  9. lock = Lock()
  10. for num in range(10):
  11. Process(target=f, args=(lock, num)).start()

不使用锁的情况下,来自于多进程的输出很容易产生混淆。

我们可以从下面的程序中看到 Thread 对象和 Process对象 在使用上的相似性与结果上的不同。各个线程和进程都做一件事:打印PID。但问题是,所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。使用 Lock 同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。

  1. import os
  2. import threading
  3. import multiprocessing
  4. # worker function
  5. def worker(sign=None, t_lock=None):
  6. t_lock.acquire()
  7. print(sign, os.getpid())
  8. t_lock.release()
  9. # Main
  10. print('Main:', os.getpid())
  11. # Multi-thread
  12. record = []
  13. threading_lock = threading.Lock()
  14. for i in range(5):
  15. thread = threading.Thread(target=worker, args=('thread', threading_lock))
  16. thread.start()
  17. record.append(thread)
  18. for thread in record:
  19. thread.join()
  20. # Multi-process
  21. record = []
  22. process_lock = multiprocessing.Lock()
  23. for i in range(5):
  24. process = multiprocessing.Process(target=worker, args=('process', process_lock))
  25. process.start()
  26. record.append(process)
  27. for process in record:
  28. process.join()

所有 Thread 的 PID 都与主程序相同,而每个 Process 都有一个不同的 PID。

Event 提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。

  1. import multiprocessing
  2. import time
  3. def wait_for_event(e):
  4. """Wait for the event to be set before doing anything"""
  5. print('wait_for_event: starting')
  6. e.wait()
  7. print('wait_for_event: e.is_set()->', e.is_set())
  8. def wait_for_event_timeout(e, t):
  9. """Wait t seconds and then timeout"""
  10. print('wait_for_event_timeout: starting')
  11. e.wait(t)
  12. print('wait_for_event_timeout: e.is_set()->', e.is_set())
  13. if __name__ == '__main__':
  14. e = multiprocessing.Event()
  15. w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))
  16. w1.start()
  17. w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2))
  18. w2.start()
  19. print('main: waiting before calling Event.set()')
  20. time.sleep(3)
  21. e.set()
  22. print('main: event is set')

共享内存:共享 ctypes 对象

应该尽量避免多进程共享资源。多进程共享资源必然会带来进程间相互竞争。而这种竞争又会造成race condition,我们的结果有可能被竞争的不确定性所影响。但如果需要,依然可以通过 共享内存Manager对象 这么做。

在共享内存上创建可被子进程继承的共享对象时是可行的。

可以使用 Value 或 Array 将数据存储在共享内存映射中。例如,以下代码:

  1. from multiprocessing import Process, Value, Array
  2. def f(n, a):
  3. n.value = 3.1415927
  4. for i in range(len(a)):
  5. a[i] = -a[i]
  6. if __name__ == '__main__':
  7. num = Value('d', 0.0)
  8. arr = Array('i', range(10))
  9. p = Process(target=f, args=(num, arr))
  10. p.start()
  11. p.join()
  12. print(num.value)
  13. print(arr[:])

将打印
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建 num 和 arr 时使用的 'd' 和 'i' 参数是 array 模块使用的类型的 typecode : 'd' 表示双精度浮点数, 'i' 表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意 ctypes 对象。

例子:

  1. import multiprocessing
  2. def f(n, a):
  3. n.value = 3.14
  4. a[0] = 5
  5. num = multiprocessing.Value('d', 0.0)
  6. arr = multiprocessing.Array('i', range(10))
  7. p = multiprocessing.Process(target=f, args=(num, arr))
  8. p.start()
  9. p.join()
  10. print(num.value)
  11. print(arr[:])

这里我们实际上只有主进程和Process对象代表的进程。我们在主进程的内存空间中创建共享的内存,也就是Value和Array两个对象。对象Value被设置成为双精度数(d), 并初始化为0.0。而Array则类似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,我们修改了Value和Array对象。回到主程序,打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。

管理器 Manager()

管理器 Manager() 提供了一种创建共享数据的方法,从而可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理 共享对象 的服务。其他进程可以通过代理访问这些共享对象。

由 Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。Manager()返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。

管理器的特点:

服务器进程管理器比使用共享内存对象更灵活,它们支持二进制对象类型。

同时,一个单独的manager可以被网络上的不同计算机的进程共享。

缺点是比使用shared memory慢。

代理对象

共享资源 --- Manager ( 分布式 进程 )

Manager 对象类似于 服务器 与 客户 之间的通信 (server-client)。

  • 用一个进程作为服务器,建立 Manager 来真正存放资源。
  • 其它的进程可以通过 "网络传递参数" 的方式访问 Manager,建立连接后,操作服务器上的资源。即 Manager 可以运用于多计算机,实现分布式。

Python 中 multiprocessing 的子模块 managers 支持把多进程分布到多台机器上,一个服务进程可以作为调度者来将任务分布到其它的多个进程当中,并依靠网络进行互相通信。由于managers的模块封装好了,所以在Python中我们调用它时可以不需要了解网络通信的底层细节,就可以直接进行分布式多进程程序的编写。

Manager() 返回一个 manager 对象。它控制一个服务器进程,这个进程会管理Python对象并允许其他进程通过代理的方式来操作这些对象。

Manager 的使用类似于 shared memory,但可以共享更丰富的对象类型。

  1. import multiprocessing
  2. def f(x, arr, l):
  3. x.value = 3.14
  4. arr[0] = 5
  5. l.append('Hello')
  6. server = multiprocessing.Manager()
  7. x = server.Value('d', 0.0)
  8. arr = server.Array('i', range(10))
  9. l = server.list()
  10. proc = multiprocessing.Process(target=f, args=(x, arr, l))
  11. proc.start()
  12. proc.join()
  13. print(x.value)
  14. print(arr)
  15. print(l)

Manager 利用 list() 方法提供了表的共享方式。实际上你可以利用 dict() 来共享词典,Lock() 来共享 threading.Lock ( 注意,我们共享的是 threading.Lock,而不是进程的 mutiprocessing.Lock。后者本身已经实现了进程共享) 等。 这样 Manager 就允许我们共享更多样的对象。

多台主机 分布式 进程

服务端

3.8版本增加了freeze_support()函数。主要是为了支持windows可执行文件。毕竟multiprocessing可用于分布式进程。所以必须引入freeze_support:

  1. import random, time, queue
  2. from multiprocessing.managers import BaseManager
  3. from multiprocessing import freeze_support
  4. # 建立2个队列,一个发送,一个接收
  5. task_queue = queue.Queue()
  6. result_queue = queue.Queue()
  7. def get_task():
  8. return task_queue
  9. def get_result():
  10. return result_queue
  11. class QueueManager(BaseManager):
  12. pass
  13. # 服务器的管理器上注册2个共享队列
  14. QueueManager.register('get_task', callable=get_task)
  15. QueueManager.register('get_result', callable=get_result)
  16. # 设置端口,地址默认为空。验证码authkey需要设定。
  17. manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
  18. def manager_run():
  19. manager.start()
  20. # 通过管理器访问共享队列。
  21. task = manager.get_task()
  22. result = manager.get_result()
  23. # 对队列进行操作, 往task队列放进任务。
  24. for value in range(10):
  25. n = random.randint(0, 100)
  26. print('Put task %d' % n)
  27. task.put(n)
  28. # 从result队列取出结果
  29. print('Try get result...')
  30. try:
  31. for value in range(10):
  32. r = result.get(timeout=10)
  33. print('Result: %s' % r)
  34. except queue.Empty:
  35. print('result is empty')
  36. # 关闭管理器。
  37. manager.shutdown()
  38. print('master exit.')
  39. if __name__ == '__main__':
  40. freeze_support()
  41. manager_run()

客户端

  1. import time, sys, queue
  2. from multiprocessing.managers import BaseManager
  3. class QueueManager(BaseManager):
  4. pass
  5. # 从网络上的服务器上获取Queue,所以注册时只提供服务器上管理器注册的队列的名字:
  6. QueueManager.register('get_task')
  7. QueueManager.register('get_result')
  8. server_addr = '127.0.0.1'
  9. print('Connect to server %s...' % server_addr)
  10. # b'abc'相当于'abc'.encode('ascii'),类型是bytes
  11. m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
  12. # 连接服务器
  13. m.connect()
  14. # 获得服务器上的队列对象
  15. task = m.get_task()
  16. result = m.get_result()
  17. for value in range(10):
  18. try:
  19. n = task.get(timeout=1)
  20. print('run task %d * %d...' % (n, n))
  21. r = '%d * %d = %d' % (n, n, n * n)
  22. time.sleep(1)
  23. result.put(r)
  24. except queue.Empty:
  25. print('task queue is empty')
  26. print('worker exit.')

进程池

直接使用 Process类动态的生成多个进程,十几个还好,但是如果上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时 进程池 就派上用场了。进程池 (Process Pool) 可以创建多个进程。这些进程就像是随时待命的士兵,准备执行任务(程序)。一个进程池中可以容纳多个待命的士兵。

Pool 类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

Pool 类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。我们可以用 Pool 类创建一个进程池,展开提交的任务给进程池。

一个进程池对象可以控制工作进程池的哪些工作可以被提交,它支持 超时 和 回调的异步结果,有一个类似 map 的实现。

参数

  • processes :进程的数量,如果 processes 是 None 那么使用 os.cpu_count() 返回的数量。
  • initializer:如果是 None,那么每一个工作进程在开始的时候会调 initializer(*initargs)。 
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild 默认是 None,意味着只要 Pool 存在工作进程就会一直存活。 
  • context:用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者 一个context 对象的 Pool() 方法来创建一个池,两种方法都适当的设置了context

进程池的方法 

  • apply(func[, args[, kwds]]) :调用 func 函数并传递 args 和 kwds,不建议使用,并且 3.x 以后不在出现。结果返回前主进程会被阻塞直到函数执行结束由于这个原因,apply_async() 更适合并发执行。
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply() 方法的一个变体,会返回一个结果对象。 如果 callback 被指定,那么 callback 可以接收一个参数然后被调用,当结果准备好回调时会调用 callback,调用失败时,则用 error_callback 替换 callback。 Callbacks 应被立即完成,否则处理结果的线程会被阻塞。   非阻塞 版本
  • close() :阻止更多的任务提交到 pool,待任务完成后,工作进程会退出。
  • terminate() :不管任务是否完成,立即停止工作进程。在对 pool 对象进程垃圾回收的时候,会立即调用 terminate()。
  • join():wait 工作线程的退出,在调用 join() 前,必须调用 close() 或者 terminate()。这样是因为被终止的进程需要被父进程调用 wait(join等价与wait),否则进程会成为僵尸进程。( 主进程阻塞,等待子进程的退出, join 方法要在 close 或 terminate 之后使用。 )
  • map(func, iterable[, chunksize])   
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
  • imap(func, iterable[, chunksize])
  • imap_unordered(func, iterable[, chunksize])   
  • starmap(func, iterable[, chunksize])
  • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

Pool 类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。

  1. from multiprocessing import Pool, TimeoutError
  2. import time
  3. import os
  4. def f(x):
  5. return x * x
  6. if __name__ == '__main__':
  7. # start 4 worker processes
  8. with Pool(processes=4) as pool:
  9. # print "[0, 1, 4,..., 81]"
  10. print(pool.map(f, range(10)))
  11. # print same numbers in arbitrary order
  12. for i in pool.imap_unordered(f, range(10)):
  13. print(i)
  14. # evaluate "f(20)" asynchronously
  15. res = pool.apply_async(f, (20,)) # runs in *only* one process
  16. print(res.get(timeout=1)) # prints "400"
  17. # evaluate "os.getpid()" asynchronously
  18. res = pool.apply_async(os.getpid, ()) # runs in *only* one process
  19. print(res.get(timeout=1)) # prints the PID of that process
  20. # launching multiple evaluations asynchronously *may* use more processes
  21. multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
  22. print([res.get(timeout=1) for res in multiple_results])
  23. # make a single worker sleep for 10 secs
  24. res = pool.apply_async(time.sleep, (10,))
  25. try:
  26. print(res.get(timeout=1))
  27. except TimeoutError:
  28. print("We lacked patience and got a multiprocessing.TimeoutError")
  29. print("For the moment, the pool remains available for more work")
  30. # exiting the 'with'-block has stopped the pool
  31. print("Now the pool is closed and no longer available")

非阻塞 进程池 --- apply_async

注意:是 "进程池",不是 "线程池"。它可以让你跑满多核CPU,而且使用方法非常简单。

注意要用 apply_async,如果省略了 async,就变成阻塞版本了。

使用 进程池( 非阻塞 版本)

  1. # -*- coding: utf-8 -*-
  2. import time
  3. import multiprocessing
  4. def worker_func(process_name=None, arg=None):
  5. for i in range(3):
  6. print(f"{process_name} ---> {arg}")
  7. time.sleep(1)
  8. if __name__ == "__main__":
  9. cpu_count = multiprocessing.cpu_count()
  10. pool = multiprocessing.Pool(processes=cpu_count)
  11. for index in range(10):
  12. data = f"hello {index}"
  13. pool.apply_async(worker_func, (f"process_name_{index}", data))
  14. pool.close()
  15. pool.join()
  16. print("主线程退出")

示例:

  1. # -*- coding: utf-8 -*-
  2. import os
  3. import time
  4. import random
  5. import multiprocessing
  6. def func_1():
  7. # os.getpid()获取当前的进程的ID
  8. print(f"func_1 ---> {os.getpid()}")
  9. start = time.time()
  10. time.sleep(random.random() * 10)
  11. end = time.time()
  12. print(f'func_1 ---> 运行 {end - start}秒')
  13. def func_2():
  14. print(f"func_2 ---> {os.getpid()}")
  15. start = time.time()
  16. time.sleep(random.random() * 20)
  17. end = time.time()
  18. print(f'func_2 ---> 运行 {end - start}秒')
  19. def func_3():
  20. print(f"func_3 ---> {os.getpid()}")
  21. start = time.time()
  22. time.sleep(random.random() * 30)
  23. end = time.time()
  24. print(f'func_3 ---> 运行 {end - start}秒')
  25. if __name__ == '__main__':
  26. func_list = [func_1, func_2, func_3]
  27. print(f"父进程 ---> {os.getpid()}")
  28. pool = multiprocessing.Pool(4)
  29. for func in func_list:
  30. # Pool执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
  31. pool.apply_async(func)
  32. print('等待字进程...')
  33. pool.close()
  34. # 调用join之前,一定要先调用close() 函数,否则会出错
  35. # close()执行后不会有新的进程加入到 pool, join 函数等待素有子进程结束
  36. pool.join()
  37. print('所有进程都完成')
  38. pass

阻塞 进程池 --- apply

  1. # coding: utf-8
  2. import time
  3. import multiprocessing
  4. def worker_func(arg):
  5. print(f"worker_func ---> {arg}")
  6. time.sleep(3)
  7. if __name__ == "__main__":
  8. pool = multiprocessing.Pool(processes=3)
  9. for i in range(4):
  10. data = f"hello {i}"
  11. # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
  12. pool.apply(worker_func, (data,))
  13. pool.close()
  14. pool.join()

map()

函数原型:map(func, iterable[, chunksize=None])

Pool 类中的 map 方法,与内置的 map 函数用法行为基本一致,它会使进程阻塞直到返回结果。
注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

import time
import multiprocessing


def worker_func(arg=None):
    time.sleep(1)
    print('arg * arg')
    return arg * arg


if __name__ == "__main__":
    temp_list = [1, 2, 3, 4, 5, 6,7,8,9]

    start_time = time.time()
    for item in temp_list:
        worker_func(item)
    end_time = time.time()
    print("顺序执行时间:", int(end_time - start_time))

    pool = multiprocessing.Pool(5)  # 创建拥有5个进程数量的进程池
    start_time = time.time()
    result = pool.map(worker_func, temp_list)  # 使进程阻塞直到返回结果
    pool.close()  # 关闭进程池,不再接受新的进程
    pool.join()   # 主进程阻塞等待子进程的退出
    end_time = time.time()
    print("并行执行时间:", int(end_time - start_time))
    print(f'map 的所有子进程返回的结果列表: {result}')

上例是一个创建多个进程并发处理与顺序执行处理同一数据,所用时间的差别。从结果可以看出,并发执行的时间明显比顺序执行要快很多,但是进程是要耗资源的,所以平时工作中,进程数也不能开太大。程序中的 result 表示全部进程执行结束后全部的返回结果集run 函数有返回值,所以一个进程对应一个返回结果,这个结果存在一个列表中。

对 Pool对象调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close(),让其不再接受新的 Process。

close()

关闭进程池(pool),使其不在接受新的任务。

terminate()

结束工作进程,不在处理未处理的任务。

join()

主进程阻塞等待子进程的退出,join 方法必须在 close 或 terminate 之后使用。

使用 Pool,并需要关注结果

更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:

  1. # -*- coding: utf-8 -*-
  2. import multiprocessing
  3. import time
  4. def func(msg):
  5. for i in range(3):
  6. print(msg)
  7. time.sleep(1)
  8. return "done " + msg
  9. if __name__ == "__main__":
  10. pool = multiprocessing.Pool(processes=4)
  11. result = []
  12. for index in range(10):
  13. msg = f"hello {index}"
  14. result.append(pool.apply_async(func, (msg,)))
  15. pool.close()
  16. pool.join()
  17. for res in result:
  18. print(res.get())
  19. print("Sub-process(es) done.")

示例 

  1. import multiprocessing
  2. def do_calculation(data):
  3. return data * 2
  4. def start_process():
  5. print('Starting', multiprocessing.current_process().name)
  6. if __name__ == '__main__':
  7. inputs = list(range(10))
  8. print('Inputs :', inputs)
  9. builtin_output = list(map(do_calculation, inputs))
  10. print('Build-In :', builtin_output)
  11. pool_size = multiprocessing.cpu_count() * 2
  12. pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, )
  13. # 默认情况下,Pool会创建固定数目的工作进程,并向这些工作进程传递作业,直到再没有更多作业为止。
  14. # maxtasksperchild 参数为每个进程执行 task 的最大数目,
  15. # 设置 maxtasksperchild参数可以告诉池在完成一定数量任务之后重新启动一个工作进程,
  16. # 来避免运行时间很长的工作进程消耗太多的系统资源。
  17. # pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, maxtasksperchild=2)
  18. print('-' * 20)
  19. pool_outputs = pool.map(do_calculation, inputs)
  20. pool.close()
  21. pool.join()
  22. print('Pool :', pool_outputs)

监听器及客户端

认证密码

日志记录

线程池

multiprocessing.dummy 模块

编程指导

使用 multiprocessing 时,应遵循一些指导原则和习惯用法。

下面这些适用于所有start方法。

避免共享状态

应该尽可能避免在进程间传递大量数据,越少越好。

最好坚持使用队列或者管道进行进程间通信,而不是底层的同步原语。

可序列化

保证所代理的方法的参数是可以序列化的。

代理的线程安全性

不要在多线程中同时使用一个代理对象,除非你用锁保护它。

(而在不同进程中使用 相同 的代理对象却没有问题。)

使用 Join 避免僵尸进程

在 POSIX 上当一个进程结束但没有被合并则它将变成僵尸进程。 这样的进程应该不会很多因为每次启动新进程(或 active_children() 被调用)时所有尚未被合并的已完成进程都将被合并。 而且调用一个已结束进程的 Process.is_alive 也会合并这个进程。 虽然如此但显式地合并你所启动的所有进程仍然是个好习惯。

继承优于序列化、反序列化

当使用 spawn 或者 forkserver 的启动方式时,multiprocessing 中的许多类型都必须是可序列化的,这样子进程才能使用它们。但是通常我们都应该避免使用管道和队列发送共享对象到另外一个进程,而是重新组织代码,对于其他进程创建出来的共享对象,让那些需要访问这些对象的子进程可以直接将这些对象从父进程继承过来。

避免杀死进程

通过 Process.terminate 停止一个进程很容易导致这个进程正在使用的共享资源(如锁、信号量、管道和队列)损坏或者变得不可用,无法在其他进程中继续使用。

所以,最好只对那些从来不使用共享资源的进程调用 Process.terminate 。

Join 使用队列的进程

记住,往队列放入数据的进程会一直等待直到队列中所有项被"feeder" 线程传给底层管道。(子进程可以调用队列的 Queue.cancel_join_thread 方法禁止这种行为)

这意味着,任何使用队列的时候,你都要确保在进程join之前,所有存放到队列中的项将会被其他进程、线程完全消费。否则不能保证这个写过队列的进程可以正常终止。记住非精灵进程会自动 join 。

死锁的例子:交换最后两行可以修复这个问题(或者直接删掉 p.join())。

  1. from multiprocessing import Process, Queue
  2. def f(q):
  3. q.put('X' * 1000000)
  4. if __name__ == '__main__':
  5. queue = Queue()
  6. p = Process(target=f, args=(queue,))
  7. p.start()
  8. p.join() # this deadlocks
  9. obj = queue.get()

显式传递资源给子进程

在 POSIX 上使用 fork 启动方法,子进程将能够访问使用全局资源在父进程中创建的共享资源。 但是,更好的做法是将对象作为子进程构造器的参数来传入。

除了(部分原因)让代码兼容 Windows 以及其他的进程启动方式外,这种形式还保证了在子进程生命期这个对象是不会被父进程垃圾回收的。如果父进程中的某些对象被垃圾回收会导致资源释放,这就变得很重要。

示例:

  1. from multiprocessing import Process, Lock
  2. def f(l):
  3. ... do something using "l" ...
  4. if __name__ == '__main__':
  5. lock = Lock()
  6. for i in range(10):
  7. Process(target=f, args=(lock,)).start()

spawn 和 forkserver 启动方式相对于 fork 启动方式,有一些额外的限制。

更依赖序列化

Process.__init__() 的所有参数都必须可序列化。同样的,当你继承 Process 时,需要保证当调用 Process.start 方法时,实例可以被序列化。

全局变量

记住,如果子进程中的代码尝试访问一个全局变量,它所看到的值(如果有)可能和父进程中执行 Process.start 那一刻的值不一样。

当全局变量只是模块级别的常量时,是不会有问题的。

安全导入主模块

确保新的 Python 解释器可以安全地导入主模块,而不会导致意想不到的副作用(如启动新进程)。

例如,使用 spawn 或 forkserver 启动方式执行下面的模块,会引发 RuntimeError 异常而失败。

  1. from multiprocessing import Process
  2. def foo():
  3. print('hello')
  4. p = Process(target=foo)
  5. p.start()

应该通过下面的方法使用 if __name__ == '__main__': ,从而保护程序"入口点":

  1. from multiprocessing import Process, freeze_support, set_start_method
  2. def foo():
  3. print('hello')
  4. if __name__ == '__main__':
  5. freeze_support()
  6. set_start_method('spawn')
  7. p = Process(target=foo)
  8. p.start()

(如果程序将正常运行而不是冻结,则可以省略 freeze_support() 行)

这允许新启动的 Python 解释器安全导入模块然后运行模块中的 foo() 函数。

如果主模块中创建了进程池或者管理器,这个规则也适用。

multiprocessing 示例

示例:创建和使用自定义管理器、代理

  1. from multiprocessing import freeze_support
  2. from multiprocessing.managers import BaseManager, BaseProxy
  3. import operator
  4. ##
  5. class Foo:
  6. def f(self):
  7. print('you called Foo.f()')
  8. def g(self):
  9. print('you called Foo.g()')
  10. def _h(self):
  11. print('you called Foo._h()')
  12. # A simple generator function
  13. def baz():
  14. for i in range(10):
  15. yield i * i
  16. # Proxy type for generator objects
  17. class GeneratorProxy(BaseProxy):
  18. _exposed_ = ['__next__']
  19. def __iter__(self):
  20. return self
  21. def __next__(self):
  22. return self._callmethod('__next__')
  23. # Function to return the operator module
  24. def get_operator_module():
  25. return operator
  26. ##
  27. class MyManager(BaseManager):
  28. pass
  29. # register the Foo class; make `f()` and `g()` accessible via proxy
  30. MyManager.register('Foo1', Foo)
  31. # register the Foo class; make `g()` and `_h()` accessible via proxy
  32. MyManager.register('Foo2', Foo, exposed=('g', '_h'))
  33. # register the generator function baz; use `GeneratorProxy` to make proxies
  34. MyManager.register('baz', baz, proxytype=GeneratorProxy)
  35. # register get_operator_module(); make public functions accessible via proxy
  36. MyManager.register('operator', get_operator_module)
  37. ##
  38. def test():
  39. manager = MyManager()
  40. manager.start()
  41. print('-' * 20)
  42. f1 = manager.Foo1()
  43. f1.f()
  44. f1.g()
  45. assert not hasattr(f1, '_h')
  46. assert sorted(f1._exposed_) == sorted(['f', 'g'])
  47. print('-' * 20)
  48. f2 = manager.Foo2()
  49. f2.g()
  50. f2._h()
  51. assert not hasattr(f2, 'f')
  52. assert sorted(f2._exposed_) == sorted(['g', '_h'])
  53. print('-' * 20)
  54. it = manager.baz()
  55. for i in it:
  56. print('<%d>' % i, end=' ')
  57. print()
  58. print('-' * 20)
  59. op = manager.operator()
  60. print('op.add(23, 45) =', op.add(23, 45))
  61. print('op.pow(2, 94) =', op.pow(2, 94))
  62. print('op._exposed_ =', op._exposed_)
  63. ##
  64. if __name__ == '__main__':
  65. freeze_support()
  66. test()

示例:使用 Pool

  1. import multiprocessing
  2. import time
  3. import random
  4. import sys
  5. #
  6. # Functions used by test code
  7. #
  8. def calculate(func, args):
  9. result = func(*args)
  10. return '%s says that %s%s = %s' % (
  11. multiprocessing.current_process().name,
  12. func.__name__, args, result
  13. )
  14. def calculatestar(args):
  15. return calculate(*args)
  16. def mul(a, b):
  17. time.sleep(0.5 * random.random())
  18. return a * b
  19. def plus(a, b):
  20. time.sleep(0.5 * random.random())
  21. return a + b
  22. def f(x):
  23. return 1.0 / (x - 5.0)
  24. def pow3(x):
  25. return x ** 3
  26. def noop(x):
  27. pass
  28. #
  29. # Test code
  30. #
  31. def test():
  32. PROCESSES = 4
  33. print('Creating pool with %d processes\n' % PROCESSES)
  34. with multiprocessing.Pool(PROCESSES) as pool:
  35. #
  36. # Tests
  37. #
  38. TASKS = [(mul, (i, 7)) for i in range(10)] + \
  39. [(plus, (i, 8)) for i in range(10)]
  40. results = [pool.apply_async(calculate, t) for t in TASKS]
  41. imap_it = pool.imap(calculatestar, TASKS)
  42. imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
  43. print('Ordered results using pool.apply_async():')
  44. for r in results:
  45. print('\t', r.get())
  46. print()
  47. print('Ordered results using pool.imap():')
  48. for x in imap_it:
  49. print('\t', x)
  50. print()
  51. print('Unordered results using pool.imap_unordered():')
  52. for x in imap_unordered_it:
  53. print('\t', x)
  54. print()
  55. print('Ordered results using pool.map() --- will block till complete:')
  56. for x in pool.map(calculatestar, TASKS):
  57. print('\t', x)
  58. print()
  59. #
  60. # Test error handling
  61. #
  62. print('Testing error handling:')
  63. try:
  64. print(pool.apply(f, (5,)))
  65. except ZeroDivisionError:
  66. print('\tGot ZeroDivisionError as expected from pool.apply()')
  67. else:
  68. raise AssertionError('expected ZeroDivisionError')
  69. try:
  70. print(pool.map(f, list(range(10))))
  71. except ZeroDivisionError:
  72. print('\tGot ZeroDivisionError as expected from pool.map()')
  73. else:
  74. raise AssertionError('expected ZeroDivisionError')
  75. try:
  76. print(list(pool.imap(f, list(range(10)))))
  77. except ZeroDivisionError:
  78. print('\tGot ZeroDivisionError as expected from list(pool.imap())')
  79. else:
  80. raise AssertionError('expected ZeroDivisionError')
  81. it = pool.imap(f, list(range(10)))
  82. for i in range(10):
  83. try:
  84. x = next(it)
  85. except ZeroDivisionError:
  86. if i == 5:
  87. pass
  88. except StopIteration:
  89. break
  90. else:
  91. if i == 5:
  92. raise AssertionError('expected ZeroDivisionError')
  93. assert i == 9
  94. print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
  95. print()
  96. #
  97. # Testing timeouts
  98. #
  99. print('Testing ApplyResult.get() with timeout:', end=' ')
  100. res = pool.apply_async(calculate, TASKS[0])
  101. while 1:
  102. sys.stdout.flush()
  103. try:
  104. sys.stdout.write('\n\t%s' % res.get(0.02))
  105. break
  106. except multiprocessing.TimeoutError:
  107. sys.stdout.write('.')
  108. print()
  109. print()
  110. print('Testing IMapIterator.next() with timeout:', end=' ')
  111. it = pool.imap(calculatestar, TASKS)
  112. while 1:
  113. sys.stdout.flush()
  114. try:
  115. sys.stdout.write('\n\t%s' % it.next(0.02))
  116. except StopIteration:
  117. break
  118. except multiprocessing.TimeoutError:
  119. sys.stdout.write('.')
  120. print()
  121. print()
  122. if __name__ == '__main__':
  123. multiprocessing.freeze_support()
  124. test()

示例:使用队列来向一组工作进程提供任务并收集结果

  1. import time
  2. import random
  3. from multiprocessing import Process, Queue, current_process, freeze_support
  4. #
  5. # Function run by worker processes
  6. #
  7. def worker(input, output):
  8. for func, args in iter(input.get, 'STOP'):
  9. result = calculate(func, args)
  10. output.put(result)
  11. #
  12. # Function used to calculate result
  13. #
  14. def calculate(func, args):
  15. result = func(*args)
  16. return '%s says that %s%s = %s' % \
  17. (current_process().name, func.__name__, args, result)
  18. #
  19. # Functions referenced by tasks
  20. #
  21. def mul(a, b):
  22. time.sleep(0.5*random.random())
  23. return a * b
  24. def plus(a, b):
  25. time.sleep(0.5*random.random())
  26. return a + b
  27. #
  28. #
  29. #
  30. def test():
  31. NUMBER_OF_PROCESSES = 4
  32. TASKS1 = [(mul, (i, 7)) for i in range(20)]
  33. TASKS2 = [(plus, (i, 8)) for i in range(10)]
  34. # Create queues
  35. task_queue = Queue()
  36. done_queue = Queue()
  37. # Submit tasks
  38. for task in TASKS1:
  39. task_queue.put(task)
  40. # Start worker processes
  41. for i in range(NUMBER_OF_PROCESSES):
  42. Process(target=worker, args=(task_queue, done_queue)).start()
  43. # Get and print results
  44. print('Unordered results:')
  45. for i in range(len(TASKS1)):
  46. print('\t', done_queue.get())
  47. # Add more tasks using `put()`
  48. for task in TASKS2:
  49. task_queue.put(task)
  50. # Get and print some more results
  51. for i in range(len(TASKS2)):
  52. print('\t', done_queue.get())
  53. # Tell child processes to stop
  54. for i in range(NUMBER_OF_PROCESSES):
  55. task_queue.put('STOP')
  56. if __name__ == '__main__':
  57. freeze_support()
  58. test()

4、Python 协程

协程 英文名 Coroutine。 关于协程,可以参考 greenlet、stackless、gevent、eventlet 等的实现。

并发编程目前有四种方式:多进程,多线程,异步,和协程。

多线程编程 Python 中有 Thread 和 threading,在 linux 下所谓的线程,实际上是 LWP 轻量级进程,其在内核中具有和进程相同的调度方式,有关 LWP,COW(写时拷贝),fork,vfork,clone等的资料较多,不再赘述。异步在 linux 下主要有三种实现 select,poll,epoll 。

协程的好处:

  • 无需线程上下文切换的开销,协程的切换是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
  • 不需要多线程的锁机制。因为只有一个线程,也不存在同时写变量冲突,
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

如何利用多核资源:

  • 协程的本质是个单线程,它不能同时利用 CPU 多核资源。
  • 协程需要和进程配合才能运行在多CPU上。"异步多进程"  可以解决利用多核资源。多进程 + 协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

传统的 "生产者 --- 消费者" 模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。如果改用协程,生产者生产消息后,直接通过 yield 跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:

  1. import time
  2. def consumer():
  3. r = ''
  4. while True:
  5. n = yield r
  6. if not n:
  7. return
  8. print('[CONSUMER] Consuming %s...' % n)
  9. time.sleep(1)
  10. r = '200 OK'
  11. def produce(c):
  12. c.next()
  13. n = 0
  14. while n < 5:
  15. n = n + 1
  16. print('[PRODUCER] Producing %s...' % n)
  17. r = c.send(n)
  18. print('[PRODUCER] Consumer return: %s' % r)
  19. c.close()
  20. if __name__=='__main__':
  21. c = consumer()
  22. produce(c)

执行结果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

注意到consumer函数是一个generator(生成器),把一个consumer传入produce后:
        1. 首先调用c.next()启动生成器;
        2. 然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
        3. consumer通过yield拿到消息,处理,又通过yield把结果传回;
        4. produce拿到consumer处理的结果,继续生产下一条消息;
        5. produce决定不生产了,通过c.close()关闭consumer,整个过程结束。

整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。最后套用Donald Knuth的一句话总结协程的特点:“子程序就是协程的一种特例

线程、进程 的操作是由程序触发系统接口,最后的执行者是系统;

协程 的操作则是程序员。

协程示例:

import asyncio


async def cor1():
    print("COR1 start")
    await cor2()
    print("COR1 end")


async def cor2():
    print("COR2")


asyncio.run(cor1())

greenlet、Eventlet,gevent ( 推荐 )、

  • greenlet :是使用生成器实现的协程,调度起来很麻烦,而且不是正在意义上的协程,只是实现的代码执行过程中的挂起,唤醒操作。Greenlet没有自己的调度过程,所以一般不会直接使用。greenlet:http://greenlet.readthedocs.org/en/latest/
  • eventlet:是在 greenlet 的基础上实现了自己的 GreenThread,实际上就是 greenlet 类的扩展封装,而与Greenlet的不同是,Eventlet实现了自己调度器称为Hub,Hub类似于Tornado的IOLoop,是单实例的。在Hub中有一个event loop,根据不同的事件来切换到对应的GreenThread。同时 Eventlet 还实现了一系列的补丁来使 Python 标准库中的 socket 等等module 来支持 GreenThread 的切换。Eventlet 的 Hub 可以被定制来实现自己调度过程。eventlet 目前支持 CPython 2.7 和 3.4+ 并将在未来删除,仅保留 CPython 3.5+ 支持。
  • gevent:基于 libev 与 Greenlet 实现。不同于 Eventlet 的用 python 实现的 hub 调度,Gevent 通过 Cython 调用 libev 来实现一个高效的 event loop 调度循环。同时类似于 Event,Gevent 也有自己的 monkey_patch,在打了补丁后,完全可以使用 python 线程的方式来无感知的使用协程,减少了开发成本。

在 Python 的世界里由于 GIL 的存在,线程一直都不是很好用,所以就有了各种协程的 hack。gevnet 是当前使用起来最方便的协程,但是由于依赖于 libev 所以不能在 pypy上 跑,如果需要在pypy上使用协程,Eventlet 是最好的选择。

gevent 属于第三方模块需要下载安装包
        pip3 install --upgrade pip3
        pip3 install gevent

示例:

  1. import gevent
  2. def fun1():
  3. print("www.baidu.com") # 第一步
  4. gevent.sleep(0)
  5. print("end the baidu.com") # 第三步
  6. def fun2():
  7. print("www.zhihu.com") # 第二步
  8. gevent.sleep(0)
  9. print("end th zhihu.com") # 第四步
  10. gevent.joinall([
  11. gevent.spawn(fun1),
  12. gevent.spawn(fun2),
  13. ])

示例:遇到 IO 操作自动切换:

  1. import gevent
  2. from gevent import monkey
  3. from datetime import datetime
  4. monkey.patch_all()
  5. import requests
  6. def func(url):
  7. print(f"[{datetime.now().replace(microsecond=0)}] 开始请求 {url}")
  8. gevent.sleep(0)
  9. proxies = {
  10. "http": "http://172.17.18.80:8080",
  11. "https": "http://172.17.18.80:8080",
  12. }
  13. proxies = None
  14. resp = requests.get(url, proxies=proxies)
  15. print(f"[{datetime.now().replace(microsecond=0)}] {{resp.url}} ---> {len(resp.content)}")
  16. gevent.joinall([
  17. gevent.spawn(func, 'https://www.baidu.com/'),
  18. gevent.spawn(func, 'https://www.sina.com.cn/'),
  19. gevent.spawn(func, 'https://www.qq.com/'),
  20. ])

多进程 + 协程:

  1. import gevent
  2. from gevent import monkey
  3. monkey.patch_all()
  4. import sys
  5. import requests
  6. from multiprocessing import Process
  7. from importlib import reload
  8. def fetch(url):
  9. try:
  10. s = requests.Session()
  11. r = s.get(url, timeout=1) # 在这里抓取页面
  12. except BaseException as be:
  13. print(be)
  14. return None
  15. def setup_process(url_list):
  16. task_list = []
  17. for url in url_list:
  18. task_list.append(gevent.spawn(fetch, url))
  19. gevent.joinall(task_list) # 使用协程来执行
  20. def main(filepath, per_task=100000): # 每10W条url启动一个进程
  21. with open(filepath, 'r') as f: # 从给定的文件中读取url
  22. url_list = f.readlines()
  23. url_list = [temp.strip() for temp in url_list]
  24. url_len = len(url_list)
  25. process_count = int(url_len / per_task) if url_len % per_task == 0 else int(url_len / per_task) + 1
  26. for index in range(process_count):
  27. task_list = url_list[index * per_task: (index + 1) * per_task]
  28. p = Process(target=setup_process, args=(url_list,))
  29. p.start()
  30. if __name__ == '__main__':
  31. main('./test_data.txt') # 读取指定文件

例子中隐藏了一个问题:进程的数量会随着 url 数量的增加而不断增加,我们在这里不使用进程池multiprocessing.Pool 来控制进程数量的原因是 multiprocessing.Pool 和 gevent 有冲突不能同时使用,但是有兴趣的同学可以研究一下 gevent.pool 这个协程池。

  1. """
  2. 对于有些人来说Gevent和multiprocessing组合在一起使用算是个又高大上又奇葩的工作模式.
  3. Python的多线程受制于GIL全局锁的特性,Gevent身为协程也是线程的一种,只是io调度上自己说了算而已。
  4. 那么如何使用多个cpu核心?
  5. 可以利用多进程 mutliprocessing 来进行多核并行工作,
  6. 在多进程里面使用gevent协程框架可以更好的做io调度,相比线程来说减少了无谓的上下文切换.
  7. """
  8. import datetime
  9. from multiprocessing import Process, cpu_count, Queue, JoinableQueue
  10. import gevent
  11. from gevent import monkey
  12. monkey.patch_all()
  13. class Consumer(object):
  14. def __init__(self, task_queue, task_list, consumer_name):
  15. self.task_queue = task_queue
  16. self.task_list = task_list
  17. self.consumer_name = consumer_name
  18. self.__run_gevent()
  19. def __run_gevent(self):
  20. jobs = [gevent.spawn(self.__print_value) for x in range(self.task_list)]
  21. gevent.joinall(jobs)
  22. def __print_value(self):
  23. while True:
  24. value = self.task_queue.get()
  25. if value is None:
  26. self.task_queue.task_done()
  27. break
  28. else:
  29. print(f"[{datetime.datetime.now()}] {self.consumer_name} ---> value: {value}")
  30. return
  31. class Producer(object):
  32. def __init__(self, task_queue, task_list, producer_name, consumers_tasks):
  33. self.task_queue = task_queue
  34. self.task_list = task_list
  35. self.producer_name = producer_name
  36. self.consumer_tasks = consumers_tasks
  37. self.__run_gevent()
  38. def __run_gevent(self):
  39. jobs = [gevent.spawn(self.produce) for x in range(self.task_list)]
  40. gevent.joinall(jobs)
  41. for x in range(self.consumer_tasks):
  42. self.task_queue.put_nowait(None)
  43. self.task_queue.close()
  44. def produce(self):
  45. for no in range(10000):
  46. print(no)
  47. self.task_queue.put(no, block=False)
  48. return
  49. def main():
  50. worker_count = cpu_count() * 2
  51. task_queue = JoinableQueue()
  52. producer_gevent = 10
  53. consumer_gevent = 7
  54. pid_list = []
  55. for index in range(worker_count):
  56. if not index % 2:
  57. pid = Process(
  58. target=Producer,
  59. args=(task_queue, 10, f"producer_{index}", consumer_gevent)
  60. )
  61. pid.start()
  62. pid_list.append(pid)
  63. else:
  64. pid = Process(
  65. target=Consumer,
  66. args=(task_queue, consumer_gevent, f"consumer_{index}")
  67. )
  68. pid.start()
  69. pid_list.append(pid)
  70. for pid in pid_list:
  71. pid.join()
  72. if __name__ == '__main__':
  73. main()

5、异步多进程 aiomultiprocess 

使用格式

  1. import asyncio
  2. from aiomultiprocess import Pool
  3. async def async_worker(data):
  4. # 异步任务的逻辑
  5. await do_something_async(data)
  6. def do_something_async(data):
  7. # 在此处编写异步任务的逻辑
  8. ...
  9. def process_worker(data):
  10. # 同步任务的逻辑
  11. do_something(data)
  12. def do_something(data):
  13. # 在此处编写同步任务的逻辑
  14. ...
  15. async def main():
  16. # 创建进程池
  17. pool = await Pool()
  18. # 准备数据
  19. data_list = [1, 2, 3, 4, 5]
  20. # 异步任务
  21. async_tasks = [async_worker(data) for data in data_list]
  22. await asyncio.gather(*async_tasks)
  23. # 多进程同步任务
  24. await pool.map(process_worker, data_list)
  25. pool.close()
  26. await pool.join()
  27. # 运行主程序
  28. if __name__ == '__main__':
  29. asyncio.run(main())

示例

  1. import json
  2. import asyncio
  3. import aiomultiprocess
  4. from loguru import logger
  5. from ichrome import AsyncChromeDaemon
  6. from ichrome.async_utils import Chrome
  7. async def startup_chrome(dp_port=None):
  8. """
  9. 设置 chrome 参数,然后启动 chrome
  10. :param dp_port: 自定义 debug port
  11. :return:
  12. """
  13. logger.info(f'dp_port ---> {dp_port}')
  14. timeout = 5
  15. # 也可以给 Chrome 添加代理
  16. proxy = '127.0.0.1:8080'
  17. udd= f'c:/chrome_user_data_dir_{dp_port}'
  18. async with AsyncChromeDaemon(port=dp_port, proxy=proxy, user_data_dir=udd) as cd:
  19. async with cd.connect_tab(index=0) as tab:
  20. url = 'https://space.bilibili.com/1904149/'
  21. await tab.set_url(url, timeout=timeout)
  22. await asyncio.sleep(5)
  23. cookie = await tab.get_cookies(url, timeout=timeout)
  24. cookie_string = json.dumps(cookie, ensure_ascii=False)
  25. logger.info(f'cookie_string ---> {cookie_string}')
  26. async def main():
  27. db_list = [9301 + offset for offset in range(5)]
  28. async with aiomultiprocess.Pool() as aio_pool:
  29. await aio_pool.map(startup_chrome, db_list)
  30. await aio_pool.join()
  31. if __name__ == "__main__":
  32. asyncio.run(main())
  33. pass


 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/130803
推荐阅读
相关标签
  

闽ICP备14008679号