赞
踩
1、I/O模型
可用下图表示:
: ... for c in 'AB': ... yield c ... for i in range(1, 3): ... yield i ... >>> list(gen()) ['A', 'B', 1, 2] >>> #使用yield from实现相同的功能 >>> def gen(): ... yield from 'AB' ... yield from range(1, 3) ... >>> list(gen()) ['A', 'B', 1, 2]
2.2、打开双通道
from collections import namedtuple Result = namedtuple('Result', 'count average') # the subgenerator def averager(): total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total / count return Result(count, average) # the delegating generator def grouper(results, key): while True: #只有当生成器averager()结束,才会返回结果给results赋值 results[key] = yield from averager() def main(data): results = {} for key, values in data.items(): group = grouper(results, key) next(group) for value in values: group.send(value) group.send(None) report(results) #如果不使用yield from,仅仅通过yield实现相同的效果,如下: def main2(data): for key, values in data.items(): aver = averager() next(aver) for value in values: aver.send(value) try: #通过异常接受返回的数据 aver.send(None) except Exception as e: result = e.value print(result) def report(results): for key, result in sorted(results.items()): group, unit = key.split(';') print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit)) data = { 'girls;kg':[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5], 'girls;m':[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43], 'boys;kg':[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3], 'boys;m':[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46], } if __name__ == '__main__': main(data)
总结如下:
3、asyncio框架
mport asyncio
# Borrowed from http://curio.readthedocs.org/en/latest/tutorial.html.
@asyncio.coroutine
def countdown(number, n):
while n > 0:
print('T-minus', n, '({})'.format(number))
yield from asyncio.sleep(1)
n -= 1
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(countdown("A", 2)),
asyncio.ensure_future(countdown("B", 3))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
虽然发展到 Python 3.4 时有了yield from的加持让协程更容易了,但是由于协程在Python中发展的历史包袱所致,迭代器的过度重载,使用生成器实现协程功能有很多缺点:
于是根据Python 3.5 Beta期间的反馈,进行了重新设计:明确的把协程从生成器里独立出来—原生协程现在拥有了自己完整的独立类型,而不再是一种新的生成器类型。
4、async/await 原生协程
async def read_data(db):
pass
协程的主要属性包括:
async def函数始终为协程,即使它不包含await表达式。
如果在async函数中使用yield或者yield from表达式会产生SyntaxError错误。
在内部,引入了两个新的代码对象标记:
常规生成器在调用时会返回一个genertor对象,同理,协程在调用时会返回一个coroutine对象。
协程不再抛出StopIteration异常,而是替代为RuntimeError。常规生成器实现类似的行为需要进行引入future(PEP-3156)
当协程进行垃圾回收时,一个从未被await的协程会抛出RuntimeWarning异常
types.coroutine():在types模块中新添加了一个函数coroutine(fn)用于asyncio中基于生成器的协程与本PEP中引入的原生携协程互通。使用它,“生成器实现的协程”和“原生协程”之间可以进行互操作。
这个函数将生成器函数对象设CO_ITERABLE_COROUTINE标记,将返回对象变为coroutine对象。如果fn不是一个生成器函数,那么它会对其进行封装。如果它返回一个生成器,那么它会封装一个awaitable代理对象。
注意:CO_COROUTINE标记不能通过types.coroutine()进行设置,这就可以将新语法定义的原生协程与基于生成器的协程进行区分。
await与yield from相似,await关键字的行为类似标记了一个断点,挂起协程的执行直到其他awaitable对象完成并返回结果数据。它复用了yield from的实现,并且添加了额外的验证参数。await只接受以下之一的awaitable对象:
协程链:协程的一个关键特性是它们可以组成协程链,就像函数调用链一样,一个协程对象是awaitable的,因此其他协程可以await另一个协程对象。
任意一个yield from链都会以一个yield结束,这是Future实现的基本机制。因此,协程在内部中是一种特殊的生成器。每个await最终会被await调用链条上的某个yield语句挂起。
关于基于生成器的协程和async定义的原生协程之间的差异,关键点是只有基于生成器的协程可以真正的暂停执行并强制性返回给事件循环。所以每个await最终会被await调用链条上的某个由types.coroutine()装饰的包含yield语句的协程函数挂起。
为了启用协程的这一特点,一个新的魔术方法__await__被添加进来。在asyncio中,对于对象在await语句启用Future对象只需要添加await = iter这行到asyncio.Future类中。带有await方法的对象也叫做Future-like对象。
另外还新增了异步上下文管理 async with 和异步迭代器 async for。异步生成器和异步推导式都让迭代变得并发,他们所做的只是提供同步对应的外观,但是有问题的循环能够放弃对事件循环的控制,以便运行其他协程。
关于何时以及如何能够和不能使用async / await,有一套严格的规则:
5、将 async/await 看做异步编程的 API
import datetime import heapq import types import time class Task: """相当于asyncio.Task,存储协程和要执行的时间""" def __init__(self, wait_until, coro): self.coro = coro self.waiting_until = wait_until def __eq__(self, other): return self.waiting_until == other.waiting_until def __lt__(self, other): return self.waiting_until < other.waiting_until class SleepingLoop: """一个事件循环,每次执行最先需要执行的协程,时间没到就阻塞等待,相当于asyncio中的事件循环""" def __init__(self, *coros): self._new = coros self._waiting = [] def run_until_complete(self): # 启动所有的协程 for coro in self._new: print(coro) wait_for = coro.send(None) heapq.heappush(self._waiting, Task(wait_for, coro)) # 保持运行,直到没有其他事情要做 while self._waiting: now = datetime.datetime.now() # 每次取出最先执行的协程 task = heapq.heappop(self._waiting) if now < task.waiting_until: # 阻塞等待指定的休眠时间 delta = task.waiting_until - now time.sleep(delta.total_seconds()) print(task.coro, delta.total_seconds()) now = datetime.datetime.now() try: # 恢复不需要等待的协程 wait_until = task.coro.send(now) heapq.heappush(self._waiting, Task(wait_until, task.coro)) except StopIteration: # 捕捉协程结束的抛出异常 pass @types.coroutine def sleep(seconds): """暂停一个协程指定时间,可把他当做asyncio.sleep()""" now = datetime.datetime.now() wait_until = now + datetime.timedelta(seconds=seconds) actual = yield wait_until return actual - now async def countdown(label, length, *, delay=0): """协程函数,实现具体的任务""" print(label, 'waiting', delay, 'seconds before starting countdown') delta = await sleep(delay) print(label, 'starting after waiting', delta) while length: print(label, 'T-minus', length) waited = await sleep(1) length -= 1 print(label, 'lift-off!') def main(): """启动事件循环,运行三个协程""" loop = SleepingLoop(countdown('A', 5), countdown('B', 3, delay=2), countdown('C', 4, delay=1)) start = datetime.datetime.now() loop.run_until_complete() print('Total elapsed time is', datetime.datetime.now() - start) if __name__ == '__main__': main()
6、总结Python异步编程版本细节
协程对象
协程对象:指一个使用async关键字定义的异步函数,是需要执行的任务,它的调用不会立即执行函数,而是会返回一个协程对象。协程不能直接运行,协程对象需要注册到事件循环,由事件循环调用。
有两种方法可以从协程读取异步函数的输出:
在Python中编写异步函数时要记住的一件事是,在def之前使用了async关键字并不意味着你的异步函数将同时运行。如果采用普通函数并在其前面添加async,则事件循环将运行函数而不会中断,因为你没有指定允许循环中断你的函数以运行另一个协同程序的位置。指定允许事件循环中断运行的位置非常简单,每次使用关键字await等待事件循环都可以停止运行你的函数并切换到运行另一个注册到循环的协同程序。
事件循环
Future & Task对象
Future对象
# 该函数在 Python 3.7 中被加入,更加高层次的函数,返回Task对象
future1 = asyncio.create_task(my_coroutine)
# 在Python 3.7 之前,是更加低级的函数,返回Future对象或者Task对象
future2 = asyncio.ensure_future(my_coroutine)
第一种方法在循环中添加一个协程并返回一个task对象,task对象是future的子类型。第二种方法非常相似,当传入协程对象时返回一个Task对象,唯一的区别是它也可以接受Future对象或Task对象,在这种情况下它不会做任何事情并且返回Future对象或者Task对象不变。
Future对象有几个状态:
创建Future对象的时候,状态为pending,事件循环调用执行的时候就是running,调用完毕就是done,如果需要取消Future对象的调度执行,可调用Future对象的cancel()函数。
除此之外,Future对象还有下面一些常用的方法:
需要注意的是,当在协程内部引发未处理的异常时,它不会像正常的同步编程那样破坏我们的程序,相反,它存储在future内部,如果在程序退出之前没有处理异常,则会出现以下错误:
Task exception was never retrieved
try:
# 调用结果时捕获异常
my_promise.result()
catch Exception:
pass
# 获取在协程执行过程中抛出的异常
my_promise.exception()
Task对象
实例分析
import asyncio import time async def compute(x, y): print("Compute {} + {}...".format(x, y)) await asyncio.sleep(2.0) return x+y async def print_sum(x, y): result = await compute(x, y) print("{} + {} = {}".format(x, y, result)) start = time.time() loop = asyncio.get_event_loop() tasks = [ asyncio.ensure_future(print_sum(0, 0)), asyncio.ensure_future(print_sum(1, 1)), asyncio.ensure_future(print_sum(2, 2)), ] loop.run_until_complete(asyncio.wait(tasks)) loop.close() print("Total elapsed time {}".format(time.time() - start))
上面的代码的执行流程是:
详细的流程应该是这样的:
四、asyncio使用详解
协程完整的工作流程是这样:
协程:协程通过 async/await 语法进行声明,是编写异步应用的推荐方式。注意:简单地调用一个协程并不会将其加入执行队列。
要真正运行一个协程,asyncio 提供了三种主要机制:
三种运行方式代码如下:
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) return delay async def main(): print(f"started at {time.strftime('%X')}") # 通过await等待运行,此时两个任务按顺序运行 result1 = await say_after(2, 'hello') result2 = await say_after(1, 'world') print(result1, result2) task1 = asyncio.create_task(say_after(2, 'hello2')) task2 = asyncio.create_task(say_after(1, 'world2')) # 通过asyncio.task()包装为task然后await等待运行,此时两个任务并发运行 result3 = await task1 result4 = await task2 print(result3, result4) print(f"finished at {time.strftime('%X')}") # 通过asyncio.run()函数运行 asyncio.run(main()) # 下面相当于上面的asyncio.run()函数 # loop = asyncio.get_event_loop() # try: # loop.run_until_complete(main()) # finally: # loop.close()
并发运行任务
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") return f async def main(): # 并发运行三个任务 result = await asyncio.gather( factorial("A", 5), factorial("B", 3), factorial("C", 4), ) print(result) asyncio.run(main())
等待任务
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") return f async def main(): tasks = list() for i in range(2, 5): tasks.append(asyncio.create_task(factorial("Task" + str(i), i))) done, pending = await asyncio.wait(tasks) for d in done: result = await d print(result) asyncio.run(main())
回调
import asyncio
def callback(task):
print(task, "done")
async def hello():
print("hello")
await asyncio.sleep(0)
async def main():
task = asyncio.create_task(hello())
task.add_done_callback(callback)
await task
asyncio.run(main())
2、队列
import asyncio import itertools as it import os import random import time async def makeitem(size: int = 5) -> str: return os.urandom(size).hex() async def randint(a: int, b: int) -> int: return random.randint(a, b) async def randsleep(a: int = 1, b: int = 5, caller=None) -> None: i = await randint(a, b) if caller: print(f"{caller} sleeping for {i} seconds.") await asyncio.sleep(i) async def produce(name: int, q: asyncio.Queue) -> None: """生产者""" n = await randint(1, 5) for _ in it.repeat(None, n): # 同步添加任务 await randsleep(caller=f"Producer {name}") i = await makeitem() t = time.perf_counter() await q.put((i, t)) print(f"Producer {name} added <{i}> to queue.") async def consume(name: int, q: asyncio.Queue) -> None: """消费者""" while True: await randsleep(caller=f"Consumer {name}") i, t = await q.get() now = time.perf_counter() print(f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds.") q.task_done() async def main(nprod: int, ncon: int): q = asyncio.Queue() # asyncio.run()会自动运行消费者和生产者 producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)] consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)] await asyncio.gather(*producers) # 等待生产者结束 await q.join() # 阻塞直到队列中的所有项目都被接收和处理 # 取消消费者 for c in consumers: c.cancel() if __name__ == "__main__": random.seed(444) start = time.perf_counter() asyncio.run(main(2, 3)) elapsed = time.perf_counter() - start print(f"Program completed in {elapsed:0.5f} seconds.")
上面的代码逻辑流程如下:
3、结合线程和进程
多线程
import asyncio from threading import Thread async def hello(i): print("hello", i) await asyncio.sleep(i) return i async def main(): tasks = [asyncio.create_task(hello(i)) for i in range(5)] await asyncio.gather(*tasks) def async_main(): asyncio.run(main()) # 在子线程中运行异步任务 t = Thread(target=async_main) t.start() # 不会干扰主线程 for i in range(3): print(i)
方法二:loop.call_soon_threadsafe()函数
loop.call_soon()用于注册回调,当异步任务执行完成时会在当前线程按顺序执行注册的普通函数。
loop.call_soon_threadsafe()用于在一个线程中注册回调函数,在另一个线程中执行注册的普通函数。
from threading import Thread import asyncio import time async def hello(i): print("hello", i) await asyncio.sleep(i) return i async def main(): tasks = [asyncio.create_task(hello(i)) for i in range(5)] await asyncio.gather(*tasks) def start_loop(loop): asyncio.set_event_loop(loop) loop.run_until_complete(main()) def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x)) # 在主线程创建事件循环,并在另一个线程中启动 new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() # 在主线程中注册回调函数,在子线程中按顺序执行回调函数 new_loop.call_soon_threadsafe(more_work, 1) new_loop.call_soon_threadsafe(more_work, 3) # 不会阻塞主线程 for i in range(10): print(i)
方法三:asyncio.run_coroutine_threadsafe()函数
loop.call_soon_threadsafe()函数是同步执行回调函数,asyncio.run_coroutine_threadsafe()函数则是异步执行回调函数,传入写成函数。
from threading import Thread import asyncio import time async def hello(i): print("hello", i) await asyncio.sleep(i) return i async def main(): tasks = [asyncio.create_task(hello(i)) for i in range(5)] await asyncio.gather(*tasks) def start_loop(loop): asyncio.set_event_loop(loop) loop.run_until_complete(main()) # 在主线程创建事件循环,并在另一个线程中启动 new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() # 在主线程中注册回调协程函数,在子线程中按异步执行回调函数 asyncio.run_coroutine_threadsafe(hello(3.5), new_loop) asyncio.run_coroutine_threadsafe(hello(1.5), new_loop) # 不会阻塞主线程 for i in range(10): print(i)
import asyncio import concurrent.futures import time def blocks(n): """阻塞任务""" time.sleep(0.1) return n ** 2 async def run_blocking_tasks(executor): loop = asyncio.get_event_loop() # 在线程池中执行阻塞任务 blocking_tasks = [ loop.run_in_executor(executor, blocks, i) for i in range(6) ] completed, pending = await asyncio.wait(blocking_tasks) results = [t.result() for t in completed] print(results) # 创建线程池 executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(run_blocking_tasks(executor)) finally: event_loop.close()
多进程
import asyncio
import multiprocessing
async def hello(i):
print("hello", i)
await asyncio.sleep(1)
def strap(tx, rx):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(hello(3))
# 启动一个子线程,在子线程中运行异步代码
p = multiprocessing.Process(target=strap, args=(1, 3))
p.start()
# 子进程和主进程不会相互干扰
for i in range(10):
print(i)
import asyncio import concurrent.futures import time def blocks(n): """阻塞任务""" time.sleep(0.1) return n ** 2 async def run_blocking_tasks(executor): loop = asyncio.get_event_loop() # 在进程池中执行阻塞任务 blocking_tasks = [ loop.run_in_executor(executor, blocks, i) for i in range(6) ] completed, pending = await asyncio.wait(blocking_tasks) results = [t.result() for t in completed] print(results) # 创建进程池 executor = concurrent.futures.ProcessPoolExecutor(max_workers=3) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(run_blocking_tasks(executor)) finally: event_loop.close()
import asyncio
from aiohttp import request
from aiomultiprocess import Worker
async def get(url):
async with request("GET", url) as response:
return await response.text("utf-8")
async def main():
p = Worker(target=get, args=("https://jreese.sh", ))
response = await p
asyncio.run(main())
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。