当前位置:   article > 正文

第二十一章 异步编程_async def

async def

异步编程的常规方法的问题是异步程序要么做完所有的事情,要么一件事也没有做完。重写所有的代码是为了保证程序不会阻塞,否则只是在浪费时间。 

-------Alvaro Videla & Jason J. W. Williams, RabbitMQ in Action

本章讨论三个密切相关的主要主题:

  • Python 的 async def、await、async with 和 async for 结构;
  • 支持这些结构的对象:原生协程以及上下文管理器、迭代器、生成器、推导式的异步变体;
  • asyncio 和其他异步库。

本章建立在可迭代对象和生成器(第 17 章,特别是“经典协程”)、上下文管理器(第 18 章)和并发编程的一般概念(第 19 章)的思想之上。

我们将研究类似于我们在第 20 章中看到的并发 HTTP 客户端,用原生协程和异步上下文管理器进行重写,使用的是和以前相同的 HTTPX 库,但现在的实现使用的是HTTPX异步 API。我们还将了解如何通过将慢速操作委托给一个特定的线程或进程执行器来避免阻塞事件循环。

在 HTTP 客户端示例之后,我们还将学习两个简单的异步服务器端应用程序,其中一个使用的是流行的 FastAPI 框架。然后我们将介绍 async/await 关键字支持的其他语法结构:异步生成器函数、异步推导式和异步生成器表达式。为了强调这些语言特性与 asyncio 并无关系,我们将看到一个使用 Curio框架重写的示例——Curio是由 David Beazley 开发的优雅创新的异步框架。

为了结束本章,我写了一个简短的部分,介绍异步编程的优点和缺点。

本章的内容覆盖面很大。我只演示了一些基本的用例,但这些用例将说明每个特性的最重要特征。

TIP

在 Yury Selivanov 重新组织之后,asyncio documentation 要好得多,将少数对应用程序开发人员有用的函数与适用于Web 框架和数据库驱动程序等包的低级 API 分开。

对于 asyncio 的整体介绍,我推荐 Caleb Hattingh 的 Using Asyncio in Python(O'Reilly,2020 年)。做一个披露:他是本书的技术评论家之一。

本章的新内容

当我编写 Fluent Python, First Edition 时,asyncio 库是临时的,并且还没有 async/await 关键字。因此,我不得不更新本章中的所有示例。我还创建了新示例:域探测脚本、一个FastAPI Web 服务以及 Python 新异步控制台模式的实验。

新部分涵盖了当时不存在的语言功能,例如原生协程、async with、async for 以及支持这些结构的对象。它们可能会为您省去很多麻烦——无论您使用的是 Python 还是 Node.js。

最后,我删除了关于 asyncio.Futures 的段落,现在这部分被认为是低级 asyncio API 的一部分。

几个重要的定义

在“经典协程”的开头,我们看到 Python 3.5 及更高版本提供了三种协程:

原生协程:

用 async def 定义的协程函数。您可以使用 await 关键字将一个原生协程委托给另一个原生协程,类似于经典协程使用 yield from 的方式。async def 语句始终定义了一个原生协程,即使其函数体中未使用 await 关键字。await 关键字不能在原生协程之外使用 。

经典协程:

一个生成器函数,它消费通过 my_coro.send(data) 调用发送给它的数据,并通过在表达式中使用 yield 读取该数据。一个经典协程可以使用 yield from 委托给另一个经典协程。经典协程不能由 await 驱动,并且 asyncio 也不再支持经典协程。

基于生成器的协程:

用 @types.coroutine 修饰的生成器函数——在 Python 3.5 中引入。该装饰器使生成器与新的 await 关键字兼容。

在本章中,我们关注原生协程以及异步生成器:

异步生成器:

使用 async def 定义并在其主体中使用 yield 的生成器函数。它返回一个异步生成器对象,该对象提供 __anext__方法,这是一种检索下一项的协程方法。


@ASYNCIO.COROUTINE 没有使用future

根据 issue43216,用于经典协程和基于生成器的协程的 @asyncio.coroutine 装饰器在 Python 3.8 中已弃用,并计划在 Python 3.11 中删除。相比之下, 根据  issue36921 ,@types.coroutine 应该保留。 asyncio 不再支持@types.coroutine,但在 Curio 和 Trio 异步框架的低级代码中使用了@types.coroutine。


一个Asyncio 示例:探测域名

想象一下,您将要注册一个关于 Python 的新博客,并且您计划使用 Python 关键字和 .DEV 后缀注册一个域名,例如:AWAIT.DEV。示例 21-1 是一个使用 asyncio 同时检查多个域名的脚本。这是它的输出结果:

  1. $ python3 blogdom.py
  2. with.dev
  3. + elif.dev
  4. + def.dev
  5. from.dev
  6. else.dev
  7. or.dev
  8. if.dev
  9. del.dev
  10. + as.dev
  11. none.dev
  12. pass.dev
  13. true.dev
  14. + in.dev
  15. + for.dev
  16. + is.dev
  17. + and.dev
  18. + try.dev
  19. + not.dev

请注意,域名看起来是无序的。如果您运行该脚本,您会看到它们一个接一个地显示出来,并且延迟也不一样。+ 号表示您的机器能够通过 DNS 解析域。否则,域名无法解析并且是可能可用的。

在 blogdom.py 中,DNS 探测是通过原生协程对象完成的。由于异步操作是交错的,因此检查 18 个域所需的时间比顺序检查要少得多。实际上,总时间实际上与单个最慢 DNS 响应的时间相同,而不是所有响应时间的总和。

这是 blogdom.py 的代码:

例 21-1。 blogdom.py:搜索 Python 博客的域

  1. #!/usr/bin/env python3
  2. import asyncio
  3. import socket
  4. from keyword import kwlist
  5. MAX_KEYWORD_LEN = 4 1
  6. async def probe(domain: str) -> tuple[str, bool]: 2
  7. loop = asyncio.get_running_loop() 3
  8. try:
  9. await loop.getaddrinfo(domain, None) 4
  10. except socket.gaierror:
  11. return (domain, False)
  12. return (domain, True)
  13. async def main() -> None: 5
  14. names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) 6
  15. domains = (f'{name}.dev'.lower() for name in names) 7
  16. coros = [probe(domain) for domain in domains] 8
  17. for coro in asyncio.as_completed(coros): 9
  18. domain, found = await coro 10
  19. mark = '+' if found else ' '
  20. print(f'{mark} {domain}')
  21. if __name__ == '__main__':
  22. asyncio.run(main()) 11
  1. 为域设置关键字的最大长度,因为越短越好。
  2. probe返回一个包含域名和布尔值的元组; True 表示域已解析。返回域名将更容易显示结果。
  3. 获取对 asyncio 事件循环的引用,以便我们接下来可以使用它。
  4. loop.getaddrinfo(…) 协程方法返回一个五个部分组成的参数元组(five-part tuple of parameters) ,以供套接字使用连接到给定的地址。在这个例子中,我们不需要结果。如果我们得到结果,说明域名可以解析;反之亦然。
  5. main 必须是一个协程,以便我们可以在其中使用 await。
  6. 生成长度最大为 MAX_KEYWORD_LEN 的 Python 关键字的生成器。
  7. 生成带有 .dev 后缀的域名的生成器。
  8. 通过使用每个domain参数调用probe协程来构建协程对象列表。
  9. asyncio.as_completed 是一个生成协程的生成器,这些协程按照它们完成的顺序返回结果——而不是按照协程提交的顺序。这个方法类似于我们在第 20 章示例 20-4 中看到的 futures.as_completed。
  10. 此时,我们知道协程已经完成,因为 as_completed 就是这样工作的。因此,await 表达式不会阻塞,但我们需要它来从 coro 获取结果。如果 coro 抛出了一个未处理的异常,它会在这里重新抛出异常。
  11. asyncio.run 启动事件循环并只在事件循环退出时返回。这是使用 asyncio 的脚本的常见模式:将 main 实现为协程,并在 if __name__ == '__main__': 块中使用 asyncio.run 驱动这个协程。

TIP

asyncio.get_running_loop 函数是在 Python 3.7 中添加的,用于在协程中使用,如probe示例所示。如果没有运行循环,asyncio.get_running_loop 会抛出 RuntimeError。它的实现比 asyncio.get_event_loop 更简单、更快,如果需要,它还可以启动一个事件循环。从 Python 3.10 开始, asyncio.get_event_loop 已被弃用,最终将成为 asyncio.get_running_loop 的别名。

Guido 阅读异步代码的技巧

在 asyncio 中有很多新概念需要掌握,但如果您采用 Guido van Rossum 本人建议的技巧,则示例 21-1 的整体逻辑很容易理解:眯着眼睛假装 async 和 await 关键字不存在。如果你这样做,你会意识到协程读起来就像原来的顺序函数。

例如,想象一下这个协程的主体......

  1. async def probe(domain: str) -> tuple[str, bool]:
  2. loop = asyncio.get_running_loop()
  3. try:
  4. await loop.getaddrinfo(domain, None)
  5. except socket.gaierror:
  6. return (domain, False)
  7. return (domain, True)

...的工作方式类似于以下函数,只是它从不阻塞:

  1. def probe(domain: str) -> tuple[str, bool]: # no async
  2. loop = asyncio.get_running_loop()
  3. try:
  4. loop.getaddrinfo(domain, None) # no await
  5. except socket.gaierror:
  6. return (domain, False)
  7. return (domain, True)

由于 await 会挂起当前协程对象,使用语法 await loop.getaddrinfo(...) 可以避免阻塞。比如在probe('if.dev')协程的执行过程中,会通过getaddrinfo('if.dev', None)创建一个新的协程对象。await这个协程会执行低级的 addrinfo 查询并将控制权交还给事件循环,而不是交还给被挂起的 probe(‘if.dev’) 协程。然后事件循环可以驱动其他挂起的协程对象,例如probe('or.dev')。

当事件循环获得对 getaddrinfo('if.dev', None) 查询的响应结果时,该特定协程对象将恢复并将控制权返回给在await语句处暂停的probe('if.dev'),暂停的协程恢复并继续执行以处理可能抛出的异常并返回结果元组。

到目前为止,我们只看到将 asyncio.as_completed 和 await 应用于协程。实际上它们可以处理任何可等待(awaitable)的对象。接下来解释这个概念。

新概念:awaitable

for 关键字可以用于可迭代对象。 await 关键字可以用于可等待对象。

作为 asyncio 的终端用户,下面是经常见到的可等待对象:

  • 一个原生协程对象,你可以通过调用一个原生协程函数来获得它。
  • 一个 asyncio.Task,通常通过将协程对象传递给 asyncio.create_task() 来获得。

但是,终端用户代码并不总是需要await一个Task。我们使用 asyncio.create_task(one_coro()) 来调度 one_coro 并发执行,而不需要等待协程的返回。这就是我们对 spinner_async.py 中的 spinner 协程所做的事情(示例 19-4)。如果您不希望取消或等待任务,则无需保留从 create_task 返回的 Task 对象。创建这个Task就可以保证协程被安排运行。

相比之下,我们现在使用 await other_coro() 来运行 other_coro 并等待完成,因为我们需要协程的结果才能继续。在 spinner_async.py 中,supervisor协程执行 res = await slow() 以执行并获得slow结果。

在实现异步库或作为asyncio 库维护者时,您还可以处理这些较低级别的可等待对象:

  • 一个实现 __await__ 方法的对象,该方法返回一个迭代器;例如,asyncio.Future 实例(asyncio.Task 是 asyncio.Future 的子类)。
  • 实现 tp_as_async.am_await 函数的 Python/C API 以其他语言编写的对象,这个函数返回一个迭代器(类似于 __await__ 方法)。

现有的代码库可能还有一种额外的可等待对象:基于生成器的协程对象——即将被弃用。

Note:

PEP 492 声明 await 表达式“使带有验证参数的额外步骤的yield from实现“和”和“await 只接受可等待对象”。PEP 没有详细解释该实现,而是参考了 PEP 380,其中介绍了 yield from。我在 fluentpython.com 的 Classic Coroutines 中的“ The Meaning of yield from”一节中发布了详细说明。

现在让我们研究下载一组固定国旗图像的脚本的 asyncio 版本。

使用 asyncio 和 HTTPX 下载国旗

flags_asyncio.py 脚本从 fluentpython.com 下载一组固定的 20 个国旗。我们首先在“并发 Web 下载”中提到它,但现在我们将详细研究它,应用我们刚刚看到的概念。

从 Python 3.10 开始,asyncio 仅直接支持 TCP 和 UDP,标准库中没有异步 HTTP 客户端或服务器包。我在所有 HTTP 客户端示例中都使用了 HTTPX。

我们将自下而上探索 flags_asyncio.py,即首先查看示例 21-2 中设置操作的函数。

WARNING:

为了使代码更易于阅读,flags_asyncio.py 没有进行异常处理。当我们介绍 async/await 时,初学者使用“快乐路径”进行学习可以快速了解常规函数和协程在程序中的排列方式。从 “Enhancing the asyncio downloader”之后,示例就包括异常处理和更多功能。

本章和第 20 章中的 flags*.py 示例共享代码和数据,因此我将它们放在 example-code-2e/20-executors/getflags 目录中。

例 21-2。 flags_asyncio.py:启动函数

  1. def download_many(cc_list: list[str]) -> int: 1
  2. return asyncio.run(supervisor(cc_list)) 2
  3. async def supervisor(cc_list: list[str]) -> int:
  4. async with AsyncClient() as client: 3
  5. to_do = [download_one(client, cc)
  6. for cc in sorted(cc_list)] 4
  7. res = await asyncio.gather(*to_do) 5
  8. return len(res) 6
  9. if __name__ == '__main__':
  10. main(download_many)
  1. 这需要是一个普通函数——而不是协程——以便传递给 flags.py 模块中的main函数并由main函数调用(示例 20-2)。
  2. 执行驱动 supervisor(cc_list) 协程对象的事件循环,直到它返回。这将在事件循环运行时阻塞线程。该行的结果是返回supervisor返回的任何内容。
  3. httpx 中的异步 HTTP 客户端操作由AsyncClient 的方法支持,它也是一个异步上下文管理器:具有异步setup和teardown方法的上下文管理器(在“异步上下文管理器”中详细介绍了这一点)
  4. 通过为每个要检索的国旗调用 download_one 协程来构建协程对象列表。
  5. await asyncio.gather 协程,它接受一个或多个可等待对象作为参数并等待所有参数完成,按照提交顺序返回对应可等待对象参数的结果列表。
  6. supervisor 返回 asyncio.gather 返回的列表的长度。

现在让我们回顾一下 flags_asyncio.py 的顶部。我重新组织了协程,以便我们可以按照事件循环启动的顺序进行阅读。

例 21-3。 flags_asyncio.py:导入部分和下载函数

  1. import asyncio
  2. from httpx import AsyncClient 1
  3. from flags import BASE_URL, save_flag, main 2
  4. async def download_one(client: AsyncClient, cc: str): 3
  5. image = await get_flag(client, cc)
  6. save_flag(image, f'{cc}.gif')
  7. print(cc, end=' ', flush=True)
  8. return cc
  9. async def get_flag(client: AsyncClient, cc: str) -> bytes: 4
  10. url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
  11. resp = await client.get(url, timeout=6.1,
  12. follow_redirects=True) 5
  13. return resp.read() 6
  1. 必须先安装 httpx——它不在标准库中。
  2. 重用 flags.py 中的代码(示例 20-2)
  3. download_one 必须是原生协程,因此它可以await get_flag——这个函数执行 HTTP 请求。然后打印下载国旗的国家代码,并保存图像。
  4. get_flag 需要接收一个 AsyncClient 用以发送请求。
  5. httpx.AsyncClient 实例的 get 方法返回一个 ClientResponse 对象,它也是一个异步上下文管理器。
  6. 网络 I/O 操作是作为协程方法实现的,因此它们由 asyncio 事件循环异步驱动。

Note:

为了获得更好的性能,get_flag 中的 save_flag 调用应该是异步的,以避免阻塞事件循环。但是,asyncio 目前没有像 Node.js 那样提供异步文件系统 API。

“Using asyncio.as_completed and a thread”将展示如何将 save_flag 委托给一个线程。您的代码通过 await 或异步上下文管理器的特殊方法(例如 AsyncClient 和 ClientResponse)显式委托给 httpx 协程--正如我们将在“Asynchronous Context Managers”中看到的那样。

原生协程的秘密:Humble 生成器

我们在“经典协程”和 flags_asyncio.py 中看到的经典协程示例之间的主要区别在于后者没有使用 .send() 调用或 yield 表达式。您的代码位于 asyncio 库和您正在使用的异步库(例如 HTTPX)之间。这在图 21-1 中进行了说明。

在幕后,asyncio 事件循环进行 .send 调用以驱动您的协程,并且您的协程await其他协程,包括库协程。 如前所述,await 从 yield from借鉴了大部分实现,yield from也使用 .send 调用来驱动协程。

await 链最终到达一个低级的 可等待对象,这个可等待对象返回一个生成器,事件循环可以驱动这个生成器来响应诸如计时器或网络 I/O 之类的事件。这些 await 链末尾的低级 awaitable 和生成器在库的深处实现,他们不是库的 API 的一部分,但是可能是 Python/C 实现的扩展。

使用 asyncio.gather 和 asyncio.create_task 等函数,您可以启动多个并发等待通道,从而在单个线程中并发执行由单个事件循环驱动的多个 I/O 操作。

all-or-nothing 问题

请注意,在示例 21-3 中,我无法重用 flags.py(示例 20-2)中的 get_flag 函数。我不得不将它重写为协程才能使用 HTTPX 的异步 API。为了使用 asyncio 以获得最佳性能,我们必须将每个操作 I/O 的函数替换为使用 await 或 asyncio.create_task 激活的异步版本,以便在函数等待 I/O 时将控制权交还给事件循环。如果您不能将阻塞型函数重写为协程,则应该在单独的线程或进程中运行它,正如我们将在“Delegating tasks to executors”中看到的那样。

这就是我为本章选择题词的原因,其中包含以下建议:“你需要重写所有代码来避免代码阻塞,否则你只是在浪费时间。”

出于同样的原因,我也无法重用 flags_threadpool.py(示例 20-3)中的 download_one 函数。示例 21-3 中的代码使用 await 驱动 get_flag,因此 download_one 也必须是一个协程。对于每个请求,在 supervisor 中会创建一个 对应的download_one 协程对象,它们都由 asyncio.gather 协程驱动。

现在让我们研究出现在 supervisor(示例 21-2)和 get_flag(示例 21-3)中的 async with 语句。

异步上下文管理器

在“上下文管理器和 with 块”中,我们看到如果对象的类提供 __enter__ 和 __exit__ 方法,对象在 with 块的主体之前和之后运行代码。

现在,分析示例 21-4,来自 asyncpg 中可异步的 PostgreSQL 驱动documentation on transactions

示例 21-4。来自 asyncpg PostgreSQL 驱动程序文档的示例代码。

  1. tr = connection.transaction()
  2. await tr.start()
  3. try:
  4. await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)")
  5. except:
  6. await tr.rollback()
  7. raise
  8. else:
  9. await tr.commit()

数据库事务非常适配于上下文管理器协议:必须先启动事务,使用 connection.execute 更改数据,然后必须发生回滚或提交,具体取决于更改的结果。

在像 asyncpg 这样的异步驱动程序中,set-up和包装函数需要是协程——以便其他操作可以并发进行。但是,经典 with 语句的实现不支持使用协程执行 __enter__ 或 __exit__ 。

这就是为什么 PEP 492—Coroutines with async and await syntax和 await 语法引入了 async with 语句,它可以与异步上下文管理器一起使用:以协程实现 __aenter__ 和 __aexit__ 方法的对象。

使用 async with,示例 21-4 可以像 asyncpg 文档中其他的代码片段一样编写:

  1. async with connection.transaction():
  2. await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)")

在  asyncpg.Transaction 类中, __aenter__ 协程方法执行await self.start() 而 __aexit__ 协程awaite私有的 __rollback 或 __commit 协程方法,调用哪个协程取决于是否发生异常。使用协程将 Transaction 实现为异步上下文管理器允许 asyncpg 并发处理多事务。


ASYNCPG 上的 CALEB HatTINGH

关于 asyncpg 的另一个非常棒的事情是,它还通过为 Postgres 本身的内部连接实现一个连接池来解决 PostgreSQL 缺乏高并发支持(每个连接使用一个服务器端进程)的问题。

这意味着您不需要像 asyncpg 文档中的推荐的 pgbouncer 等其他工具。


回到 flags_asyncio.py,httpx 的 AsyncClient 类是一个异步上下文管理器,因此它可以在其 __aenter__ 和 __aexit__ 特殊协程方法中使用可等待对象。

Note:

“作为上下文管理器的异步生成器”展示了如何使用 Python 的 contextlib 来创建异步上下文管理器,而无需编写类。由于一个先决主题:“异步生成器函数”,本章稍后会进行介绍。

我们现在将使用进度条增强 asyncio 国旗下载示例,这将引导我们探索更多的 asyncio API。

增强 asyncio 下载器

回想一下“Downloads with Progress Display and Error Handling” ,flags2 示例集共享相同的命令行界面,并且在下载时它们会显示一个进度条。

Note
我鼓励您使用 flags2 示例来直观地了解并发 HTTP 客户端的执行方式。使用 -h 选项查看示例 20-10 中的帮助说明。使用 -a、-e 和 -l 命令行选项控制下载次数,使用 -m 选项设置并发下载次数。在 LOCAL、REMOTE、DELAY 和 ERROR 服务器上分别运行测试。发现并发下载的最佳数量,以最大限度地提高每台服务器的吞吐量。按照“Setting up test servers”中的说明调整测试服务器的选项。

例如,示例 21-5 显示了使用 100 个并发请求 (-m 100) 从 ERROR 服务器获取 100 个国旗 (-al 100) 的尝试。结果中的 48 个错误是 HTTP 418 或超时错误——slow_server.py 的预期(错误)行为。

示例 21-5。运行 flags2_asyncio.py

  1. $ python3 flags2_asyncio.py -s ERROR -al 100 -m 100
  2. ERROR site: http://localhost:8002/flags
  3. Searching for 100 flags: from AD to LK
  4. 100 concurrent connections will be used.
  5. 100%|████████████████████████████████████████████████████████| 100/100 [00:03<00:00, 30.48it/s]
  6. --------------------
  7. 52 flags downloaded.
  8. 48 errors.
  9. Elapsed time: 3.31s

在测试并发客户端时要采取正确的行为

即使线程和异步 HTTP 客户端之间的总体下载时间没有太大差异,异步可以更快地发送请求,因此服务器更有可能被怀疑 DOS 攻击。要真正全速运行这些并发客户端,请使用本地 HTTP 服务器进行测试,如 “Setting up test servers”中所述。


现在让我们看看 flags2_asyncio.py 是如何实现的。

使用 asyncio.as_completed 和一个单独线程

在示例 21-3 中,我们将几个协程传递给 asyncio.gather,它会按照提交的顺序返回一个包含协程结果的列表。这意味着 asyncio.gather 只能在所有可等待对象完成后返回。但是,要更新进度条,我们需要在完成时获取结果。

幸运的是,我们在带有进度条的线程池示例中使用的 as_completed 生成器函数在asyncio 中有一个 等效函数(示例 20-16)。

示例 21-6 显示了 flags2_asyncio.py 脚本的上半部分,其中定义了 get_flag 和 download_one 协程。示例 21-7 列出了其余的源代码,包括 supervisor 和 download_many。由于加入了异常处理,此脚本比 flags_asyncio.py 长。

示例 21-6。 flags2_asyncio.py:脚本的顶部;其余代码在示例 21-7 中

  1. import asyncio
  2. from collections import Counter
  3. from http import HTTPStatus
  4. from pathlib import Path
  5. import httpx
  6. import tqdm # type: ignore
  7. from flags2_common import main, DownloadStatus, save_flag
  8. # low concurrency default to avoid errors from remote site,
  9. # such as 503 - Service Temporarily Unavailable
  10. DEFAULT_CONCUR_REQ = 5
  11. MAX_CONCUR_REQ = 1000
  12. async def get_flag(client: httpx.AsyncClient, 1
  13. base_url: str,
  14. cc: str) -> bytes:
  15. url = f'{base_url}/{cc}/{cc}.gif'.lower()
  16. resp = await client.get(url, timeout=3.1, follow_redirects=True) 2
  17. resp.raise_for_status()
  18. return resp.content
  19. async def download_one(client: httpx.AsyncClient,
  20. cc: str,
  21. base_url: str,
  22. semaphore: asyncio.Semaphore,
  23. verbose: bool) -> DownloadStatus:
  24. try:
  25. async with semaphore: 3
  26. image = await get_flag(client, base_url, cc)
  27. except httpx.HTTPStatusError as exc: 4
  28. res = exc.response
  29. if res.status_code == HTTPStatus.NOT_FOUND:
  30. status = DownloadStatus.NOT_FOUND
  31. msg = f'not found: {res.url}'
  32. else:
  33. raise
  34. else:
  35. await asyncio.to_thread(save_flag, image, f'{cc}.gif') 5
  36. status = DownloadStatus.OK
  37. msg = 'OK'
  38. if verbose and msg:
  39. print(cc, msg)
  40. return status
  1. get_flag 与示例 20-14 中的顺序版本非常相似。第一个区别:它需要client参数。
  2. 第二和第三个区别:.get 是一个 AsyncClient 方法,它是一个协程,所以我们需要await这个协程。
  3. 将semaphore用作异步上下文管理器,这样整个程序就不会被阻塞:当semaphore计数器为零时,只有这个协程被挂起。在“Python’s Semaphores”中了解更多信息。
  4. 异常处理逻辑与示例 20-14 中的 download_one 相同。
  5. 保存图像是一个 I/O 操作。为避免阻塞事件循环,需要在线程中运行 save_flag。

所有网络 I/O 都是通过 asyncio 中的协程完成的,但是文件 I/O没有使用协程。然而,文件 I/O 也是“阻塞的”——从某种意义上说,读/写文件比读/写 RAM 花费的时间要长数千倍。如果您使用的是 Network-Attached Storage,它甚至可能涉及网络 I/O。

从 Python 3.9 开始,asyncio.to_thread 协程可以将文件 I/O 委托给 asyncio 提供的线程池。如果你需要支持 Python 3.7 或 3.8,“Delegating tasks to executors”展示了如何添加几行代码来实现这个特性。但首先让我们完成对 HTTP 客户端代码的研究。

使用semaphore(信号量)限制请求

像我们正在研究的网络客户端应该受到限制(例如,限制请求数)以避免过多的并发请求冲击服务器。

 semaphore是一个同步原始量,它比锁更灵活。一个semaphore可以由多个协程持有,最大数量是可配置的。这样就可以限制活动并发协同程序的数量。 “Python’s Semaphores”里面有更多说明。

在 flags2_threadpool.py(示例 20-16)中,通过在 download_many 函数中将所需的 max_workers 参数设置为 concur_req 来实例化 ThreadPoolExecutor 来完成限流。在 flags2_asyncio.py 中有一个由supervisor函数创建的 asyncio.Semaphore(如示例 21-7 所示)并作为示例 21-6 中的 download_one 的 semaphore 参数传递。


Python 的信号量

计算机科学家 Edsger W. Dijkstra 在 1960 年代初期发明了信号量。这是一个简单的想法,但它非常灵活,以至于大多数其他同步对象(例如锁和barrier)都可以构建在信号量之上。Python 的标准库中有三个 Semaphore 类:一个在threading中,另一个在multiprocessing中,第三个在 asyncio 中。在这里,我们将介绍最后一个。

asyncio.Semaphore 有一个内部计数器,每当我们等待 .acquire() 协程方法时,该计数器就会递减,当我们调用 .release() 方法时递增——.release()方法不是协程,因为它从不阻塞。实例化 Semaphore 时设置计数器的初始值:

semaphore = asyncio.Semaphore(concur_req)

当计数器大于零时,等待 .acquire() 不会有延迟, 但如果计数器为零,.acquire() 会挂起等待的协程,直到其他协程在同一Semaphore上调用 .release(),从而增加计数器。与其直接使用这些方法,不如使用Semaphore作为异步上下文管理器更安全,就像我在示例 21-6 中所做的那样,函数 download_one:

  1. async with semaphore:
  2. image = await get_flag(client, base_url, cc)

Semaphore.__aenter__ 协程方法await .acquire(),它的 __aexit__ 协程方法调用 .release()。 该片段保证在任何时候最多只有 concur_req 个 get_flags 协程实例处于活动状态。

标准库中的每个 Semaphore 类都有一个 BoundedSemaphore 子类,它强制执行一个额外的约束:当 .release() 多于 .acquire() 操作时,内部计数器永远不会大于初始值。


现在让我们看一下示例 21-7 中脚本的其余部分。

示例 21-7。 flags2_asyncio.py:示例 21-6 后面的脚本

  1. async def supervisor(cc_list: list[str],
  2. base_url: str,
  3. verbose: bool,
  4. concur_req: int) -> Counter[DownloadStatus]: 1
  5. counter: Counter[DownloadStatus] = Counter()
  6. semaphore = asyncio.Semaphore(concur_req) 2
  7. async with httpx.AsyncClient() as client:
  8. to_do = [download_one(client, cc, base_url, semaphore, verbose)
  9. for cc in sorted(cc_list)] 3
  10. to_do_iter = asyncio.as_completed(to_do) 4
  11. if not verbose:
  12. to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) 5
  13. error: httpx.HTTPError | None = None 6
  14. for coro in to_do_iter: 7
  15. try:
  16. status = await coro 8
  17. except httpx.HTTPStatusError as exc:
  18. error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
  19. error_msg = error_msg.format(resp=exc.response)
  20. error = exc 9
  21. except httpx.RequestError as exc:
  22. error_msg = f'{exc} {type(exc)}'.strip()
  23. error = exc 10
  24. except KeyboardInterrupt:
  25. break
  26. if error:
  27. status = DownloadStatus.ERROR 11
  28. if verbose:
  29. url = str(error.request.url) 12
  30. cc = Path(url).stem.upper() 13
  31. print(f'{cc} error: {error_msg}')
  32. counter[status] += 1
  33. return counter
  34. def download_many(cc_list: list[str],
  35. base_url: str,
  36. verbose: bool,
  37. concur_req: int) -> Counter[DownloadStatus]:
  38. coro = supervisor(cc_list, base_url, verbose, concur_req)
  39. counts = asyncio.run(coro) 14
  40. return counts
  41. if __name__ == '__main__':
  42. main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
  1. supervisor 接收与 download_many 函数相同的参数,但它不能直接从 main 调用,因为它是一个协程,而不是像 download_many 这样的普通函数。
  2. 创建一个 asyncio.Semaphore,使用这个Semaphore的活动协程最多为 concur_req 个。concur_req 的值由 flags2_common.py 中的main函数计算,基于每个示例中设置的命令行选项和常量。
  3. 创建一个协程对象列表,每次调用一个 download_one 协程。
  4. 获取一个迭代器,它将在完成后返回协程对象。我没有将这个对 as_completed 的调用直接放在下面的 for 循环中,因为我可能需要用进度条的 tqdm 迭代器来包装它,这取决于用户选择的verbose。
  5. 使用 tqdm 生成器函数包装 as_completed 迭代器以显示进度。
  6. error是一个 httpx.HTTPError或者None,被初始化为None;如果抛出异常,则此变量将用于保存 try/except 语句之外的异常。
  7. 迭代已经完成的协程对象列表;此循环类似于示例 20-16 中 download_many 中的循环。
  8. 在协程上进行await以获取其结果。这不会阻塞,因为 as_completed生成的是已完成的协程。
  9. 这个赋值是必要的,因为 exc 变量作用域仅限于这个 except 子句,但我需要保留它的值以备后用。
  10. 和上步一样
  11. 如果error不为空,则将status设置为DownloadStatus.ERROR 
  12. 在详细模式下,从抛出的异常中提取 URL……
  13. …然后提取文件名以显示国家代码。
  14. download_many 实例化supervisor协程对象并通过 asyncio.run 将其传递给事件循环,收集事件循环结束时supervisor返回的计数器。

在例 21-7 中,我们不能使用例 20-16 中看到的从future到国家代码的映射,因为 asyncio.as_completed 返回的可等待对象与我们传递给 as_completed 调用的可等待对象相同。在内部,asyncio 机制可能会用其他awaitables替换传入 awaitables,替换的awaitables最终会产生相同的结果。

TIP

因为在失败的情况下我不能使用可等待对象作为键来从字典中检索出国家代码,所以我只能在异常中提取国家代码。为此,我将异常保存在error变量中,以便在 try/except 语句之外检索。诸如循环和 try/except 之类的语句不会在它们管理的块中创建本地作用域。但是,如果一个 except 子句将异常绑定到变量时——就像我们刚刚看到的 exc 变量一样——该绑定只存在于这个特定 except 子句内的块中。

这结束了我们对之前看到的 flags2_threadpool.py 的 功能等同的asyncio 示例的讨论。

下一个示例演示了使用协程按顺序执行一个异步任务的简单模式。这值得我们关注,因为任何有 JavaScript 经验的人都知道,按顺序运行一个异步函数是编写嵌套编码的原因,即所谓的末日金字塔。await 关键字使这个不好的模式消失了。这就是为什么 await 现在是 Python 和 JavaScript 的一部分。

在每次下载中进行多次请求

假设您想用国家名称和国家代码来保存每个国家的国旗,而不是只用国家代码命名。现在您需要一个国旗发出两个 HTTP 请求:一个获取国旗图像本身,另一个请求获取与图像位于同一目录中的 metadata.json 文件:这是记录国家名称的地方。

在线程脚本中协调同一任务中的多个请求很容易:只需先进行一次请求,然后再进行另一个请求,两次阻塞线程,并将两条数据(国家代码和名称)保存在局部变量中,以便在保存文件时使用。如果您需要在带有回调的异步脚本中执行相同的操作,您需要使用嵌套函数,以便国家代码和名称在它们的闭包中可用,直到您保存文件,因为每个回调在不同的局部作用域内运行。await 关键字提供了解决方案,允许您一个接一个地驱动异步请求,共享驱动协程的局部作用域。

TIP

如果您在现代 Python 中使用大量回调进行异步编程,那么您可能正在应用在现代 Python 中没有意义的旧模式,编写一个不支持协程的遗留代码或低级代码接口的库除外。不管是何种情况,StackOverflow 问答  What is the use case for future.add_done_callback()? 解释了为什么在低级代码中需要回调,但如今在 Python 应用程序级代码中并不是很有用。

asyncio 国旗下载脚本的第三个变体有一些变化:

get_country

        这个新的协程获取国家代码的 metadata.json 文件,并从中获取国家名称。

download_one

        这个协程现在使用 await 委托给 get_flag 和新的 get_country 协程,使用后者的结果来构建要保存的文件的名称。

让我们从 get_country 的代码开始。请注意,它与示例 21-6 中的 get_flag 非常相似。

示例 21-8。 flags3_asyncio.py:get_country 协程

  1. async def get_country(client: httpx.AsyncClient,
  2. base_url: str,
  3. cc: str) -> str: 1
  4. url = f'{base_url}/{cc}/metadata.json'.lower()
  5. resp = await client.get(url, timeout=3.1, follow_redirects=True)
  6. resp.raise_for_status()
  7. metadata = resp.json() 2
  8. return metadata['country'] 3
  1. 这个协程返回一个国家名称的字符串——如果一切顺利的话。
  2. metadata从响应的 JSON 内容构建 Python 字典。
  3. 返回国家名称

现在让我们看看修改后的 download_one,它与示例 21-6 中的同一个协程仅更改了几行

示例 21-9。 flags3_asyncio.py:download_one 协程

  1. async def download_one(client: httpx.AsyncClient,
  2. cc: str,
  3. base_url: str,
  4. semaphore: asyncio.Semaphore,
  5. verbose: bool) -> DownloadStatus:
  6. try:
  7. async with semaphore: 1
  8. image = await get_flag(client, base_url, cc)
  9. async with semaphore: 2
  10. country = await get_country(client, base_url, cc)
  11. except httpx.HTTPStatusError as exc:
  12. res = exc.response
  13. if res.status_code == HTTPStatus.NOT_FOUND:
  14. status = DownloadStatus.NOT_FOUND
  15. msg = f'not found: {res.url}'
  16. else:
  17. raise
  18. else:
  19. filename = country.replace(' ', '_') 3
  20. await asyncio.to_thread(save_flag, image, f'{filename}.gif')
  21. status = DownloadStatus.OK
  22. msg = 'OK'
  23. if verbose and msg:
  24. print(cc, msg)
  25. return status
  1. 持有semaphore以await get_flag
  2. 对get_country进行同样的操作
  3. 使用国家名称来创建文件名。作为命令行用户,我不想在文件名中看到空格

这比嵌套回调好多了!

我将 get_flag 和 get_country 的调用分别放在两个semaphore的块中,因为最佳实践是尽可能短地持有semaphore和锁。

我可以使用 asyncio.gather 并行编排get_flag 和 get_country,但如果 get_flag 抛出异常,则没有要保存的图像,再执行 get_country 毫无意义。但是在某些情况下,使用 asyncio.gather 同时访问多个 API 而不是在发出下一个请求之前等待一个响应是有意义的。

在 flags3_asyncio.py 中,await 语法出现了 6 次,async with 出现了 3 次。希望您应该掌握 Python 中异步编程的窍门。一个挑战是知道何时必须使用 await 以及何时不能使用它。原则上答案很简单,您只应该await协程和其他可等待对象,例如 asyncio.Task 实例。但是有些 API 很棘手,它们以看似随意的方式混合协程和普通函数,例如我们将在示例 21-14 中使用的 StreamWriter 类。

示例 21-9 总结了国旗的示例。现在让我们讨论在异步编程中线程或进程执行器的使用。

将任务委托给执行器

Node.js 在异步编程方面优于 Python 的一个重要优势是 Node.js 标准库,它为所有 I/O 提供异步 API,而python仅仅提供异步的网络 I/O。在 Python 中如果不注意的话,文件 I/O 会严重降低异步应用程序的性能,因为在主线程中读取和写入存储 阻塞了事件循环。

在示例 21-6 的 download_one 协程中,我使用一行代码将下载的图像保存到磁盘:

        await asyncio.to_thread(save_flag, image, f'{cc}.gif')

如前所述,asyncio.to_thread 是在 Python 3.9 中添加的。如果您需要支持 3.7 或 3.8,则需要替换为下面的代码:

示例 21-10。替代 await asyncio.to_thread的代码

  1. loop = asyncio.get_running_loop() 1
  2. loop.run_in_executor(None, save_flag, 2
  3. image, f'{cc}.gif') 3
  1. 获取事件循环的引用。
  2. 第一个参数是要使用的executor;传入 None 将使用 asyncio 事件循环中始终可用的默认的 ThreadPoolExecutor。
  3. 可以按位置参数传递给要运行的函数,但如果需要传递关键字参数,则需要使用 functool.partial,如 run_in_executor documentation中所述。

更新的 asyncio.to_thread 函数更易于使用且更灵活,因为它可以接受关键字参数。

asyncio 的实现在一些地方使用了 run_in_executor。例如,我们在示例 21-1 中看到的 loop.getaddrinfo(…) 协程是通过从 socket 模块调用 getaddrinfo 函数来实现的——这是一个阻塞函数,可能需要几秒钟才能返回,因为它取决于 DNS 解析的性能。

异步 API 中的一个常见模式是在内部使用 run_in_executor 将实现细节里的阻塞调用包装在协程中。这样,您就可以提供一个一致的协程接口来使用 await 驱动,并隐藏出于实用原因使用的线程。MongoDB 的 Motor 异步驱动程序有一个与 async/await 兼容的 API,它实际上是围绕与数据库服务器对话的线程核心的外观。Motor 的首席开发人员 A. Jesse Jiryu Davis 在 Response to “Asynchronous Python and Databases” 中解释了这样做的理由。剧透:Davis 发现在数据库驱动程序的特定用例中,线程池的性能更高——尽管大家都认为网络 I/O 的异步方法总是比线程快。

需要将显式 Executor 传递给 loop.run_in_executor 的主要原因是,如果要执行的函数是 CPU 密集型的,则使用 ProcessPoolExecutor,这样可以在多个Python 进程中运行,避免 GIL 争用。由于启动成本高,最好在supervisor中启动ProcessPoolExecutor,并传递给需要使用它的协程。

Caleb Hattingh — 《在 Python 中使用 Asyncio》(O' Reilly,2020 年)一书的作者 — 是本书的技术审稿人之一,并建议我添加以下关于 executors 和 asyncio 的警告。


CALEB’S WARNING ABOUT RUN_IN_EXECUTORS

使用 run_in_executor 可能会产生难以调试的问题,因为取消操作不会按照预期工作。使用执行器的协程只是假装取消:底层线程(如果它是 ThreadPoolExecutor)没有取消机制。例如,在 run_in_executor 调用中创建的长寿命线程可能会阻止asyncio 程序关闭:asyncio.run 将等待 executor 在返回之前完全关闭,如果 executor 任务没有以某种方式自行停止,协程将永远等待。我认为更合理的是将该函数命名为 run_in_executor_uncancellable。


我们现在将从客户端脚本转向使用 asyncio 编写服务端代码

编写异步服务器

TCP 服务器的经典玩具示例是回显服务器。我们将构建一些更有趣的玩具:服务器端 Unicode 字符搜索实用程序,首先使用带有 FastAPI 的 HTTP,然后仅使用带有 asyncio 的普通 TCP。

这些服务器允许用户根据我们在“Unicode 数据库”中讨论的 unicodedata 模块中的标准名称中的单词查询 Unicode 字符。以下屏幕截图显示了与 web_mojifinder.py 的会话,这是我们将构建的第一台服务器。

这些示例中的 Unicode 搜索逻辑位于 Fluent Python 2e code repository中 charindex.py 模块的 InvertedIndex 类中。那个小模块中没有使用并发,所以我只会在下面的可选框部分进行简要概述。您可以跳到“A FastAPI Web 服务”中的 HTTP 服务器实现。 


MEET THE INVERTED INDEX

倒排索引通常将单词映射到它们出现的文档。在 mojifinder 示例中,每个“文档”都是一个 Unicode 字符。charindex.InvertedIndex 类索引 Unicode 数据库中每个字符名称中出现的每个单词,并创建一个存储在 defaultdict 中的倒排索引。例如,要索引字符 U+0037——DIGIT SEVEN——InvertedIndex 初始化程序将字符“7”附加到键“DIGIT”和“SEVEN”对应的值中。在索引Python 3.9.1绑定的 Unicode 13.0.0 数据后,“DIGIT”映射到 868 个字符,“SEVEN”映射到 143 个字符,包括 U+1F556—CLOCK FACE SEVEN OCLOCK 和 U+2790—DINGBAT NEGATIVE CIRCLED SANS-SERIF DIGIT SEVEN(出现在本书的许多代码清单中)

下面是对“CAT”和“FACE”的演示,请参见图 21-3

InvertedIndex.search 方法将查询字符串分解为单词,并返回所有单词对应结果的交集。这就是为什么搜索“face”会找到 171 个结果,“cat”会找到 14 个结果,而“cat face”只会找到 10 个结果。 

这就是倒排索引背后的原理:信息检索的基本组成部分——搜索引擎背后的理论。请参阅英文 Wikipedia 文章  Inverted Index 了解更多信息。


FastAPI Web 服务

我使用 FastAPI 编写了下面示例 web_mojifinder.py: “ASGI—Asynchronous Server Gateway Interface”中提到的 Python ASGI Web 框架之一。图 21-2 是前端的屏幕截图。这是一个超级简单的 SPA(单页应用程序):在初始 HTML 下载后,客户端 JavaScript通过与服务器通信更新 UI。

FastAPI 用于实现 SPA 和移动应用程序的后端,这些后端主要由返回 JSON response(而不是服务器呈现的 HTML )的 Web API 端点组成。FastAPI 利用装饰器、类型提示和代码自省来消除 Web API 的大量样板代码,并且还自动发布交互式 OpenAPI——a.k.a. Swagger——我们创建的 API 的文档。图 21-4 显示了为 web_mojifinder.py 自动生成的 /docs 页面。

示例 21-11 是 web_mojifinder.py 的代码,但这只是后端代码。当您点击根 URL / 时,服务器会发送 form.html 文件,该文件有 81 行代码,包括 54 行 JavaScript 来与服务器通信并用结果填充表格。如果你有兴趣阅读没有使用框架生成的 JavaScript,请在Fluent Python 2e code repository中找到 21-async/mojifinder/static/form.html 

要运行 web_mojifinder.py,您需要安装两个包及其依赖项:FastAPI 和 uvicorn

这是在开发模式下使用 uvicorn 运行示例 21-11 的命令:

$ uvicorn web_mojifinder:app --reload

参数是:

web_mojifinder:app

包名、冒号和其中定义的 ASGI 应用程序的名称——app 惯例使用的名称。

--reload

使 uvicorn 监控对应用程序源文件的更改并自动重新加载它们。仅在开发期间有用。

现在让我们研究一下 web_mojifinder.py 的源代码。

示例 21-11。 web_mojifinder.py:完整源码

  1. from pathlib import Path
  2. from unicodedata import name
  3. from fastapi import FastAPI
  4. from fastapi.responses import HTMLResponse
  5. from pydantic import BaseModel
  6. from charindex import InvertedIndex
  7. STATIC_PATH = Path(__file__).parent.absolute() / 'static' 1
  8. app = FastAPI( 2
  9. title='Mojifinder Web',
  10. description='Search for Unicode characters by name.',
  11. )
  12. class CharName(BaseModel): 3
  13. char: str
  14. name: str
  15. def init(app): 4
  16. app.state.index = InvertedIndex()
  17. app.state.form = (STATIC_PATH / 'form.html').read_text()
  18. init(app) 5
  19. @app.get('/search', response_model=list[CharName]) 6
  20. async def search(q: str): 7
  21. chars = sorted(app.state.index.search(q))
  22. return ({'char': c, 'name': name(c)} for c in chars) 8
  23. @app.get('/', response_class=HTMLResponse, include_in_schema=False)
  24. def form(): 9
  25. return app.state.form
  26. # no main funcion 10
  1. 与本章主题无关,但值得注意的是:pathlib 非常优雅的重载了 / 运算符
  2. 这一行定义了 ASGI 应用程序。它可以像 app = FastAPI() 一样简单。参数是自动生成文档的元数据。
  3. 带有 char 和 name 字段的 JSON 响应的 pydantic 模式
  4. 构建index并加载静态 HTML 表单,将两者保存在 app.state 中。
  5. 当这个模块被 ASGI 服务器加载时运行 init函数。
  6. /search 端点的路由; response_model 使用该 CharName pydantic 模型来描述响应格式。
  7. FastAPI 假定出现在函数或协程签名中但不在路由路径中的任何参数都将在 HTTP 查询字符串中传递, e.g. /search?q=cat。由于 q 没有默认值,如果查询字符串中缺少 q,FastAPI 将返回 422(不可处理实体)状态。
  8. 返回与 response_model 模式兼容的字典的迭代器,这样可以 允许 FastAPI 根据 @app.get 装饰器中的 response_model 构建 JSON 响应。

  9. 常规函数(即非异步函数)也可用于产生响应。

  10. 该模块没有主要功能。它由 ASGI 服务器加载和驱动——本例中为 uvicorn。

示例 21-11 没有直接调用 asyncio。 FastAPI 建立在 Starlette ASGI 工具包之上,而后者使用了 asyncio。

另请注意,search的函数体没有使用 await、async with 或 async for,因此它也可以是一个普通函数。我将search定义为协程只是为了演示 FastAPI 知道如何处理协程。在真正的应用程序中,大多数端点将查询数据库或访问其他远程服务器,因此 FastAPI 和一般的 ASGI 框架相比的一个关键优势是FastAPI 支持可以利用异步库进行网络 I/O 的协程。

TIP:

我为加载和提供静态 HTML 表单而编写的 init 和 form 函数是一种使示例简短且易于运行的技巧。推荐的最佳实践是在 ASGI 服务器前面有一个代理/负载均衡器来处理所有静态资源,并尽可能使用 CDN(内容交付网络)。Traefik 就是一个这样的代理/负载平衡器,它自称为“边缘路由器”,“代表系统接收请求并找出哪些组件负责处理它们。”FastAPI 有project generation脚本,可以生成代码。

关注类型提示的读者可能已经注意到search和form中没有返回类型提示。相反,FastAPI 依赖于路由装饰器中的 response_model= 关键字参数。 FastAPI 文档中的响应模型页面是这样解释的:

        响应模型在参数中传入,而不是作为函数返回类型注释.因为 path 函数实际上可能不会返回那个响应模型,而是返回一个字典、数据库对象或其他一些模型,然后使用 response_model 来执行字段限制和序列化。

例如,在search函数中,我返回了一个 dict 的生成器,而不是 CharName 对象的列表,但这足以让 FastAPI 和 pydantic 验证我的数据并构建与 response_model=list[CharName] 兼容的正确的 JSON 响应。

我们现在将重点关注响应图 21-5 中查询的 tcp_mojifinder.py 脚本。

一个异步 TCP 服务器

tcp_mojifinder.py 程序使用纯 TCP 与 Telnet 或 Netcat 等客户端通信,因此我可以使用 asyncio 编写它,而无需外部依赖,也无需重新发明 HTTP。图 21-5 显示了基于文本的 UI。

代码的长度是 web_mojifinder.py 的两倍,因此我将演示文稿分为三个部分:示例 21-12、示例 21-14 和示例 21-15。 tcp_mojifinder.py 的顶部——包括 import 语句——在示例 21-14 中,但我将开始描述supervisor协程和驱动程序的main函数。

示例 21-12。 tcp_mojifinder.py:一个简单的 TCP 服务器;继续示例 21-14 。

  1. async def supervisor(index: InvertedIndex, host: str, port: int) -> None:
  2. server = await asyncio.start_server( 1
  3. functools.partial(finder, index), 2
  4. host, port) 3
  5. socket_list = cast(tuple[TransportSocket, ...], server.sockets) 4
  6. addr = socket_list[0].getsockname()
  7. print(f'Serving on {addr}. Hit CTRL-C to stop.') 5
  8. await server.serve_forever() 6
  9. def main(host: str = '127.0.0.1', port_arg: str = '2323'):
  10. port = int(port_arg)
  11. print('Building index.')
  12. index = InvertedIndex() 7
  13. try:
  14. asyncio.run(supervisor(index, host, port)) 8
  15. except KeyboardInterrupt: 9
  16. print('\nServer shut down.')
  17. if __name__ == '__main__':
  18. main(*sys.argv[1:])
  1. 这个 await 很快得到了一个 asyncio.Server 的实例,一个 TCP 套接字服务器。默认情况下,start_server 创建并启动服务器,因此它已准备好接收连接。
  2. start_server 的第一个参数是 client_connected_cb,一个新的客户端连接开始时执行的回调函数。回调函数可以是普通函数或协程,但它必须只能传入两个参数:asyncio.StreamReader 和 asyncio.StreamWriter。但是,我的 finder 协程也需要接收一个index,所以我使用 functools.partial 来绑定该参数并获取一个可调用的读取器和写入器。使用户函数适配回调 API 是 functools.partial 最常见的用例。
  3. host 和 port 是 start_server 的第二个和第三个参数。请参阅 asyncio documentation中的完整签名。
  4. 之所以需要这种转换,是因为 typeshed 对 Server 类的 sockets 属性有一个过时的类型提示——截至 2021 年 5 月。请参阅关于 typeshed 的 issue #5535
  5. 显示服务器第一个套接字的地址和端口。
  6. 虽然 start_server 已经将服务器作为并发任务启动,但我需要在 server_forever 方法上等待,使supervisor协程在这里暂停。如果没有这一行,supervisor 将立即返回,结束以 asyncio.run(supervisor(…)) 开始的循环,并退出程序。documentation for Server.serve_forever说:“如果服务器已经接受连接,则可以调用此方法。”
  7. 建立倒排索引
  8. 启动事件循环运行supervisor协程
  9. 当我在运行它的终端上使用 CTRL-C 停止服务器时,捕获 KeyboardInterrupt 以避免分散注意力的回溯。

如果您研究 tcp_mojifinder.py 在服务器控制台上生成的输出(如示例 21-13 中所列),您可能会发现更容易理解控制流程是如何进行的。

示例 21-13。 tcp_mojifinder.py:这是图 21-5 中描述的会话的服务器端

  1. $ python3 tcp_mojifinder.py
  2. Building index. 1
  3. Serving on ('127.0.0.1', 2323). Hit CTRL-C to stop. 2
  4. From ('127.0.0.1', 58192): 'cat face' 3
  5. To ('127.0.0.1', 58192): 10 results.
  6. From ('127.0.0.1', 58192): 'fire' 4
  7. To ('127.0.0.1', 58192): 11 results.
  8. From ('127.0.0.1', 58192): '\x00' 5
  9. Close ('127.0.0.1', 58192). 6
  10. ^C 7
  11. Server shut down. 8
  12. $
  1. main的输出。在下一行出现之前,我看到我的机器在建立索引时有 0.6 秒的延迟。
  2. supervisor的输出。
  3. finder 中 while 循环的第一次迭代。 TCP/IP 堆栈将端口 58192 分配给我的 Telnet 客户端。如果有多个客户端连接到服务器,可以输出中看到不同的端口。
  4. finder 中 while 循环的第二次迭代。
  5. 我在客户端上按了CTRL-C; finder 中的 while 循环退出。
  6. finder 协程显示此消息然后退出。与此同时,服务器仍在运行,准备为另一个客户端提供服务
  7. 我在服务器终端上按了 CTRL-C; server.serve_forever 被取消,supervisor和事件循环结束。
  8. main的输出。

在 main 建立索引并启动事件循环后,supervisor 快速显示 Serving on... 消息并在 await server.serve_forever() 行暂停。此时,控制流进入事件循环并停留在那里,偶尔会返回到 finder 协程,每当它需要等待网络发送或接收数据时,它会将控制权交还给事件循环。

当事件循环处于活动状态时,将为每个连接到服务器的客户端启动一个新的 finder 协程实例。这样,这个简单的服务器就可以同时处理多个客户端。服务直到服务器上发生 KeyboardInterrupt 或其进程被操作系统杀死才会停止。

现在让我们看看 tcp_mojifinder.py 的上面部分,以及 finder 协程。

示例 21-14。 tcp_mojifinder.py:接示例 21-12。

  1. import asyncio
  2. import functools
  3. import sys
  4. from asyncio.trsock import TransportSocket
  5. from typing import cast
  6. from charindex import InvertedIndex, format_results 1
  7. CRLF = b'\r\n'
  8. PROMPT = b'?> '
  9. async def finder(index: InvertedIndex, 2
  10. reader: asyncio.StreamReader,
  11. writer: asyncio.StreamWriter) -> None:
  12. client = writer.get_extra_info('peername') 3
  13. while True: 4
  14. writer.write(PROMPT) # can't await! 5
  15. await writer.drain() # must await! 6
  16. data = await reader.readline() 7
  17. if not data: 8
  18. break
  19. try:
  20. query = data.decode().strip() 9
  21. except UnicodeDecodeError: 10
  22. query = '\x00'
  23. print(f' From {client}: {query!r}') 11
  24. if query:
  25. if ord(query[:1]) < 32: 12
  26. break
  27. results = await search(query, index, writer) 13
  28. print(f' To {client}: {results} results.') 14
  29. writer.close() 15
  30. await writer.wait_closed() 16
  31. print(f'Close {client}.') 17
  1. format_results 可用于在基于文本的 UI(如命令行或 Telnet 会话)中显示 InvertedIndex.search 的结果。
  2. 为了将 finder 传递给 asyncio.start_server,我用 functools.partial 将它包装起来,因为服务器需要一个只接受 reader 和 writer 参数的协程或函数。
  3. 获取套接字连接的远程客户端地址。
  4. 此循环处理一个对话,该对话一直持续到从客户端接收到控制字符。
  5. StreamWriter.write 方法不是协程,只是一个普通的函数;此行发送 ?> 提示。
  6. StreamWriter.drain 刷新写入器缓冲区;它是一个协程,所以必须用 await 驱动。
  7. StreamWriter.readline 是一个返回字节序列的协程。
  8. 如果没有收到任何字节,客户端关闭连接,因此退出循环。
  9. 使用默认的 UTF-8 编码将字节解码为 str。
  10. 当用户按下 CTRL-C 并且 Telnet 客户端发送控制字节时,可能会发生 UnicodeDecodeError;如果发生这种情况,为简单起见,会将查询替换为空字符。
  11. 在服务器控制台打印查询记录。
  12. 如果接收到控制字符或空字符,则退出循环。
  13. 进行真正的search搜索;下面会讲解search函数代码。
  14. 服务器控制台打印响应的记录。
  15. 关闭 StreamWriter。
  16. 等待 StreamWriter 关闭。这是 .close() method documentation的推荐做法。
  17. 在服务器控制台打印本次客户端会话的结束记录

这个例子的最后一部分是search协程:

示例 21-15。 tcp_mojifinder.py:搜索协程

  1. async def search(query: str, 1
  2. index: InvertedIndex,
  3. writer: asyncio.StreamWriter) -> int:
  4. chars = index.search(query) 2
  5. lines = (line.encode() + CRLF for line 3
  6. in format_results(chars))
  7. writer.writelines(lines) 4
  8. await writer.drain() 5
  9. status_line = f'{"─" * 66} {len(chars)} found' 6
  10. writer.write(status_line.encode() + CRLF)
  11. await writer.drain()
  12. return len(chars)
  1. search 必须是协程,因为它写入 StreamWriter 并且必须使用其 .drain() 协程方法
  2. 查询倒排索引。
  3. 此生成器表达式将生成以 UTF-8 编码的字节字符串,其中包含 Unicode 代码点、实际字符、其名称和 CRLF 序列——例如b'U+0039\t9\tDIGIT 9\r\n')。
  4. 写入lines。令人意外的是,writer.writelines 不是协程。
  5. 但是 writer.drain() 是一个协程。不要忘记await关键字!
  6. 构建状态行,然后将它写入

请注意,tcp_mojifinder.py 中的所有网络 I/O 都以字节流为单位:我们需要对从网络接收到的字节流进行解码,并在将字符串发送出去之前对其进行编码。在 Python 3 中,默认编码是 UTF-8,这就是我在本示例中的所有编码和解码调用中默认隐式使用的编码。

WARNING

请注意,有些 I/O 方法是协程,必须由 await 驱动,而另一些则是简单的函数。例如,StreamWriter.write 是一个普通函数,因为它写入的是缓冲区。另一方面,StreamWriter.drain——它刷新缓冲区并执行网络 I/O——是一个协程,StreamReader.readline 也是如此——但是 StreamWriter.writelines不是协程!在我编写本书的第一版时,clearly labeling coroutines as such asyncio API 文档正在改进:clearly labeling coroutines as such

tcp_mojifinder.py 代码利用了提供现成服务器的高级 asyncio Streams API,因此您只需要实现一个处理函数,它可以是一个普通的回调函数或协程。还有一个较低级别的传输和协议 API,其灵感来自 Twisted 框架中的传输和协议抽象。有关更多信息,请参阅 asyncio 文档,包括使用该较低级别 API 实现的 TCP and UDP echo servers and clients 。

我们的下一个主题是 async for 和适用的对象。

异步迭代和异步可迭代对象

我们在“异步上下文管理器”中看到 async with 如何与实现 __aenter__ 和 __aexit__ 方法的对象一起工作,其中__aexit__返回一个可等待对象——通常以协程对象的形式。

同样,async for 适用于异步可迭代对象:实现 __aiter__ 的对象。但是,__aiter__ 必须是常规方法——而不是协程方法——并且它必须返回一个异步迭代器。

一个异步迭代器提供了一个 __anext__ 协程方法,它返回一个可等待的对象——通常是一个协程对象。这个定义也佐证了我们在 “Don’t make the iterable an iterator for itself”中讨论的可迭代对象和迭代器的重要区别。

aiopg 异步 PostgreSQL 驱动程序文档有一个示例,演示了如何使用 async for 迭代数据库游标的行

  1. async def go():
  2. pool = await aiopg.create_pool(dsn)
  3. async with pool.acquire() as conn:
  4. async with conn.cursor() as cur:
  5. await cur.execute("SELECT 1")
  6. ret = []
  7. async for row in cur:
  8. ret.append(row)
  9. assert ret == [(1,)]

在此示例中,查询将返回单行,但在实际场景中, SELECT 查询的结果可能有数千行。对于大量的返回结果,游标不会在一个批次中加载所有行。因此,重要的是 async for row in cur: 在游标可能正在等待其他行时不会阻塞事件循环。通过将游标实现为异步迭代器,aiopg 可以在每次 __anext__ 调用时将控制权交还给事件循环,并在以后有更多行从 PostgreSQL 到达时恢复。

异步生成器函数

您可以通过使用 __anext__ 和 __aiter__ 编写一个类来实现异步迭代器,但还有一种更简单的方法:编写一个使用 async def 声明的函数并在函数体中使用 yield。这类似于生成器函数如何简化经典的迭代器模式。

让我们研究一个使用 async for 并实现异步生成器的简单示例。在示例 21-1 中,我们看到的 blogdom.py,这是一个探测域名的脚本。现在假设我们找到了我们在那里定义的探测协程的其他用途,并决定将它放入一个新模块——domainlib.py——以及一个新的 multi_probe 异步生成器,该生成器接收一个域名列表并在探测它们时产生结果。

我们很快就会看到 domainlib.py 的实现,但首先让我们看看它是如何与 Python 的新异步控制台一起使用的。

体验 Python 的异步控制台

从 Python 3.8 开始,您可以使用 -m asyncio 命令行选项运行解释器以获得“异步 REPL”:一个 Python 控制台,它导入 asyncio,提供一个正在运行的事件循环,并在顶级提示符处接受 await、async for 和 async with——否则在原生协程外使用这些关键字时会出现语法错误。

要试验 domainlib.py,请转到 Fluent Python 2e 代码存储库的本地副本中的 Fluent Python 2e code repository目录。然后运行:

$ python -m asyncio

您将看到控制台启动,就会得到类似于以下内容:

  1. asyncio REPL 3.9.1 (v3.9.1:1e5d33e9b9, Dec 7 2020, 12:10:52)
  2. [Clang 6.0 (clang-600.0.57)] on darwin
  3. Use "await" directly instead of "asyncio.run()".
  4. Type "help", "copyright", "credits" or "license" for more information.
  5. >>> import asyncio
  6. >>>

请注意标题声明可以使用 await 而不是 asyncio.run() 来驱动协程和其他可等待对象。另外:我没有输入 import asyncio。 asyncio 模块会自动导入,并且这行使用户清楚这个情况。

现在让我们导入 domainlib.py 并使用它的两个协程:probe 和 multi_probe。

示例 21-16。运行 python3 -m asyncio 后试验 domainlib.py。

  1. >>> await asyncio.sleep(3, 'Rise and shine!') 1
  2. 'Rise and shine!'
  3. >>> from domainlib import *
  4. >>> await probe('python.org') 2
  5. Result(domain='python.org', found=True) 3
  6. >>> names = 'python.org rust-lang.org golang.org no-lang.invalid'.split() 4
  7. >>> async for result in multi_probe(names): 5
  8. ... print(*result, sep='\t')
  9. ...
  10. golang.org True 6
  11. no-lang.invalid False
  12. python.org True
  13. rust-lang.org True
  14. >>>
  1. 尝试一个简单的 await 来查看异步控制台的运行情况。提示: asyncio.sleep() 接受一个可选的第二个参数,当您await它时返回该参数。
  2. 驱动probe协程。
  3. domainlib 版本的 probe 返回Result命名元组。
  4. 制作域列表。 .invalid 顶级域保留用于测试;此类域的 DNS 查询总是从 DNS 服务器获得 NXDOMAIN 响应,这意味着“该域不存在”
  5. 在 multi_probe 异步生成器上使用 async for进行迭代以显示结果。
  6. 请注意,结果与域被分配给 multiprobe 的顺序不同。结果在每个 DNS 响应返回时打印。

示例 21-16 显示 multi_probe 是一个异步生成器,因为它与 async for 兼容。现在让我们再做一些实验,从之前的例子继续。

示例 21-17。更多实验,从示例 21-16 继续。

  1. >>> probe('python.org') 1
  2. <coroutine object probe at 0x10e313740>
  3. >>> multi_probe(names) 2
  4. <async_generator object multi_probe at 0x10e246b80>
  5. >>> for r in multi_probe(names): 3
  6. ... print(r)
  7. ...
  8. Traceback (most recent call last):
  9. ...
  10. TypeError: 'async_generator' object is not iterable
  1. 调用原生协程返回一个协程对象。
  2. 调用异步生成器返回一个 async_generator 对象。
  3. 我们不能使用带有异步生成器的常规 for 循环,因为它们实现的是 __aiter__ 而不是 __iter__。

异步生成器由 async for 驱动,它可以是块语句(如示例 21-16 所示),它也出现在异步推导式中,我们将很快介绍。

实现异步生成器

现在让我们使用 multi_probe 异步生成器研究 domainlib.py 的代码。

示例 21-18。 domainlib.py:用于探测域的函数

  1. import asyncio
  2. import socket
  3. from collections.abc import Iterable, AsyncIterator
  4. from typing import NamedTuple, Optional
  5. class Result(NamedTuple): 1
  6. domain: str
  7. found: bool
  8. OptionalLoop = Optional[asyncio.AbstractEventLoop] 2
  9. async def probe(domain: str, loop: OptionalLoop = None) -> Result: 3
  10. if loop is None:
  11. loop = asyncio.get_running_loop()
  12. try:
  13. await loop.getaddrinfo(domain, None)
  14. except socket.gaierror:
  15. return Result(domain, False)
  16. return Result(domain, True)
  17. async def multi_probe(domains: Iterable[str]) -> AsyncIterator[Result]: 4
  18. loop = asyncio.get_running_loop()
  19. coros = [probe(domain, loop) for domain in domains] 5
  20. for coro in asyncio.as_completed(coros): 6
  21. result = await coro 7
  22. yield result 8
  1. NamedTuple 使probe的结果更易于阅读和调试。
  2. 这种类型的别名是为了避免下一行对于书单来说太长。
  3. probe 现在接受一个可选的loop参数,以避免在这个协程由 multi_probe 驱动时重复调用 get_running_loop。

  4. 异步生成器函数生成异步生成器对象,可以将其注解为 AsyncIterator[SomeType]。

  5. 构建probe协程对象的列表,每个对象都有不同的域。

  6. 这里不使用async for,因为 asyncio.as_completed 返回的结果是一个经典的生成器。

  7. 在协程对象上等待以检索结果。

  8. 产出result。这一行使 multi_probe 成为异步生成器。

Note:

示例 21-18 中的 for 循环可以更简洁:

  1. for coro in asyncio.as_completed(coros):
  2. yield await coro

Python 将其解析为 yield (await coro),所以这样写没有问题。

我认为在本书的第一个异步生成器示例中使用这种快捷方式可能会造成混淆,因此我将其分成两行。

根据这版 domainlib.py,我们可以在 domaincheck.py 中演示 multi_probe 异步生成器的使用:一个脚本,它接收一个域名后缀并搜索由短 Python 关键字组成的域。这是 domaincheck.py 的示例输出:

  1. $ ./domaincheck.py net
  2. FOUND NOT FOUND
  3. ===== =========
  4. in.net
  5. del.net
  6. true.net
  7. for.net
  8. is.net
  9. none.net
  10. try.net
  11. from.net
  12. and.net
  13. or.net
  14. else.net
  15. with.net
  16. if.net
  17. as.net
  18. elif.net
  19. pass.net
  20. not.net
  21. def.net

多亏了 domainlib,domaincheck.py 的代码很简单。

示例 21-19。 domaincheck.py:使用 domainlib 探测域的实体程序

  1. #!/usr/bin/env python3
  2. import asyncio
  3. import sys
  4. from keyword import kwlist
  5. from domainlib import multi_probe
  6. async def main(tld: str) -> None:
  7. tld = tld.strip('.')
  8. names = (kw for kw in kwlist if len(kw) <= 4) 1
  9. domains = (f'{name}.{tld}'.lower() for name in names) 2
  10. print('FOUND\t\tNOT FOUND') 3
  11. print('=====\t\t=========')
  12. async for domain, found in multi_probe(domains): 4
  13. indent = '' if found else '\t\t' 5
  14. print(f'{indent}{domain}')
  15. if __name__ == '__main__':
  16. if len(sys.argv) == 2:
  17. asyncio.run(main(sys.argv[1])) 6
  18. else:
  19. print('Please provide a TLD.', f'Example: {sys.argv[0]} COM.BR')
  1. 生成长度不超过 4 的关键字生成器。
  2. 根据TLD生成域名生成器
  3. 格式化输出:表格的标题。
  4. 异步迭代 multi_probe(domains)。
  5. 将indent设置为''或两个制表符以将结果放在正确的列中。
  6. 使用传入的命令行参数运行main协程。

生成器还有一个与迭代无关的额外用途:它们可以用作上下文管理器。这也适用于异步生成器。

作为上下文管理器的异步生成器

编写自定义的异步上下文管理器并不是频率较高的用例,但如果您需要编写,请考虑使用 Python 3.7 中添加到 contextlib 模块中的 @asynccontextmanager 装饰器。这与我们在“使用@contextmanager”中学习的@contextmanager 装饰器非常相似。

将@asynccontextmanager 与 loop.run_in_executor 结合的一个有趣示例出现在 Caleb Hattingh 的《在 Python 中使用 Asyncio》一书中。示例 21-20 是 Caleb 的代码 — 在本文中进行简单的更改并添加了标注。

示例 21-20。使用 @asynccontextmanager 和 loop.run_in_executor 的示例

  1. from contextlib import asynccontextmanager
  2. @asynccontextmanager
  3. async def web_page(url): 1
  4. loop = asyncio.get_running_loop() 2
  5. data = await loop.run_in_executor( 3
  6. None, download_webpage, url)
  7. yield data 4
  8. await loop.run_in_executor(None, update_stats, url) 5
  9. async with web_page('google.com') as data: 6
  10. process(data)
  1. 装饰器函数必须是异步生成器。

  2. 对Caleb 代码的小更新:使用轻量级的 get_running_loop 代替 get_event_loop。

  3. 假设 download_webpage 是一个使用 requests 库的阻塞函数;我们在一个单独的线程中运行它以避免阻塞事件循环。

  4. 这个 yield 表达式之前的所有行都将成为装饰器构建的异步上下文管理器的 __aenter__ 协程方法。data 的值将绑定到下面 async with 语句中的 as 子句之后的 data 变量。

  5. yield 之后的行将成为 __aexit__ 协程方法。这里另一个阻塞调用被委托给线程执行器。

  6. 将 web_page 与 async with 一起使用。

这与顺序 @contextmanager 装饰器非常相似。请参阅“Using @contextmanager” 了解更多详细信息,包括yield行的异常处理。有关@asynccontextmanager 的另一个示例,请参阅contextlib documentation

现在让我们通过将异步生成器函数与原生协程进行对比来总结我们对异步生成器函数的介绍。

异步生成器对比原生协程

以下是原生协程和异步生成器函数之间的一些关键相似点和不同点:

  • 两者都使用 async def 声明。
  • 异步生成器的主体中总是有一个 yield 表达式——这就是它成为生成器的原因。原生协程不包含 yield。
  • 原生协程可能会return None 以外的一些值。异步生成器只能使用空的return语句。
  • 原生协程是可等待的:它们可以由 await 表达式驱动或传递给许多接收可等待对象参数的 asyncio 函数之一,例如 create_task。异步生成器是不可等待的。它们是异步可迭代对象,由 async for 或异步推导式驱动。

是时候谈谈异步推导式了。

异步推导式和异步生成器表达式

PEP 530-Asynchronous Comprehensions 从 Python 3.6 开始在推导式和生成器表达式的语法中引入了 async for 和 await 的使用。

PEP 530 定义的async for和await唯一可以出现在 async def 函数体之外的结构是异步生成器表达式。

定义和使用异步生成器表达式

给定示例 21-18 中的 multi_probe 异步生成器,我们可以编写另一个异步生成器,仅返回找到的域的名称。下面是使用 -m asyncio 启动的异步控制台:

  1. >>> from domainlib import multi_probe
  2. >>> names = 'python.org rust-lang.org golang.org no-lang.invalid'.split()
  3. >>> gen_found = (domain async for domain, found in multi_probe(names) if found) 1
  4. >>> gen_found
  5. <async_generator object <genexpr> at 0x10a8f9700> 2
  6. >>> async for name in gen_found: 3
  7. ... print(name)
  8. ...
  9. golang.org
  10. python.org
  11. rust-lang.org
  1. 使用 async for 使其成为异步生成器表达式。它可以在 Python 模块中的任何位置定义。
  2. 异步生成器表达式构建了一个 async_generator 对象——与 multi_probe 等异步生成器函数返回的对象类型完全相同。
  3. 异步生成器对象由 async for 语句驱动——而该语句只能出现在 async def 的函数体中——或者我在本例中使用的神奇异步控制台中。

总结一下:异步生成器表达式可以在程序的任何地方定义,但它只能在原生协程或异步生成器函数中使用。

PEP 530 引入的其余结构只能在原生协程或异步生成器函数中定义和使用。

异步推导式

Yury Selivanov——PEP 530 的作者——证明了异步理解的必要性,接下来复制了三个简短的代码片段。

我们可以将下面这段代码进行改写:

  1. result = []
  2. async for i in aiter():
  3. if i % 2:
  4. result.append(i)

改写为:

result = [i async for i in aiter() if i % 2]

另外,如果是原生协程的fun,我们应该可以这样写:

result = [await fun() for fun in funcs]

TIP:

在列表推导中使用 await 类似于使用 asyncio.gather。但是由于它的可选参数 return_exceptions ,gather 能够更好地控制异常处理。Caleb Hattingh 建议始终设置 return_exceptions=True(默认为 False)。请参阅 asyncio.gather documentation 了解更多信息。

回到神奇的异步控制台:

  1. >>> names = 'python.org rust-lang.org golang.org no-lang.invalid'.split()
  2. >>> names = sorted(names)
  3. >>> coros = [probe(name) for name in names]
  4. >>> await asyncio.gather(*coros)
  5. [Result(domain='golang.org', found=True), Result(domain='no-lang.invalid', found=False),
  6. Result(domain='python.org', found=True), Result(domain='rust-lang.org', found=True)]
  7. >>> [await probe(name) for name in names]
  8. [Result(domain='golang.org', found=True), Result(domain='no-lang.invalid', found=False),
  9. Result(domain='python.org', found=True), Result(domain='rust-lang.org', found=True)]
  10. >>>

请注意,我对名称列表进行了排序,以表明在这两种情况下,结果都是按照提交的顺序输出显示。

PEP 530 允许在列表推导以及 dict 和集合推导中使用 async for 和 await。例如,这里是存储 multi_probe 的结果一个字典推导式,在异步控制台中是这样的:

  1. >>> {name: found async for name, found in multi_probe(names)}
  2. {'golang.org': True, 'python.org': True, 'no-lang.invalid': False,
  3. 'rust-lang.org': True}

我们可以在 for 或 async for 子句之前的表达式中使用 await 关键字,也可以在 if 子句之后的表达式中使用。这是异步控制台中的集合推导式,仅收集找到的域名:

  1. >>> {name for name in names if (await probe(name)).found}
  2. {'rust-lang.org', 'python.org', 'golang.org'}

由于 __getattr__ 运算符(.)的优先级更高,需要在 await 表达式周围加上额外的括号.

同样,所有这些推导式只能出现在 async def函数体中或异步控制台中。

现在让我们谈谈异步语句、异步表达式以及它们创建的对象的一个​​非常重要的特性。这些结构通常与 asyncio 一起使用,但它们实际上是独立于库的。

超越 asyncio库 的异步库:Curio

Python 的 async/await 语言结构不依赖于任何特定的事件循环或库。由于是通过特殊方法提供的可扩展 的API,所有人都可以编写自己的异步运行时环境和框架来驱动原生协程、异步生成器等。

这就是 David Beazley 在他的 Curio 项目中所做的。他有兴趣重新思考如何在从零开始构建的框架中使用这些新的语言特性。回想一下,asyncio 是在 Python 3.4 中发布的,它使用 yield from 而不是 await,因此它的 API 无法利用异步上下文管理器、异步迭代器以及 async/await 关键字驱动的其他功能。因此,与 asyncio 相比,Curio 具有更简洁的 API 和更简单的实现。

示例 21-21 显示了重写为使用 Curio 库的 blogdom.py 脚本(示例 21-1)。

示例 21-21。 blogdom.py:示例 21-1,现在使用 Curio实现。

  1. #!/usr/bin/env python3
  2. from curio import run, TaskGroup
  3. import curio.socket as socket
  4. from keyword import kwlist
  5. MAX_KEYWORD_LEN = 4
  6. async def probe(domain: str) -> tuple[str, bool]: 1
  7. try:
  8. await socket.getaddrinfo(domain, None) 2
  9. except socket.gaierror:
  10. return (domain, False)
  11. return (domain, True)
  12. async def main() -> None:
  13. names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
  14. domains = (f'{name}.dev'.lower() for name in names)
  15. async with TaskGroup() as group: 3
  16. for domain in domains:
  17. await group.spawn(probe, domain) 4
  18. async for task in group: 5
  19. domain, found = task.result
  20. mark = '+' if found else ' '
  21. print(f'{mark} {domain}')
  22. if __name__ == '__main__':
  23. run(main()) 6
  1. probe 不需要获取事件循环,因为……
  2. getaddrinfo 是 curio.socket 的顶级函数,而不是loop对象的方法——它在 asyncio 中是这样的。
  3. TaskGroup 是 Curio 中的一个核心概念,用于监视和控制多个协程,并确保它们都被执行和清理。
  4. TaskGroup.spawn 是启动协程的方式,由特定的 TaskGroup 实例管理。协程被包装为Task。
  5. 在 TaskGroup 上使用 async for进行迭代会在每个完成时产生 Task 实例。这对应于示例 21-1 中使用 for ... as_completed(...): 的行。
  6. Curio 开创了这种在 Python 中启动异步程序的明智方法。

扩展最后一点:如果您查看 Fluent Python 第一版的 asyncio 代码示例,您会看到类似这样的行,一遍又一遍地重复:

  1. loop = asyncio.get_event_loop()
  2. loop.run_until_complete(main())
  3. loop.close()

Curio TaskGroup 是一个异步上下文管理器,它替换了 asyncio 中的几个 ad-hoc API 和编码模式。我们刚刚看到了通过迭代TaskGroup替代 asyncio.as_completed(...) 函数。

另一个例子:Task Groups docs 的这个代码片段没有使用特殊的gather函数,而是收集组中所有任务的结果:

  1. async with TaskGroup(wait=all) as g:
  2. await g.spawn(coro1)
  3. await g.spawn(coro2)
  4. await g.spawn(coro3)
  5. print('Results:', g.results)

任务组支持structured concurrency:一种并发编程形式,它将一组异步任务的所有活动限制在一个入口和出口点。这类似于结构化编程,它避开 GOTO 命令并引入块语句来限制循环和子程序的入口和出口点。当用作异步上下文管理器时,TaskGroup 确保在退出封闭块时完成或取消内部产生的所有任务,并抛出期间的任何异常。

Note:

在即将发布的 Python 版本中,asyncio 可能会采用结构化并发。在PEP 654–Exception Groups and except*中出现了一个强烈的迹象,它已被批准用于 Python 3.11。

 Motivation部分提到了 Trio 的“nurseries”,这是他们对任务组的名称:“在 asyncio 中实现更好的任务生成 API,受 Trio Nurseries 的启发,是这个 PEP 的主要动机。”

Curio 的另一个重要特性是更好地支持在同一代码库中使用协程和线程进行编程——这是大多数重要异步程序的必要条件。使用 await spawn_thread(func, ...) 启动一个线程会返回一个具有类似任务接口的 AsyncThread 对象。线程可以调用协程,这要归功于一个特殊的 AWAIT(coro) 函数——以全大写命名,因为 await 现在是一个关键字。

Curio 还提供了一个 UniversalQueue,可用于协调线程、Curio 协程和 asyncio 协程之间的工作。没错,Curio 具有允许它在一个线程中运行,以及在另一个线程中的运行Asyncio的特性,在同一个进程中,他们通过 UniversalQueue 和 UniversalEvent 进行通信。这些“通用”类的 API 在协程序列内部和外部是相同的,但在协程中,您需要在调用前加上 await。

当我在 2021 年 10 月写这篇文章时,HTTPX 是第一个与 Curio 兼容的 HTTP 客户端库,但我还不知道有任何异步数据库库支持它。在 Curio 存储库中有一组令人印象深刻的网络编程示例,包括一个使用 WebSocket 的示例,另一个实现 RFC 8305——Happy Eyeballs 并发算法,用于连接到 IPv6 端点,并在需要时快速回退到 IPv4。

Curio的设计一直很有影响力。由 Nathaniel J. Smith 创立的 Trio 框架深受 Curio 的启发。Curio 还可能促使 Python 贡献者提高 asyncio API 的可用性。例如,在其最早的版本中,asyncio 用户经常不得不获取和传递一个loop对象,因为一些基本函数要么是loop方法,要么需要一个loop参数。在最近的 Python 版本中,不需要经常直接访问loop,事实上,一些接受可选loop参数的函数现在正在废弃该参数。

异步类型的类型注解是我们的下一个主题。

异步对象的类型提示

原生协程的返回类型描述了当你在那个协程上等待时你得到什么,它是出现在原生协程函数体的返回语句中的对象的类型。

本章提供了许多使用注解的原生协程示例,包括示例 21-21 中的probe:

  1. async def probe(domain: str) -> tuple[str, bool]:
  2. try:
  3. await socket.getaddrinfo(domain, None)
  4. except socket.gaierror:
  5. return (domain, False)
  6. return (domain, True)

如果需要注解协程对象的参数,则泛型类型为:

  1. class typing.Coroutine(Awaitable[V_co], Generic[T_co, T_contra, V_co]):
  2. ...

Python 3.5 和 3.6 中引入了这个类型以及以下类型来注解异步对象: 

  1. class typing.AsyncContextManager(Generic[T_co]):
  2. ...
  3. class typing.AsyncIterable(Generic[T_co]):
  4. ...
  5. class typing.AsyncIterator(AsyncIterable[T_co]):
  6. ...
  7. class typing.AsyncGenerator(AsyncIterator[T_co], Generic[T_co, T_contra]):
  8. ...
  9. class typing.Awaitable(Generic[T_co]):
  10. ...

对于 Python ≥ 3.9,请使用上述的 collections.abc 等效项。

我想强调这些泛型类型的三个方面。

第一:它们在第一个类型参数上都是协变的,这是从这些对象产生的项的类型。回顾“Variance Rules of Thumb”的规则 #1:

        如果正式类型参数定义了来自对象的数据的类型,则它可以是协变的。

第二:AsyncGenerator 和 Coroutine 在倒数第二个参数上是逆变的。这是事件循环调用以驱动异步生成器和协程的低级 .send() 方法的参数类型。因此,它是一种“输入”类型。因此,根据经验法则#2,它可以是逆变的:

        如果形式类型参数定义了在初始构造后进入对象的数据的类型,则它可以是逆变的。

第三:AsyncGenerator 没有返回类型,这与我们在“经典协程的通用类型提示”中看到的 typing.Generator 形成对比。正如我们在“经典协程”中看到的那样,通过抛出 StopIteration(value) 返回一个值是使生成器能够作为协程运行并支持 yield from 的技巧之一。异步对象之间没有这样的重叠:AsyncGenerators 对象不返回值,并且与使用 typing.Coroutine 注解的原生协程对象完全分开。

最后,让我们简要讨论一下异步编程的优势和挑战。

异步编程的优势和挑战

本章结束的部分讨论了有关异步编程的高级思想,无论您使用哪种语言或库。

让我们首先解释异步编程吸引人的第一个原因,然后是一个流行的神话,以及如何处理它

执行阻塞型调用的循环

Node.js 的发明者 Ryan Dahl 在介绍他的项目理念时说:“我们在用错误的方式执行I/O",他将阻塞型函数定义为执行文件或网络 I/O 的函数,并认为我们不能像对待非阻塞函数那样对待它们。为了解释原因,他在表 21-1 的第二列中给出了数字进行佐证。

表 21-1。现代计算机从不同设备读取数据的延迟;第三列以人类感官的时间比例进行显示

DeviceCPU cyclesProportional “human” scale

L1 cache

3

3 seconds

L2 cache

14

14 seconds

RAM

250

250 seconds

disk

41,000,000

1.3 years

network

240,000,000

7.6 years

为了理解表 21-1,请记住具有 GHz 时钟的现代 CPU 每秒运行数十亿个周期。假设一个 CPU 每秒运行 10 亿个周期。该 CPU 在一秒钟内可以进行超过 3.33 亿次一级缓存读取,或者同时进行 4(四次!)网络读取。表 21-1 的第三列通过将第二列乘以一个常数因子来透视这些数字。因此,在另一个宇宙中,如果从 L1 缓存读取一次需要 3 秒,那么网络读取将需要 7.6 年!

表 21-1 解释了为什么严格的异步编程方法可以带来高性能服务器。挑战在于实现这一纪律。第一步是认识到“I/O 密集型系统”是一种幻想。

I/O 密集型系统的神话

一个常见的重复模因是异步编程有利于实现“I/O 密集型系统”。我艰难地了解到没有“I/O 密集型系统”。您可能有 I/O 密集型函数。也许您系统中的绝大多数功能都受 I/O 限制,即系统花费更多时间等待 I/O,而不是处理数据。在等待时,他们将控制权交给事件循环,然后可以驱动其他一些待处理的任务。但不可避免的是,任何重要的系统都会有一些受 CPU 限制的部分。即使是不重要的系统也会在压力下证明这一点。在“Soapbox”中,我讲述了两个异步程序的故事, 他们被迫使用CPU 密集型函数,减慢了事件循环并严重影响了性能。

鉴于任何重要的系统都有CPU 密集型函数,因此处理它们是异步编程成功的关键。

避免 CPU 密集型陷阱

如果您正在大规模使用 Python,您应该有一些专门设计的自动化测试,用于在性能回归时进行测试。这对于异步代码至关重要,但也与Python线程代码相关——因为 GIL。如果你等到性能下降才和开发团队沟通,那就太迟了。修复可能需要进行一些重大改造。

当您确定CPU 是性能瓶颈时,可以使用以下方法:

  • 将任务委托给 Python 进程池;
  • 将任务委托给外部任务队列;
  • 用 Cython、C、Rust 或其他可编译为机器代码并与 Python/C API 接口的其他语言重写相关代码,最好能够释放 GIL;
  • 确定你可以承受性能损失并且什么都不做——但记录下这个决定,以便以后更容易恢复它。

外部任务队列应该在项目一开始就尽快选择和集成,这样团队就可以拿来即用。

最后一个选项——什么都不做——属于technical debt范畴。

并发编程是一个引人入胜的话题,我想写更多关于它的内容。但这不是本书的主要重点,而且这已经是最长的章节之一,所以让我们把它总结一下。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/102515
推荐阅读
相关标签
  

闽ICP备14008679号