赞
踩
这几天看asyncio相关的pycon视频又重温了asyncio 的官方文档,收获很多。之前asyncio被吐槽的一点就是文档写的不好,Python 3.7 时 asyncio 的官方文档被 Andrew Svetlov 以及 Yury Selivanov 等核心开发者重写了,新的版本我觉得已经好很多了。在这里记录一下我对asyncio的一些理解。
asyncio里面主要有4个需要关注的基本概念
Eventloop可以说是asyncio应用的核心,是中央总控。Eventloop实例提供了注册、取消和执行任务和回调的方法。
把一些异步函数(就是任务,Task,一会就会说到)注册到这个事件循环上,事件循环会循环执行这些函数(但同时只能执行一个),当执行到某个函数时,如果它正在等待I/O返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同(Cooperative)运行:这就是事件循环的目标。
协程(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程:
- import asyncio
-
-
- async def a():
- print('Suspending a')
- await asyncio.sleep(0)
- print('Resuming a')
-
-
- async def b():
- print('In b')
-
-
- async def main():
- await asyncio.gather(a(), b())
-
-
- if __name__ == '__main__':
- asyncio.run(main())
这里面有4个重要关键点:
async def
声明,Python 3.5时的装饰器写法已经过时,我就不列出来了。await asyncio.sleep(0)
,await表示调用协程,sleep 0并不会真的sleep(因为时间为0),但是却可以把控制权交出去了。- loop = asyncio.get_event_loop()
- loop.run_until_complete(main())
- loop.close()
好了,我们先运行一下看看:
- ❯ python coro1.py
- Suspending a
- In b
- Resuming a
看到了吧,在并发执行中,协程a被挂起又恢复过。
接着说Future,它代表了一个「未来」对象,异步操作结束后会把最终结果设置到这个Future对象上。Future是对协程的封装,不过日常开发基本是不需要直接用这个底层Future类的。我在这里只是演示一下:
- In : def c():
- ...: print('Inner C')
- ...: return 12
- ...:
-
- In : future = loop.run_in_executor(None, c) # 这里没用await,None 表示默认的 executor
- Inner C
-
- In : future # 虽然c已经执行了,但是状态还是 pending。
- Out: <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]>
-
- In : future.done() # 还没有完成
- Out: False
-
- In : for a in dir(future):
- ...: if not a.startswith('_'):
- ...: print(a)
- ...:
- add_done_callback
- cancel
- cancelled
- done
- exception
- get_loop
- remove_done_callback
- result
- set_exception
- set_result
可以对这个Future实例添加完成后的回调(add_done_callback)、取消任务(cancel)、设置最终结果(set_result)、设置异常(如果有的话,set_exception)等。现在我们让Future完成:
- In : await future
- Out: 12
-
- In : future
- Out: <Future finished result=12>
-
- In : future.done()
- Out: True
-
- In : future.result()
- Out: 12
看到了吧,await之后状态成了finished。这里顺便说一下,一个对象怎么样就可以被await(或者说怎么样就成了一个awaitable对象)呢?给类实现一个__await__方法,Python版本的Future的实现大概如下:
- def __await_(self):
- if not self.done():
- self._asyncio_future_blocking = True
- yield self
- if not self.done():
- raise RuntimeError("await wasn't used with future")
- return self.result()
这样就可以await future
了,那为什么await future
后Future的状态就能改变呢,这是因为用loop.run_in_executor
创建的Future注册了一个回调(通过asyncio.futures.wrap_future
,加了一个_call_set_state
回调, 有兴趣的可以通过延伸阅读链接2找上下文)。
__await__
里面的yield self
不要奇怪,主要是为了兼容__iter__
,给旧的yield from
用:
- In : future = loop.run_in_executor(None, c)
- Inner C
-
- In : future
- Out: <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]>
-
- In : def spam():
- ...: yield from future
- ...:
-
- In : s = spam()
-
- In : next(s)
- Out: <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]>
新的替代yield from
的用法await必须在异步函数(用 async def申明)中使用:
- In : def spam():
- ...: await future
- ...:
- File "cell_name", line 5
- SyntaxError: 'await' outside async function
Eventloop除了支持协程,还支持注册Future和Task2种类型的对象,那为什么要存在Future和Task这2种类型呢?
先回忆前面的例子,Future是协程的封装,Future对象提供了很多任务方法(如完成后的回调、取消、设置任务结果等等),但是开发者并不需要直接操作Future这种底层对象,而是用Future的子类Task协同的调度协程以实现并发。
Task非常容易创建和使用:
- # 或者用task = loop.create_task(a())
- In : task = asyncio.ensure_future(a())
-
- In : task
- Out: <Task pending coro=<a() running at /Users/dongwm/mp/2019-05-22/coro1.py:4>>
-
- In : task.done()
- Out: False
-
- In : await task
- Suspending a
- Resuming a
-
- In : task
- Out: <Task finished coro=<a() done, defined at /Users/dongwm/mp/2019-05-22/coro1.py:4> result=None>
-
- In : task.done()
- Out: True
在代码中使用async/await是不是就能发挥asyncio的并发优势么,其实是不对的,我们先看个例子:
- async def a():
- print('Suspending a')
- await asyncio.sleep(3)
- print('Resuming a')
-
-
- async def b():
- print('Suspending b')
- await asyncio.sleep(1)
- print('Resuming b')
-
-
- async def s1():
- await a()
- await b()
有2个协程a和b,分别sleep1秒和3秒,如果协程可以并发执行,那么执行时间应该是sleep最大的那个值(3秒),现在它们都在s1协程里面被调用。大家先猜一下s1会运行几秒?
我们写个小程序验证一下:
- def show_perf(func):
- print('*' * 20)
- start = time.perf_counter()
- asyncio.run(func())
- print(f'{func.__name__} Cost: {time.perf_counter() - start}')
大家注意我这个时间计数用的方法,没有用time.time,而是用了Python 3.3新增的time.perf_counter它是现在推荐的用法。我们在IPython里面验证下:
- In : from coro2 import *
-
- In : show_perf(s1)
- ********************
- Suspending a
- Resuming a
- Suspending b
- Resuming b
- s1 Cost: 4.009796932999961
看到了吧,4秒!!!,相当于串行的执行了(sleep 3 + 1)。这是错误的用法,应该怎么用呢,前面的asyncio.gather就可以:
- async def c1():
- await asyncio.gather(a(), b())
-
- In : show_perf(c1)
- ********************
- Suspending a
- Suspending b
- Resuming b
- Resuming a
- c1 Cost: 3.002452698999832
看到了吧,3秒!另外一个是asyncio.wait:
- async def c2():
- await asyncio.wait([a(), b()])
-
- In : show_perf(c2)
- ...
- c2 Cost: 3.0066957049998564
同样是3秒。先别着急,gather和wait下篇文章还会继续对比。还有一个方案就是用asyncio.create_task:
- async def c3():
- task1 = asyncio.create_task(a())
- task2 = asyncio.create_task(b())
- await task1
- await task2
-
-
- async def c4():
- task = asyncio.create_task(b())
- await a()
- await task
-
- In : show_perf(c3)
- ...
- c3 Cost: 3.002332438999929
-
- In : show_perf(c4)
- ...
- c4 Cost: 3.002270970000154
都是3秒。asyncio.create_task相当于把协程封装成Task。不过大家要注意一个错误的用法:
- async def s2():
- await asyncio.create_task(a())
- await asyncio.create_task(b())
-
- In : show_perf(s2)
- ...
- s2 Cost: 4.004671427999938
**直接await task不会对并发有帮助。asyncio.create_task是Python 3.7新增的高阶API,*是推荐的用法,其实你还可以用asyncio.ensure_future和loop.create_task:
- async def c5():
- task = asyncio.ensure_future(b())
- await a()
- await task
-
-
- async def c6():
- loop = asyncio.get_event_loop()
- task = loop.create_task(b())
- await a()
- await task
-
- In : show_perf(c5)
- ...
- c5 Cost: 3.0033873750003295
-
- In : show_perf(c6)
- ...
- c6 Cost: 3.006120122000084
到这里,我们一共看到2种错误的,6种正确的写法。你学到了么?
本文代码可以在 mp项目 找到
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。