赞
踩
从python3.4的装饰器,到python3.4的async/await语法糖,再到python3.7中加入的一些高层级API,asyncio模块发生了很多更新变化。
本文主要介绍的是几个高层级API在python3.11中的用法。在官方文档中也是推荐开发者使用高层级的API,而不是使用低层级的API来手动任务,或者创建和关闭事件循环。
协程函数指返回一个coroutine对象的函数。
协程函数可通过async def
语句来定义,并且其中可能包含await
、async for
和async with
关键字。
await的作用是挂起协程(coroutine)的执行以等待一个可等待(awaitable)对象。
如果一个对象可以在await
语句中使用,那么它就是 可等待 对象。许多asyncio API
都被设计为接受可等待对象。
在asyncio
中,有下面这些可等待对象:
import asyncio
# async声明一个协程函数main()
async def main():
# await相当于一个标记,告诉程序这里有一个阻塞操作,可以将协程挂起并等待
await asyncio.sleep(1)
print('hello')
asyncio.run(main())
在协程函数中可能会涉及到一些网络请求的操作,传统的requests
模块并不能在协程函数中使用。
由于网络请求是阻塞操作,在遇到阻塞操作时协程是需要被挂起等待的,否则不能实现异步效果,而需要等待的对象都需要用await
关键字声明。
但是只有可等待对象可以在await
中使用,而requests
模块中并没有可等待对象,在这里需要使用第三方模块aiohttp
代替requests
模块。
安装aiohttp
模块:
pip3 install aiohttp
用法示例:
# 下面是某个async def定义的函数内部代码
# 获取session对象
async with aiohttp.ClientSession() as req:
# 发起网络请求,这是一个阻塞操作,需要用await声明
async with await req.get(url, headers=headers) as response:
# 获取响应内容,read()是二进制格式,text()是文本格式
# 这也是一个阻塞操作,需要用await声明
pic_data = await response.read()
官方文档:
应用开发者通常应当使用高层级的 asyncio 函数,例如 asyncio.run(),应当很少有必要引用循环对象或调用其方法。
运行传入的 coroutine ,创建一个新的事件循环并在结束时关闭之。
负责管理asyncio
事件循环,终结异步生成器,并关闭线程池。
Execute the coroutine and return the result
asyncio.run(coro, *, debug=None)
要点:
并发地运行 fs 可迭代对象中的 Future 和 Task 实例并进入阻塞状态直到满足 return_when 所指定的条件。
asyncio.wait(fs, *, timeout=None, return_when=ALL_COMPLETED)
要点:
Task
或者Future
,不可以再传入coroutine
。Future
或Task
会在设定的timeout
秒数后被直接返回。return_when
指定此函数应在何时返回。它必须为以下常量之一:
此外:
可迭代对象 fs 不能为空,并且不接受产生任务的生成器,如create_task()。
返回两个 Task/Future 集合: (done, pending)
。
done, pending = await asyncio.wait(fs)
源码(python3.11):
async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED): """Wait for the Futures or Tasks given by fs to complete. The fs iterable must not be empty. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending). Usage: done, pending = await asyncio.wait(fs) Note: This does not raise TimeoutError! Futures that aren't done when the timeout occurs are returned in the second set. """
并发运行传入的序列中的全部可等待对象(协程、任务、Futrue)。
gather(*aws, return_exceptions=False)
要点:
*aws
是不定长参数,可以一次性传入多个可等待对象。return_exceptions参数:
False(默认):
引发的首个异常会立即传播给等待 gather()
的任务。序列中的其他可等待对象不会被取消并将继续运行。
True:
异常会和成功的结果一样处理,并且聚合至结果列表。
注意:
gather()
被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。CancelledError
一样处理 – 在此情况下 gather()
调用 不会 被取消。官方文档:
create_task() 函数,它是创建新任务的首选途径。
将协程封装成一个Task并且调度其执行,返回Task对象。
asyncio.create_task(coro, *, name=None, context=None):
要点:
RuntimeError
。context参数(3.11后增加):
源代码(python3.11):
def create_task(coro, *, name=None, context=None):
"""Schedule the execution of a coroutine object in a spawn task.
Return a Task object.
"""
loop = events.get_running_loop()
if context is None:
# Use legacy API if context is not needed
task = loop.create_task(coro)
else:
task = loop.create_task(coro, context=context)
_set_task_name(task, name)
return task
async def func1():
...
task = asyncio.create_task(func1())
asyncio.run(task)
# RuntimeError: no running event loop
# sys:1: RuntimeWarning: coroutine 'func1' was never awaited
原因如下:
在create_task()中,协程在封装成任务后会在loop中执行,但是此时并没有在运行的循环,所以引发RuntimeError: no running event loop
。
解决:
在调用create_task()前必须先获取循环对象,使程序中有正在运行的循环。
有三种方式:
使用低层级API:get_event_loop()
和run_until_complete()
import asyncio # 协程1 async def func1(): print(1) await asyncio.sleep(2) print(2) # 协程2 async def func2(): print(3) await asyncio.sleep(2) print(4) # 获取事件循环对象,使得程序中有正在运行的循环 loop = asyncio.get_event_loop() # 创建任务列表 tasks = [ # 注意,在这里create_task应该从loop创建 loop.create_task(func1()), loop.create_task(func2()) ] # 运行事件循环 loop.run_until_complete(asyncio.wait(tasks)) print("All Done!")
使用高层级API:run()
,但是需要另外定义一个协程函数作为入口,以满足调用顺序。
import asyncio # 协程1 async def func1(): print(1) await asyncio.sleep(2) print(2) # 协程2 async def func2(): print(3) await asyncio.sleep(2) print(4) # 另外定义一个main()函数作为事件循环的入口 async def main(): tasks = [ asyncio.create_task(func1()), asyncio.create_task(func2()) ] # 由于两个协程中都存在阻塞操作,在这里也应该用await声明一下阻塞操作 await asyncio.wait(tasks) print("All Done!") # 获取循环对象并运行循环,这里是整个程序的入口,执行run()后程序中便有了运行中的循环 asyncio.run(main())
使用高层级API:gather()
这种方式的好处就是可以直接传入协程(Coroutine),在其内部会自动创建任务(Task)
缺点是如果需要绑定回调函数的话,还是需要先创建任务再绑定,因为add_done_callback()
只能给Task或Future绑定
import asyncio async def func1(): print(1) await asyncio.sleep(2) print(2) async def func2(): print(3) await asyncio.sleep(2) print(4) async def main(): # 并发执行传入的两个协程,会自动封装成task并且循环运行 await asyncio.gather( func1(), func2() ) print("All Done!") # 程序入口 asyncio.run(main())
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。