一、简介
asyncio
是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。
asyncio
的编程模型就是一个消息循环。我们从asyncio
模块中直接获取一个EventLoop
的引用,然后把需要执行的协程扔到EventLoop
中执行,就实现了异步IO
此模块为编写单线程并发代码提高基础架构,通过使用协程、套接字和其他资源的 I/O 多路复用,运行网络客户端和服务器,以及其他相关的基元。
包内容的详细的列表如下:
- 各种系统具体实现的可插拔 event loop
- transport 和 protocol 抽象(类似于 Twisted 里的那些)
- 具体支持 TCP、 UDP、 SSL、 子进程管道,延迟调用,和其他 (有些可能依赖于系统)
- 一个模仿
concurrent.futures
模块但适合在事件循环中使用的Future
类 - 基于
yield from
( PEP 380)的协程和任务帮助以顺序的方式编写并发代码 - 取消
Future
和协程的操作支持 - 单线程的协程间使用的 synchronization primitives 类似于
threading
模块里那些 - 一个接口,用于将工作传递到线程池,在绝对的时候,积极地使用一个阻塞I / O调用的库
二、参考文档
中文文档:http://python.usyiyi.cn/translate/python_352/library/asyncio.html
官方文档:https://docs.python.org/3/library/asyncio.html
三、关键示例
关于asyncio模块详细使用说明这里不再赘述,下面将为大家展示一些例子,作为快速学习之用:
1、使用 AbstractEventLoop.call_soon()
方法来安排回调的示例。回调显示 "Hello World"
,然后停止事件循环:
- import asyncio
-
- def hello_world(loop):
- print('Hello World')
- loop.stop()
-
- loop = asyncio.get_event_loop()
-
- # Schedule a call to hello_world()
- loop.call_soon(hello_world, loop)
-
- # Blocking call interrupted by loop.stop()
- loop.run_forever()
- loop.close()
2、回调示例每秒显示当前日期。回调使用AbstractEventLoop.call_later()
方法在5秒内重新计划自身,然后停止事件循环:
- import asyncio
- import datetime
-
- def display_date(end_time, loop):
- print(datetime.datetime.now())
- if (loop.time() + 1.0) < end_time:
- loop.call_later(1, display_date, end_time, loop)
- else:
- loop.stop()
-
- loop = asyncio.get_event_loop()
-
- # Schedule the first call to display_date()
- end_time = loop.time() + 5.0
- loop.call_soon(display_date, end_time, loop)
-
- # Blocking call interrupted by loop.stop()
- loop.run_forever()
- loop.close()
3、等待文件描述器使用AbstractEventLoop.add_reader()
方法接收到一些数据,然后关闭事件循环:
- import asyncio
- try:
- from socket import socketpair
- except ImportError:
- from asyncio.windows_utils import socketpair
-
- # Create a pair of connected file descriptors
- rsock, wsock = socketpair()
- loop = asyncio.get_event_loop()
-
- def reader():
- data = rsock.recv(100)
- print("Received:", data.decode())
- # We are done: unregister the file descriptor
- loop.remove_reader(rsock)
- # Stop the event loop
- loop.stop()
-
- # Register the file descriptor for read event
- loop.add_reader(rsock, reader)
-
- # Simulate the reception of data from the network
- loop.call_soon(wsock.send, 'abc'.encode())
-
- # Run the event loop
- loop.run_forever()
-
- # We are done, close sockets and the event loop
- rsock.close()
- wsock.close()
- loop.close()
4、使用AbstractEventLoop.add_signal_handler()
方法的信号SIGINT
和SIGTERM
的寄存器处理程序:
- import asyncio
- import functools
- import os
- import signal
-
- def ask_exit(signame):
- print("got signal %s: exit" % signame)
- loop.stop()
-
- loop = asyncio.get_event_loop()
- for signame in ('SIGINT', 'SIGTERM'):
- loop.add_signal_handler(getattr(signal, signame),
- functools.partial(ask_exit, signame))
-
- print("Event loop running forever, press Ctrl+C to interrupt.")
- print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid())
- try:
- loop.run_forever()
- finally:
- loop.close()
-
- #此示例仅适用于UNIX
5、组合Future
和coroutine function的示例:
协程函数负责计算(需要1秒),并将结果存储到Future
。run_until_complete()
方法等待Future
的完成。
- import asyncio
-
- @asyncio.coroutine
- def slow_operation(future):
- yield from asyncio.sleep(1)
- future.set_result('Future is done!')
-
- loop = asyncio.get_event_loop()
- future = asyncio.Future()
- asyncio.ensure_future(slow_operation(future))
- loop.run_until_complete(future)
- print(future.result())
- loop.close()
6、使用Future.add_done_callback()
方法来不同地编写前面的示例来明确描述控制流:
在此示例中,Future
用于将slow_operation()
链接到got_result()
:当slow_operation()
完成时,got_result()
与结果一起调用。
- import asyncio
-
- @asyncio.coroutine
- def slow_operation(future):
- yield from asyncio.sleep(1)
- future.set_result('Future is done!')
-
- def got_result(future):
- print(future.result())
- loop.stop()
-
- loop = asyncio.get_event_loop()
- future = asyncio.Future()
- asyncio.ensure_future(slow_operation(future))
- future.add_done_callback(got_result)
- try:
- loop.run_forever()
- finally:
- loop.close()
7、并行执行3个任务(A,B,C)的示例:
任务在创建时自动计划执行。所有任务完成后,事件循环停止。
- import asyncio
-
- @asyncio.coroutine
- def factorial(name, number):
- f = 1
- for i in range(2, number+1):
- print("Task %s: Compute factorial(%s)..." % (name, i))
- yield from asyncio.sleep(1)
- f *= i
- print("Task %s: factorial(%s) = %s" % (name, number, f))
-
- loop = asyncio.get_event_loop()
- tasks = [
- asyncio.ensure_future(factorial("A", 2)),
- asyncio.ensure_future(factorial("B", 3)),
- asyncio.ensure_future(factorial("C", 4))]
- loop.run_until_complete(asyncio.gather(*tasks))
- loop.close()
output:
- Task A: Compute factorial(2)...
- Task B: Compute factorial(2)...
- Task C: Compute factorial(2)...
- Task A: factorial(2) = 2
- Task B: Compute factorial(3)...
- Task C: Compute factorial(3)...
- Task B: factorial(3) = 6
- Task C: Compute factorial(4)...
- Task C: factorial(4) = 24
8、TCP echo客户端使用AbstractEventLoop.create_connection()
方法,TCP回显服务器使用AbstractEventLoop.create_server()
方法
客户端:
事件循环运行两次。在这个简短的例子中,优先使用run_until_complete()
方法来引发异常,如果服务器没有监听,而不必写一个短的协程来处理异常并停止运行循环。在run_until_complete()
退出时,循环不再运行,因此在发生错误时不需要停止循环。
- import asyncio
-
- class EchoClientProtocol(asyncio.Protocol):
- def __init__(self, message, loop):
- self.message = message
- self.loop = loop
-
- def connection_made(self, transport):
- transport.write(self.message.encode())
- print('Data sent: {!r}'.format(self.message))
-
- def data_received(self, data):
- print('Data received: {!r}'.format(data.decode()))
-
- def connection_lost(self, exc):
- print('The server closed the connection')
- print('Stop the event loop')
- self.loop.stop()
-
- loop = asyncio.get_event_loop()
- message = 'Hello World!'
- coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
- '127.0.0.1', 8888)
- loop.run_until_complete(coro)
- loop.run_forever()
- loop.close()
服务器:
Transport.close()
可以在WriteTransport.write()
之后立即调用,即使数据尚未在套接字上发送:两种方法都是异步的。不需要yield from
,因为这些传输方法不是协程。
- import asyncio
-
- class EchoServerClientProtocol(asyncio.Protocol):
- def connection_made(self, transport):
- peername = transport.get_extra_info('peername')
- print('Connection from {}'.format(peername))
- self.transport = transport
-
- def data_received(self, data):
- message = data.decode()
- print('Data received: {!r}'.format(message))
-
- print('Send: {!r}'.format(message))
- self.transport.write(data)
-
- print('Close the client socket')
- self.transport.close()
-
- loop = asyncio.get_event_loop()
- # Each client connection will create a new protocol instance
- coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
- server = loop.run_until_complete(coro)
-
- # Serve requests until Ctrl+C is pressed
- print('Serving on {}'.format(server.sockets[0].getsockname()))
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- pass
-
- # Close the server
- server.close()
- loop.run_until_complete(server.wait_closed())
- loop.close()
9、UDP echo客户端使用AbstractEventLoop.create_datagram_endpoint()
方法,UDP echo服务器使用AbstractEventLoop.create_datagram_endpoint()
方法
客户端:
- import asyncio
-
- class EchoClientProtocol:
- def __init__(self, message, loop):
- self.message = message
- self.loop = loop
- self.transport = None
-
- def connection_made(self, transport):
- self.transport = transport
- print('Send:', self.message)
- self.transport.sendto(self.message.encode())
-
- def datagram_received(self, data, addr):
- print("Received:", data.decode())
-
- print("Close the socket")
- self.transport.close()
-
- def error_received(self, exc):
- print('Error received:', exc)
-
- def connection_lost(self, exc):
- print("Socket closed, stop the event loop")
- loop = asyncio.get_event_loop()
- loop.stop()
-
- loop = asyncio.get_event_loop()
- message = "Hello World!"
- connect = loop.create_datagram_endpoint(
- lambda: EchoClientProtocol(message, loop),
- remote_addr=('127.0.0.1', 9999))
- transport, protocol = loop.run_until_complete(connect)
- loop.run_forever()
- transport.close()
- loop.close()
服务器:
- import asyncio
-
- class EchoServerProtocol:
- def connection_made(self, transport):
- self.transport = transport
-
- def datagram_received(self, data, addr):
- message = data.decode()
- print('Received %r from %s' % (message, addr))
- print('Send %r to %s' % (message, addr))
- self.transport.sendto(data, addr)
-
- loop = asyncio.get_event_loop()
- print("Starting UDP server")
- # One protocol instance will be created to serve all client requests
- listen = loop.create_datagram_endpoint(
- EchoServerProtocol, local_addr=('127.0.0.1', 9999))
- transport, protocol = loop.run_until_complete(listen)
-
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- pass
-
- transport.close()
- loop.close()
10、等待套接字使用协议使用AbstractEventLoop.create_connection()
方法接收数据,然后关闭事件循环
- import asyncio
- try:
- from socket import socketpair
- except ImportError:
- from asyncio.windows_utils import socketpair
-
- # Create a pair of connected sockets
- rsock, wsock = socketpair()
- loop = asyncio.get_event_loop()
-
- class MyProtocol(asyncio.Protocol):
- transport = None
-
- def connection_made(self, transport):
- self.transport = transport
-
- def data_received(self, data):
- print("Received:", data.decode())
-
- # We are done: close the transport (it will call connection_lost())
- self.transport.close()
-
- def connection_lost(self, exc):
- # The socket has been closed, stop the event loop
- loop.stop()
-
- # Register the socket to wait for data
- connect_coro = loop.create_connection(MyProtocol, sock=rsock)
- transport, protocol = loop.run_until_complete(connect_coro)
-
- # Simulate the reception of data from the network
- loop.call_soon(wsock.send, 'abc'.encode())
-
- # Run the event loop
- loop.run_forever()
-
- # We are done, close sockets and the event loop
- rsock.close()
- wsock.close()
- loop.close()
11、TCP回显客户端使用asyncio.open_connection()
函数,TCP回显服务器使用asyncio.start_server()
函数
客户端:
- import asyncio
-
- @asyncio.coroutine
- def tcp_echo_client(message, loop):
- reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
- loop=loop)
-
- print('Send: %r' % message)
- writer.write(message.encode())
-
- data = yield from reader.read(100)
- print('Received: %r' % data.decode())
-
- print('Close the socket')
- writer.close()
-
- message = 'Hello World!'
- loop = asyncio.get_event_loop()
- loop.run_until_complete(tcp_echo_client(message, loop))
- loop.close()
服务器:
- import asyncio
-
- @asyncio.coroutine
- def handle_echo(reader, writer):
- data = yield from reader.read(100)
- message = data.decode()
- addr = writer.get_extra_info('peername')
- print("Received %r from %r" % (message, addr))
-
- print("Send: %r" % message)
- writer.write(data)
- yield from writer.drain()
-
- print("Close the client socket")
- writer.close()
-
- loop = asyncio.get_event_loop()
- coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
- server = loop.run_until_complete(coro)
-
- # Serve requests until Ctrl+C is pressed
- print('Serving on {}'.format(server.sockets[0].getsockname()))
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- pass
-
- # Close the server
- server.close()
- loop.run_until_complete(server.wait_closed())
- loop.close()
12、在命令行中获取URL的HTTP头的简单示例:
- import asyncio
- import urllib.parse
- import sys
-
- @asyncio.coroutine
- def print_http_headers(url):
- url = urllib.parse.urlsplit(url)
- if url.scheme == 'https':
- connect = asyncio.open_connection(url.hostname, 443, ssl=True)
- else:
- connect = asyncio.open_connection(url.hostname, 80)
- reader, writer = yield from connect
- query = ('HEAD {path} HTTP/1.0\r\n'
- 'Host: {hostname}\r\n'
- '\r\n').format(path=url.path or '/', hostname=url.hostname)
- writer.write(query.encode('latin-1'))
- while True:
- line = yield from reader.readline()
- if not line:
- break
- line = line.decode('latin1').rstrip()
- if line:
- print('HTTP header> %s' % line)
-
- # Ignore the body, close the socket
- writer.close()
-
- url = sys.argv[1]
- loop = asyncio.get_event_loop()
- task = asyncio.ensure_future(print_http_headers(url))
- loop.run_until_complete(task)
- loop.close()
- #用法:
-
- python example.py http://example.com/path/page.html
-
- #使用HTTPS:
-
- python example.py https://example.com/path/page.html
13、协程等待,直到套接字使用open_connection()
函数接收数据:
- import asyncio
- try:
- from socket import socketpair
- except ImportError:
- from asyncio.windows_utils import socketpair
-
- @asyncio.coroutine
- def wait_for_data(loop):
- # Create a pair of connected sockets
- rsock, wsock = socketpair()
-
- # Register the open socket to wait for data
- reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)
-
- # Simulate the reception of data from the network
- loop.call_soon(wsock.send, 'abc'.encode())
-
- # Wait for data
- data = yield from reader.read(100)
-
- # Got data, we are done: close the socket
- print("Received:", data.decode())
- writer.close()
-
- # Close the second socket
- wsock.close()
-
- loop = asyncio.get_event_loop()
- loop.run_until_complete(wait_for_data(loop))
- loop.close()
14、子进程协议的示例,用于获取子进程的输出并等待子进程退出。子过程由AbstractEventLoop.subprocess_exec()
方法创建:
- import asyncio
- import sys
-
- class DateProtocol(asyncio.SubprocessProtocol):
- def __init__(self, exit_future):
- self.exit_future = exit_future
- self.output = bytearray()
-
- def pipe_data_received(self, fd, data):
- self.output.extend(data)
-
- def process_exited(self):
- self.exit_future.set_result(True)
-
- @asyncio.coroutine
- def get_date(loop):
- code = 'import datetime; print(datetime.datetime.now())'
- exit_future = asyncio.Future(loop=loop)
-
- # Create the subprocess controlled by the protocol DateProtocol,
- # redirect the standard output into a pipe
- create = loop.subprocess_exec(lambda: DateProtocol(exit_future),
- sys.executable, '-c', code,
- stdin=None, stderr=None)
- transport, protocol = yield from create
-
- # Wait for the subprocess exit using the process_exited() method
- # of the protocol
- yield from exit_future
-
- # Close the stdout pipe
- transport.close()
-
- # Read the output which was collected by the pipe_data_received()
- # method of the protocol
- data = bytes(protocol.output)
- return data.decode('ascii').rstrip()
-
- if sys.platform == "win32":
- loop = asyncio.ProactorEventLoop()
- asyncio.set_event_loop(loop)
- else:
- loop = asyncio.get_event_loop()
-
- date = loop.run_until_complete(get_date(loop))
- print("Current date: %s" % date)
- loop.close()
15、使用Process
类控制子进程和StreamReader
类从标准输出读取的示例。子过程由create_subprocess_exec()
函数创建:
- import asyncio.subprocess
- import sys
-
- @asyncio.coroutine
- def get_date():
- code = 'import datetime; print(datetime.datetime.now())'
-
- # Create the subprocess, redirect the standard output into a pipe
- create = asyncio.create_subprocess_exec(sys.executable, '-c', code,
- stdout=asyncio.subprocess.PIPE)
- proc = yield from create
-
- # Read one line of output
- data = yield from proc.stdout.readline()
- line = data.decode('ascii').rstrip()
-
- # Wait for the subprocess exit
- yield from proc.wait()
- return line
-
- if sys.platform == "win32":
- loop = asyncio.ProactorEventLoop()
- asyncio.set_event_loop(loop)
- else:
- loop = asyncio.get_event_loop()
-
- date = loop.run_until_complete(get_date())
- print("Current date: %s" % date)
- loop.close()