赞
踩
如果你是python开发工程师,你不学asyncio,真的是你的遗憾。asyncio 是用来编写并发代码的库,
asyncio是基于协程的并发,是在一个单线程内实现并发,使用 async/await 语法。asyncio 被用作
多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
asyncio 往往是构建IO密集型和高层级结构化网络代码的最佳选择,在看本文章的时候,最好去了解下
asynio 的使用,了解常用的方法。本文主要讲以下几点:
1. asyncio Handle的原理
2. asyncio Future的原理
3. asyncio Task的原理
4. asyncio 网络底层封装
源码可以参考:https://github.com/zhaojiangbing/asyncio。后续文章会可能会讲解asyncio在
后端常用主件的使用
在了解asyncio之前我们先回顾下python的一些知识点,如果你是高手就不用看这小节了。
python异步编程我想都离不开 yield、yield from 了吧,python中当普通函数中出现
yield、yield from,那么该函数就变成一个生成器了,通过 send、next 让生成器执行,
当遇到 yield、yield from 语句就会停止执行下面的语句,需要再次调用 next、send 让
生成器从 yield 、yield from 语句处的下一条语句开始运行,send 相对于 next 会向生
成器传入值,下面代码中 send_value 就是 send 传入的值,yield from 语句就是起传递作用。
def fib(number): # 斐波那契数列实现
n, a, b = 0, 0, 1
while n < number:
send_value = yield b # 这里返回b不会继续向下运行了,
# 直到下一次next或者send从下面一行代码开始运行
print("send_value: ", send_value) # 打印send过来的值
a, b = b, a + b
n = n + 1
return 'OK!'
fib_func = fib(10) # 创建生成器fib_func
def print_num():
ok = yield from fib_func # 这里只是把send或者next过来的信号传递给生成器fib_func,
# 并且把生成器fib_func yield处的值传回
return ok # 这里返回的是生成器(fib_func) return返回的值,
# 不要理解成是生成器(fib_func) yield处返回的b
print_func = print_num() # 创建生成器print_func
a = print_func.send(None)
print("生成器返回值:", a)
a = print_func.send(1212)
print("生成器返回值:", a)
a = next(print_func)
print("生成器返回值:", a)
a = next(print_func)
print("生成器返回值:", a)
迭代是Python最强大的功能之一,是访问集合元素的一种方式,迭代器对象从集合的第一个元素开始访问,
直到所有的元素被访问完结束。迭代器只能往前不会后退,迭代器有两个基本的方法:iter() 和 next(),
字符串,列表或元组对象都可用于创建迭代器。
list=[1,2,3,4]
it = iter(list) # 创建迭代器对象
for x in it:
print (x, end=" ") # 输出 1 2 3 4
# 当遍历完的时候会抛出StopIteration异常
上面可以看出迭代器 it 是可以被循环迭代的,下面我们用 __iter__()、 __next__()方法实现类迭代器。
class MyNumbers:
def __iter__(self):
print("我是iter函数")
self.a = 1
return self
def __next__(self):
print("我是next函数")
x = self.a
self.a += 1
return x
myclass = MyNumbers()
myiter = iter(myclass) # 我是iter函数
print(next(myiter)) # 我是next函数
# 1
print(next(myiter)) # 我是next函数
# 2
print(next(myiter)) # 我是next函数
# 3
输出结果为:
我是iter函数
我是next函数
1
我是next函数
2
我是next函数
3
上面可以看到要用 next() 函数迭代之前,需要调用 iter() 函数,其实 iter() 函数执行了 MyNumbers 类的
__iter__() 方法,而 next() 函数执行了 MyNumbers 类的 __next__() 方法,下面我们循环遍历迭代器。
class MyNumbers:
def __iter__(self):
print("我是iter函数")
self.a = 1
return self
def __next__(self):
if self.a <= 5: # 大于5就结束迭代器
x = self.a
self.a += 1
return x
else:
raise StopIteration
myclass = MyNumbers()
for x in myclass:
print(x)
输出结果为:
我是iter函数
1
2
3
4
5
可以看到循环遍历迭代器对象会先后调用 MyNumbers 类的 __iter__()、__next__() 方法,
并且还会处理 StopIteration 异常,我们还可以用 yield from 遍历迭代器。
class MyNumbers:
def __iter__(self):
print("我是iter函数")
self.a = 1
return self
def __next__(self):
if self.a <= 5: # 大于5就结束迭代器
x = self.a
self.a += 1
return x
else:
raise StopIteration
myclass = MyNumbers() # 声明一个迭代器对象
def yield_from():
yield from myclass # 用yield from遍历迭代器
return
yield_obj = yield_from()
for i in yield_obj:
print(i)
输出结果是:
我是iter函数
1
2
3
4
5
在常规的多线程编程中定义一个全局变量,如果某一个线程修改了全局变量,那么其他的线程读取
的可能就是修改了的值。那有没有一种方法定义了全局变量,各个线程读写全局变量互不影响呢?
python就刚好提供了local(线程局部存储)来做线程安全,我们直接看下面代码吧。
import threading # 这个是python多线程常用的包
local = threading.local() # 定义一个全局local
local.tname = "main" # 设置local全局变量的值
tname = "main" # 定义一个普通全局变量
def func(info):
local.tname = info
global tname # 表示使用全局变量
tname = info
print(local.tname, tname)
t1 = threading.Thread(target=func, args=['funcA']) # 创建线程t1
t2 = threading.Thread(target=func, args=['funcB']) # 创建线程t2
t1.start() # 启动线程t1
t1.join() # 主线程等待线程t1
t2.start() # 启动线程t2
t2.join() # 主线程等待线程t2
print(local.tname, tname) # 只有当线程t1, t2结束了才会打印
输出结果为:
funcA funcA
funcB funcB
main funcB
可以看到两个线程分别修改了全局变量 local 的 tname 属性,但是呢主线程并没有改变,说明全局
local 对象的属性在多线程中并不共享,而普通全局变量 tname 在主线程的值却是最后一个线程修改的值,
从结果可以知道变量 tname 最后修改的线程为t2。
python3.5开始出现 async、await 关键字,async、await 往往是配套使用的。asyncio中 async
放在函数定义的前面,那么这个函数就变成了协程,如果函数中涉及到阻塞,那么就需要在涉及到阻塞的
语句前面加 await,常见的网络阻塞操作都是由asyncio封装了的,aiohttp,aio_pika、aiomysql等
涉及到tcp传输协议的的操作,最终都会走asyncio提供的tcp协议api。
上面提到了网络阻塞,其实就是socket的阻塞,但是在asyncio中,所有的socket操作都设置成了非阻塞,
而对于涉及到socket的阻塞,都会用 future 代替,future 其实就是一个可迭代对象,实现了
__iter__() 方法,这个方法其实也是一个生成器,所以在函数中出现 yield from future 的时候就
会让函数阻塞,切换到其他协程。
from asyncio import Future, coroutine
future = Future
@coroutine
def worker():
yield from future
async def a_worker():
await future
其实上面的 worker()、a_worker() 两个函数是等价的,所以asyncio中要把一个生成器(函数)变成协程(函数)
用 coroutine 装饰器就可以了,coroutine 原理其实也很简单,就是把生成器(函数)封装成含有 __await__()
方法的对象,这样才能满足 async、await 语法。
import asyncio
async def hello(): # async表示该函数已经被定义成协程(函数)
print("我是hello开始")
await asyncio.sleep(2) # asyncio.sleep代替阻塞请求,此时会等待,让出cpu
print("我是hello结束")
async def world():
print("我是world开始")
await asyncio.sleep(2) # 此时会等待,让出cpu
print("我是world结束")
loop = asyncio.get_event_loop() # 创建一个事件循环
loop.create_task(hello()) # 把协程(函数)添加到事件循环
loop.create_task(world()) # 把协程(函数)添加到事件循环
loop.run_forever() # 运行事件循环
# loop.run_until_complete()
从上面可以看到要想运行协程(函数),首先用 asyncio.get_event_loop() 函数创建事件循环(loop),
把协程(函数)通过 asyncio.create_task() 函数创建成任务,最后通过 loop.run_forever() 函数
运行事件循环,处理每一个任务。
asyncio 实现异步是需要一个事件循环的,什么是事件循环,你是不是觉得有点抽象,其实理解很简单,
假如有一个事件循环的类(Loop),创建该类的实例(loop),loop 有一个就绪队列(_ready)属性,当
有任务的时候就通过 loop 添加到队列(_ready)里面,loop 还有一个运行事件循环的方法(run_forever),
方法里写一个死循环,遍历队列(_ready)里的任务,最后处理任务,具体是怎么个流程后续章节详解。
后面没特别说明,事件循环类就用 Loop 表示。
前面我们有对什么是事件循环(loop)解释过了,当我们要把函数加入 loop 的时候,函数必须封装成 Handle,
然后在把 Handle 添加到 loop, Handle 添加到 loop 是怎么回事呢? 这里要从 loop 的两个属性(_ready、
_scheduled)说起,这两个属性其实就是两个队列,添加 Handle 到 loop,其实就是添加到这两个队列里面,
_ready 存放的是立刻要执行的 Handle,我就叫它就绪队列;_scheduled 存放的是延时执行的 TimerHandle,
我就叫它计划列表。asyncio底层实现了 Handle、TimerHandle 两个类,TimerHandle 继承 Handle,
源码参考:(https://github.com/zhaojiangbing/asyncio/blob/master/events.py)。
定义:handle 是 Handle 的实例,下面看看这个类的省略代码:
class Handle:
def __init__(self, callback, args, loop):
self._loop = loop # 这就是事件循环
self._callback = callback
self._args = args
self._cancelled = False
self._repr = None
if self._loop.get_debug():
self._source_traceback = extract_stack(sys._getframe(1))
else:
self._source_traceback = None
def cancel(self): # 取消handle
if not self._cancelled:
self._cancelled = True
if self._loop.get_debug():
self._repr = repr(self)
self._callback = None
self._args = None
def _run(self):
try:
self._callback(*self._args)
except Exception as exc:
cb = _format_callback_source(self._callback, self._args)
msg = 'Exception in callback {}'.format(cb)
context = {
'message': msg,
'exception': exc,
'handle': self,
}
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
self = None # Needed to break cycles when an exception occurs.
从代码中可以看出 handle 有个 _run() 方法,_run() 方法中会调用 _callback 属性。
定义:loop 是 Loop 的实例,下面我们来看看 Handle 在事件循中的角色代码:
import collections
class Loop(object): # 事件循环类
def __init__(self):
self._ready = collections.deque() # 就绪队列,存放马上就要运行的handle
self._scheduled = [] # 计划列表,存放延时的handle
self._stopping = False # 如果True,事件循环运行结束
def _add_callback(self, handle):
"""添加handle到就绪队列_ready。"""
assert isinstance(handle, Handle), '我去,你都不是我包养的类型'
if handle._cancelled:
return
self._ready.append(handle)
def run_forever(self):
"""运行事件循环。"""
while True:
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled: # 如果handle被取消了就不会运行
continue
handle._run()
if self._stopping: # 结束事件循环
break
def worker():
print("我是被Handle包养了的鸭子")
if __name__ == "__main__":
loop = Loop() # 创建一个事件循环loop
handle = Handle(worker) # 把worker函数包装成handle
loop._add_callback(handle) # 添加handle到事件循环
loop.run_forever() # 运行事件循环
其实上面的注释很清晰了,创建事件循环(loop),函数 worker() 封装成 handle,worker() 存放在
handle._callback 属性上,用 loop._add_callback() 方法把 handle 添加到事件循环就绪队列(_ready),
调用 loop.run_forever() 方法运行事件循环,loop.run_forever() 会遍历出 handle,如果
handle._cancelled 属性不等于True,就会调用 handle._run() 方法了,handle._run() 方法会调用
self._callback 属性,其实就是调用 worker() 函数。停止事件循环,只需要把 loop._stopping 属性
设置为True。这里使用的是 Loop,是为了让读者明白asyncio底层实现,原理想通。
定义:handle 是 TimerHandle 的实例,下面看看这个类的省略代码:
class TimerHandle(Handle):
__slots__ = ['_scheduled', '_when'] # __slots__ 存放暴露外面的属性
def __init__(self, when, callback, args, loop):
assert when is not None
super().__init__(callback, args, loop)
if self._source_traceback:
del self._source_traceback[-1]
self._when = when
self._scheduled = False
TimerHandle 在 Handle 基础上增加了 _when 属性,实现了定时器等功能,在工作中肯定会遇到让某个任务延时运行,比如30秒后运行,下面我们来看看 TimerHandle 在事件循中的角色代码:
import collections
import time
class Loop(object):
def __init__(self):
self._ready = self._ready = collections.deque() # 就绪队列
self._scheduled = [] # 计划列表
self._stopping = False # 如果True,事件循环运行结束
def call_later(self, delay, callback, *args): # 添加延时任务
"""添加延时任务到事件循环
:param delay: int,延时的时间
:param callback: 回调函数
:param args: 回调函数的参数
:return:
"""
execute_time = time.time() + delay # 获取callback执行的时间
# execute_time在TimerHandle初始化的时候赋值给handle._when
handle = TimerHandle(execute_time, callback, args) # 封装成TimerHandle
self._scheduled.append(handle) # 添加到计划列表
def run_forever(self):
"""运行事件循环。"""
while True:
while self._scheduled:
handle = self._scheduled.pop(0) # 从计划列表移除
if handle._cancelled:
continue
if handle._when >= time.time(): # 只有当_when大等于当前时间才会放入就绪队列
self._ready.append(handle) # 添加到就绪队列
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft() # 移除handle
if handle._cancelled: # 如果handle被取消了就不会运行
continue
handle._run()
if self._stopping: # 结束事件循环
break
def worker():
print("TimerHandle包养我30秒")
if __name__ == "__main__":
loop = Loop() # 创建一个事件循环 loop
loop.call_later(30, worker) # 30秒后调用worker
loop.run_forever() # 运行事件循环
创建一个事件循环(loop),函数 worker() 通过 loop.call_later() 添加到 loop,注意这里不是添加到
就绪队列(_ready),而是添加到计划列表(_scheduled),在添加之前会用 TimerHandle 封装 worker()
函数, 并把 worker() 函数执行的时间赋值给 TimerHandle 的 _when 属性,loop.run_forever() 每
一轮循环中,只有当 handle._when 大于当前时间才会把 handle 添加到就绪队列(_ready),
从而达到延时执行的效果。
Future 我们叫它未来对象,Future 有三个状态,分别是 _PENDING(就绪)、_CANCELLED(取消)、_FINISHED(完成)。
future 是一个可迭代对象,如果某个函数里出现 yeild from future 语句,那么这个函数就是生成器,
当生成器运行到 yeild from future 语句时会阻塞,那这个生成器要怎么才能继续运行下去呢? 条件有两点,
I:future 状态是 _CANCELLED(取消) 或者 _FINISHED(完成), II:必须有一个东西来 send、next 这个
生成器,这个东西就是 Task。本章节只讲解第I点,第II点在下一章节讲解,那为什么第I点是必要条件?
下面看 asyncio.Future 类的省略代码来找答案。Future 类的源码参考
(https://github.com/zhaojiangbing/asyncio/blob/master/futures.py)
定义:future 是 Future 的实例,下面看看这个类的省略代码:
class Future:
_state = _PENDING # Future的状态
_result = None #
_exception = None
def __init__(self, *, loop=None): # loop就是要传入的事件循环
if loop is None:
self._loop = events.get_event_loop()
else:
self._loop = loop
self._callbacks = []
if self._loop.get_debug():
self._source_traceback = events.extract_stack(sys._getframe(1))
def cancel(self): # 取消future
self._log_traceback = False
if self._state != _PENDING:
return False
self._state = _CANCELLED # 更改为取消状态
self._schedule_callbacks()
return True
def _schedule_callbacks(self): # 把future的回调函数添加到事件循环
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback in callbacks:
# 添加到事件循环,call_soon里面会封装callback为Handle,然后添加到loop._ready
self._loop.call_soon(callback, self)
def cancelled(self): # 判断future是否取消
return self._state == _CANCELLED
def done(self):
return self._state != _PENDING # 判断future是否取消或者完成
def result(self): # 获取future结果
if self._state == _CANCELLED:
raise CancelledError
if self._state != _FINISHED:
raise InvalidStateError('Result is not ready.')
self._log_traceback = False
if self._exception is not None:
raise self._exception
return self._result
def add_done_callback(self, fn): # future添加回调函数
if self._state != _PENDING: # 如果不等于就绪
self._loop.call_soon(fn, self) # 添加到事件循环
else:
self._callbacks.append(fn) # 添加回调函数到回调列表
def remove_done_callback(self, fn): # 移除future回调函数
filtered_callbacks = [f for f in self._callbacks if f != fn]
removed_count = len(self._callbacks) - len(filtered_callbacks)
if removed_count:
self._callbacks[:] = filtered_callbacks
return removed_count
def set_result(self, result): # 设置future结果
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED # 更改状态为完成
self._schedule_callbacks()
def __iter__(self): # future是一个可迭代对象
if not self.done(): # future没有完成或者取消的时候,会再次返回future
self._asyncio_future_blocking = True
yield self
assert self.done(), "yield from wasn't used with future"
# future完成或者取消的时候,会返回future的结果
return self.result()
if compat.PY35:
__await__ = __iter__
看了 Future 类的代码总结如下:
a、future 三个状态的分别是 _PENDING(就绪)、_CANCELLED(取消)、_FINISHED(完成),状态存放在
future._state 属性, 在 future 创建的时候 future._state = _PENDING,通过 future.done()
方法判断是否取消或完成,通过 future.cancelled() 方法判断是否取消。
b、future 可以通过 future.add_done_callback()、future.remove_done_callback() 方法添加、
移除回调函数。添加回调的时候,当future._state != _PENDING的时候,回调函数就会通过
loop.call_soon() 方法添加到事件循环就绪队列(_ready),否则就添加到回调列表 future._callbacks里。
c、future 是可以通过 future.set_result() 设置结果,结果存放到 future._result 属性,通过
future.result() 方法获取结果。在设置结果的时候 future._state 会变为 _FINISHED,future
的所有回调函数会添加到事件循环就绪队列(_ready)。
d、future 是可以通过 future.cancel() 方法取消的,取消的时候,结果默认设置为None,future._state
会变为 _CANCELLED,future 的所有回调函数会添加到事件循环就绪队列(_ready)。
e、future 实现了 __iter__() 方法,该方法还是个生成器,future 就是可迭代对象了,那么在函数中使用
yield from future 语句的时候就会调用 __iter__() 方法,可以看到这个语句是会让函数阻塞而让出cpu
运行其他的函数,从而达到函数切换的目的。
上面提到,如果函数里面有 yield from future 语句的时候,这个函数就变成了生成器,由于 future 实现了
__iter__() 方法,所以才能在函数中用 yield from future。只有当 future.done() 方法返回True的时
候,也就是 future 的状态变为非就绪状态(!_PENDING),future.__iter__() 函数才会返回 future 的结
果,否则就返回 future 自己(self),当返回self的时候,事件循环会重复上面第II点,这里也就回答了第I点
是生成器继续运行的必要条件。从 Future 类省略代码我们可以看到,要想让 future 的状态变成非就绪状态(!_PENDING),
可以设置 future 的结果或者取消 future,这两种操作也会让 future 回调函数得执行。
future 在 asyncio 中的使用:
import time
import asyncio
from asyncio import Future, coroutine
def call_back(future):
print("我是 future 的回调函数", time.time())
@coroutine # 把生成器转换成协程(函数)
def worker(future):
print("我在等待 future", time.time()) # 打印时间
result = yield from future # 这里会等待10秒,其他的协程函数会运行,等future设置了结果,就会继续运行
print("future 结果:", result, time.time()) # 打印future结果
async def corout_worker(): # 这个函数等价于worker()函数
await future # 这里和yiled from future等价,
async def set_result(future): # 协程函数
await asyncio.sleep(5) # 睡眠10秒后设置future的结果
print("future设置结果前的状态:", future._state)
future.set_result("我是future")
print("future设置结果后的状态:", future._state)
if __name__ == "__main__":
future = Future() # 创建future
future.add_done_callback(call_back) # 添加回调函数
loop = asyncio.get_event_loop() # 获取事件循环
loop.create_task(worker(future)) # 把协程(函数)添加到事件循环
loop.create_task(set_result(future)) # 把协程(函数)添加到事件循环
loop.run_forever() # 运行事件循环
执行结果如下:-----------------------
我在等待 future 1603595207.5414805
future设置结果前的状态: PENDING
future设置结果后的状态: FINISHED
我是 future 的回调函数 1603595212.5418484
future 结果: 我是future 1603595212.5418484
上面代码首先创建一个全局的 future,future 把 call_back() 函数添加为回调函数,回调函数第一个参数就是 future
自己(self),然后创建一个事件循环,注意这里不再是上面举例的事件循环(Loop),worker() 函数是个生成器,由于
asyncio 提倡使用 async await 语法,所以用 @coroutine 把生成器函数 worker() 装饰成 async await 类型的函
数,也就是协程(函数),这里就知道了生成器函数怎么变协程(函数), 对于 async await 可以参考python技术知识点复习
章节,worker() 函数其实和 corout_worker() 函数是等价的。通过 loop.create_task() 方法把 worker()、
set_result() 两个协程(函数)添加到事件循环,最后运行事件循环。从上面运行结果可以知道,worker() 协程在遇到
yield from future 的时候会阻塞,而 set_result() 协程5秒后会设置 future 的结果,这样 worker() 协程5秒后就继续运行了。
上一章讲解了生成器怎么变协程(函数),由于协程(函数)里面底层有 yield from future,要想协程(函数)运行,
需要 send、next 去催动,其实就是通过 task._step() 方法去 send 协程(函数),那么首先就要把协程函数封
装成 Task。源码参考:(https://github.com/zhaojiangbing/asyncio/blob/master/tasks.py)
定义:task是 Task 的实例,下面看看这个类的省略代码:
class Task(futures.Future):
def __init__(self, coro, *, loop=None): # 这里的loop就是事件循环
assert coroutines.iscoroutine(coro), repr(coro)
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
self._coro = coro # 存放协程(函数)
self._fut_waiter = None
self._must_cancel = False
self._loop.call_soon(self._step) # 添加_step()方法到事件循环
self.__class__._all_tasks.add(self)
def _step(self, exc=None):
assert not self.done(), \
'_step(): already done: {!r}, {!r}'.format(self, exc)
if self._must_cancel:
if not isinstance(exc, futures.CancelledError):
exc = futures.CancelledError()
self._must_cancel = False
coro = self._coro # 其实这就是存放协程函数的属性
self._fut_waiter = None
self.__class__._current_tasks[self._loop] = self
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
self.set_exception(futures.CancelledError())
else:
self.set_result(exc.value)
except futures.CancelledError:
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
self.set_exception(exc)
except BaseException as exc:
self.set_exception(exc)
raise
else:
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# Yielded Future must come from Future.__iter__().
if result._loop is not self._loop:
self._loop.call_soon(
self._step,
RuntimeError(
'Task {!r} got Future {!r} attached to a '
'different loop'.format(self, result)))
elif blocking:
if result is self:
self._loop.call_soon(
self._step,
RuntimeError(
'Task cannot await on itself: {!r}'.format(
self)))
else:
result._asyncio_future_blocking = False
result.add_done_callback(self._wakeup)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel():
self._must_cancel = False
else:
self._loop.call_soon(
self._step,
RuntimeError(
'yield was used instead of yield from '
'in task {!r} with {!r}'.format(self, result)))
elif result is None:
# Bare yield relinquishes control for one event loop iteration.
self._loop.call_soon(self._step)
elif inspect.isgenerator(result):
self._loop.call_soon(
self._step,
RuntimeError(
'yield was used instead of yield from for '
'generator in task {!r} with {!r}'.format(
self, result)))
else:
# Yielding something else is an error.
self._loop.call_soon(
self._step,
RuntimeError(
'Task got bad yield: {!r}'.format(result)))
finally:
self.__class__._current_tasks.pop(self._loop)
self = None # Needed to break cycles when an exception occurs.
从 Task 省略代码可以看出,Task 继承了 Future,那么 Task 有 Future 的所有功能,前面章节我们说
loop.create_task(worker()) 把 worker() 协程添加到事件循环,其实并不是直接添加,那是怎么个流程?
我们先看看 asyncio 中真实的事件循环,其实 loop 就是 BaseEventLoop 及其子类的实例,
下面看看这个类的省略代码:
class BaseEventLoop(events.AbstractEventLoop):
def call_soon(self, callback, *args):
self._check_closed()
if self._debug:
self._check_thread()
self._check_callback(callback, 'call_soon')
handle = self._call_soon(callback, args)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
def _call_soon(self, callback, args):
handle = events.Handle(callback, args, self)
if handle._source_traceback:
del handle._source_traceback[-1]
self._ready.append(handle)
return handle
def create_task(self, coro):
self._check_closed()
if self._task_factory is None:
task = tasks.Task(coro, loop=self) # 创建一个task
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
return task
def run_forever(self): # self就是loop
try:
while True: # 写的一个死循环
self._run_once() # 这就是每一轮循环处理的事情。
if self._stopping: # 当事件循环暂停的时候就退出循环。
break
finally:
pass
def _run_once(self):
event_list = self._selector.select(timeout) # 获取就绪fd列表,返回就绪列表
self._process_events(event_list) # _process_events函数里面调用了_add_callback方法,
# _add_callback方法会把fd的回调函数加入 self._ready 就绪队列里面
end_time = self.time() + self._clock_resolution
while self._scheduled: # 遍历 _scheduled 计划列表
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle) # 添加到 self._ready 就绪队列里面
ntodo = len(self._ready)
for i in range(ntodo): # 遍历 self._ready 就绪队列
handle = self._ready.popleft() # 移除handle,handle就是协程函数的封装,
handle._run() # handle._run方法
handle = None # Needed to break cycles when an exception occurs.
看了上面代码可以知道协程(worker())被执行的流程如下:
a、loop.create_task(worker()) 把 worker() 封装成 task。---->
b、task.__init__() 把 worker() 赋值给了 task._coro,同时调用 loop.call_soon(task._step) ---->
c、loop.call_soon() 里面调用 loop._call_soon(),会把 task._step 封装成 Handle,最后把 Handle 添加的就绪队列(loop._ready) ---->
d、task.run_forever() 运行事件循环,循环调用 loop._run_once() ---->
e、loop._run_once() 是事件循环每一轮循环要做的事,如下:
i、会遍历计划列表(self._scheduled),把超时的 TimerHandle 添加到就绪队列(loop._ready) ---->
ii、遍历就绪的io,也就是被监听起来的文件描述符,每个文件描述符都有对应的 Handle,把 Handle 添加到就绪队列(loop._ready) ---->
iii、遍历就绪队列(loop._ready),处理 handle ---->
f、处理 Handle 其实就是调用 handle.run(),最终就是调用 task._step() ---->
g、task._step() 函数通过 send 去催动 task._coro,也就是催动 worker() 函数,有2中结果:
i、如果 worker() 返回的是 future,再把 task._step 添加到就绪队列(loop._ready),这样又可以在下一轮循环继续催动 worker() 函数---->
ii、如果 worker() 返回的不是 future,就会抛出生成器异常,就把异常设置 future 的结果。
看了上面流程,就知道协程(函数)是怎么被事件循环调度了,可以发现,平时的网络io阻塞操作,都变成了非阻塞io,调用io
操作的协程(函数)不用等待io就绪,而是变成等待 future 完成或者取消,当协程(函数)里涉及的io就绪的时候,那么协程
(函数)里面的 future 也会完成或者取消,同时会在每一轮循环中 send 协程(函数),让协程(函数)继续运行。
上面章节讲了,网络请求的socket阻塞都变成了非阻塞,对网络io的等待,变成了对 future 的完成或者取消,
那 asyncio 底层是怎么实现这个转变的呢?本章就来回答这个问题,底层源码自行浏览,这里主要讲解下流程
1、当有 http 请求时,协程会创建一个 socket,把 socket 设置为非阻塞,socket 进行远端连接,把 socket fd 当成-写事件-注册
到io多路复用 selector 监听起来。由于是非阻塞,连接函数就不用等待网络阻塞,创建一个 future,等待 future 设置结果,并
且让出cpu,future 设置回调函数,回调函数就是把 socket fd -写事件-从 selector 注销。当连接成功的时候,事件循环会设置
future 的结果,从而让协程继续运行,最后还会调用回调函数。
2、当协程连接创建了以后,会创建传输工具(transport)和协议(protocol)。创建 transport 的时候,又会把socket fd
当成-读事件-注册到 selector 监听起来,并且会把 transport 的 _read_ready 方法添加到 selector, _read_ready 方
法会调用 socket 的读取函数来读取请求结果,并且把结果放入到 protocol 的 buffer 里面,在数据读取完后会把 socket fd -读
事件注销掉,注意这里没有调用 _read_ready 方法,而是把它当 selector 回调,等 socket fd -读事件-就绪以后才会调用
_read_ready 方法。
3、当协程传输工具(transport)和协议(protocol)创建以后,就会通过 transport 发送数据,又会把 socket fd 当成
-写事件-注册到 selector 监听起来,并且会把 transport 的 _write_ready 方法添加到 selector,_write_ready
方法会调用socket的发送函数来发送transport buffer里的数据,在数据发送完后会把socket fd-读事件注销掉,注意这
里没有调用 _write_ready 方法,而是把它当selector回调,等socket fd -写事件-就绪以后才会调用 _write_ready
方法。
4、上面第2点讲了,事件循环协程数据放到了 protocol buffer 里面,那协程只要返回 buffer 里面的数据就可以响应请求了,
你可能会想到,协程是怎么等待 buffer 里有数据的呢,其实还是借用了 future 的功能,协程在读取数据数据的时候,还是会创
建一个 future,第2点中的 _read_ready 函数在读取数据完成后会设置 future 的结果。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。