赞
踩
asyncio (Asynchronous I/O)模块是一个异步代码库,它提供了一种基于协程(coroutine)和事件循环(event loop)的编程模型。是很多python异步架构的基础,多用于处理高并发网络请求方面的问题
协程(coroutine) 也被称为轻量级线程,是一种可以在单线程中实现多任务的编程方式(在用户态切换上下文)。它是一种特殊的函数或者方法,可以在执行过程中暂停,将控制权交还给调用者。
这使得协程能够在执行过程中等待某些事件发生,比如等待 IO 操作完成,而不会阻塞其他任务的执行
注意:下面使用的例子都是基于 Python3.5+ 来实现的,不适用于之前的 Python 版本。
在学习异步之前,先看如下两个例子的对比,可以更直观的看出使用异步的优势。
例子一:
import time
def func1():
print('1')
time.sleep(1)
print('2')
def func2():
print('3')
time.sleep(1)
print('4')
func1()
func2()
输出如下:这里立即输出了 1,等待了 1s 后,又输出了 2 和 3,然后又等待了 1s, 最后输出了 4
1 # 这里立马输出 1,等待 1s 后才输出 2
2
3 # 2 和 3 基本同时输出,然后等待 1s 后才输出 4
4
例子二:
import asyncio
async def func1():
print('1')
await asyncio.sleep(1)
print('2')
async def func2():
print('3')
await asyncio.sleep(1)
print('4')
task_list = [func1(), func2()]
asyncio.run( asyncio.wait(task_list))
输出如下:这里立即输出了 3 和 1,等待了 1s 后,输出了 4 和 2
3 # 这里 3 和 1基本同时输出,等待 1s 后,4 和 2
1
4
2
第一个例子想必都能看懂,函数依次调用,遇到 time.sleep() 函数,则等待指定时间,然后继续执行,第一个例子大概用时 2s
第二个例子则是使用了 asyncio 异步库,遇到 asyncio.sleep() 函数,则切换任务执行(先暂停执行这个函数,切换到别的函数执行,当指定等待的时间间隔过去后,在继续执行这个函数),第二个例子大概用时 1s
可以简单的理解为一个 while True 循环(死循环),它会循环检测执行某些代码。
只有当任务列表中的所有任务全部执行完,才会退出循环.
import asyncio
async def func():
print("123456")
# 获取一个事件循环的函数,异步事件循环可以管理和调度异步任务的执行
loop = asyncio.get_event_loop()
# 将任务放到 '任务列表',它会一直运行事件循环,直到 result 被完成为止
# 这里的 result 应该是一个可等待对象(协程对象 或者 Future对象)
# loop.run_until_complete(result) # 这行是伪代码,真正的代码示例如下
loop.run_until_complete(func())
# 输出如下:
123456
协程函数:使用 async def
语法定义,例如:async def func():
协程对象:调用 协程函数 返回一个协程对象.
# 这是一个协程函数
async def func():
pass
# result 是一个协程对象 (注意:函数内部代码不会执行,只是返回一个协程对象)
result = func()
如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理
import asyncio
async def func():
print("执行协程函数 func")
result = func()
"""
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_comlete( result ) # 运行事件循环
"""
# 在 python3.7 之后,可以省略上面两行,直接执行下面一行函数即可
asyncio.run(result)
await
关键字会暂停当前协程的执行(这个关键字只能在协程函数中使用
),让出事件循环的控制权,以便其它协程或异步任务可以执行,一旦被等待的对象完成,await 表达式将返回对象的结果,并且当前协程将继续执行。
await
+ 可等待的对象(协程对象、Task对象、Future对象)
同一个协程任务中,多个await
, 会依次等待可等待对象执行完成;不同协程任务中,遇到await会交替执行。
例子一:
import asyncio async def func(): print("func() start") await asyncio.sleep(2) # 这里模拟IO操作 print("func() end") return "我是 func()" async def main(): print("执行协程函数 main()") # 遇到IO操作挂起当前协程(任务),等待IO操作完成之后在继续往下执行,当前协程挂起,事件循环可以去执行其他的协程(任务) result = await func() print("io请求结果:", result) asyncio.run(main())
执行结果如下:
执行协程函数 main()
func() start
func() end
io请求结果: 我是 func()
例子二:
一个协程函数中可以有多个 await
,但是 await
是等待对应的函数得到结果后,才继续向下执行(同一协程任务中)
import asyncio async def func(flags): print("func() start") await asyncio.sleep(2) # 这里模拟IO操作 print("func() end") return "我是 func(" + flags + ")" async def main(): print("执行协程函数 main()") # 遇到IO操作挂起当前协程(任务),等待IO操作完成之后在继续往下执行,当前协程挂起,事件循环可以去执行其他的协程(任务) result = await func("1") print("IO第一次的请求结果:", result) result = await func("2") print("IO第二次的请求结果:", result) asyncio.run(main())
执行结果如下:
执行协程函数 main()
func() start
func() end
IO第一次的请求结果: 我是 func(1)
func() start
func() end
IO第二次的请求结果: 我是 func(2)
Task 对象用于表示一个可并发执行的异步任务。它是 asyncio 中的一个重要概念,用于管理协程的执行。Task对象用于包装协程。
说简单点就是让协程加入事件循环中等待被调度执行,或者说在事件循环中添加任务等待被调度执行。事件循环会在适当的时机执行协程,并在协程遇到阻塞操作时挂起,转而执行其他可运行的任务,当阻塞操作完成,事件循环会恢复挂起的协程执行。
asyncio.create_task() 函数是将协程对象包装成Task对象,该任务将被添加到默认的事件循环中,如果当前没有活动的事件循环,则会引发 RuntimeError 异常。
除了asyncio.create_task() 函数,还可以用低层级的 loop.create_task() 或 asyncio.ensure_future() 函数。
● asyncio.create_task() 是一个全局函数,不依赖特定的事件循环。且它在Python3.7 中才被引入
● loop.create_task() 是事件循环的方法,需要先获取事件循环的引用
● asyncio.ensure_future() 虽然也是一个全局函数,但是它返回的是Future对象。python3.7 之前使用这个函数
例子一:
这里和 3.4 await 关键字: 例子二 中不同的是,这里通过 asyncio.create_task()
函数将多个协程任务添加到事件循环中,这里事件循环中是不同的协程任务,所以这里会交替执行。
import asyncio async def func(arg): print(1) await asyncio.sleep(2) print(2) return "返回值: " + arg async def main(): print("开始创建Task对象") # 创建 Task 对象, 将当前执行 func 函数任务添加到事件循环 task1 = asyncio.create_task(func("task1")) task2 = asyncio.create_task(func("task2")) print("创建Task对象结束") # 当执行某协程遇到IO操作时, 会自动切换到其他任务 # 此处的 await 是等待Task对象全都执行完毕并获取结果 ret1 = await task1 ret2 = await task2 print(ret1, ret2) asyncio.run(main())
执行结果如下:
开始创建Task对象
创建Task对象结束
1
1
2
2
返回值: task1 返回值: task2
例子二:
asyncio.wait()
是asyncio
模块中的一个函数,用于等待一组可等待对象完成,它返回一个Future
对象,表示等待任务完成的结果。可以将任务列表(或者其他可迭代对象)作为参数传递给asyncio.wait()
函数。
import asyncio async def func(arg): print(1) await asyncio.sleep(2) print(2) return "返回值" + arg async def main(): print("开始创建Task对象") # 创建任务列表 task_list,列表中的每个元素都是一个Task对象 task_list = [ asyncio.create_task(func("task1")), asyncio.create_task(func("task2")) ] print("创建Task对象结束") # done: 是完成的 Task 对象, # pending: 是未完成的 Task 对象 done, pending = await asyncio.wait(task_list, timeout=None) print(done) print(pending) asyncio.run( main() )
执行结果如下
开始创建Task对象
创建Task对象结束
1
1
2
2
# 这是已经完成的Task对象
{<Task finished name='Task-2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task1'>, <Task finished name='Task-3' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task2'>}
# 这是未完成的Task对象,set() 表示集合为空(没有未完成的任务)
set()
asyncio.create_task()
函数有一个 name 的字段可以为创建的任务指定一个可选的名称,这样可以更容易和的识别和追踪特定的任务。
import asyncio async def func(): print(1) await asyncio.sleep(2) print(2) return "返回值" async def main(): print("开始创建Task对象") # 创建 Task 对象, 将当前执行 func 函数任务添加到事件循环 # name 给对象任务列表起名字,方便区分哪个任务结束 task_list = [ asyncio.create_task(func(), name="n1"), asyncio.create_task(func(), name="n2") ] print("创建Task对象结束") # done 是完成的 Task 对象, # pending 是未完成的 Task 对象 done, pending = await asyncio.wait(task_list, timeout=None) print(done) asyncio.run( main() ) # 输出如下: 开始创建Task对象 创建Task对象结束 1 1 2 2 {<Task finished name='n1' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值'>, <Task finished name='n2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值'>}
例子三:
例子三 和 例子二 相比,task_list 中直接写任务名称,没有通过 asyncio.create_task()
创建Task对象,这是因为create_task()
函数创建任务之后会自动添加到事件循环中。这里定义的 task_list 列表还没有创建事件循环。并且 task_list 列表中的任务也并不需要手动调用 asyncio.create_task()
函数,因为 asyncio.task()
函数会隐式的将协程对象转换为任务(Task对象)。
在asyncio.wait()
的参数中,可以直接包含协程对象,而不必显示的将其转换为任务,它会自动的将协程对象封装成任务,并进行等待和管理。
import asyncio async def func(arg): print(1) await asyncio.sleep(2) print(2) return "返回值" + arg task_list = [ func("task1"), func("task2") ] """ 这里这样写是错误的,因为asyncio.create_task()是将协程对象包装成任务,并添加到 事件循环中,但是这里事件循环还没有创建(asyncio.run() 才会创建事件循环),所以会报错 task_list = [ asyncio.create_task(func("task1")), asyncio.create_task(func("task2")) ] """ done, pending = asyncio.run( asyncio.wait(task_list)) print(done)
执行结果如下:
1
1
2
2
{<Task finished name='Task-2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task2'>, <Task finished name='Task-3' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task1'>}
asyncio
中的 Future
对象是一个更偏向底层的可等待对象,代表异步任务的最终结果。通常不会直接用到这个对象,而是直接使用Task对象来完成任务的创建和状态的追踪。
它是 Task 对象的基类,Task 继承 Future, Task 对象内部 await
结果的处理基 于Future
对象来的。
import asyncio
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),这个任务什么都不做
fut = loop.create_future()
# 等待任务结果(Future 对象),没结果会一直等待下去
await fut
asyncio.run( main() )
例子一:
import asyncio async def set_after(fut: asyncio.Future): await asyncio.sleep(2) # fut.set_result("success") async def main(): # 获取当前事件循环 loop = asyncio.get_running_loop() # 创建一个任务(Future对象),没有绑定任何行为,这个任务永远不知道什么时候结束 fut = loop.create_future() # 创建一个任务(Task 对象),绑定了 set_after 函数,函数内部在等待 2s 后,会给 fut 赋值 # 即手动设置 Future 任务的最终结果,那么 fut 就可以结束了 await loop.create_task(set_after(fut)) # 等待 Future 对象获取最终结果,否则一直等待下去 # 这里等待 Future 对象的时候,遇到阻塞操作(因为 Future 对象没有结果会一直等待下去), 所以会切换到 set_after 任务去执行 # 而 set_after 任务在等待 2s 后,会给 Future 对象赋值,所以此时 Future 对象返回, 代码执行结束 data = await fut print(data) asyncio.run( main() )
例子二:
在 asyncio
模块中,还可以使用 asyncio.Future()
函数来创建一个 Future
对象。它与 loop.create_future()
创建的 Future
对象不同的是,loop.create_future()
创建的对象与特定的事件循环相关联,这意味着,该 Future
对象只能在创建它的事件循环中使用,而 asyncio.Future()
创建的 Future
对象没有特定事件循环想关联,所以可以在任何事件循环中使用。
import asyncio async def task1(future): await asyncio.sleep(2) future.set_result('Task 1 completed') async def task2(future): await asyncio.sleep(3) future.set_result('Task 2 completed') async def main(): loop = asyncio.get_running_loop() fut = loop.create_future() # 启动两个任务,并共享同一个 Future 对象 asyncio.create_task(task1(fut)) asyncio.create_task(task2(fut)) # 等待 Future 对象的结果 result = await fut print(result) asyncio.run(main())
在上述示例中,我们定义了两个任务函数 task1() 和 task2(),它们分别会在一段时间后设置共享的Future对象的结果。在main()函数中,我们获取当前的事件循环对象,并使用loop.create_future()创建了一个 Future 对象 fut。然后,通过 asyncio.create_task() 函数启动了两个任务,它们共享同一个 Future 对象 fut。最后,使用 await 关键字等待 fut 的结果,并输出结果。
在这个示例中,loop.create_future() 的优势是可以将一个 Future 对象传递给多个协程,使得多个协程可以共享同一个 Future 对象,并在不同的时间点设置该对象的结果。这样可以实现更灵活的协同处理,例如在一个协程中等待多个任务完成后再继续执行。
首先,这个对象和 asyncio.Future 对象没有任何关系,它是使用线程池、进程池实现异步操作时用到的对象.
import time from concurrent.futures import Future from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.process import ProcessPoolExecutor def func(value): time.sleep(1) print(value) # 创建线程池 pool = ThreadPoolExecutor(max_workers=5) # 创建进程池 # pool = ProcessPoolExecutor(max_workers=5) for i in range(10): fut = pool.submit(func, i) print(fut)
他们为不同的应用场景设计,但是 Python 提供了一个将 futures.Future
对象包装成 asyncio.Future
对象的函数 asyncio.warp_future
。
import time import asyncio import concurrent.futures def func1(): # 某个耗时操作 time.sleep(2) return "123" async def main(): loop = asyncio.get_running_loop() # 1. Run in the default loop's executor ( 默认ThreadPoolExecutor ) # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象 # 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。 # 因为concurrent.futures.Future对象不支持await语法,所以需要包装为 asycio.Future对象 才能使用。 fut = loop.run_in_executor(None, func1) result = await fut print('default thread pool', result) # 2. Run in a custom thread pool: # with concurrent.futures.ThreadPoolExecutor() as pool: # result = await loop.run_in_executor( # pool, func1) # print('custom thread pool', result) # 3. Run in a custom process pool: # with concurrent.futures.ProcessPoolExecutor() as pool: # result = await loop.run_in_executor( # pool, func1) # print('custom process pool', result) asyncio.run(main())
loop.run_in_executor()
函数上面注释已经说清楚了,其实就是内部调用了 线程池或者进程池的submit
方法,然后调用了 asyncio.wrap_future()
,将起包装成了一个 asyncio 的 Future 对象。
loop.run_in_executor()
的作用就是将阻塞的函数委托给线程或进程池,在异步环境中运行它,以避免阻塞事件循环。它返回一个asyncio.Future
对象,表示在执行器中运行的函数的结果。
注意:我通过查看代码,发现里面调用的是 futures.wrap_future() 函数,而不是 asyncio.wrap_future()函数,可能不同版本之间不一样吧,但是这两个函数的功能是一样的,都是将concurrent.futures.Future对象包装为asyncio.Future对象,以便在 asyncio 的事件循环中进行异步处理,两者之间的主要区别在于模块的不同,一个是concurrent.futures模块中的函数,另一个是asyncio模块中的函数。
下面这个例子是 asyncio 模块加上不支持异步模块的混合使用。
import requests import asyncio import os async def download_images(url): # 发送网络请求,下载图片,遇到网络IO,自动切换其他任务 print("开始下载:", url) loop = asyncio.get_event_loop() # requests 模块默认不支持异步操作,所以使用线程池来配合实现 future = loop.run_in_executor(None, requests.get, url) response = await future print("下载完成") file_name = url.rsplit("_")[-1] with open(file_name, mode = 'wb') as f: f.write(response.content) if __name__ == '__main__': url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M04/B7/07/1488x0_1_autohomecar__CjIFVmR2EjmAV2uOAAJsOzOueIg341.jpg", "https://car2.autoimg.cn/cardfs/product/g24/M06/76/C9/1488x0_1_autohomecar__Chtk3WQQgK-AA6DDACIn_NIMSLw990.jpg", "https://www2.autoimg.cn/newsdfs/g26/M08/82/C6/1488x0_1_autohomecar__ChxkjmR2BUuADxtRAAVgtryBmXQ532.jpg" ] tasks = [ download_images(url) for url in url_list] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
异步迭代器: 实现了 __aiter__()
和 __anext__()
方法的对象。__anext__
必须返回一个 awaitable
对象。async for
会处理异步迭代器的__anext__()
方法所返回的可等待对象,直到其引发一个 StopAsyncIteration
异常。由 PEP 492 引入。
awaitable
对象 是指在异步编程中可以使用 await 关键字进行等待操作的对象
异步可迭代对象: 可在 async for
语句中被使用的对象。必须通过它的 __aiter__()
方法返回一个 asynchronous iterator
。由 PEP 492 引入。
以上的概念从该链接复制:python 术语对照表
import asyncio class Reader(object): """ 自定义异步迭代器, 同时也是异步可迭代对象""" def __init__(self): self.count = 0 async def readline(self): self.count += 1 if self.count == 100: return None return self.count def __aiter__(self): return self async def __anext__(self): val = await self.readline() if val is None: raise StopAsyncIteration return val async def func(): obj = Reader() # async for 语句必须写在协程函数里面 async for item in obj: print(item) asyncio.run(func())
上下文管理器 是Python 中用于管理资源的一种机制,它提供了一种方便的方式来管理资源的获取和释放,无论是在正常情况下还是发生异常的情况。它可以使用 with 语句来创建一个上下文,并确保在离开该上下文时正确处理资源.
异步上下文管理器 它通过定义 __aenter__()
和 __aexit__()
方法来对 async with
语句中的环境进行控制。由 PEP 492 引入。
import asyncio class AsyncContextManger: def __init__(self, conn): self.conn = conn async def do_something(self): # 异步操作数据库 return 666 async def __aenter__(self): # 异步链接数据库 self.conn = await asyncio.sleep(1) return self async def __aexit__(self, exc_type, exc, tb): # 异步关闭数据库链接 await asyncio.sleep(1) # async with 方法需要在异步函数内进行使用 # 当使用是会执行当前类中得__aenter__ 这个方法返回什么那么f就是什么[可以进行设置数据库链接] # 当上下文完成后 就会自动使用__aexit__方法[关闭数据库链接] async def func(): async with AsyncContextManger() as f: result = await f.do_something() print (result) asyncio.run(func())
uvloop
是 asyncio
的事件循环的替代方案。是第三方的人员写的。它的事件循环效率是大于默认 asyncio
的事件循环的, 性能更高.
注意:windows 下面并没有 uvloop,在 Linux 上面是可以安装成功的
pip3 install uvloop
import asyncio
import uvloop
# TODO:将 asyncio 里面的事件循环替换为 uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写 asyncio 的代码
# 内部循环事件会自动化变为 uvloop
asyncio.run( ... )
在一些情况下,我们可能需要将 asyncio
与其他异步框架一起使用,asyncio
支持与其它库一起使用,例如aioredis、aiomysql、aiohttp
等,这些库都实现了 asyncio
的协议,并且能够与 asyncio
无缝的协作。
pip3 install aioredis # 异步操作redis的库
pip3 install aiomysql # 异步操作mysql的库
pip3 install aiohttp # 异步编写http的库
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。