当前位置:   article > 正文

asyncio模块(python3.11)

asyncio模块

前言

从python3.4的装饰器,到python3.4的async/await语法糖,再到python3.7中加入的一些高层级API,asyncio模块发生了很多更新变化。

本文主要介绍的是几个高层级API在python3.11中的用法。在官方文档中也是推荐开发者使用高层级的API,而不是使用低层级的API来手动任务,或者创建和关闭事件循环。


1 基本知识

1.1 协程函数/async关键字

协程函数指返回一个coroutine对象的函数。

协程函数可通过async def语句来定义,并且其中可能包含awaitasync forasync with关键字。

1.2 可等待对象/await关键字

await的作用是挂起协程(coroutine)的执行以等待一个可等待(awaitable)对象。

如果一个对象可以在await语句中使用,那么它就是 可等待 对象。许多asyncio API都被设计为接受可等待对象。

asyncio中,有下面这些可等待对象:

  • Coroutine(协程)
  • Task(任务)
  • Future
import asyncio

# async声明一个协程函数main()
async def main():
    # await相当于一个标记,告诉程序这里有一个阻塞操作,可以将协程挂起并等待
    await asyncio.sleep(1)
    print('hello')


asyncio.run(main())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

1.3 asyncio中的网络请求

在协程函数中可能会涉及到一些网络请求的操作,传统的requests模块并不能在协程函数中使用。

由于网络请求是阻塞操作,在遇到阻塞操作时协程是需要被挂起等待的,否则不能实现异步效果,而需要等待的对象都需要用await关键字声明。

但是只有可等待对象可以在await中使用,而requests模块中并没有可等待对象,在这里需要使用第三方模块aiohttp代替requests模块。

安装aiohttp模块:

pip3 install aiohttp
  • 1

用法示例:

# 下面是某个async def定义的函数内部代码
# 获取session对象
async with aiohttp.ClientSession() as req:
    # 发起网络请求,这是一个阻塞操作,需要用await声明
    async with await req.get(url, headers=headers) as response:
        # 获取响应内容,read()是二进制格式,text()是文本格式
        # 这也是一个阻塞操作,需要用await声明
        pic_data = await response.read()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2 高层级API

2.1 run()

官方文档:
应用开发者通常应当使用高层级的 asyncio 函数,例如 asyncio.run(),应当很少有必要引用循环对象或调用其方法。

运行传入的 coroutine ,创建一个新的事件循环并在结束时关闭之。

负责管理asyncio事件循环,终结异步生成器,并关闭线程池。

Execute the coroutine and return the result

asyncio.run(coro, *, debug=None)
  • 1

要点:

  1. 只能传入coroutine,不支持传入多个。
  2. 当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。
  3. 它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
  4. 返回的结果是所运行的协程的返回值

2.2 wait()

并发地运行 fs 可迭代对象中的 Future 和 Task 实例并进入阻塞状态直到满足 return_when 所指定的条件。

asyncio.wait(fs, *, timeout=None, return_when=ALL_COMPLETED)
  • 1

要点:

  1. 传入的是可迭代对象,例如:一个由多个task元素组成的列表。
  2. python3.8后,只能传入Task或者Future,不可以再传入coroutine
  3. 当超时发生时尚未完成的FutureTask会在设定的timeout秒数后被直接返回。
  4. return_when指定此函数应在何时返回。它必须为以下常量之一:
    • FIRST_COMPLETED:函数将在任意可等待对象结束或取消时返回。
    • FIRST_EXCEPTION:函数将在任意可等待对象因引发异常而结束时返回。
    • ALL_COMPLETED:函数将在所有可等待对象结束或取消时返回。

此外:

  1. 可迭代对象 fs 不能为空,并且不接受产生任务的生成器,如create_task()。

  2. 返回两个 Task/Future 集合: (done, pending)

    done, pending = await asyncio.wait(fs)
    
    • 1

源码(python3.11):

async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
    """Wait for the Futures or Tasks given by fs to complete.
    
    The fs iterable must not be empty.

    Coroutines will be wrapped in Tasks.

    Returns two sets of Future: (done, pending).

    Usage:

        done, pending = await asyncio.wait(fs)

    Note: This does not raise TimeoutError! Futures that aren't done
    when the timeout occurs are returned in the second set.
    """
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

2.3 gather()

并发运行传入的序列中的全部可等待对象(协程、任务、Futrue)

gather(*aws, return_exceptions=False)
  • 1

要点:

  1. 参数中的*aws是不定长参数,可以一次性传入多个可等待对象
  2. 如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。
  3. 如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

return_exceptions参数:

  • False(默认):

    引发的首个异常会立即传播给等待 gather() 的任务。序列中的其他可等待对象不会被取消并将继续运行。

  • True:

    异常会和成功的结果一样处理,并且聚合至结果列表。

注意:

  • 如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消
  • 如果 aws 序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了 CancelledError 一样处理 – 在此情况下 gather() 调用 不会 被取消。

2.4 create_task()

官方文档:
create_task() 函数,它是创建新任务的首选途径。

将协程封装成一个Task并且调度其执行,返回Task对象。

asyncio.create_task(coro, *, name=None, context=None):
  • 1

要点:

  1. 只能传入coroutine,返回Task对象
  2. name可以设置task的名称,会在返回的结果中体现。
  3. 该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError

context参数(3.11后增加):

  • 允许指定自定义的contectvars.Context共协程运行,未指定时将创建当前上下文的副本。

源代码(python3.11):

def create_task(coro, *, name=None, context=None):
    """Schedule the execution of a coroutine object in a spawn task.

    Return a Task object.
    """
    loop = events.get_running_loop()
    if context is None:
        # Use legacy API if context is not needed
        task = loop.create_task(coro)
    else:
        task = loop.create_task(coro, context=context)

    _set_task_name(task, name)
    return task
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2.5 一个错误调用API的分析

async def func1():
    ...

task = asyncio.create_task(func1())
asyncio.run(task)
# RuntimeError: no running event loop
# sys:1: RuntimeWarning: coroutine 'func1' was never awaited
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

原因如下:

在create_task()中,协程在封装成任务后会在loop中执行,但是此时并没有在运行的循环,所以引发RuntimeError: no running event loop

解决:

在调用create_task()前必须先获取循环对象,使程序中有正在运行的循环。

三种方式

  1. 使用低层级API:get_event_loop()run_until_complete()

    import asyncio
    
    # 协程1
    async def func1():
        print(1)
        await asyncio.sleep(2)
        print(2)
    
    # 协程2
    async def func2():
        print(3)
        await asyncio.sleep(2)
        print(4)
    
    # 获取事件循环对象,使得程序中有正在运行的循环
    loop = asyncio.get_event_loop()
    
    # 创建任务列表
    tasks = [
        # 注意,在这里create_task应该从loop创建
        loop.create_task(func1()),
        loop.create_task(func2())
    ]
    # 运行事件循环
    loop.run_until_complete(asyncio.wait(tasks))
    print("All Done!")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
  2. 使用高层级API:run(),但是需要另外定义一个协程函数作为入口,以满足调用顺序。

    import asyncio
    
    # 协程1
    async def func1():
        print(1)
        await asyncio.sleep(2)
        print(2)
    
    # 协程2
    async def func2():
        print(3)
        await asyncio.sleep(2)
        print(4)
    
    # 另外定义一个main()函数作为事件循环的入口
    async def main():
        tasks = [
            asyncio.create_task(func1()),
            asyncio.create_task(func2())
        ]
        # 由于两个协程中都存在阻塞操作,在这里也应该用await声明一下阻塞操作
        await asyncio.wait(tasks)
        print("All Done!")
    
    # 获取循环对象并运行循环,这里是整个程序的入口,执行run()后程序中便有了运行中的循环
    asyncio.run(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
  3. 使用高层级API:gather()

    这种方式的好处就是可以直接传入协程(Coroutine),在其内部会自动创建任务(Task)

    缺点是如果需要绑定回调函数的话,还是需要先创建任务再绑定,因为add_done_callback()只能给Task或Future绑定

    import asyncio
    
    
    async def func1():
        print(1)
        await asyncio.sleep(2)
        print(2)
    
    
    async def func2():
        print(3)
        await asyncio.sleep(2)
        print(4)
    
    
    async def main():
        # 并发执行传入的两个协程,会自动封装成task并且循环运行
        await asyncio.gather(
            func1(),
            func2()
        )
        print("All Done!")
    
    # 程序入口
    asyncio.run(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/832752
推荐阅读
相关标签
  

闽ICP备14008679号