赞
踩
asyncio
是Python 3.4版本引入的标准库,直接内置了对异步IO的支持
定义/创建协程对象
将协程转为task任务 - 定义事件循环对象容器
将task任务扔进事件循环对象中触发
1.假设任务只有两个状态:
2.在这种情况下,event loop 会维护两个任务列表,分别对应这两种状态;并且选取预备状态的一个任务使其运行,一直到这个任务把控制权交还给event loop为止
3.当任务把控制权交还给event loop 时,event loop会根据其是否完成,把任务放到预备或等待状态的列表,然后遍历等待状态列表的任务,查看他们是否完成。
而原先在预备状态列表的任务位置仍旧不变,因为它们还未运行。
5.当所有任务被重新放置在合适的列表后,新一轮的循环又开始了:event loop 继续从预备状态的列表中选取一个任务使其执行…如此周而复始,直到所有任务完成。
from collections.abc import Coroutine
async def hello(name):
print('Hello,', name)
if __name__ == '__main__':
# 生成协程对象,并不会运行函数内的代码
coroutine = hello("World")
# 检查是否是协程 Coroutine 类型
print(isinstance(coroutine, Coroutine)) # True
名称 | 含义 |
---|---|
event_loop 事件循环 | 程序开启一个无限的循环,程序员会把一些函数(协程)注册到事件循环上。当满足事件发生的时候,调用相应的协程函数 |
coroutine 协程 | 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用 |
future 对象 | 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别 |
task 任务 | 个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。Task 对象是 Future 的子类,它将 coroutine 和 Future 联系在一起,将 coroutine 封装成一个 Future 对象 |
async/await 关键字 | python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口,其作用在一定程度上类似于yield |
import asyncio async def hello(name): print('Hello,', name) # 定义协程对象 coroutine = hello("World") # 定义事件循环对象容器 loop = asyncio.get_event_loop() # task = asyncio.ensure_future(coroutine) # 将协程转为task任务 task = loop.create_task(coroutine) # 将task任务扔进事件循环对象中并触发 loop.run_until_complete(task)
输出
Hello, World
import time import asyncio async def _sleep(x): time.sleep(2) return '暂停了{}秒!'.format(x) def callback(future): print('这里是回调函数,获取返回结果是:', future.result()) coroutine = _sleep(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) # 添加回调函数 task.add_done_callback(callback) loop.run_until_complete(task)
输出
这里是回调函数,获取返回结果是: 暂停了2秒!
import asyncio async def do_work(x): print("waiting", x) await asyncio.sleep(x) return f"done after {x}s " loop = asyncio.get_event_loop() tasks = [asyncio.ensure_future(do_work(1)), asyncio.ensure_future(do_work(2)), ] loop.run_until_complete(asyncio.gather(*tasks)) # loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print("Task rest", task.result())
说了这么多,现在,我们不妨来深入代码底层看看。有了前面的知识做基础,你应该很容易理解这两段代码。
迭代一:同步
import asyncio async def worker_1(): print('worker_1 start') await asyncio.sleep(1) print('worker_1 done') async def worker_2(): print('worker_2 start') await asyncio.sleep(2) print('worker_2 done') async def main(): print('before await') await worker_1() print('awaited worker_1') await worker_2() print('awaited worker_2') %time asyncio.run(main()) ########## 输出 ########## before await worker_1 start worker_1 done awaited worker_1 worker_2 start worker_2 done awaited worker_2 Wall time: 3 s
迭代二:异步
import asyncio async def worker_1(): print('worker_1 start') await asyncio.sleep(1) print('worker_1 done') async def worker_2(): print('worker_2 start') await asyncio.sleep(2) print('worker_2 done') async def main(): task1 = asyncio.create_task(worker_1()) task2 = asyncio.create_task(worker_2()) print('before await') await task1 print('awaited worker_1') await task2 print('awaited worker_2') %time asyncio.run(main()) ########## 输出 ########## before await worker_1 start worker_2 start worker_1 done awaited worker_1 worker_2 done awaited worker_2 Wall time: 2.01 s
不过,第二个代码,到底发生了什么呢?为了让你更详细了解到协程和线程的具体区别,这里我详细地分析了整个过程。步骤有点多,别着急,我们慢慢来看。
asyncio.run(main())
,程序进入 main() 函数,事件循环开启;'before await'
;'worker_1 start'
,然后运行到 await asyncio.sleep(1)
, 从当前任务切出,事件调度器开始调度 worker_2;'worker_2 start'
,然后运行 await asyncio.sleep(2)
从当前任务切出;'worker_1 done'
,task_1 完成任务,从事件循环中退出;'awaited worker_1'
,·然后在 await task2 处继续等待;'worker_2 done'
,task_2 完成任务,从事件循环中退出;'awaited worker_2'
,协程全任务结束,事件循环结束。接下来,我们进阶一下。如果我们想给某些协程任务限定运行时间,一旦超时就取消,又该怎么做呢?再进一步,如果某些协程运行时出现错误,又该怎么处理呢?同样的,来看代码。
迭代三:
import asyncio async def worker_1(): await asyncio.sleep(1) return 1 async def worker_2(): await asyncio.sleep(2) return 2 / 0 async def worker_3(): await asyncio.sleep(3) return 3 async def main(): task_1 = asyncio.create_task(worker_1()) task_2 = asyncio.create_task(worker_2()) task_3 = asyncio.create_task(worker_3()) await asyncio.sleep(2) task_3.cancel() res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True) print(res) %time asyncio.run(main()) ########## 输出 ########## [1, ZeroDivisionError('division by zero'), CancelledError()] Wall time: 2 s
你可以看到,worker_1 正常运行,worker_2 运行中出现错误,worker_3 执行时间过长被我们 cancel 掉了,这些信息会全部体现在最终的返回结果 res 中。
不过要注意return_exceptions=True
这行代码。如果不设置这个参数,错误就会完整地 throw 到我们这个执行层,从而需要 try except 来捕捉,这也就意味着其他还没被执行的任务会被全部取消掉。为了避免这个局面,我们将 return_exceptions 设置为 True 即可。
import asyncio async def do_work(x): print("waiting", x) await asyncio.sleep(x) return f"done after {x}s " async def main(): tasks = [asyncio.ensure_future(do_work(1)), asyncio.ensure_future(do_work(2)), ] done, pending = await asyncio.wait(tasks) for task in done: print("Task rest", task.result()) loop = asyncio.get_event_loop() loop.run_until_complete(main())
接受的参数
wait
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
gather
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
返回的结果
await
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
gather
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
控制功能
import asyncio import random async def coro(tag): await asyncio.sleep(random.uniform(0.5, 5)) loop = asyncio.get_event_loop() tasks = [coro(i) for i in range(1, 11)] # 【控制运行任务数】:运行第一个任务就返回 # FIRST_COMPLETED :第一个任务完全返回 # FIRST_EXCEPTION:产生第一个异常返回 # ALL_COMPLETED:所有任务完成返回 (默认选项) dones, pendings = loop.run_until_complete( asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) print("第一次完成的任务数:", len(dones)) # 【控制时间】:运行一秒后,就返回 dones2, pendings2 = loop.run_until_complete(asyncio.wait(pendings, timeout=1)) print("第二次完成的任务数:", len(dones2)) # 【默认】:所有任务完成后返回 dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2)) print("第三次完成的任务数:", len(dones3)) loop.close()
import time import asyncio from queue import Queue from threading import Thread async def do_work(x, queue: Queue, msg=""): await asyncio.sleep(x) queue.put(msg) def start_loop(loop: asyncio.ProactorEventLoop): asyncio.set_event_loop(loop) loop.run_forever() if __name__ == '__main__': queue = Queue() loop = asyncio.new_event_loop() my_thread = Thread(target=start_loop, args=(loop,)) my_thread.start() #不能加join否者不能执行 print(time.ctime()) asyncio.run_coroutine_threadsafe(do_work(1, queue, "第一个"), loop) asyncio.run_coroutine_threadsafe(do_work(2, queue, "第二个"), loop) while True: msg = queue.get() print(f"{msg} 协程运行完成...") print(time.ctime())
import asyncio import time from queue import Queue from threading import Thread QUEUE_PRODUCER = Queue() QUEUE_CONSUMER = Queue() def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def do_sleep(x, queue: Queue): await asyncio.sleep(x) queue.put('ok') def start_producer(): QUEUE_PRODUCER.put(5) QUEUE_PRODUCER.put(3) QUEUE_PRODUCER.put(1) def consumer(): while True: task = QUEUE_PRODUCER.get() if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(do_sleep(int(task), QUEUE_CONSUMER), new_loop) if __name__ == '__main__': print(time.ctime()) new_loop = asyncio.new_event_loop() loop_thread = Thread(target=start_loop, args=(new_loop,)) loop_thread.daemon = True loop_thread.start() start_producer() consumer_thread = Thread(target=consumer) consumer_thread.daemon = True consumer_thread.start() while True: msg = QUEUE_CONSUMER.get() print("协程运行完成..") print("当前时间:", time.ctime())
到这里,发现了没,线程能实现的,协程都能做到。那就让我们温习一下这些知识点,用协程来实现一个经典的生产者消费者模型吧。
import asyncio import random async def consumer(queue, id): while True: val = await queue.get() print('{} get a val: {}'.format(id, val)) await asyncio.sleep(1) async def producer(queue, id): for i in range(5): val = random.randint(1, 10) await queue.put(val) print('{} put a val: {}'.format(id, val)) await asyncio.sleep(1) async def main(): queue = asyncio.Queue() consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1')) consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2')) producer_1 = asyncio.create_task(producer(queue, 'producer_1')) producer_2 = asyncio.create_task(producer(queue, 'producer_2')) await asyncio.sleep(10) consumer_1.cancel() consumer_2.cancel() await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True) %time asyncio.run(main()) ########## 输出 ########## producer_1 put a val: 5 producer_2 put a val: 3 consumer_1 get a val: 5 consumer_2 get a val: 3 producer_1 put a val: 1 producer_2 put a val: 3 consumer_2 get a val: 1 consumer_1 get a val: 3 producer_1 put a val: 6 producer_2 put a val: 10 consumer_1 get a val: 6 consumer_2 get a val: 10 producer_1 put a val: 4 producer_2 put a val: 5 consumer_2 get a val: 4 consumer_1 get a val: 5 producer_1 put a val: 2 producer_2 put a val: 8 consumer_1 get a val: 2 consumer_2 get a val: 8 Wall time: 10 s
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。