赞
踩
Async IO是一种并发编程设计,在Python中得到了专门的支持,从Python 3.4到3.7,并且可能还会继续发展。
你可能会担心地想到:“并发、并行、线程、多进程。这已经够难理解的了。异步IO又是什么?"
本教程旨在帮助你回答这个问题,让你更好地理解Python中异步IO的方法。
以下是本教程涵盖的内容:
协程(特殊的生成器函数)是Python中异步IO的核心,我们将在后面详细介绍它们。
注意:在本文中,我使用术语"异步IO(async IO)"表示异步IO的与语言无关的设计,而"asyncio"则是指Python的包。
在开始之前,你需要确保已经设置好使用asyncio和本教程中涉及的其他库的环境。
为了完整地按照本文进行操作,你需要Python 3.7或更高版本,以及aiohttp和aiofiles包:
- $ python3.7 -m venv ./py37async
- $ source ./py37async/bin/activate # Windows系统:.\py37async\Scripts\activate.bat
- $ pip install --upgrade pip aiohttp aiofiles # 可选:aiodns
如果需要安装Python 3.7并设置虚拟环境的帮助,请查阅Python 3安装和设置指南或虚拟环境入门指南。
有了这些准备,让我们开始吧!
异步IO相对于其久经考验的兄弟多进程和多线程而言,鲜为人知。本节将为你提供关于异步IO是什么以及它如何适应其周围环境的更全面的了解。
并发性和并行性是广阔的主题,不容易涉足。虽然本文重点介绍Python中的异步IO及其实现,但值得花点时间将异步IO与其他相对应的内容进行比较,以了解异步IO如何融入更大、有时令人迷惑的谜题中。
并行性包括同时执行多个操作。多进程是实现并行性的一种方式,它将任务分布在计算机的中央处理单元(CPU核心)上。多进程非常适合CPU密集型任务:紧密绑定的循环和数学计算通常属于这一类别。
并发性比并行性的范畴要广一些。它表示多个任务可以以重叠的方式运行。(有一句话说,并发不意味着并行。)
线程是一种并发执行模型,多个线程轮流执行任务。一个进程可以包含多个线程。由于Python的全局解释器锁(GIL),多线程在Python中有复杂的关系,但超出了本文的范围。
关于多线程,重要的是要知道它更适用于IO密集型任务。CPU密集型任务的特点是计算机的核心从开始到结束一直在全力工作,而IO密集型任务主要是等待输入/输出完成。
总结一下,并发包括多进程(适用于CPU密集型任务)和多线程(适用于IO密集型任务)。多进程是并行性的一种形式,而并行性是并发性的一种特定类型(子集)。Python标准库通过multiprocessing、threading和concurrent.futures等包长期支持这两种方法。
现在是时候引入一个新成员了。在过去的几年里,一种单独的设计已经更全面地内置到CPython中:异步IO(async IO),通过标准库中的asyncio包和新的async和await语言关键字启用。需要明确的是,异步IO不是一个新发明的概念,它已经存在或正在被构建到其他语言和运行环境中,如Go、C#或Scala。
Python文档将asyncio包描述为一个编写并发代码的库。然而,异步IO(async IO)并不是线程,也不是多进程。它没有建立在这两者之上。
事实上,异步IO(async IO)是一种单线程、单进程的设计:它使用协作式多任务处理,这个术语在本教程的最后将会详细介绍。换句话说,虽然在单个进程的单个线程中运行,但异步IO给人一种并发的感觉。协程(异步IO的核心特性)可以并发调度,但它们本身并不是并发的。
再次强调,异步IO是一种并发编程风格,但它并不是并行性。它与线程更为相似,而不是多进程,但与这两者都有明显区别,并且作为并发编程中独立的成员存在。
这还有一个术语需要解释。什么是异步的意思?虽然这不是一个严格的定义,但在我们这里,我可以想到两个属性:
下面是一个图表将所有内容整合在一起。白色术语代表概念,绿色术语代表它们的实现或实施方式:
关于并发编程模型的比较,我将就此打住。本教程专注于异步IO这个子组件,介绍如何使用它以及围绕它出现的API。
异步IO乍看之下可能会令人感到违反直觉和自相矛盾。它是如何利用单线程和单个CPU核心来促进并发代码的呢?我从来不太擅长构思例子,所以我想借用Miguel Grinberg在2017年PyCon演讲中的一个例子,他用一个非常精妙的方式解释了一切:
国际象棋大师Judit Polgár在一次展览中与多名业余棋手对局。她有两种进行展览的方式:同步和异步。
假设条件:
同步版本:Judit一次只玩一场比赛,直到完成这场比赛后才开始下一场。每场比赛需要(55 + 5) * 30 == 1800秒,即30分钟。整个展览需要24 * 30 == 720分钟,即12小时。
异步版本:Judit在不同的桌子间移动,每张桌子只走一步。她离开桌子,在等待对手走下一步的时间里进行下一步。在24场比赛中每场比赛只需要Judit花费5 * 24 == 120秒,即2分钟。整个展览时间缩短到了120 * 30 == 3600秒,即1小时。
Judit Polgár只有一个人,只有两只手,一次只能走一步棋。但是通过异步对局,展览时间从12小时缩短到了1小时。因此,协作式多任务处理就是程序的事件循环(稍后会详细介绍)与多个任务进行通信,在最佳时间让每个任务轮流运行。
异步IO利用长时间的等待期间,允许其他功能在这段空闲时间内运行。(阻塞的函数会在开始执行时禁止其他函数运行,直到其返回为止。)
我听过这样的说法:"尽可能使用异步 IO(async IO );必要时使用线程。" 事实上,构建稳健的多线程代码可能很困难且容易出错。异步IO避免了在使用线程设计时可能遇到的一些潜在问题。
但这并不意味着Python中的异步IO很容易。请注意:当你深入一点时,异步编程也可能很困难!Python的异步模型是围绕回调、事件、传输、协议和期物等概念构建的,光是术语就可能令人生畏。此外,其API一直在不断变化,这使得情况更加复杂。
幸运的是,asyncio已经发展到一个阶段,其中大部分功能不再是试验性的,同时其文档也经过了重大改进,还出现了一些优质资源来帮助理解这一主题。
现在你已经对异步IO有了一些背景了解,让我们来探索Python的实现方式。Python的asyncio包(在Python 3.4中引入)以及它的两个关键字async和await,分别担当着不同的角色,协助你声明、构建、执行和管理异步代码。
一句话的忠告:小心在互联网上看到的内容。Python 的异步 IO API 在 Python 3.4 到 Python 3.7 之间快速演变。一些旧的模式不再使用,而一些最初被禁止的东西现在通过新的引入被允许使用。
异步 IO 的核心是协程。协程是 Python 生成器函数的一个特殊版本。让我们从一个基本定义开始,然后随着你的进展逐步构建:协程是一个函数,它在达到 return 之前可以暂停其执行,并且可以间接地将控制权传递给另一个协程一段时间。
稍后,你将更深入地了解传统生成器是如何改变成协程的。目前,最简单的学习协程工作原理的方法是开始编写一些协程。
我们采用沉浸式的方式编写一些异步 IO 代码。这段简短的程序是异步 IO 的 Hello World,但它在说明核心功能方面很有用:
- #!/usr/bin/env python3
- # countasync.py
-
- import asyncio
-
- async def count():
- print("One")
- await asyncio.sleep(1)
- print("Two")
-
- async def main():
- await asyncio.gather(count(), count(), count())
-
- if __name__ == "__main__":
- import time
- s = time.perf_counter()
- asyncio.run(main())
- elapsed = time.perf_counter() - s
- print(f"{__file__} executed in {elapsed:0.2f} seconds.")
当执行这个文件时,注意与使用 def 和 time.sleep() 定义函数时的区别:
- $ python3 countasync.py
- One
- One
- One
- Two
- Two
- Two
- countasync.py executed in 1.01 seconds.
这个输出的顺序是异步 IO 的核心。对于每次调用 count(),都有一个单独的事件循环或协调器在处理。当每个任务达到 await asyncio.sleep(1) 时,该函数会向事件循环发送信号,并将控制权交还给它,告诉它:“我要睡眠 1 秒钟,你可以在这段时间内做其他有意义的事情。”
与之相比,同步版本如下:
- #!/usr/bin/env python3
- # countsync.py
-
- import time
-
- def count():
- print("One")
- time.sleep(1)
- print("Two")
-
- def main():
- for _ in range(3):
- count()
-
- if __name__ == "__main__":
- s = time.perf_counter()
- main()
- elapsed = time.perf_counter() - s
- print(f"{__file__} executed in {elapsed:0.2f} seconds.")
执行时,顺序和执行时间发生了轻微但关键的变化:
- $ python3 countsync.py
- One
- Two
- One
- Two
- One
- Two
- countsync.py executed in 3.01 seconds.
虽然使用 time.sleep() 和 asyncio.sleep() 看起来可能很普通,但它们被用作代表任何耗时的等待过程的替代品。(你可以等待的最普通的事情是一个几乎不执行任何操作的 sleep() 调用。)也就是说,time.sleep() 可以代表任何耗时的阻塞函数调用,而 asyncio.sleep() 用于代表非阻塞调用(但也需要一些时间来完成)。
正如你在下一节中将看到的,等待某些事情(包括 asyncio.sleep())的好处在于,周围的函数可以暂时将控制权交给另一个能够立即执行某些操作的函数。相比之下,time.sleep() 或任何其他阻塞调用与异步 Python 代码不兼容,因为它会在休眠期间完全停止所有操作。
在这一点上,我们需要更正式地定义async、await以及它们创建的协程函数。这部分内容可能有些复杂,但理解async/await的工作原理非常重要,所以如果需要的话,可以随时回头查阅这些内容:
在代码中,第二个要点的大致写法如下:
- async def g():
- # 在这里暂停,直到 f() 准备就绪时返回 g()
- r = await f()
- return r
此外,还有一些关于何时以及如何使用async/await的严格规定。无论你是刚开始学习这些语法还是已经有一些经验,这些规定都很有用:
下面是一些简洁的示例,总结了上面的几条规定:
- async def f(x):
- y = await z(x) # 可以 - 协程中允许使用await和return
- return y
-
- async def g(x):
- yield x # 可以 - 这是一个异步生成器
-
- async def m(x):
- yield from gen(x) # 不可以 - 语法错误
-
- def m(x):
- y = await z(x) # 依然不可以 - 语法错误(没有async def关键字)
- return y
最后,当你使用await f()时,f()必须是一个可等待的对象。那么这个解释似乎并没有什么用处,对吗?目前,只需要知道可等待的对象要么是另一个协程,要么是定义了.__await__()魔术方法并返回一个迭代器的对象。对于大多数编程任务而言,你只需要关心第一种情况。
接下来是一个可能会出现的另一个技术区别:旧的将函数标记为协程的方法是使用@asyncio.coroutine装饰普通的def函数。结果就是一个基于生成器的协程。自从Python 3.5引入了async/await语法后,这种写法已经过时。
这两种协程本质上是等价的(都是可等待的),但第一种是基于生成器的,而第二种是原生协程:
- import asyncio
-
- @asyncio.coroutine
- def py34_coro():
- """基于生成器的协程,旧的语法"""
- yield from stuff()
-
- async def py35_coro():
- """原生协程,新的语法"""
- await stuff()
如果你自己编写任何代码,请为了明确而不是含糊不清的目的,更倾向于使用原生协程。基于生成器的协程将在Python 3.10中被移除。
在本教程的后半部分,我们将仅出于解释的目的涉及基于生成器的协程。引入async/await的原因是使协程成为Python的一个独立功能,可以很容易地与普通生成器函数区分开,从而减少歧义。
不要陷入基于生成器的协程中,它们已经被async/await故意过时。它们有自己的一小套规则(例如,不能在基于生成器的协程中使用await),但如果你坚持使用async/await语法,这些规则在很大程度上是无关紧要的。
废话不多说,让我们来看几个更复杂的例子。
下面是一个async IO如何减少等待时间的示例:假设有一个协程函数makerandom(),它不断生成范围在[0, 10]之间的随机整数,直到其中一个整数超过阈值。你希望多次调用该协程函数时不需要等待彼此按顺序完成。你可以基本上按照上面两个脚本的模式进行编写,稍作修改:
- #!/usr/bin/env python3
- # rand.py
-
- import asyncio
- import random
-
- # ANSI colors
- c = (
- "\033[0m", # End of color
- "\033[36m", # Cyan
- "\033[91m", # Red
- "\033[35m", # Magenta
- )
-
- async def makerandom(idx: int, threshold: int = 6) -> int:
- print(c[idx + 1] + f"Initiated makerandom({idx}).")
- i = random.randint(0, 10)
- while i <= threshold:
- print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
- await asyncio.sleep(idx + 1)
- i = random.randint(0, 10)
- print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
- return i
-
- async def main():
- res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
- return res
-
- if __name__ == "__main__":
- random.seed(444)
- r1, r2, r3 = asyncio.run(main())
- print()
- print(f"r1: {r1}, r2: {r2}, r3: {r3}")
彩色的输出信息比我能讲的更多,并让你对脚本的执行方式有了一定的了解:
该程序使用一个主协程makerandom(),并在3个不同的输入上并发运行它。大多数程序将包含一些小型的模块化协程和一个包装函数,用于将这些小型协程连接在一起。然后,main()函数用于通过将中心协程映射到某个可迭代对象或池来收集任务(futures)。
在这个小例子中,池是range(3)。在稍后的更完整的示例中,它可以是一组需要并发请求、解析和处理的URL集合,而main()函数封装了每个URL的整个流程。
虽然“生成随机整数”(更多是CPU绑定)可能不是作为asyncio的最佳选择,但在示例中使用asyncio.sleep()的目的是模拟一个涉及不确定等待时间的IO限制进程。例如,asyncio.sleep()调用可以表示在消息应用程序中的两个客户端之间发送和接收不太随机的整数。
在本节中,我们将介绍Async IO具有的一组可能的脚本设计。
协程的一个关键特性是它们可以链接在一起(请记住,协程对象是可等待的,因此另一个协程可以await它)。这使你能够将程序拆分为更小、可管理和可重复使用的协程:
- #!/usr/bin/env python3
- # chained.py
-
- import asyncio
- import random
- import time
-
- async def part1(n: int) -> str:
- i = random.randint(0, 10)
- print(f"part1({n}) sleeping for {i} seconds.")
- await asyncio.sleep(i)
- result = f"result{n}-1"
- print(f"Returning part1({n}) == {result}.")
- return result
-
- async def part2(n: int, arg: str) -> str:
- i = random.randint(0, 10)
- print(f"part2{n, arg} sleeping for {i} seconds.")
- await asyncio.sleep(i)
- result = f"result{n}-2 derived from {arg}"
- print(f"Returning part2{n, arg} == {result}.")
- return result
-
- async def chain(n: int) -> None:
- start = time.perf_counter()
- p1 = await part1(n)
- p2 = await part2(n, p1)
- end = time.perf_counter() - start
- print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")
-
- async def main(*args):
- await asyncio.gather(*(chain(n) for n in args))
-
- if __name__ == "__main__":
- import sys
- random.seed(444)
- args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
- start = time.perf_counter()
- asyncio.run(main(*args))
- end = time.perf_counter() - start
- print(f"Program finished in {end:0.2f} seconds.")
请特别注意输出中,part1()会休眠不同的时间,而part2()则会在结果可用时开始处理:
- $ python3 chained.py 9 6 3
- part1(9) sleeping for 4 seconds.
- part1(6) sleeping for 4 seconds.
- part1(3) sleeping for 0 seconds.
- Returning part1(3) == result3-1.
- part2(3, 'result3-1') sleeping for 4 seconds.
- Returning part1(9) == result9-1.
- part2(9, 'result9-1') sleeping for 7 seconds.
- Returning part1(6) == result6-1.
- part2(6, 'result6-1') sleeping for 4 seconds.
- Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
- -->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).
- Returning part2(6, 'result6-1') == result6-2 derived from result6-1.
- -->Chained result6 => result6-2 derived from result6-1 (took 8.01 seconds).
- Returning part2(9, 'result9-1') == result9-2 derived from result9-1.
- -->Chained result9 => result9-2 derived from result9-1 (took 11.01 seconds).
- Program finished in 11.01 seconds.
在这个设置中,main()的运行时间将等于它收集并调度的任务的最长运行时间。
asyncio包提供了一组类似于queue模块中的类的队列类。到目前为止,我们在示例中并没有真正需要队列结构。在chained.py中,每个任务(future)由一组明确互相等待并逐个通过单个输入的协程组成。
还有一种替代结构也可以与Async IO一起使用:许多彼此不关联的生产者将项目添加到队列中。每个生产者可以在错开、随机且未通知的时间将多个项目添加到队列中。一组消费者会在物品出现时贪婪地从队列中拉取物品,而不需要等待其他信号。
在这种设计中,没有将任何个别消费者与生产者链接起来。消费者不知道生产者的数量,甚至不知道预先将添加到队列中的项目的总数。
每个生产者或消费者在从队列中放入或提取项目时需要不同的时间。队列充当一个通信通道,使得生产者和消费者可以直接进行通信而无需彼此直接交互。
注意:尽管由于queue.Queue()的线程安全性,队列通常在线程程序中使用,但在处理Async IO时无需担心线程安全性(除非将两者结合使用,但本教程中未涉及这种情况)。
队列的一个用例(正如本例所示)是作为生产者和消费者之间传输通道的队列,它们没有直接链接或关联。
这个程序的同步版本看起来相当糟糕:一组阻塞的生产者按顺序一个接一个地将项目添加到队列中。只有在所有生产者完成后,队列才能由一个消费者逐个处理。这种设计中存在大量的延迟。项目可能会闲置在队列中而不被立即接收和处理。
下面是异步版本的asyncq.py。这个工作流程的挑战在于需要向消费者发出生产完成的信号。否则,await q.get()将会无限期地挂起,因为队列已经完全处理,但是消费者却不知道生产已经完成。
(非常感谢一个StackOverflow用户提供的帮助,帮助修复了main()函数:关键在于使用await q.join(),它会阻塞直到队列中的所有项目都被接收和处理,然后取消消费者任务,否则它们将无休止地等待额外的队列项目。)
以下是完整的脚本:
- #!/usr/bin/env python3
- # asyncq.py
-
- import asyncio
- import itertools as it
- import os
- import random
- import time
-
- async def makeitem(size: int = 5) -> str:
- return os.urandom(size).hex()
-
- async def randsleep(caller=None) -> None:
- i = random.randint(0, 10)
- if caller:
- print(f"{caller} sleeping for {i} seconds.")
- await asyncio.sleep(i)
-
- async def produce(name: int, q: asyncio.Queue) -> None:
- n = random.randint(0, 10)
- for _ in it.repeat(None, n): # Synchronous loop for each single producer
- await randsleep(caller=f"Producer {name}")
- i = await makeitem()
- t = time.perf_counter()
- await q.put((i, t))
- print(f"Producer {name} added <{i}> to queue.")
-
- async def consume(name: int, q: asyncio.Queue) -> None:
- while True:
- await randsleep(caller=f"Consumer {name}")
- i, t = await q.get()
- now = time.perf_counter()
- print(f"Consumer {name} got element <{i}>"
- f" in {now-t:0.5f} seconds.")
- q.task_done()
-
- async def main(nprod: int, ncon: int):
- q = asyncio.Queue()
- producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
- consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
- await asyncio.gather(*producers)
- await q.join() # Implicitly awaits consumers, too
- for c in consumers:
- c.cancel()
-
- if __name__ == "__main__":
- import argparse
- random.seed(444)
- parser = argparse.ArgumentParser()
- parser.add_argument("-p", "--nprod", type=int, default=5)
- parser.add_argument("-c", "--ncon", type=int, default=10)
- ns = parser.parse_args()
- start = time.perf_counter()
- asyncio.run(main(**ns.__dict__))
- elapsed = time.perf_counter() - start
- print(f"Program completed in {elapsed:0.5f} seconds.")
前几个协程是一些辅助函数,用于返回随机字符串、小数秒的性能计数器和随机整数。生产者将1到5个项目放入队列中。每个项目都是一个元组(i, t),其中i是一个随机字符串,t是生产者尝试将元组放入队列的时间。
当消费者取出一个项目时,它简单地使用项目放入队列的时间戳计算项目在队列中的等待时间。
请注意,asyncio.sleep()被用来模拟一些其他更复杂的协程,如果它是一个普通的阻塞函数,它将会占用时间并阻塞所有其他执行。
以下是使用两个生产者和五个消费者进行的测试运行:
- $ python3 asyncq.py -p 2 -c 5
- Producer 0 sleeping for 3 seconds.
- Producer 1 sleeping for 3 seconds.
- Consumer 0 sleeping for 4 seconds.
- Consumer 1 sleeping for 3 seconds.
- Consumer 2 sleeping for 3 seconds.
- Consumer 3 sleeping for 5 seconds.
- Consumer 4 sleeping for 4 seconds.
- Producer 0 added <377b1e8f82> to queue.
- Producer 0 sleeping for 5 seconds.
- Producer 1 added <413b8802f8> to queue.
- Consumer 1 got element <377b1e8f82> in 0.00013 seconds.
- Consumer 1 sleeping for 3 seconds.
- Consumer 2 got element <413b8802f8> in 0.00009 seconds.
- Consumer 2 sleeping for 4 seconds.
- Producer 0 added <06c055b3ab> to queue.
- Producer 0 sleeping for 1 seconds.
- Consumer 0 got element <06c055b3ab> in 0.00021 seconds.
- Consumer 0 sleeping for 4 seconds.
- Producer 0 added <17a8613276> to queue.
- Consumer 4 got element <17a8613276> in 0.00022 seconds.
- Consumer 4 sleeping for 5 seconds.
- Program completed in 9.00954 seconds.
在这种情况下,项目在几分之一秒内被处理。延迟可能有两个原因:
关于第二个原因,幸运的是,完全可以扩展到数百甚至数千个消费者。对于python3 asyncq.py -p 5 -c 100,你不应该有任何问题。关键在于,从理论上讲,你可以有不同的用户在不同的系统上控制生产者和消费者的管理,队列作为中央吞吐量。
到目前为止,你已经直接接触了三个使用async和await定义的协程的asyncio调用的相关示例。如果你还没有完全理解,或者只是想更深入地了解现代协程在Python中是如何出现的,那么你将从下一节。
之前,你看到了基于旧式生成器的协程的例子,这种协程已经被更显式的本地协程取代。这个例子值得用一个小小的改动重新展示:
- import asyncio
-
- @asyncio.coroutine
- def py34_coro():
- """Generator-based coroutine"""
- # No need to build these yourself, but be aware of what they are
- s = yield from stuff()
- return s
-
- async def py35_coro():
- """Native coroutine, modern syntax"""
- s = await stuff()
- return s
-
- async def stuff():
- return 0x10, 0x20, 0x30
作为一个实验,如果你单独调用`py34_coro()`或`py35_coro()`,没有使用`await`,也没有调用`asyncio.run()`或其他`asyncio`的"门面"函数,会发生什么呢?单独调用一个协程会返回一个协程对象:
- >>> py35_coro()
- <coroutine object py35_coro at 0x10126dcc8>
表面上看这并没有太大的意义。单独调用一个协程会返回一个可等待的协程对象。
现在来做一个问题:Python 的哪个特性看起来是这样的?(当它被单独调用时实际上并没有太多作用?)
希望你会想到生成器作为这个问题的答案,因为协程实际上是在生成器的基础上进行了增强。在这方面,它们的行为类似:
- >>> def gen():
- ... yield 0x10, 0x20, 0x30
- ...
- >>> g = gen()
- >>> g # Nothing much happens - need to iterate with `.__next__()`
- <generator object gen at 0x1012705e8>
- >>> next(g)
- (16, 32, 48)
生成器函数实际上是异步IO的基础(无论你是否使用新的`async def`语法而不是旧的`@asyncio.coroutine`装饰器声明协程)。技术上讲,`await`更接近于`yield from`而不是`yield`。(但请记住,`yield from x()`只是语法糖,用于替换`for i in x(): yield i`。)
与异步IO相关的生成器的一个关键特性是它们可以随时被有效地停止和重新启动。例如,你可以打断对生成器对象的迭代,然后稍后继续迭代剩余的值。当生成器函数到达`yield`时,它产生该值,但然后处于空闲状态,直到被告知产生下一个值。
我们可以通过一个例子来详细说明:
- async def counter():
- for i in range(3):
- print(i)
- await asyncio.sleep(1)
-
- async def main():
- task = asyncio.create_task(counter())
- await task
- print("Done")
-
- asyncio.run(main())
关键字`await`的行为类似,标记了一个断点,协程在这里暂停自身并让其他协程工作。在这种情况下,“暂停”意味着协程暂时放弃控制,但并未完全退出或完成。请记住,`yield`,以及其衍生的`yield from`和`await`,标记了生成器执行的断点。
这是函数和生成器之间的基本区别。函数是全有或全无的。一旦开始执行,它将不会停止,直到遇到`return`语句,并将值返回给调用者(调用该函数的函数)。而生成器则在每次遇到`yield`时暂停,并停在那里。它不仅可以将该值推送给调用堆栈,而且在通过调用`next()`恢复生成器时,可以保留其本地变量。
生成器还有一个不太为人知但非常重要的特性。你还可以通过`send()`方法将值发送到生成器中。这使得生成器(和协程)可以相互调用(等待),而无需阻塞。我不会进一步深入探讨此功能的细节,因为它主要适用于协程在幕后的实现,但你实际上不需要直接使用它。
让我们试着将上述所有文章浓缩成几句话:协程实际上是通过一种特殊而非传统的机制运行的。它们的结果是在调用其`send()`方法时抛出的异常对象的属性。所有这些都涉及一些更为复杂的细节,但在实践中,这些细节可能并不会对你使用该语言的这一部分产生太大帮助,所以我们继续进行下一步。
为了总结协程作为生成器的关键要点,以下是一些要点:
除了普通的 async/await,Python 还支持 async for,用于迭代异步迭代器。异步迭代器的目的是在每次迭代时能够调用异步代码。
这个概念的自然延伸是异步生成器。回想一下,你可以在原生协程中使用 await、return 或 yield。在 Python 3.6 中(通过 PEP 525),使用 yield 在协程内部成为可能,引入了异步生成器,允许在同一个协程函数体中同时使用 await 和 yield:
- >>> async def mygen(u: int = 10):
- ... """Yield powers of 2."""
- ... i = 0
- ... while i < u:
- ... yield 2 ** i
- ... i += 1
- ... await asyncio.sleep(0.1)
最后但同样重要的是,Python 还允许使用 async for 进行异步推导式。就像其同步版本一样,这主要是语法糖:
- >>> async def main():
- ... # This does *not* introduce concurrent execution
- ... # It is meant to show syntax only
- ... g = [i async for i in mygen()]
- ... f = [j async for j in mygen() if not (j // 3 % 5)]
- ... return g, f
- ...
- >>> g, f = asyncio.run(main())
- >>> g
- [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
- >>> f
- [1, 2, 16, 32, 256, 512]
这是一个关键的区别:异步生成器和推导式都不会使迭代并发执行。它们仅提供了与同步版本相似的外观和感觉,但允许相关的循环放弃控制权给事件循环以运行其他协程。
换句话说,异步迭代器和异步生成器并不旨在同时对序列或迭代器进行某个函数的映射。它们仅设计为允许封闭的协程让其他任务轮流执行。只有在使用普通的 for 或 with 语句可能“破坏”协程中的 await 特性时,才需要 async for 和 async with 语句。理解异步性和并发性之间的区别至关重要。
可以将事件循环类比为一个类似于 while True 的循环,它监视协程,根据空闲情况并寻找可执行的内容。当协程等待的内容可用时,它能唤醒处于空闲状态的协程。
到目前为止,事件循环的整个管理工作都是通过一个函数调用隐式处理的:
asyncio.run(main()) # Python 3.7+
`asyncio.run()` 是在 Python 3.7 中引入的函数,负责获取事件循环、运行任务直到它们被标记为完成,并关闭事件循环。
有一种更冗长的方式来管理 asyncio 事件循环,使用 `get_event_loop()`。典型的模式如下所示:
- loop = asyncio.get_event_loop()
- try:
- loop.run_until_complete(main())
- finally:
- loop.close()
在较旧的示例中,你可能会看到 `loop.get_event_loop()` 的用法,但除非你有特定需求需要对事件循环进行详细控制,否则对于大多数程序来说,`asyncio.run()` 应该足够了。
如果你确实需要在 Python 程序中与事件循环交互,`loop` 是一个具有传统 Python 对象特性的对象,你可以使用 `loop.is_running()` 和 `loop.is_closed()` 进行内省。如果需要更精细的控制,可以通过将循环作为参数传递来操作它,例如在安排回调时。
更重要的是,了解事件循环的一些内部机制。以下是关于事件循环值得强调的几点。
#1:协程本身在未绑定到事件循环时并不执行太多操作。
你之前在有关生成器的解释中已经提到了这一点,但值得再次强调。如果你有一个主协程等待其他协程,仅仅单独调用它几乎没有任何效果:
- >>> import asyncio
-
- >>> async def main():
- ... print("Hello ...")
- ... await asyncio.sleep(1)
- ... print("World!")
-
- >>> routine = main()
- >>> routine
- <coroutine object main at 0x1027a6150>
记得使用 `asyncio.run()` 来实际上强制执行,通过将 `main()` 协程(future 对象)调度到事件循环中执行:
- >>> asyncio.run(routine)
- Hello ...
- World!
其他协程可以使用 await
运行。通常情况下,只需将 main()
放在 asyncio.run()
中进行包装,然后从那里调用使用 await
链接协程。
#2:默认情况下,异步 IO 事件循环在单个线程和单个 CPU 核心上运行。通常情况下,在一个单线程的事件循环中运行在一个 CPU 核心上是足够的。也可以在多个核心上运行事件循环。
#3:事件循环是可插拔的。也就是说,如果你真的有需要,可以编写自己的事件循环实现,并以相同的方式运行任务。这在 uvloop 包中得到了很好的演示,它是使用 Cython 实现的事件循环。
这就是“可插拔事件循环”一词的含义:你可以使用任何有效的事件循环实现,而不必关心协程本身的结构。asyncio 包本身附带两种不同的事件循环实现,其中默认实现基于 selectors 模块。(第二种实现仅适用于 Windows。)
你已经走到了这一步,现在是时候进入有趣又轻松的部分了。在本节中,你将使用 aiohttp 构建一个网络抓取 URL 收集器,名为 areq.py,它是一个极快的异步 HTTP 客户端/服务器框架。(我们只需要客户端部分。)这样的工具可以用于绘制一组站点之间的连接,从而形成一个有向图。
注意:你可能想知道为什么 Python 的 requests 包不与异步 IO 兼容。requests 是构建在 urllib3 之上的,而 urllib3 又使用了 Python 的 http 和 socket 模块。
默认情况下,套接字操作是阻塞的。这意味着 Python 不支持 await requests.get(url),因为 .get() 不可等待。相比之下,aiohttp 中几乎所有的东西都是可等待的协程,比如 session.request() 和 response.text()。它是一个很棒的包,但在异步代码中使用 requests 就是对自己不利。
高级程序结构将如下所示:
以下是 urls.txt 的内容。它并不是很大,主要包含高流量的网站:
- $ cat urls.txt
- https://regex101.com/
- https://docs.python.org/3/this-url-will-404.html
- https://www.nytimes.com/guides/
- https://www.mediamatters.org/
- https://1.1.1.1/
- https://www.politico.com/tipsheets/morning-money
- https://www.bloomberg.com/markets/economics
- https://www.ietf.org/rfc/rfc2616.txt
列表中的第二个 URL 应该返回一个 404 响应,你需要优雅地处理它。如果你运行的是这个程序的扩展版本,你可能需要处理比这更复杂的问题,比如服务器断开连接和无限重定向。
请求本身应该使用单个会话进行,以利用会话的内部连接池的复用。
让我们来看看完整的程序。我们将逐步完成以下操作:
- #!/usr/bin/env python3
- # areq.py
-
- """Asynchronously get links embedded in multiple pages' HMTL."""
-
- import asyncio
- import logging
- import re
- import sys
- from typing import IO
- import urllib.error
- import urllib.parse
-
- import aiofiles
- import aiohttp
- from aiohttp import ClientSession
-
- logging.basicConfig(
- format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
- level=logging.DEBUG,
- datefmt="%H:%M:%S",
- stream=sys.stderr,
- )
- logger = logging.getLogger("areq")
- logging.getLogger("chardet.charsetprober").disabled = True
-
- HREF_RE = re.compile(r'href="(.*?)"')
-
- async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
- """GET request wrapper to fetch page HTML.
- kwargs are passed to `session.request()`.
- """
-
- resp = await session.request(method="GET", url=url, **kwargs)
- resp.raise_for_status()
- logger.info("Got response [%s] for URL: %s", resp.status, url)
- html = await resp.text()
- return html
-
- async def parse(url: str, session: ClientSession, **kwargs) -> set:
- """Find HREFs in the HTML of `url`."""
- found = set()
- try:
- html = await fetch_html(url=url, session=session, **kwargs)
- except (
- aiohttp.ClientError,
- aiohttp.http_exceptions.HttpProcessingError,
- ) as e:
- logger.error(
- "aiohttp exception for %s [%s]: %s",
- url,
- getattr(e, "status", None),
- getattr(e, "message", None),
- )
- return found
- except Exception as e:
- logger.exception(
- "Non-aiohttp exception occured: %s", getattr(e, "__dict__", {})
- )
- return found
- else:
- for link in HREF_RE.findall(html):
- try:
- abslink = urllib.parse.urljoin(url, link)
- except (urllib.error.URLError, ValueError):
- logger.exception("Error parsing URL: %s", link)
- pass
- else:
- found.add(abslink)
- logger.info("Found %d links for %s", len(found), url)
- return found
-
- async def write_one(file: IO, url: str, **kwargs) -> None:
- """Write the found HREFs from `url` to `file`."""
- res = await parse(url=url, **kwargs)
- if not res:
- return None
- async with aiofiles.open(file, "a") as f:
- for p in res:
- await f.write(f"{url}\t{p}\n")
- logger.info("Wrote results for source URL: %s", url)
-
- async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
- """Crawl & write concurrently to `file` for multiple `urls`."""
- async with ClientSession() as session:
- tasks = []
- for url in urls:
- tasks.append(
- write_one(file=file, url=url, session=session, **kwargs)
- )
- await asyncio.gather(*tasks)
-
- if __name__ == "__main__":
- import pathlib
- import sys
-
- assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
- here = pathlib.Path(__file__).parent
-
- with open(here.joinpath("urls.txt")) as infile:
- urls = set(map(str.strip, infile))
-
- outpath = here.joinpath("foundurls.txt")
- with open(outpath, "w") as outfile:
- outfile.write("source_url\tparsed_url\n")
-
- asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))
这个脚本比我们最初的玩具程序要长一些,所以让我们来逐步分析一下。
常量 HREF_RE 是一个正则表达式,用于提取我们最终要搜索的内容,即 HTML 中的 href 标签:
- >>> HREF_RE.search('Go to <a href="https://realpython.com/">Real Python</a>')
- <re.Match object; span=(15, 45), match='href="https://realpython.com/"'>
协程 fetch_html() 是一个包装器,用于执行 GET 请求、发送请求并解码生成的页面 HTML。它发送请求,等待响应,并在非 200 状态时立即引发异常:
- resp = await session.request(method="GET", url=url, **kwargs)
- resp.raise_for_status()
如果状态码正常,fetch_html() 将返回页面的 HTML(一个字符串)。需要注意的是,在这个函数中没有进行任何异常处理。逻辑是将该异常传播给调用者并在调用者处处理:
html = await resp.text()
我们使用 await 来等待 session.request() 和 resp.text(),因为它们是可等待的协程。否则,请求/响应的循环将成为应用程序中占用时间最长的部分,但是通过使用异步 IO,fetch_html() 可以让事件循环处理其他已准备好的工作,如解析和写入已获取的 URL。
协程链中的下一个函数是 parse(),它等待 fetch_html() 返回的指定 URL 的页面,并从该页面的 HTML 中提取所有的 href 标签,确保每个标签都是有效的,并将其格式化为绝对路径。
值得注意的是,parse() 函数的第二部分是阻塞的,但它只是一个快速的正则表达式匹配,并确保所发现的链接转换为绝对路径。在这个特定的情况下,这个同步代码应该很快且不显眼。但请记住,给定协程中的任何行都会阻塞其他协程,除非该行使用 yield、await 或 return。如果解析过程更加复杂,你可能需要考虑将这部分代码放在自己的进程中,并使用 loop.run_in_executor() 运行。
然后,协程 write() 接收一个文件对象和一个单独的 URL,等待 parse() 返回解析后的 URL 集合,并通过 aiofiles 包(用于异步文件 IO)将每个 URL 和其源 URL 异步写入文件。
最后,bulk_crawl_and_write() 函数是脚本中协程链的主要入口点。它使用一个单独的会话,并为从 urls.txt 文件中读取的每个 URL 创建一个任务。
以下是一些值得一提的附加说明:
这是它所有荣耀的执行,areq.py 在不到一秒的时间内获取、解析并保存了 9 个 URL 的结果:
- $ python3 areq.py
- 21:33:22 DEBUG:asyncio: Using selector: KqueueSelector
- 21:33:22 INFO:areq: Got response [200] for URL: https://www.mediamatters.org/
- 21:33:22 INFO:areq: Found 115 links for https://www.mediamatters.org/
- 21:33:22 INFO:areq: Got response [200] for URL: https://www.nytimes.com/guides/
- 21:33:22 INFO:areq: Got response [200] for URL: https://www.politico.com/tipsheets/morning-money
- 21:33:22 INFO:areq: Got response [200] for URL: https://www.ietf.org/rfc/rfc2616.txt
- 21:33:22 ERROR:areq: aiohttp exception for https://docs.python.org/3/this-url-will-404.html [404]: Not Found
- 21:33:22 INFO:areq: Found 120 links for https://www.nytimes.com/guides/
- 21:33:22 INFO:areq: Found 143 links for https://www.politico.com/tipsheets/morning-money
- 21:33:22 INFO:areq: Wrote results for source URL: https://www.mediamatters.org/
- 21:33:22 INFO:areq: Found 0 links for https://www.ietf.org/rfc/rfc2616.txt
- 21:33:22 INFO:areq: Got response [200] for URL: https://1.1.1.1/
- 21:33:22 INFO:areq: Wrote results for source URL: https://www.nytimes.com/guides/
- 21:33:22 INFO:areq: Wrote results for source URL: https://www.politico.com/tipsheets/morning-money
- 21:33:22 INFO:areq: Got response [200] for URL: https://www.bloomberg.com/markets/economics
- 21:33:22 INFO:areq: Found 3 links for https://www.bloomberg.com/markets/economics
- 21:33:22 INFO:areq: Wrote results for source URL: https://www.bloomberg.com/markets/economics
- 21:33:23 INFO:areq: Found 36 links for https://1.1.1.1/
- 21:33:23 INFO:areq: Got response [200] for URL: https://regex101.com/
- 21:33:23 INFO:areq: Found 23 links for https://regex101.com/
- 21:33:23 INFO:areq: Wrote results for source URL: https://regex101.com/
- 21:33:23 INFO:areq: Wrote results for source URL: https://1.1.1.1/
这不错!作为一个合理性检查,你可以检查输出的行数。在我的例子中,是 626 行,但要记住这个数字可能会有所波动:
- $ wc -l foundurls.txt
- 626 foundurls.txt
-
- $ head -n 3 foundurls.txt
- source_url parsed_url
- https://www.bloomberg.com/markets/economics https://www.bloomberg.com/feedback
- https://www.bloomberg.com/markets/economics https://www.bloomberg.com/notices/tos
后续步骤:如果你想进一步提高,可以让这个网络爬虫递归。你可以使用 aio-redis 来跟踪已在树中爬取的 URL,避免重复请求,并使用 Python 的 networkx 库连接链接。
记得要友好对待。向一个小而毫无防备的网站发送 1000 个并发请求是非常糟糕的,非常糟糕。有办法限制一次批处理中发出的并发请求数量,例如使用 asyncio 的 semaphore 对象或使用像下面这样的模式。如果不遵守这个警告,你可能会遇到大量的 TimeoutError 异常,最终只会伤害你自己的程序。
现在你已经看到了大量的代码,让我们退后一步,考虑一下何时异步 IO 是一个理想的选择,以及你如何进行比较,以得出这个结论或者选择其他并发模型。
本教程不是一个对比异步 IO、线程和多进程的详细论述之地。然而,了解异步 IO 在三者中可能是最佳选择的情况是有帮助的。
异步 IO 和多进程之间的对比并不真的是一场对决。实际上,它们可以相互配合使用。如果你有多个相对均匀的CPU密集型任务(一个很好的例子是在诸如scikit-learn或keras等库中进行网格搜索),多进程应该是一个显而易见的选择。
仅仅在每个函数之前添加async关键字是一个不好的主意,如果所有的函数都使用阻塞调用的话(这实际上会减慢你的代码)。但正如之前提到的,异步 IO 和多进程可以在某些情况下和谐共存。
异步 IO 和线程之间的比较稍微直接一些。我在介绍中提到过"线程很难"。完整的情况是,即使在看似容易实现线程的情况下,它仍然可能导致臭名昭著的难以追踪的错误,如竞争条件和内存使用等问题。
线程的扩展性也不如异步 IO 那样优雅,因为线程是一种有限可用性的系统资源。在许多计算机上创建成千上万个线程会失败,我不建议一开始就尝试。而创建成千上万个异步 IO 任务完全是可行的。
当你有多个以 IO 为主的任务时,异步 IO 显得十分出色,其中这些任务本来会被阻塞的 IO 等待时间所主导,例如:
不使用异步 IO 的最大原因是await仅支持一组特定对象,这些对象定义了一组特定方法。如果你想使用某个特定数据库管理系统进行异步读取操作,你不仅需要找到一个适用于该数据库管理系统的Python封装,还需要一个支持async/await语法的封装器。包含同步调用的协程会阻塞其他协程和任务的运行。
关于支持async/await的一些库的简要列表,请参见本教程末尾的列表。
本教程专注于异步 IO、async/await 语法以及使用 asyncio 进行事件循环管理和指定任务。asyncio 当然不是唯一的异步 IO 库。Nathaniel J. Smith 的这番观察很有意思:
"几年后,asyncio 可能会沦为像 urllib2 这样的标准库,被精明的开发人员所避免使用。
...
我在实际上所争论的是,asyncio 是其自身成功的牺牲品:在设计时,它采用了最佳的方法;但自那以后,受 asyncio 的启发,如添加 async/await,已经改变了局面,我们现在可以做得更好,而 asyncio 却受到早期承诺的制约。"
为此,有一些大牌的替代方案,它们与 asyncio 做的事情相同,尽管使用不同的 API 和方法,例如 curio 和 trio。就我个人而言,我认为如果你正在构建一个中等规模、简单的程序,仅仅使用 asyncio 就足够了,并且可以理解,这样你就不用添加另一个庞大的依赖库,超出了 Python 标准库之外。
但是,你可以查看一下 curio 和 trio,也许你会发现它们以对你来说更直观的方式完成同样的工作。这里介绍的许多与包无关的概念也应该适用于其他异步 IO 包。
在接下来的几个部分中,你将了解一些关于 asyncio 和 async/await 的其他细节,这些内容在本教程之前没有很好地涵盖,但对于构建和理解完整的程序仍然很重要。
除了asyncio.run()之外,你还见过其他一些包级函数,例如asyncio.create_task()和asyncio.gather()。
你可以使用create_task()来安排协程对象的执行,然后使用asyncio.run()来运行它:
- >>> import asyncio
-
- >>> async def coro(seq) -> list:
- ... """'IO' wait time is proportional to the max element."""
- ... await asyncio.sleep(max(seq))
- ... return list(reversed(seq))
- ...
- >>> async def main():
- ... # This is a bit redundant in the case of one task
- ... # We could use `await coro([3, 2, 1])` on its own
- ... t = asyncio.create_task(coro([3, 2, 1])) # Python 3.7+
- ... await t
- ... print(f't: type {type(t)}')
- ... print(f't done: {t.done()}')
- ...
- >>> t = asyncio.run(main())
- t: type <class '_asyncio.Task'>
- t done: True
这种模式有一个微妙之处:如果你在main()函数中没有使用await t,它可能在main()函数本身发出完成信号之前就已经完成了。因为asyncio.run(main())调用了loop.run_until_complete(main()),事件循环只关心main()函数是否完成(在没有await t的情况下),而不关心在main()函数内部创建的任务是否完成。如果没有await t,循环中的其他任务可能会被取消,可能会在它们完成之前被取消。如果你需要获取当前待处理任务的列表,你可以使用asyncio.Task.all_tasks()。
注意:asyncio.create_task()是在Python 3.7中引入的。在Python 3.6或更低版本中,可以使用asyncio.ensure_future()来替代create_task()。
另外,还有一个函数叫做asyncio.gather()。虽然它没有特别的功能,但它可以将一组协程(或期物)整合到一个单独的期物中。因此,它返回一个单一的期物对象,并且如果你使用await asyncio.gather()并指定多个任务或协程,你将等待它们全部完成。(这在某种程度上类似于我们之前的例子中的queue.join()。)gather()的结果将是各个输入的结果列表:
- >>> import time
- >>> async def main():
- ... t = asyncio.create_task(coro([3, 2, 1]))
- ... t2 = asyncio.create_task(coro([10, 5, 0])) # Python 3.7+
- ... print('Start:', time.strftime('%X'))
- ... a = await asyncio.gather(t, t2)
- ... print('End:', time.strftime('%X')) # Should be 10 seconds
- ... print(f'Both tasks done: {all((t.done(), t2.done()))}')
- ... return a
- ...
- >>> a = asyncio.run(main())
- Start: 16:20:11
- End: 16:20:21
- Both tasks done: True
- >>> a
- [[1, 2, 3], [0, 5, 10]]
你可能已经注意到,gather()会等待你传递给它的所有期物或协程的完整结果集。另外,你可以使用asyncio.as_completed()循环遍历已完成的任务,并按照完成的顺序获取它们。该函数返回一个迭代器,按照任务完成的顺序生成任务。在下面的例子中,coro([3, 2, 1])的结果将在coro([10, 5, 0])完成之前可用,这与使用gather()不同:
- >>> async def main():
- ... t = asyncio.create_task(coro([3, 2, 1]))
- ... t2 = asyncio.create_task(coro([10, 5, 0]))
- ... print('Start:', time.strftime('%X'))
- ... for res in asyncio.as_completed((t, t2)):
- ... compl = await res
- ... print(f'res: {compl} completed at {time.strftime("%X")}')
- ... print('End:', time.strftime('%X'))
- ... print(f'Both tasks done: {all((t.done(), t2.done()))}')
- ...
- >>> a = asyncio.run(main())
- Start: 09:49:07
- res: [1, 2, 3] completed at 09:49:10
- res: [0, 5, 10] completed at 09:49:17
- End: 09:49:17
- Both tasks done: True
最后,你可能也会看到asyncio.ensure_future()。你很少需要使用它,因为它是一个较低级别的API,而且在很大程度上已经被稍后引入的create_task()所取代。
await关键字的优先级比yield关键字高得多。这意味着,由于其绑定更紧密,yield from语句中需要使用括号的情况在对应的await语句中是不需要的。
现在你已经掌握了使用async/await以及基于它构建的库的技巧。以下是你所学到的内容的总结:
Python中的异步IO发展迅速,很难跟踪每个版本引入了什么。以下是与asyncio相关的Python次要版本更改和引入的列表:
如果你想要安全(并且能够使用asyncio.run()),请使用Python 3.7或更高版本以获取完整的功能集。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。