当前位置:   article > 正文

Python的协程异步IO(asyncio)详解_python asyncio 深度解析

python asyncio 深度解析

一、协程简介

1.1 定义

协程不是系统级线程,很多时候协程被称为“轻量级线程”、“微线程”、“纤程(fiber)”等。简单来说可以认为协程是线程里不同的函数,这些函数之间可以相互快速切换。

协程和用户态线程非常接近,用户态线程之间的切换不需要陷入内核,但部分操作系统中用户态线程的切换需要内核态线程的辅助。

协程是编程语言(或者 lib)提供的特性(协程之间的切换方式与过程可以由编程人员确定),是用户态操作。协程适用于 IO 密集型的任务。常见提供原生协程支持的语言有:c++20、golang、python 等,其他语言以库的形式提供协程功能,比如 C++20 之前腾讯的 fiber 和 libco 等等。

1.2 分类 

协程有两种,一种无栈协程,python中以 asyncio 为代表, 一种有栈协程,python 中 以 gevent 为代表,本文主要讲解 asyncio 线程。

有栈线程

无栈线程

备注

例子:

lua thread

python gevent

C# yield return

C# async\await

python asyncio

是否拥有单独的上下文:

上下文包括寄存器、栈帧

局部变量保存位置:

无栈协程的局部变量保存在堆上,比如generator的数据成员。

优点:

1. 每个协程有单独的上下文,可以在任意的嵌套函数中任何地方挂起此协程。

2. 不需要编译器做语法支持,通过汇编指令即可实现

1. 不需要为每个协程保存单独的上下文,内存占用低。

2. 切换成本低,性能更高。

缺点:

1. 需要提前分配一定大小的堆内存保存每个协程上下文,所以会出现内存浪费或者栈溢出。

2. 上下文拷贝和切换成本高,性能低于无栈协程。

1. 需要编译器提供语义支持,比如C# yield return语法糖。

2. 只能在这个生成器内挂起此协程,无法在嵌套函数中挂起此协程。

3. 关键字有一定传染性,异步代码必须都有对应的关键字。作为对比,有栈协程只需要做对应的函数调用。

无栈协程无法在嵌套函数中挂起此协程,有栈协程由于是通过保存和切换上下文包括寄存器和执行栈实现,可以在协程函数的嵌套函数内部yield这个协程并唤醒。

二、python的asyncio协程详解

2.1 介绍

asyncio 是用来编写并发代码的库,使用 async/await 语法。

asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

asyncio 提供一组高层级 API 用于:

  • 并发地运行 Python 协程 并对其执行过程实现完全控制;
  • 执行网络 IO 和 IPC;
  • 控制子进程;
  • 通过队列实现分布式任务;
  • 同步并发代码;
  • 创建和管理事件循环,以提供异步 API 用于 网络化, 运行 子进程,处理 OS 信号 等等;
  • 使用 transports 实现高效率协议;
  • 通过 async/await 语法桥接基于回调的库和代码。

2.2 asyncio 协程的使用(用的python3.8的语法)

asyncio 函数的源代码地址:https://github.com/python/cpython/tree/3.8/Lib/asyncio

1)协程通过 async/await 语法进行声明,是编写 asyncio 应用的推荐方式。

asyncio.run() 函数用来运行最高层级的入口点 "main()" 函数。

asyncio.sleep(delay, result=None, *, loop=None) 函数用来阻塞指定的秒数。

  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. async def main():
  5. print('hello')
  6. await asyncio.sleep(1)
  7. print('world')
  8. asyncio.run(main())
2)事件循环函数(包括循环的创建、运行和停止)

asyncio.get_running_loop() 函数返回当前 OS 线程中正在运行的事件循环。
asyncio.get_event_loop() 函数获取当前事件循环。
asyncio.set_event_loop(loop) 函数将 loop 设置为当前 OS 线程的当前事件循环。
asyncio.new_event_loop() 函数创建一个新的事件循环。
loop.run_until_complete(future) 函数运行直到 future (Future 的实例) 被完成。
loop.run_forever() 函数运行事件循环直到 stop() 被调用。
loop.stop() 函数停止事件循环。
loop.is_running() 函数返回 True 如果事件循环当前正在运行。
loop.is_closed() 函数如果事件循环已经被关闭,返回 True 。
loop.close() 函数关闭事件循环。
loop.create_future() 函数创建一个附加到事件循环中的 asyncio.Future 对象。
loop.create_task(coro, *, name=None) 函数安排一个 协程 的执行。返回一个 Task 对象。
loop.set_task_factory(factory) 函数设置一个 task 工厂 , 被用于 loop.create_task() 。
loop.get_task_factory() 函数返回一个任务工厂,或者如果是使用默认值则返回 None。

 示例1:

  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. async def fun1():
  5. await asyncio.sleep(1)
  6. print('协程1')
  7. async def fun2():
  8. await asyncio.sleep(1)
  9. print('协程2')
  10. loop = asyncio.get_event_loop()
  11. loop.run_until_complete(asyncio.wait([fun1(), fun2()]))
  12. loop.close()

示例2:

  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. import time
  5. # 一个对future进行赋值的函数
  6. async def slow_operation(future, num):
  7. await asyncio.sleep(1)
  8. # 给future赋值
  9. future.set_result('Future'+ str(num) +' is done!')
  10. def main():
  11. loop = asyncio.get_event_loop()
  12. # 创建一个future
  13. future1 = loop.create_future()
  14. # 使用ensure_future 创建Task
  15. asyncio.ensure_future(slow_operation(future1, 1))
  16. future2 = loop.create_future()
  17. asyncio.ensure_future(slow_operation(future2, 2))
  18. # gather Tasks,并通过run_uniti_complete来启动、终止loop
  19. loop.run_until_complete(asyncio.gather(future1, future2))
  20. print(future1.result())
  21. print(future2.result())
  22. loop.close()
  23. if __name__ == "__main__":
  24. main()
3)调度回调和延迟回调 

loop.call_soon(callback, *args, context=None) 函数安排 callback 在事件循环的下一次迭代时附带 args 参数被调用。回调按其注册顺序被调用。每个回调仅被调用一次。方法不是线程安全的。
loop.call_soon_threadsafe(callback, *args, context=None) 函数是 call_soon() 的线程安全变体。必须被用于安排 来自其他线程 的回调。
loop.call_later(delay, callback, *args, context=None) 函数安排 callback 在给定的 delay 秒(可以是 int 或者 float)后被调用。
loop.call_at(when, callback, *args, context=None) 函数安排 callback 在给定的绝对时间戳的时间(一个 int 或者 float)被调用,使用与 loop.time() 同样的时间参考。
loop.time() 函数根据时间循环内部的单调时钟,返回当前时间, float 值。

  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. from threading import Thread
  5. import time
  6. def callback(arg, loop):
  7. print('回调函数arg={} 回调的时间time={}'.format(arg, loop.time()))
  8. async def task(loop):
  9. now = loop.time()
  10. print('时钟时间:{}'.format(time.time()))
  11. print('时事件循环时间:{}'.format(loop.time()))
  12. print('注册回调函数')
  13. loop.call_at(now + 1, callback, 'call_at1', loop) # 等待1秒执行 call_at 函数
  14. loop.call_at(now + 2, callback, 'call_at2', loop)
  15. loop.call_later(3, callback, 'call_later1', loop) # 等待3秒执行 call_later 函数
  16. loop.call_later(4, callback, 'call_later2', loop)
  17. loop.call_soon(callback, 'call_soon', loop) # 立即执行执行 call_soon 函数
  18. await asyncio.sleep(4)
  19. def main():
  20. event_loop = asyncio.get_event_loop()
  21. try:
  22. print('进入事件循环监听')
  23. event_loop.run_until_complete(task(event_loop)) # 将事件循环对象传入task函数中
  24. finally:
  25. print('关闭事件循环监听')
  26. event_loop.close()
  27. if __name__ == "__main__":
  28. main()
4)socket连接和Streams函数
  • loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None) 函数打开一个流式传输连接,连接到由 host 和 port 指定的地址。
  • loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True) 函数创建TCP服务 (socket 类型 SOCK_STREAM ) 监听 host 地址的 port 端口。
  • loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True) 函数与 loop.create_server() 类似但是专用于 AF_UNIX 套接字族。path 是必要的 Unix 域套接字名称,除非提供了 sock 参数。 抽象的 Unix 套接字, str, bytes 和 Path 路径都是受支持的。
  • loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None) 函数将已被接受的连接包装成一个传输/协议对。
  • loop.sock_recv(sock, nbytes) 函数从 sock 接收至多 nbytes。 socket.recv() 的异步版本。
  • loop.sock_recv_into(sock, buf) 函数从 sock 接收数据放入 buf 缓冲区。 模仿了阻塞型的 socket.recv_into() 方法。
  • loop.sock_sendall(sock, data) 函数将 data 发送到 sock 套接字。 socket.sendall() 的异步版本。
  • loop.sock_accept(sock) 函数接受一个连接。 模仿了阻塞型的 socket.accept() 方法。
  • loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True) 函数在可能的情况下使用高性能的 os.sendfile 发送文件。 返回所发送的字节总数。
  • asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None) 函数建立网络连接并返回一对 (reader, writer) 对象。

  • asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True) 函数启动套接字服务。

  • asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None) 函数建立一个 Unix 套接字连接并返回 (reader, writer) 这对返回值。与 open_connection() 相似,但是操作在 Unix 套接字上。

  • asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True) 函数启动一个Unix socket服务。与 start_server() 相似,但是是在 Unix 套接字上的操作。

  • asyncio.StreamReader 这个类表示一个提供api来从IO流中读取数据的读取器对象。

    reader.read(n=-1) 函数读取 n 个byte. 如果没有设置 n , 则自动置为 -1 ,读至 EOF 并返回所有读取的byte。
    reader.readline() 函数读取一行,其中“行”指的是以 \n 结尾的字节序列。如果读到EOF而没有找到 \n ,该方法返回部分读取的数据。如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。
    reader.readexactly(n) 函数精准读取 n 个 bytes,不能超过也不能少于。
    reader.readuntil(separator=b'\n') 函数从流中读取数据直至遇到 分隔符成功后,数据和指定的separator将从内部缓冲区中删除(或者说被消费掉)。返回的数据将包括在末尾的指定separator。如果读取的数据量超过了配置的流限制,将引发 LimitOverrunError 异常,数据将留在内部缓冲区中并可以再次读取。如果在找到完整的separator之前到达EOF,则会引发 IncompleteReadError 异常,并重置内部缓冲区。 IncompleteReadError.partial 属性可能包含指定separator的一部分。
    reader.at_eof() 函数如果缓冲区为空并且 feed_eof() 被调用,则返回 True 。

  • asyncio.StreamWriter 这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。

    writer.write(data) 函数会尝试立即将 data 写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。
    writer.writelines(data) 函数会立即尝试将一个字节串列表(或任何可迭代对象)写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。
    writer.close() 函数会关闭流以及下层的套接字。
    writer.can_write_eof() 函数如果下层的传输支持 write_eof() 方法则返回``True``,否则返回 False。
    writer.write_eof() 函数在已缓冲的写入数据被刷新后关闭流的写入端。
    writer.transport() 函数返回下层的 asyncio 传输。
    writer.drain() 函数等待直到可以适当地恢复写入到流。
    writer.is_closing() 函数如果流已被关闭或正在被关闭则返回 True。
    writer.wait_closed() 函数等待直到流被关闭。

server 代码: 

  1. # coding=utf8
  2. import asyncio
  3. from asyncio import StreamReader, StreamWriter
  4. async def echo(reader: StreamReader, writer: StreamWriter):
  5. data = await reader.read(1024)
  6. message = data.decode()
  7. addr = writer.get_extra_info('peername')
  8. print(f"Received {message} from {addr}")
  9. print(f"Send: {message}")
  10. writer.write(data)
  11. await writer.drain()
  12. writer.close()
  13. async def main(host, port):
  14. server = await asyncio.start_server(echo, host, port)
  15. addr = server.sockets[0].getsockname()
  16. print(f'Serving on {addr}')
  17. async with server:
  18. await server.serve_forever()
  19. asyncio.run(main("127.0.0.1", 9999))

client 代码:

  1. # coding=utf8
  2. import asyncio
  3. async def tcp_echo_client(message):
  4. reader, writer = await asyncio.open_connection('127.0.0.1', 9999)
  5. print(f'Send to server: {message}')
  6. writer.write(message.encode())
  7. await writer.drain()
  8. data = await reader.read(1024)
  9. print(f'Received from server: {data.decode()}')
  10. writer.close()
  11. await writer.wait_closed()
  12. if __name__ == '__main__':
  13. while True:
  14. send_msg = input("send: ")
  15. asyncio.run(tcp_echo_client(send_msg))
5)在线程或者进程池中执行代码

loop.run_in_executor(executor, func, *args) 函数安排在指定的执行器中调用 func 。

  1. # coding=utf8
  2. import asyncio
  3. import concurrent.futures
  4. def blocking_io():
  5. # File operations (such as logging) can block the
  6. # event loop: run them in a thread pool.
  7. with open('/dev/urandom', 'rb') as f:
  8. return f.read(100)
  9. def cpu_bound():
  10. # CPU-bound operations will block the event loop:
  11. # in general it is preferable to run them in a
  12. # process pool.
  13. return sum(i * i for i in range(5))
  14. async def main():
  15. loop = asyncio.get_running_loop()
  16. ## Options:
  17. # 1. Run in the default loop's executor:
  18. result = await loop.run_in_executor(
  19. None, blocking_io)
  20. print('default thread pool', result)
  21. print("\n")
  22. # 2. Run in a custom thread pool:
  23. with concurrent.futures.ThreadPoolExecutor() as pool:
  24. result = await loop.run_in_executor(
  25. pool, blocking_io)
  26. print('custom thread pool', result)
  27. print("\n")
  28. # 3. Run in a custom process pool:
  29. with concurrent.futures.ProcessPoolExecutor() as pool:
  30. result = await loop.run_in_executor(
  31. pool, cpu_bound)
  32. print('custom process pool', result)
  33. asyncio.run(main())
6) asyncio.create_task(coro, *, name=None) 函数用来将一个协程打包为一个 Task 排入日程准备执行,并返回 Task 对象。
  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. import time
  5. async def say_after(delay, what):
  6. await asyncio.sleep(delay)
  7. print(what)
  8. async def main():
  9. task1 = asyncio.create_task(
  10. say_after(1, 'hello'))
  11. task2 = asyncio.create_task(
  12. say_after(2, 'world'))
  13. print(f"started at {time.strftime('%X')}")
  14. # Wait until both tasks are completed (should take around 2 seconds.)
  15. await task1
  16. await task2
  17. print(f"finished at {time.strftime('%X')}")
  18. asyncio.run(main())
7)错误处理API

loop.set_exception_handler(handler) 函数将 handler 设置为新的事件循环异常处理器。
loop.get_exception_handler() 函数返回当前的异常处理器,如果没有设置异常处理器,则返回 None 。
loop.default_exception_handler(context) 函数默认的异常处理器。
loop.call_exception_handler(context) 函数调用当前事件循环的异常处理器。
loop.get_debug() 函数获取事件循环调试模式设置(bool)。
loop.set_debug(enabled: bool) 函数设置事件循环的调试模式。

  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. def handle_exception(loop, context):
  5. print('Error:', context['message'])
  6. async def my_task():
  7. await asyncio.sleep(1)
  8. print('task1')
  9. loop = asyncio.get_event_loop()
  10. loop.set_exception_handler(handle_exception)
  11. loop.run_until_complete(my_task())
  12. loop.close()
8)Future 

asyncio.Future(*, loop=None) 函数是一个 Future 代表一个异步运算的最终结果。线程不安全。
asyncio.isfuture(obj) 函数用来判断如果 obj 为一个 asyncio.Future类的示例、 asyncio.Task 类的实例或者一个具有 _asyncio_future_blocking 属性的对象,返回 True。
asyncio.ensure_future(obj, *, loop=None) 函数创建新任务。
asyncio.wrap_future(future, *, loop=None) 函数将一个 concurrent.futures.Future 对象封装到 asyncio.Future 对象中。

Future 对象相关函数:

fut.result() 函数返回 Future 的结果。
fut.set_result(result) 函数将 Future 标记为 完成 并设置结果。
fut.set_exception(exception)  函数将 Future 标记为 完成 并设置一个异常。
fut.done() 函数如果 Future 为已 完成 则返回 True 。
fut.cancelled() 函数是如果 Future 已取消则返回 True
fut.add_done_callback(callback, *, context=None) 函数添加一个在 Future 完成 时运行的回调函数。
fut.remove_done_callback(callback) 函数从回调列表中移除 callback 。
fut.cancel() 函数取消 Future 并调度回调函数。
fut.exception() 函数返回 Future 已设置的异常。
fut.get_loop() 函数返回 Future 对象已绑定的事件循环。

  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. import time
  5. # 定义一个协程
  6. async def slow_operation(fut):
  7. await asyncio.sleep(1)
  8. fut.set_result(22)
  9. def def_callback(fut):
  10. number = fut.result()
  11. print(number + 1)
  12. def main():
  13. # 获得全局循环事件
  14. loop = asyncio.get_event_loop()
  15. # 实例化期物对象
  16. fut = asyncio.Future()
  17. asyncio.ensure_future(slow_operation(fut))
  18. # 执行回调函数
  19. fut.add_done_callback(def_callback)
  20. # loop 的 run_until_complete 会将 _run_until_complete_cb 添加到 future 的完成回调列表中。而 _run_until_complete_cb 中会执行 loop.stop() 方法
  21. loop.run_until_complete(fut)
  22. # 关闭事件循环对象
  23. loop.close()
  24. if __name__ == "__main__":
  25. main()
9)asyncio.gather(*aws, loop=None, return_exceptions=False) 函数用来并发运行 aws 序列中的可等待对象。如果 aws 中的某个可等待对象为协程,它将自动作为一个任务加入日程。
  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. async def factorial(name, number):
  5. f = 1
  6. for i in range(2, number + 1):
  7. print(f"Task {name}: Compute factorial({i})...")
  8. await asyncio.sleep(1)
  9. f *= i
  10. print(f"Task {name}: factorial({number}) = {f}")
  11. async def main():
  12. # Schedule three calls *concurrently*:
  13. await asyncio.gather(
  14. factorial("A", 2),
  15. factorial("B", 3),
  16. factorial("C", 4),
  17. )
  18. asyncio.run(main())
10)asyncio.shield(aw, *, loop=None) 函数用来保护一个可等待对象防止其被取消。
  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. async def task_func(number):
  5. await asyncio.sleep(1)
  6. print('函数执行成功:'+str(number))
  7. async def cancel_task(task):
  8. await asyncio.sleep(0.2)
  9. was_cancelled = task.cancel()
  10. print(f'cancelled: {was_cancelled}')
  11. async def main():
  12. coro = task_func(1)
  13. task = asyncio.create_task(coro)
  14. shielded = asyncio.shield(task)
  15. asyncio.create_task(cancel_task(shielded))
  16. try:
  17. result = await shielded
  18. print(f'>got: {result}')
  19. except asyncio.CancelledError:
  20. print('shielded was cancelled')
  21. await asyncio.sleep(1)
  22. print(f'shielded: {shielded}')
  23. print(f'task: {task}')
  24. asyncio.run(main())
11)asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 函数并发地运行 aws 可迭代对象中的可等待对象并进入阻塞状态直到满足 return_when 所指定的条件。

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常数

描述

FIRST_COMPLETED

函数将在任意可等待对象结束或取消时返回。

FIRST_EXCEPTION

函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED

ALL_COMPLETED

函数将在所有可等待对象结束或取消时返回。

  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. async def coroutine_example(name):
  5. print('正在执行name:', name)
  6. await asyncio.sleep(1)
  7. print('执行完毕name:', name)
  8. loop = asyncio.get_event_loop()
  9. tasks = [coroutine_example('Zarten_' + str(i)) for i in range(3)]
  10. wait_coro = asyncio.wait(tasks)
  11. loop.run_until_complete(wait_coro)
  12. loop.close()
12)asyncio.wait_for(aw, timeout, *, loop=None) 函数等待 aw 可等待对象完成,指定 timeout 秒数后超时。
  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. async def eternity():
  5. # Sleep for one hour
  6. await asyncio.sleep(3600)
  7. print('yay!')
  8. async def main():
  9. # Wait for at most 1 second
  10. try:
  11. await asyncio.wait_for(eternity(), timeout=1)
  12. except asyncio.TimeoutError:
  13. print('timeout!')
  14. asyncio.run(main())
13) asyncio.run_coroutine_threadsafe(coro, loop) 函数向指定事件循环提交一个协程。线程安全。
  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. import threading
  5. async def main(i):
  6. while True:
  7. await asyncio.sleep(1)
  8. print(i)
  9. async def production_task():
  10. for i in range(1, 4):
  11. # 将不同参数main这个协程循环注册到运行在线程中的循环,
  12. # thread_loop会获得一循环任务
  13. asyncio.run_coroutine_threadsafe(main(i),thread_loop)
  14. # 注意:run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用
  15. def start_loop(thread_loop):
  16. # 运行事件循环, loop以参数的形式传递进来运行
  17. asyncio.set_event_loop(thread_loop)
  18. thread_loop.run_forever()
  19. if __name__ == '__main__':
  20. # 获取一个事件循环
  21. thread_loop = asyncio.new_event_loop()
  22. # 将次事件循环运行在一个线程中,防止阻塞当前主线程,运行线程,同时协程事件循环也会运行
  23. threading.Thread(target=start_loop, args=(thread_loop,)).start()
  24. # 将生产任务的协程注册到这个循环中
  25. loop = asyncio.get_event_loop()
  26. # 运行次循环
  27. loop.run_until_complete(production_task())
14) asyncio.current_task(loop=None) 函数返回当前运行的 Task 实例,如果没有正在运行的任务则返回 None。如果 loop 为 None 则会使用 get_running_loop() 获取当前事件循环。

asyncio.all_tasks(loop=None) 函数返回事件循环所运行的未完成的 Task 对象的集合。如果 loop 为 None,则会使用 get_running_loop() 获取当前事件循环。

  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. import time
  5. async def my_coroutine():
  6. task = asyncio.current_task()
  7. print(task)
  8. async def main():
  9. task1 = asyncio.create_task(my_coroutine())
  10. task2 = asyncio.create_task(my_coroutine())
  11. tasks = [task1, task2]
  12. await asyncio.gather(*tasks)
  13. asyncio.run(main())
15)asyncio.Task(coro, *, loop=None, name=None) 函数一个与 Future 类似 的对象,可运行 Python 协程。非线程安全。 

Task 对象相关函数:

task.cancelled() 函数如果 Task 对象 被取消 则返回 True。
task.done() 函数如果 Task 对象 已完成 则返回 True。
task.result() 函数返回 Task 的结果。
task.exception() 函数返回 Task 对象的异常。
task.remove_done_callback(callback) 函数从回调列表中移除 callback 。
task.get_stack(*, limit=None) 函数返回此 Task 对象的栈框架列表。
task.print_stack(*, limit=None, file=None) 函数打印此 Task 对象的栈或回溯。
task.get_coro() 函数返回由 Task 包装的协程对象。
task.get_name() 函数返回 Task 的名称。
task.set_name(value) 函数设置 Task 的名称。

  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. import time
  5. async def cancel_me():
  6. print('cancel_me(): before sleep')
  7. try:
  8. # Wait for 1 hour
  9. await asyncio.sleep(3600)
  10. except asyncio.CancelledError:
  11. print('cancel_me(): cancel sleep')
  12. raise
  13. finally:
  14. print('cancel_me(): after sleep')
  15. async def main():
  16. # Create a "cancel_me" Task
  17. task = asyncio.create_task(cancel_me())
  18. # Wait for 1 second
  19. await asyncio.sleep(1)
  20. task.cancel()
  21. try:
  22. await task
  23. except asyncio.CancelledError:
  24. print("main(): cancel_me is cancelled now")
  25. asyncio.run(main())
16)基于生成器的协程(Python 3.10之后已经弃用)

基于生成器的协程是 async/await 语法的前身。它们是使用 yield from 语句创建的 Python 生成器,可以等待 Future 和其他协程。

@asyncio.coroutine 用来标记基于生成器的协程的装饰器。

asyncio.iscoroutine(obj) 函数如果 obj 是一个协程对象则返回 True。

asyncio.iscoroutinefunction(func) 函数如果 func 是一个协程函数则返回 True。

  1. # coding=utf8
  2. import sys
  3. import asyncio
  4. import time
  5. @asyncio.coroutine # 标志协程的装饰器
  6. def taskIO_1():
  7. print('开始运行IO任务1...')
  8. yield from asyncio.sleep(2) # 假设该任务耗时2s
  9. print('IO任务1已完成,耗时2s')
  10. return taskIO_1.__name__
  11. @asyncio.coroutine # 标志协程的装饰器
  12. def taskIO_2():
  13. print('开始运行IO任务2...')
  14. yield from asyncio.sleep(3) # 假设该任务耗时3s
  15. print('IO任务2已完成,耗时3s')
  16. return taskIO_2.__name__
  17. asyncio.run(taskIO_1())
  18. asyncio.run(taskIO_2())

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/74981
推荐阅读
相关标签
  

闽ICP备14008679号