当前位置:   article > 正文

python异步编程asyncio原理分析

asyncio原理

一、asyncio介绍


如果你是python开发工程师,你不学asyncio,真的是你的遗憾。asyncio 是用来编写并发代码的库,
asyncio是基于协程的并发,是在一个单线程内实现并发,使用 async/await 语法。asyncio 被用作
多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
asyncio 往往是构建IO密集型和高层级结构化网络代码的最佳选择,在看本文章的时候,最好去了解下
asynio 的使用,了解常用的方法。本文主要讲以下几点:
	
	1. asyncio Handle的原理
	2. asyncio Future的原理
	3. asyncio Task的原理
	4. asyncio 网络底层封装
	
源码可以参考:https://github.com/zhaojiangbing/asyncio。后续文章会可能会讲解asyncio在
后端常用主件的使用	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

二、python技术知识点复习


在了解asyncio之前我们先回顾下python的一些知识点,如果你是高手就不用看这小节了。
  • 1
  • 2

1、 生成器


python异步编程我想都离不开 yieldyield from 了吧,python中当普通函数中出现 
yieldyield from,那么该函数就变成一个生成器了,通过 send、next 让生成器执行,
当遇到 yieldyield from 语句就会停止执行下面的语句,需要再次调用 next、send 让
生成器从 yieldyield from 语句处的下一条语句开始运行,send 相对于 next 会向生
成器传入值,下面代码中 send_value 就是 send 传入的值,yield from 语句就是起传递作用。


def fib(number):  # 斐波那契数列实现
    n, a, b = 0, 0, 1
    while n < number:
        send_value = yield b  # 这里返回b不会继续向下运行了,
        					  # 直到下一次next或者send从下面一行代码开始运行
        print("send_value: ", send_value)  # 打印send过来的值
        a, b = b, a + b
        n = n + 1
    return 'OK!'

fib_func = fib(10)  # 创建生成器fib_func

def print_num():
    ok = yield from fib_func  # 这里只是把send或者next过来的信号传递给生成器fib_func,
    						  # 并且把生成器fib_func yield处的值传回
    return ok  # 这里返回的是生成器(fib_func) return返回的值,
    		   # 不要理解成是生成器(fib_func) yield处返回的b

print_func = print_num()  # 创建生成器print_func

a = print_func.send(None)
print("生成器返回值:", a)
a = print_func.send(1212)
print("生成器返回值:", a)
a = next(print_func)
print("生成器返回值:", a)
a = next(print_func)
print("生成器返回值:", a)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

2、迭代器


迭代是Python最强大的功能之一,是访问集合元素的一种方式,迭代器对象从集合的第一个元素开始访问,
直到所有的元素被访问完结束。迭代器只能往前不会后退,迭代器有两个基本的方法:iter()next(),
字符串,列表或元组对象都可用于创建迭代器。


list=[1,2,3,4]
it = iter(list)    # 创建迭代器对象
for x in it:
    print (x, end=" ")  # 输出 1 2 3 4
    					# 当遍历完的时候会抛出StopIteration异常


上面可以看出迭代器 it 是可以被循环迭代的,下面我们用 __iter__()、 __next__()方法实现类迭代器。


class MyNumbers:
    def __iter__(self):
        print("我是iter函数")
        self.a = 1
        return self

    def __next__(self):
        print("我是next函数")
        x = self.a
        self.a += 1
        return x


myclass = MyNumbers()


myiter = iter(myclass)  # 我是iter函数
print(next(myiter))  # 我是next函数
                     # 1
print(next(myiter))  # 我是next函数
                     # 2
print(next(myiter))  # 我是next函数
                     # 3


输出结果为:

我是iter函数
我是next函数
1
我是next函数
2
我是next函数
3


上面可以看到要用 next() 函数迭代之前,需要调用 iter() 函数,其实 iter() 函数执行了 MyNumbers 类的
__iter__() 方法,而 next() 函数执行了 MyNumbers 类的 __next__() 方法,下面我们循环遍历迭代器。


class MyNumbers:
    def __iter__(self):
        print("我是iter函数")
        self.a = 1
        return self


    def __next__(self):
        if self.a <= 5:  # 大于5就结束迭代器
            x = self.a
            self.a += 1
            return x
        else:
            raise StopIteration


myclass = MyNumbers()
for x in myclass:
    print(x)


输出结果为:

我是iter函数
1
2
3
4
5


可以看到循环遍历迭代器对象会先后调用 MyNumbers 类的 __iter__()、__next__() 方法,
并且还会处理 StopIteration 异常,我们还可以用 yield from 遍历迭代器。
	

class MyNumbers:  
    def __iter__(self):
        print("我是iter函数")
        self.a = 1
        return self

    def __next__(self):
        if self.a <= 5:  # 大于5就结束迭代器
            x = self.a
            self.a += 1
            return x
        else:
            raise StopIteration


myclass = MyNumbers()  # 声明一个迭代器对象


def yield_from():
    yield from myclass  # 用yield from遍历迭代器
    return
yield_obj = yield_from()


for i in yield_obj:
    print(i)


输出结果是:

我是iter函数
1
2
3
4
5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127

3、local(线程局部存储)线程安全


在常规的多线程编程中定义一个全局变量,如果某一个线程修改了全局变量,那么其他的线程读取
的可能就是修改了的值。那有没有一种方法定义了全局变量,各个线程读写全局变量互不影响呢?
python就刚好提供了local(线程局部存储)来做线程安全,我们直接看下面代码吧。


import threading  # 这个是python多线程常用的包

local = threading.local()  # 定义一个全局local
local.tname = "main"  # 设置local全局变量的值
tname = "main"  # 定义一个普通全局变量

def func(info):
    local.tname = info
    global tname  # 表示使用全局变量
    tname = info
    print(local.tname, tname)

t1 = threading.Thread(target=func, args=['funcA'])  # 创建线程t1
t2 = threading.Thread(target=func, args=['funcB'])  # 创建线程t2

t1.start()  # 启动线程t1
t1.join()  # 主线程等待线程t1

t2.start()  # 启动线程t2
t2.join()  # 主线程等待线程t2

print(local.tname, tname)  # 只有当线程t1, t2结束了才会打印


输出结果为:

funcA funcA
funcB funcB
main funcB


可以看到两个线程分别修改了全局变量 local 的 tname 属性,但是呢主线程并没有改变,说明全局
local 对象的属性在多线程中并不共享,而普通全局变量 tname 在主线程的值却是最后一个线程修改的值,
从结果可以知道变量 tname 最后修改的线程为t2。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

4、aysnc await关键字


python3.5开始出现 asyncawait 关键字,asyncawait 往往是配套使用的。asyncio中 async 
放在函数定义的前面,那么这个函数就变成了协程,如果函数中涉及到阻塞,那么就需要在涉及到阻塞的
语句前面加 await,常见的网络阻塞操作都是由asyncio封装了的,aiohttp,aio_pika、aiomysql等
涉及到tcp传输协议的的操作,最终都会走asyncio提供的tcp协议api。

上面提到了网络阻塞,其实就是socket的阻塞,但是在asyncio中,所有的socket操作都设置成了非阻塞,
而对于涉及到socket的阻塞,都会用 future 代替,future 其实就是一个可迭代对象,实现了
 __iter__() 方法,这个方法其实也是一个生成器,所以在函数中出现 yield from future 的时候就
会让函数阻塞,切换到其他协程。


from asyncio import Future, coroutine


future = Future


@coroutine
def worker():
	yield from future

async def a_worker():
	await future


其实上面的 worker()、a_worker() 两个函数是等价的,所以asyncio中要把一个生成器(函数)变成协程(函数)
用 coroutine 装饰器就可以了,coroutine 原理其实也很简单,就是把生成器(函数)封装成含有 __await__()
方法的对象,这样才能满足 asyncawait 语法。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

三、asyncio简单使用

1、hello world简明使用


import asyncio


async def hello():  # async表示该函数已经被定义成协程(函数)
    print("我是hello开始")
    await asyncio.sleep(2)  # asyncio.sleep代替阻塞请求,此时会等待,让出cpu
    print("我是hello结束")

async def world():
    print("我是world开始")
    await asyncio.sleep(2)  # 此时会等待,让出cpu
    print("我是world结束")

loop = asyncio.get_event_loop()  # 创建一个事件循环
loop.create_task(hello())  #  把协程(函数)添加到事件循环
loop.create_task(world())  #  把协程(函数)添加到事件循环
loop.run_forever()  # 运行事件循环
# loop.run_until_complete()


从上面可以看到要想运行协程(函数),首先用 asyncio.get_event_loop() 函数创建事件循环(loop),
把协程(函数)通过 asyncio.create_task() 函数创建成任务,最后通过 loop.run_forever() 函数
运行事件循环,处理每一个任务。

asyncio 实现异步是需要一个事件循环的,什么是事件循环,你是不是觉得有点抽象,其实理解很简单,
假如有一个事件循环的类(Loop),创建该类的实例(loop),loop 有一个就绪队列(_ready)属性,当
有任务的时候就通过 loop 添加到队列(_ready)里面,loop 还有一个运行事件循环的方法(run_forever),
方法里写一个死循环,遍历队列(_ready)里的任务,最后处理任务,具体是怎么个流程后续章节详解。
后面没特别说明,事件循环类就用 Loop 表示。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

四、asyncio Handle的原理


前面我们有对什么是事件循环(loop)解释过了,当我们要把函数加入 loop 的时候,函数必须封装成 Handle,
然后在把 Handle 添加到 loop, Handle 添加到 loop 是怎么回事呢? 这里要从 loop 的两个属性(_ready、
_scheduled)说起,这两个属性其实就是两个队列,添加 Handle 到 loop,其实就是添加到这两个队列里面,
_ready 存放的是立刻要执行的 Handle,我就叫它就绪队列;_scheduled 存放的是延时执行的 TimerHandle,
我就叫它计划列表。asyncio底层实现了 Handle、TimerHandle 两个类,TimerHandle 继承 Handle,
源码参考:(https://github.com/zhaojiangbing/asyncio/blob/master/events.py)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1、Handle类


定义:handle 是 Handle 的实例,下面看看这个类的省略代码:


class Handle:

    def __init__(self, callback, args, loop):
        self._loop = loop  # 这就是事件循环
        self._callback = callback
        self._args = args
        self._cancelled = False
        self._repr = None
        if self._loop.get_debug():
            self._source_traceback = extract_stack(sys._getframe(1))
        else:
            self._source_traceback = None

    def cancel(self):  # 取消handle
        if not self._cancelled:
            self._cancelled = True
            if self._loop.get_debug():
                self._repr = repr(self)
            self._callback = None
            self._args = None

    def _run(self):
        try:
            self._callback(*self._args)
        except Exception as exc:
            cb = _format_callback_source(self._callback, self._args)
            msg = 'Exception in callback {}'.format(cb)
            context = {
                'message': msg,
                'exception': exc,
                'handle': self,
            }
            if self._source_traceback:
                context['source_traceback'] = self._source_traceback
            self._loop.call_exception_handler(context)
        self = None  # Needed to break cycles when an exception occurs.


从代码中可以看出 handle 有个 _run() 方法,_run() 方法中会调用 _callback 属性。

定义:loop 是 Loop 的实例,下面我们来看看 Handle 在事件循中的角色代码:


import collections


class Loop(object):  # 事件循环类
    def __init__(self):

        self._ready = collections.deque()  # 就绪队列,存放马上就要运行的handle
        self._scheduled = []  # 计划列表,存放延时的handle
        self._stopping = False  # 如果True,事件循环运行结束

    def _add_callback(self, handle):
    	"""添加handle到就绪队列_ready。"""
		
        assert isinstance(handle, Handle), '我去,你都不是我包养的类型'
        if handle._cancelled:
            return
        self._ready.append(handle)

    def run_forever(self): 
    	"""运行事件循环。"""
    	
        while True:
            ntodo = len(self._ready)
            for i in range(ntodo):
                handle = self._ready.popleft()
                if handle._cancelled:  # 如果handle被取消了就不会运行
                    continue
                handle._run()
            if self._stopping:  # 结束事件循环
                break


def worker():
    print("我是被Handle包养了的鸭子")


if __name__ == "__main__":
    
    loop = Loop()  # 创建一个事件循环loop
    handle = Handle(worker)  # 把worker函数包装成handle
    loop._add_callback(handle)  # 添加handle到事件循环
    loop.run_forever()  # 运行事件循环


其实上面的注释很清晰了,创建事件循环(loop),函数 worker() 封装成 handle,worker() 存放在
handle._callback 属性上,用 loop._add_callback() 方法把 handle 添加到事件循环就绪队列(_ready),
调用 loop.run_forever() 方法运行事件循环,loop.run_forever() 会遍历出 handle,如果
handle._cancelled 属性不等于True,就会调用 handle._run() 方法了,handle._run() 方法会调用
self._callback 属性,其实就是调用 worker() 函数。停止事件循环,只需要把 loop._stopping 属性
设置为True。这里使用的是 Loop,是为了让读者明白asyncio底层实现,原理想通。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97

2、TimerHandle类


定义:handle 是 TimerHandle 的实例,下面看看这个类的省略代码:


class TimerHandle(Handle):

    __slots__ = ['_scheduled', '_when']  # __slots__ 存放暴露外面的属性

    def __init__(self, when, callback, args, loop):
        assert when is not None
        super().__init__(callback, args, loop)
        if self._source_traceback:
            del self._source_traceback[-1]
        self._when = when
        self._scheduled = False


TimerHandle 在 Handle 基础上增加了 _when 属性,实现了定时器等功能,在工作中肯定会遇到让某个任务延时运行,比如30秒后运行,下面我们来看看 TimerHandle 在事件循中的角色代码:


import collections
import time


class Loop(object):
    def __init__(self):

        self._ready = self._ready = collections.deque()  # 就绪队列
        self._scheduled = []  # 计划列表
        self._stopping = False  # 如果True,事件循环运行结束

    def call_later(self, delay, callback, *args):  # 添加延时任务
        """添加延时任务到事件循环

        :param delay: int,延时的时间
        :param callback: 回调函数
        :param args: 回调函数的参数
        :return:
        """

        execute_time = time.time() + delay  # 获取callback执行的时间
        # execute_time在TimerHandle初始化的时候赋值给handle._when
        handle = TimerHandle(execute_time, callback, args)  # 封装成TimerHandle
        self._scheduled.append(handle)  # 添加到计划列表

    def run_forever(self):
		"""运行事件循环。"""
		
        while True:
            while self._scheduled:
                handle = self._scheduled.pop(0)  # 从计划列表移除
                if handle._cancelled:
                    continue
                if handle._when >= time.time():  # 只有当_when大等于当前时间才会放入就绪队列
                    self._ready.append(handle)  # 添加到就绪队列
                
            ntodo = len(self._ready)
            for i in range(ntodo):
                handle = self._ready.popleft()  # 移除handle
                if handle._cancelled:  # 如果handle被取消了就不会运行
                    continue
                handle._run()
            if self._stopping:  # 结束事件循环
                break


def worker():
    print("TimerHandle包养我30秒")


if __name__ == "__main__":

    loop = Loop()  # 创建一个事件循环 loop
    loop.call_later(30, worker)  # 30秒后调用worker
    loop.run_forever()  # 运行事件循环


创建一个事件循环(loop),函数 worker() 通过 loop.call_later() 添加到 loop,注意这里不是添加到
就绪队列(_ready),而是添加到计划列表(_scheduled),在添加之前会用 TimerHandle 封装 worker() 
函数, 并把 worker() 函数执行的时间赋值给 TimerHandle 的 _when 属性,loop.run_forever() 每
一轮循环中,只有当 handle._when 大于当前时间才会把 handle 添加到就绪队列(_ready),
从而达到延时执行的效果。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

五、asyncio Future的原理


Future 我们叫它未来对象,Future 有三个状态,分别是 _PENDING(就绪)、_CANCELLED(取消)、_FINISHED(完成)。
future 是一个可迭代对象,如果某个函数里出现 yeild from future 语句,那么这个函数就是生成器,
当生成器运行到 yeild from future 语句时会阻塞,那这个生成器要怎么才能继续运行下去呢? 条件有两点,
I:future 状态是 _CANCELLED(取消) 或者 _FINISHED(完成), II:必须有一个东西来 send、next 这个
生成器,这个东西就是 Task。本章节只讲解第I点,第II点在下一章节讲解,那为什么第I点是必要条件?
下面看 asyncio.Future 类的省略代码来找答案。Future 类的源码参考
(https://github.com/zhaojiangbing/asyncio/blob/master/futures.py)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

1、Future类省略代码


定义:future 是 Future 的实例,下面看看这个类的省略代码:


class Future:
    _state = _PENDING  # Future的状态
    _result = None  # 
    _exception = None

    def __init__(self, *, loop=None):  # loop就是要传入的事件循环
        if loop is None:
            self._loop = events.get_event_loop()
        else:
            self._loop = loop
        self._callbacks = []
        if self._loop.get_debug():
            self._source_traceback = events.extract_stack(sys._getframe(1))

    def cancel(self):  # 取消future
        self._log_traceback = False
        if self._state != _PENDING:
            return False
        self._state = _CANCELLED  # 更改为取消状态
        self._schedule_callbacks()
        return True

    def _schedule_callbacks(self):  # 把future的回调函数添加到事件循环
        callbacks = self._callbacks[:]
        if not callbacks:
            return
        self._callbacks[:] = []
        for callback in callbacks:
        	# 添加到事件循环,call_soon里面会封装callback为Handle,然后添加到loop._ready
            self._loop.call_soon(callback, self)
				
    def cancelled(self):  # 判断future是否取消
        return self._state == _CANCELLED

    def done(self):
        return self._state != _PENDING  # 判断future是否取消或者完成

    def result(self):  # 获取future结果
        if self._state == _CANCELLED:
            raise CancelledError
        if self._state != _FINISHED:
            raise InvalidStateError('Result is not ready.')
        self._log_traceback = False
        if self._exception is not None:
            raise self._exception
        return self._result

    def add_done_callback(self, fn):  # future添加回调函数
        if self._state != _PENDING:  # 如果不等于就绪
            self._loop.call_soon(fn, self)  # 添加到事件循环
        else:
            self._callbacks.append(fn)  # 添加回调函数到回调列表

    def remove_done_callback(self, fn):  # 移除future回调函数
        filtered_callbacks = [f for f in self._callbacks if f != fn]
        removed_count = len(self._callbacks) - len(filtered_callbacks)
        if removed_count:
            self._callbacks[:] = filtered_callbacks
        return removed_count

    def set_result(self, result):  # 设置future结果
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = _FINISHED  # 更改状态为完成
        self._schedule_callbacks()

    def __iter__(self):  # future是一个可迭代对象
        if not self.done():  # future没有完成或者取消的时候,会再次返回future
            self._asyncio_future_blocking = True
            yield self
        assert self.done(), "yield from wasn't used with future"
        # future完成或者取消的时候,会返回future的结果
        return self.result()

    if compat.PY35:
        __await__ = __iter__
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

2、Future类总结


看了 Future 类的代码总结如下:

a、future 三个状态的分别是 _PENDING(就绪)、_CANCELLED(取消)、_FINISHED(完成),状态存放在
future._state 属性, 在 future 创建的时候 future._state = _PENDING,通过 future.done()
方法判断是否取消或完成,通过 future.cancelled() 方法判断是否取消。

b、future 可以通过 future.add_done_callback()、future.remove_done_callback() 方法添加、
移除回调函数。添加回调的时候,当future._state != _PENDING的时候,回调函数就会通过 
loop.call_soon() 方法添加到事件循环就绪队列(_ready),否则就添加到回调列表 future._callbacks里。

c、future 是可以通过 future.set_result() 设置结果,结果存放到 future._result 属性,通过 
future.result() 方法获取结果。在设置结果的时候 future._state 会变为 _FINISHED,future 
的所有回调函数会添加到事件循环就绪队列(_ready)。

d、future 是可以通过 future.cancel() 方法取消的,取消的时候,结果默认设置为None,future._state 
会变为 _CANCELLED,future 的所有回调函数会添加到事件循环就绪队列(_ready)。

e、future 实现了 __iter__() 方法,该方法还是个生成器,future 就是可迭代对象了,那么在函数中使用
yield from future 语句的时候就会调用  __iter__() 方法,可以看到这个语句是会让函数阻塞而让出cpu
运行其他的函数,从而达到函数切换的目的。

上面提到,如果函数里面有 yield from future 语句的时候,这个函数就变成了生成器,由于 future 实现了
__iter__() 方法,所以才能在函数中用 yield from future。只有当 future.done() 方法返回True的时
候,也就是 future 的状态变为非就绪状态(!_PENDING),future.__iter__() 函数才会返回 future 的结
果,否则就返回 future 自己(self),当返回self的时候,事件循环会重复上面第II点,这里也就回答了第I点
是生成器继续运行的必要条件。从 Future 类省略代码我们可以看到,要想让 future 的状态变成非就绪状态(!_PENDING),
可以设置 future 的结果或者取消 future,这两种操作也会让 future 回调函数得执行。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

3、future的使用


future 在 asyncio 中的使用:


import time
import asyncio

from asyncio import Future, coroutine


def call_back(future):
    print("我是 future 的回调函数", time.time())

@coroutine  # 把生成器转换成协程(函数)
def worker(future):
    print("我在等待 future", time.time())  # 打印时间
    result = yield from future  # 这里会等待10秒,其他的协程函数会运行,等future设置了结果,就会继续运行
    print("future 结果:", result, time.time())  # 打印future结果

async def corout_worker():  # 这个函数等价于worker()函数
    await future  # 这里和yiled from future等价,

async def set_result(future):  # 协程函数
    await asyncio.sleep(5)  # 睡眠10秒后设置future的结果
    print("future设置结果前的状态:", future._state)
    future.set_result("我是future")
    print("future设置结果后的状态:", future._state)


if __name__ == "__main__":
    future = Future()  # 创建future
    future.add_done_callback(call_back)  # 添加回调函数
    loop = asyncio.get_event_loop()  # 获取事件循环
    loop.create_task(worker(future))  # 把协程(函数)添加到事件循环
    loop.create_task(set_result(future))  # 把协程(函数)添加到事件循环
    loop.run_forever()  # 运行事件循环


执行结果如下:-----------------------

我在等待 future 1603595207.5414805
future设置结果前的状态: PENDING
future设置结果后的状态: FINISHED
我是 future 的回调函数 1603595212.5418484
future 结果: 我是future 1603595212.5418484


上面代码首先创建一个全局的 future,future 把 call_back() 函数添加为回调函数,回调函数第一个参数就是 future
自己(self),然后创建一个事件循环,注意这里不再是上面举例的事件循环(Loop),worker() 函数是个生成器,由于 
asyncio 提倡使用 async await 语法,所以用 @coroutine 把生成器函数 worker() 装饰成 async await 类型的函
数,也就是协程(函数),这里就知道了生成器函数怎么变协程(函数), 对于 async await 可以参考python技术知识点复习
章节,worker() 函数其实和 corout_worker() 函数是等价的。通过 loop.create_task() 方法把 worker()、
set_result() 两个协程(函数)添加到事件循环,最后运行事件循环。从上面运行结果可以知道,worker() 协程在遇到 
yield from future 的时候会阻塞,而 set_result() 协程5秒后会设置 future 的结果,这样 worker() 协程5秒后就继续运行了。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

六、asyncio Task的原理


上一章讲解了生成器怎么变协程(函数),由于协程(函数)里面底层有 yield from future,要想协程(函数)运行,
需要 send、next 去催动,其实就是通过 task._step() 方法去 send 协程(函数),那么首先就要把协程函数封
装成 Task。源码参考:(https://github.com/zhaojiangbing/asyncio/blob/master/tasks.py)
  • 1
  • 2
  • 3
  • 4

1、Task类省略代码


定义:task是 Task 的实例,下面看看这个类的省略代码:


class Task(futures.Future):

	def __init__(self, coro, *, loop=None):  # 这里的loop就是事件循环
        assert coroutines.iscoroutine(coro), repr(coro)
        super().__init__(loop=loop)
        if self._source_traceback:
            del self._source_traceback[-1]
        self._coro = coro # 存放协程(函数)
        self._fut_waiter = None
        self._must_cancel = False
        self._loop.call_soon(self._step)  # 添加_step()方法到事件循环
        self.__class__._all_tasks.add(self)

    def _step(self, exc=None):
        assert not self.done(), \
            '_step(): already done: {!r}, {!r}'.format(self, exc)
        if self._must_cancel:
            if not isinstance(exc, futures.CancelledError):
                exc = futures.CancelledError()
            self._must_cancel = False
        coro = self._coro  # 其实这就是存放协程函数的属性
        self._fut_waiter = None

        self.__class__._current_tasks[self._loop] = self
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                self.set_exception(futures.CancelledError())
            else:
                self.set_result(exc.value)
        except futures.CancelledError:
            super().cancel()  # I.e., Future.cancel(self).
        except Exception as exc:
            self.set_exception(exc)
        except BaseException as exc:
            self.set_exception(exc)
            raise
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if result._loop is not self._loop:
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'Task {!r} got Future {!r} attached to a '
                            'different loop'.format(self, result)))
                elif blocking:
                    if result is self:
                        self._loop.call_soon(
                            self._step,
                            RuntimeError(
                                'Task cannot await on itself: {!r}'.format(
                                    self)))
                    else:
                        result._asyncio_future_blocking = False
                        result.add_done_callback(self._wakeup)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel():
                                self._must_cancel = False
                else:
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'yield was used instead of yield from '
                            'in task {!r} with {!r}'.format(self, result)))
            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self._step)
            elif inspect.isgenerator(result):
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'yield was used instead of yield from for '
                        'generator in task {!r} with {!r}'.format(
                            self, result)))
            else:
                # Yielding something else is an error.
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'Task got bad yield: {!r}'.format(result)))
        finally:
            self.__class__._current_tasks.pop(self._loop)
            self = None  # Needed to break cycles when an exception occurs.


从 Task 省略代码可以看出,Task 继承了 Future,那么 Task 有 Future 的所有功能,前面章节我们说
loop.create_task(worker()) 把 worker() 协程添加到事件循环,其实并不是直接添加,那是怎么个流程? 
我们先看看 asyncio 中真实的事件循环,其实 loop 就是 BaseEventLoop 及其子类的实例,
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104

1、事件循环部分方法


下面看看这个类的省略代码:


class BaseEventLoop(events.AbstractEventLoop):

	def call_soon(self, callback, *args):
        self._check_closed()
        if self._debug:
            self._check_thread()
            self._check_callback(callback, 'call_soon')
        handle = self._call_soon(callback, args)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        return handle
	
	def _call_soon(self, callback, args):
        handle = events.Handle(callback, args, self)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)
        return handle

	def create_task(self, coro):
        self._check_closed()
        if self._task_factory is None:
            task = tasks.Task(coro, loop=self)  # 创建一个task
            if task._source_traceback:
                del task._source_traceback[-1]
        else:
            task = self._task_factory(self, coro)
        return task

	def run_forever(self):  # self就是loop
        try:
            while True:  # 写的一个死循环
                self._run_once()  # 这就是每一轮循环处理的事情。
                if self._stopping:  # 当事件循环暂停的时候就退出循环。
                    break
        finally:
            pass
	
	def _run_once(self):
        event_list = self._selector.select(timeout)  # 获取就绪fd列表,返回就绪列表
        self._process_events(event_list)  # _process_events函数里面调用了_add_callback方法,
        								  # _add_callback方法会把fd的回调函数加入 self._ready 就绪队列里面

        end_time = self.time() + self._clock_resolution
        while self._scheduled:  # 遍历 _scheduled 计划列表
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)  # 添加到 self._ready 就绪队列里面

        ntodo = len(self._ready)
        for i in range(ntodo):  # 遍历 self._ready 就绪队列
            handle = self._ready.popleft()  # 移除handle,handle就是协程函数的封装,
            handle._run()  # handle._run方法
        handle = None  # Needed to break cycles when an exception occurs.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

2、协程的执行流程


看了上面代码可以知道协程(worker())被执行的流程如下:

a、loop.create_task(worker()) 把 worker() 封装成 task。----> 
b、task.__init__() 把 worker() 赋值给了 task._coro,同时调用 loop.call_soon(task._step) ---->
c、loop.call_soon() 里面调用 loop._call_soon(),会把 task._step 封装成 Handle,最后把 Handle 添加的就绪队列(loop._ready) ---->
d、task.run_forever() 运行事件循环,循环调用 loop._run_once() ---->
e、loop._run_once() 是事件循环每一轮循环要做的事,如下:
	i、会遍历计划列表(self._scheduled),把超时的 TimerHandle 添加到就绪队列(loop._ready) ---->
	ii、遍历就绪的io,也就是被监听起来的文件描述符,每个文件描述符都有对应的 Handle,把 Handle 添加到就绪队列(loop._ready) ---->
	iii、遍历就绪队列(loop._ready),处理 handle ---->
f、处理 Handle 其实就是调用 handle.run(),最终就是调用 task._step() ---->
g、task._step() 函数通过 send 去催动 task._coro,也就是催动 worker() 函数,有2中结果:
    i、如果 worker() 返回的是 future,再把 task._step 添加到就绪队列(loop._ready),这样又可以在下一轮循环继续催动 worker() 函数---->
    ii、如果 worker() 返回的不是 future,就会抛出生成器异常,就把异常设置 future 的结果。

看了上面流程,就知道协程(函数)是怎么被事件循环调度了,可以发现,平时的网络io阻塞操作,都变成了非阻塞io,调用io
操作的协程(函数)不用等待io就绪,而是变成等待 future 完成或者取消,当协程(函数)里涉及的io就绪的时候,那么协程
(函数)里面的 future 也会完成或者取消,同时会在每一轮循环中 send 协程(函数),让协程(函数)继续运行。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

七、asyncio 网络底层封装

上面章节讲了,网络请求的socket阻塞都变成了非阻塞,对网络io的等待,变成了对 future 的完成或者取消,
那 asyncio 底层是怎么实现这个转变的呢?本章就来回答这个问题,底层源码自行浏览,这里主要讲解下流程

1、当有 http 请求时,协程会创建一个 socket,把 socket 设置为非阻塞,socket 进行远端连接,把 socket fd 当成-写事件-注册
到io多路复用 selector 监听起来。由于是非阻塞,连接函数就不用等待网络阻塞,创建一个 future,等待 future 设置结果,并
且让出cpu,future 设置回调函数,回调函数就是把 socket fd -写事件-从 selector 注销。当连接成功的时候,事件循环会设置
future 的结果,从而让协程继续运行,最后还会调用回调函数。

2、当协程连接创建了以后,会创建传输工具(transport)和协议(protocol)。创建 transport 的时候,又会把socket fd
当成-读事件-注册到 selector 监听起来,并且会把 transport 的 _read_ready 方法添加到 selector, _read_ready 方
法会调用 socket 的读取函数来读取请求结果,并且把结果放入到 protocol 的 buffer 里面,在数据读取完后会把 socket fd -读
事件注销掉,注意这里没有调用 _read_ready 方法,而是把它当 selector 回调,等 socket fd -读事件-就绪以后才会调用
_read_ready 方法。

3、当协程传输工具(transport)和协议(protocol)创建以后,就会通过 transport 发送数据,又会把 socket fd 当成
-写事件-注册到 selector 监听起来,并且会把 transport 的 _write_ready 方法添加到 selector,_write_ready
 方法会调用socket的发送函数来发送transport buffer里的数据,在数据发送完后会把socket fd-读事件注销掉,注意这
 里没有调用 _write_ready 方法,而是把它当selector回调,等socket fd -写事件-就绪以后才会调用 _write_ready 
 方法。

4、上面第2点讲了,事件循环协程数据放到了 protocol buffer 里面,那协程只要返回 buffer 里面的数据就可以响应请求了,
你可能会想到,协程是怎么等待 buffer 里有数据的呢,其实还是借用了 future 的功能,协程在读取数据数据的时候,还是会创
建一个 future,第2点中的 _read_ready 函数在读取数据完成后会设置 future 的结果。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/74983
推荐阅读
相关标签
  

闽ICP备14008679号