赞
踩
本文试图搞明白Python协程编程
通常我们认为线程是轻量级的进程,因此我们也把协程理解为轻量级的线程即微线程
这是几个姊妹篇:
在学习的时候,发现并行和并发在好些地方搞混了,这是两个概念,得先明确下
Erlang 之父 Joe Armstrong 画了一张很可爱的图来解释这两个概念:
两个词很好的说明了并发和并行的区别:
那么并发并行和多进程多线程的关系呢?
场景:
总结下:
1、进程
2、线程
3、协程
又称微线程,纤程,英文名Coroutine。协程的作用是在执行函数A时可以随时中断去执行函数B,然后中断函数B继续执行函数A(可以自由切换)。但这一过程并不是函数调用,是线程里的并发
拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方(非CPU),在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,CPU感觉不到协程的存在,协程是用户自己控制的
优点:
无需线程上下文切换的开销
无需数据操作锁定及同步的开销
方便切换控制流,简化编程模型
高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理
缺点:
无法利用多核资源:协程的本质是个单线程,它不能同时将单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上,协程如果要使用多核CPU的话,那么就需要先启多个进程,在每个进程下启一个线程,然后在线程下在启协程。
日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用
线程是进程的一部分,一个线程只能属于一个进程,而一个进程可以有多个线程,且至少有一个线程。而协程则包含在线程中
可以看个图
区别:理解它们的差别,从资源使用的角度出发。(所谓的资源就是计算机里的中央处理器,内存,文件,网络等等)
根本区别:进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位
在开销方面:每个进程都有独立的代码和数据空间(程序上下文),程序之间的切换会有较大的开销;线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的开销小
所处环境:在操作系统中能同时运行多个进程(程序);而在同一个进程(程序)中有多个线程同时执行(通过CPU调度,在每个时间片中只有一个线程执行)
内存分配方面:系统在运行的时候会为每个进程分配不同的内存空间;而对线程而言,除了CPU外,系统不会为线程分配内存(线程所使用的资源来自其所属进程的资源),线程组之间只能共享资源
包含关系:
一切还是得从生成器说起,因为asyncio或者大多数协程库内部也是通过生成器实现的
生成器是一次生成一个值的特殊类型函数,可以将其视为可恢复函数,这里就不探究其内部实现原理了
简单例子如下
def gen_func():
yield 1
yield 2
yield 3
if __name__ == '__main__':
gen = gen_func()
for i in gen:
print(i)
output:
1
2
3
上面的例子没有什么稀奇的不是吗?yield像一个特殊的关键字,将函数变成了一个类似于迭代器的对象,可以使用for循环取值。
协程自然不会这么简单,python协程的目标是星辰大海,从上面的例之所以get不到它的野心,是因为你没有试过send, next两个函数。
首先说next
def gen_func():
yield 1
yield 2
yield 3
if __name__ == '__main__':
gen = gen_func()
print(next(gen))
print(next(gen))
print(next(gen))
output:
1
2
3
next的操作有点像for循环,每调用一次next,就会从中取出一个yield出来的值,其实还是没啥特别的,感觉还没有for循环好用。
不过,不知道你有没有想过,如果你只需要一个值,你next一次就可以了,然后你可以去做其他事情,等到需要的时候才回来再次next取值。
就这一部分而言,你也许知道为啥说生成器是可以暂停的了,不过,这似乎也没什么用,那是因为你不知到时,生成器除了可以抛出值,还能将值传递进去。
接下来我们看send的例子。
def gen_func():
a = yield 1
print("a: ", a)
b = yield 2
print("b: ", b)
c = yield 3
print("c: ", c)
return "finish"
if __name__ == '__main__':
gen = gen_func()
for i in range(4):
if i == 0:
print(gen.send(None))
else:
# 因为gen生成器里面只有三个yield,那么只能循环三次。
# 第四次循环的时候,生成器会抛出StopIteration异常,并且return语句里面内容放在StopIteration异常里面
try:
print(gen.send(i))
except StopIteration as e:
print("e: ", e)
output:
1
a: 1
2
b: 2
3
c: 3
e: finish
send有着next差不多的功能,不过send在传递一个值给生成器的同时,还能获取到生成器yield抛出的值,在上面的代码中,send分别将None,1,2,3四个值传递给了生成器,之所以第一需要传递None给生成器,是因为规定,之所以规定,因为第一次传递过去的值没有特定的变量或者说对象能接收,所以规定只能传递None, 如果你传递一个非None的值进去,会抛出一下错误
TypeError: can't send non-None value to a just-started generator
从上面的例子我们也发现,生成器里面的变量a,b,c获得了send函数发送来的1, 2, 3.
如果你有事件循环或者说多路复用的经验,你也许能够隐隐察觉到微妙的感觉。
这个微妙的感觉是,是否可以将IO操作yield出来?由事件循环调度, 如果你能get到这个微妙的感觉,那么你已经知道协程高并发的秘密了.
下面是yield from的例子
def gen_func():
a = yield 1
print("a: ", a)
b = yield 2
print("b: ", b)
c = yield 3
print("c: ", c)
return 4
def middle():
gen = gen_func()
ret = yield from gen
print("ret: ", ret)
return "middle Exception"
def main():
mid = middle()
for i in range(4):
if i == 0:
print(mid.send(None))
else:
try:
print(mid.send(i))
except StopIteration as e:
print("e: ", e)
if __name__ == '__main__':
main()
output:
1
a: 1
2
b: 2
3
c: 3
ret: 4
e: middle Exception
从上面的代码我们发现,main函数调用的middle函数的send,但是gen_func函数却能接收到main函数传递的值.有一种透传的感觉,这就是yield from的作用, 这很关键。
而yield from最终传递出来的值是StopIteration异常,异常里面的内容是最终接收生成器(本示例是gen_func)return出来的值,所以ret获得了gen_func函数return的4.但是ret将异常里面的值取出之后会继续将接收到的异常往上抛,所以main函数里面需要使用try语句捕获异常。而gen_func抛出的异常里面的值已经被middle函数接收,所以middle函数会将抛出的异常里面的值设为自身return的值
linux有5种IO模型
同步模型自然是效率最低的模型了,每次只能处理完一个连接才能处理下一个,如果只有一个线程的话, 如果有一个连接一直占用,那么后来者只能傻傻的等了。所以不适合高并发,不过最简单,符合惯性思维
不会阻塞后面的代码,但是需要不停的显式询问内核数据是否准备好,一般通过while循环,而while循环会耗费大量的CPU。所以也不适合高并发。
当前最流行,使用最广泛的高并发方案。
而多路复用又有三种实现方式, 分别是select, poll, epoll:
select,poll由于设计的问题,当处理连接过多会造成性能线性下降,而epoll是在前人的经验上做过改进的解决方案。不会有此问题。
不过select, poll并不是一无是处,假设场景是连接数不多,并且每个连接非常活跃,select,poll是要性能高于epoll的。
可参考:select、poll、epoll之间的区别总结[整理]
没见过
理论上比多路复用更快,因为少了一次调用,但是实际使用并没有比多路复用快非常多
IO模型能够解决IO的效率问题,但是实际使用起来需要一个事件循环驱动协程去处理IO。
简单实现
下面引用官方的一个简单例子:
import selectors
import socket
# 创建一个selctor对象
# 在不同的平台会使用不同的IO模型,比如Linux使用epoll, windows使用select(不确定)
# 使用select调度IO
sel = selectors.DefaultSelector()
# 回调函数,用于接收新连接
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)
# 回调函数,用户读取client用户数据
def read(conn, mask):
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
# 创建一个非堵塞的socket
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
# 一个事件循环,用于IO调度
# 当IO可读或者可写的时候, 执行事件所对应的回调函数
def loop():
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
if __name__ == '__main__':
loop()
上面代码中loop函数对应事件循环,它要做的就是一遍一遍的等待IO,然后调用事件的回调函数.
asyncio在python3.4被引入标准库,Python 3.5添加了async
和await
这两个关键字,分别用来替换asyncio.coroutine
和yield from
,协程成为新的语法,而不再是一种生成器类型了
需要明确一点,asyncio使用单线程、单个进程的方式切换;现存的一些库其实并不能原生的支持asyncio(因为会发生阻塞或者功能不可用),比如requests,如果要写爬虫,配合asyncio的应该用aiohttp,其他的如数据库驱动等各种Python对应的库也都得使用对应的aioXXX版本了
几个概念:
运行的序列图
import asyncio
async def compute(x, y):
print('Compute {} + {} ...'.format(x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print('{} + {} = {}'.format(x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
通过async定义一个协程,协程是一个对象,不能直接运行,需要把协程加入到事件循环(loop)中,由loop在适当的时候调用协程
asyncio.get_event_loop()
方法可以创建一个事件循环,然后由run_until_complete(协程对象)
将协程注册到事件循环中,并启动事件循环
run_until_complete
根据传递的参数的不同,返回的结果也有所不同:
run_until_complete()
传递的是一个协程对象或task对象,则返回他们finished的返回结果(前提是他们得有return的结果,否则返回None)run_until_complete(asyncio.wait(多个协程对象或任务))
,函数会返回一个元组包括(done, pending),通过访问done里的task对象,获取返回值run_until_complete(asyncio.gather(多个协程对象或任务))
,函数会返回一个列表,列表里面包括各个任务的返回结果,按顺序排列python 3.7 以前的版本调用异步函数的步骤:
asyncio.get_event_loop()
函数获取事件循环loop对象loop.run_forever()
方法或者loop.run_until_complete()
方法执行异步函数python3.7 以后的版本使用asyncio.run
即可。此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
import asyncio
async def work(x): # 通过async关键字定义一个协程
for _ in range(3):
print('Work {} is running ..'.format(x))
coroutine_1 = work(1) # 协程是一个对象,不能直接运行
# 3.5<version<3.7:
loop = asyncio.get_event_loop() # 创建一个事件循环
result = loop.run_until_complete(coroutine_1) # 将协程对象加入到事件循环中,并执行
print(result) # 协程对象并没有返回结果,打印None
# version≥3.7:
# asyncio.run(coroutine_1) #创建一个新的事件循环,并以coroutine_1为程序的主入口,执行完毕后关闭事件循环
协程对象不能直接运行,在注册到事件循环的时候,其实是run_until_complete
方法将协程包装成一个task对象,所谓的task对象就是Future类的子类,它保存了协程运行后的状态,用于未来获取协程的结果。
创建task后,task在加入事件循环之前是pending状态,因为下例中没有耗时操作,task很快会完成,后面打印finished状态。
import asyncio
async def work(x): # 通过async关键字定义一个协程
for _ in range(3):
print('Work {} is running ..'.format(x))
coroutine_1 = work(1) # 协程是一个对象,不能直接运行
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine_1)
# task = asyncio.ensure_future(coroutine_1) # 这样也能创建一个task
print(task)
loop.run_until_complete(task) # run_until_complete接受的参数是一个future对象,当传人一个协程时,其内部自动封装成task
print(task)
在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过这个对象可以获取协程的返回值,如果回调函数需要多个参数,可以通过偏函数导入。
从下例可以看出,coroutine执行结束时候会调用回调函数,并通过future获取协程返回(return)的结果。我们创建的task和回调里面的future对象,实际上是同一个对象。
import asyncio
async def work(x):
for _ in range(3):
print('Work {} is running ..'.format(x))
return "Work {} is finished".format(x)
def call_back(future):
print("Callback: {}".format(future.result()))
coroutine = work(1)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
loop.run_until_complete(task) # 返回任务的结果
当回调函数需要传递多个参数时,可以使用functools里的partial方法(偏函数导入这些参数)
functools.partial(func, * args, * * keywords)
,函数装饰器,返回一个新的partial对象。调用partial对象和调用被修饰的函数func相同,只不过调用partial对象时传入的参数个数通常要少于调用func时传入的参数个数。当一个函数func可以接收很多参数,而某一次使用只需要更改其中的一部分参数,其他的参数都保持不变时,partial对象就可以将这些不变的对象冻结起来,这样调用partial对象时传入未冻结的参数,partial对象调用func时连同已经被冻结的参数一同传给func函数,从而可以简化调用过程
import asyncio
import functools
async def work(x):
for _ in range(3):
print('Work {} is running ..'.format(x))
return "Work {} is finished".format(x)
def call_back_2(num, future):
print("Callback_2: {}, the num is {}".format(future.result(), num))
coroutine = work(1)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(functools.partial(call_back_2, 100))
loop.run_until_complete(task)
使用async可以定义协程对象,使用await可以正对耗时操作进行挂起,就像生成器里的yield一样,函数让出控制权。
协程遇到await,事件循环就会挂起这个协程,执行别的协程,直到其他协程也挂起或执行完毕,在进行下一个协程的执行。
如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。
import asyncio
import time
async def work(delay, msg):
print(f"Task receives the message :'{msg}' ")
await asyncio.sleep(delay)
print(msg)
async def main():
print(f"Started at {time.strftime('%X')}")
await work(1, "hello") # 启动一个协程,但是这是同步执行的
await work(2, "world")
print(f"Finished at time {time.strftime('%X')}")
asyncio.run(main())
# 运行结果:
# 先打印print(f"Task receives the message :'{msg}' ")然后等待1秒后打印“hello”,
# 然后再次打印print(f"Task receives the message :'{msg}' ")等待2秒后打印“world”
简单的说,await就是挂起当前任务,去执行其他任务,此时是堵塞的,必须要等其他任务执行完毕才能返回到当前任务继续往下执行,这样的说的前提是,在一个时间循环中有多个task或future,当await右面等待的对象是协程对象时,就没有了并发的作用,就是堵塞等待这个协程对象完成
使用async可以定义协程,协程用于耗时的IO操作。我们也可以封装更多的IO操作过程,在一个协程中await另外一个协程,实现协程的嵌套。
import asyncio, time
async def work(x):
for _ in range(3):
print("Work {} is running ..".format(x))
await asyncio.sleep(1) # 当执行某个协程时,在任务阻塞的时候用await挂起
return "Work {} is finished!".format(x)
async def main_work():
coroutine_1 = work(1)
coroutine_2 = work(2)
coroutine_3 = work(3)
tasks = [
asyncio.ensure_future(coroutine_1),
asyncio.ensure_future(coroutine_2),
asyncio.ensure_future(coroutine_3),
]
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print("The task's result is : {}".format(task.result()))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main_work())
运行结果
Work 1 is running ..
Work 2 is running ..
Work 3 is running ..
Work 1 is running ..
Work 2 is running ..
Work 3 is running ..
Work 1 is running ..
Work 2 is running ..
Work 3 is running ..
The task's result is : Work 2 is finished!
The task's result is : Work 3 is finished!
The task's result is : Work 1 is finished!
import asyncio
async def nested1():
print("nested1")
await asyncio.sleep(0.5)
print("nested1 is finished!")
return 1
async def nested2():
print("nested2")
await asyncio.sleep(0.5)
print("nested2 is finished!")
return 2
async def nested3():
print("nested3")
await asyncio.sleep(0.5)
print("nested3 is finished!")
return 3
async def nested4():
print("nested4")
await asyncio.sleep(0.5)
print("nested4 is finished!")
return 4
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
print("main")
task1 = asyncio.create_task(nested1()) # 使用asyncio.create_task将函数打包成一个任务,该协程将自动排入日程等待运行
task2 = asyncio.create_task(nested2())
task3 = asyncio.create_task(nested3())
task4 = asyncio.create_task(nested4())
await asyncio.sleep(1) # 在main这个协程中,碰到耗时操作,则挂起任务,执行其他任务,即:task1 or task2 or task3 or task4
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
print(await task1) # 等待 task1 如果task1中存在耗时操作,则挂起
print(await task2)
print(await task3)
print(await task4)
asyncio.run(main()) # 并发运行这个5个协程,运行最高层级的入口点main函数
output:
main
nested1
nested2
nested3
nested4
nested1 is finished!
nested2 is finished!
nested4 is finished!
nested3 is finished!
1
2
3
4
比如现在要抓取http://httpbin.org/get?a=X
这样的页面,X为1-10000的数字,一次性的产生1w次请求显然很快就会被封掉。那么我们可以用Semaphore控制同时的并发量(例子中为了演示,X为0-11):
import aiohttp
import asyncio
NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(3)
async def fetch_async(a):
async with aiohttp.request('GET', URL.format(a)) as r:
data = await r.json()
return data['args']['a']
async def print_result(a):
with (await sema):
r = await fetch_async(a)
print('fetch({}) = {}'.format(a, r))
loop = asyncio.get_event_loop()
f = asyncio.wait([print_result(num) for num in NUMBERS])
loop.run_until_complete(f)
在运行的时候可以感受到并发受到了信号量的限制,基本保持在同时处理三个请求的标准。
看下面的例子:
import asyncio
import functools
def unlock(lock):
print('callback releasing lock')
lock.release()
async def test(locker, lock):
print('{} waiting for the lock'.format(locker))
with await lock:
print('{} acquired lock'.format(locker))
print('{} released lock'.format(locker))
async def main(loop):
lock = asyncio.Lock()
await lock.acquire()
loop.call_later(0.1, functools.partial(unlock, lock))
await asyncio.wait([test('l1', lock), test('l2', lock)])
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
这个例子中我们首先使用 acquire 加锁,通过 call_later 方法添加一个 0.1 秒后释放锁的函数。看一下调用:
l1 waiting for the lock
l2 waiting for the lock
callback releasing lock
l1 acquired lock
l1 released lock
l2 acquired lock
l2 released lock
import asyncio
import functools
async def consumer(cond, name, second):
await asyncio.sleep(second)
with await cond:
await cond.wait()
print('{}: Resource is available to consumer'.format(name))
async def producer(cond):
await asyncio.sleep(2)
for n in range(1, 3):
with await cond:
print('notifying consumer {}'.format(n))
cond.notify(n=n)
await asyncio.sleep(0.1)
async def producer2(cond):
await asyncio.sleep(2)
with await cond:
print('Making resource available')
cond.notify_all()
async def main(loop):
condition = asyncio.Condition()
task = loop.create_task(producer(condition))
consumers = [consumer(condition, name, index)
for index, name in enumerate(('c1', 'c2'))]
await asyncio.wait(consumers)
task.cancel()
task = loop.create_task(producer2(condition))
consumers = [consumer(condition, name, index)
for index, name in enumerate(('c1', 'c2'))]
await asyncio.wait(consumers)
task.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
这次演示了 2 种通知的方式:
由于 producer 和 producer2 是异步的函数,所以不能使用之前 call_later 方法,需要用 create_task 把它创建成一个任务(Task)。但是最后记得要把任务取消掉。
执行以下看看效果:
notifying consumer 1
c1: Resource is available to consumer
notifying consumer 2
c2: Resource is available to consumer
Making resource available
c1: Resource is available to consumer
c2: Resource is available to consumer
模仿锁的例子实现:
import asyncio
import functools
def set_event(event):
print('setting event in callback')
event.set()
async def test(name, event):
print('{} waiting for event'.format(name))
await event.wait()
print('{} triggered'.format(name))
async def main(loop):
event = asyncio.Event()
print('event start state: {}'.format(event.is_set()))
loop.call_later(
0.1, functools.partial(set_event, event)
)
await asyncio.wait([test('e1', event), test('e2', event)])
print('event end state: {}'.format(event.is_set()))
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
看起来也确实和锁的意思很像,不同的是,事件被触发时,2 个消费者不用获取锁就要尽快的执行下去了。
在 asyncio 官网上已经举例了 2 个很好的 队列例子 了,这文就不重复了。asyncio 同样支持 LifoQueue 和 PriorityQueue,我们体验下 aiohttp + 优先级队列的用法吧:
import asyncio
import random
import aiohttp
NUMBERS = random.sample(range(100), 7)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(3)
async def fetch_async(a):
async with aiohttp.request('GET', URL.format(a)) as r:
data = await r.json()
return data['args']['a']
async def collect_result(a):
with (await sema):
return await fetch_async(a)
async def produce(queue):
for num in NUMBERS:
print('producing {}'.format(num))
item = (num, num)
await queue.put(item)
async def consume(queue):
while 1:
item = await queue.get()
num = item[0]
rs = await collect_result(num)
print('consuming {}...'.format(rs))
queue.task_done()
async def run():
queue = asyncio.PriorityQueue()
consumer = asyncio.ensure_future(consume(queue))
await produce(queue)
await queue.join()
consumer.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
看到使用了新的 ensure_future 方法,其实它和之前说的 create_task 意思差不多,都是为了把一个异步的函数变成一个协程的 Task。它们的区别是:
这个例子中,首先我们从 0-99 中随机取出 7 个数字,放入优先级队列,看看消费者是不是按照从小到大的顺序执行的呢?
producing 6
producing 4
producing 22
producing 48
producing 9
producing 90
producing 40
consuming 4...
consuming 6...
consuming 9...
consuming 22...
consuming 40...
consuming 48...
consuming 90...
asyncio.gather(* aws,loop = None,return_exceptions = False )
同时在aws 序列中运行等待对象:
return_exceptions
是False(默认),则第一个引发的异常会立即传播到等待的任务gather()
。aws序列 中的其他等待项将不会被取消并继续运行。return_exceptions
是True,异常的处理方式一样成功的结果,并在结果列表汇总。gather()
被取消,所有提交的awaitables(尚未完成)也被取消。gather()
呼叫。这是为了防止取消一个提交的任务/未来以导致其他任务/期货被取消。# 并发运行任务的案例
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...") # python3.7新语法,了解一波
await asyncio.sleep(1) # await后面是 可等待对象
f *= i
print(f"Task {name}: factorial({number}) = {f}")
return f"Task {name}: Finished!"
async def main():
# Schedule three calls *concurrently*:
results = await asyncio.gather( # results包含所有任务的返回结果,是一个列表,按执行顺序返回结果
factorial("A", 2), # 协程,会自动调度为任务
factorial("B", 3),
factorial("C", 4),
)
print(results)
asyncio.run(main()) # 协程的嵌套,后面有详解
asyncio.shield(aw, * , loop=None)
保护一个 可等待对象 防止其被 取消。如果 aw 是一个协程,它将自动作为任务加入日程。
res = await shield(something())
相当于: res = await something()
不同之处 在于如果包含它的协程被取消,在 something()
中运行的任务不会被取消。从 something()
的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 “await” 表达式仍然会引发 CancelledError。
如果通过其他方式取消 something()
(例如在其内部操作) 则 shield()
也会取消。
如果希望完全忽略取消操作 (不推荐) 则 shield()
函数需要配合一个 try/except 代码段,如下所示:
try:
res = await shield(something())
except CancelledError:
res = None
asyncio.wait_for(aw, timeout, * , loop=None)
等待 aw 可等待对象 完成,指定 timeout 秒数后超时。
# 超时的案例
import asyncio
async def eternity():
# Sleep for one hour
# await asyncio.sleep(0.5)
await asyncio.sleep(3600)
print('yay!')
async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0) # 等待 可等待对象 完成,超过timeout秒后,抛出asyncio.TimeoutError异常
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
# 使用事件循环和asyncio.wait、asyncio.gather实现并发运行任务
import asyncio, time
async def work_1(x):
print(f"Starting {x}")
time.sleep(1)
print(f"Starting {x}")
for _ in range(3):
print(f"Work {x} is running ..")
await asyncio.sleep(2) # 耗时操作,此时挂起该协程,执行其他协程
return f"Work {x} is finished"
async def work_2(x):
print(f"Starting {x}")
for _ in range(3):
await asyncio.sleep(1) # 耗时操作,此时挂起该协程,执行其他协程
print(f"Work {x} is running ..")
return f"Work {x} is finished"
coroutine_1 = work_1(1)
coroutine_2 = work_2(2)
loop = asyncio.get_event_loop() # 创建一个事件循环
# 方式一,asyncio.wait(tasks)接受一个task列表 执行的顺序与列表里的任务顺序有关
tasks = [
asyncio.ensure_future(coroutine_1),
asyncio.ensure_future(coroutine_2),
]
# 注册到事件循环中,并执行
dones, pendings = loop.run_until_complete(asyncio.wait(tasks)) # loop.run_until_complete(asyncio.wait(tasks))的作用相当于:await asyncio.wait(tasks)
for task in dones:
print(task.result())
# 方式二,使用asyncio.gather(*tasks),接受一堆tasks,tasks也可以是一个列表,使用*解包
# task_1 = asyncio.ensure_future(coroutine_1)
# task_2 = asyncio.ensure_future(coroutine_2)
# task_result_list = loop.run_until_complete(asyncio.gather(task_1, task_2)) # 返回一个列表,里面包含所有task的result()的结果
# print(task_result_list)
asyncio.wait(aws,* , loop = None,timeout = None,return_when = ALL_COMPLETED )
同时运行aws中的等待对象 并阻塞 ,直到return_when指定的条件。
done, pending = await asyncio.wait(aws)
# 简单等待的案例
import asyncio
async def foo():
return 42
task = asyncio.create_task(foo())
# 注意:1、这里传递的要是一个任务组,而不能是单个task,如果只有一个任务,可以这样传递:[task](task,){task}
# 2、直接传递协程对象的方式已弃用 即:done, pending = await asyncio.wait([foo()])
done, pending = await asyncio.wait((task, ))
if task in done:
print(f"The task's result is {task.result()}")
asyncio.as_completed(aws, * , loop=None, timeout=None)
并发地运行 aws 集合中的 可等待对象。返回一个 Future 对象的迭代器。返回的每个 Future 对象代表来自剩余可等待对象集合的最早结果。
如果在所有 Future 对象完成前发生超时则将引发 asyncio.TimeoutError
# 使用as_completed方法
import asyncio
async def work(x):
for _ in range(3):
print("Work {} is running ..".format(x))
await asyncio.sleep(1) # 当执行某个协程时,在任务阻塞的时候用await挂起
return "Work {} is finished!".format(x)
async def main_work():
coroutine_1 = work(1)
coroutine_2 = work(2)
coroutine_3 = work(3)
tasks = [
asyncio.ensure_future(coroutine_2),
asyncio.ensure_future(coroutine_1),
asyncio.ensure_future(coroutine_3),
]
for task in asyncio.as_completed(tasks): # 返回一个可迭代对象,每次返回最先完成的任务的结果
result = await task
print(f"The task's result is : {result}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main_work())
直接参见:python多任务—协程(二)
较为详细的归纳了下python协程的编程方法,在后续的使用中逐渐熟悉吧
参考:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。