当前位置:   article > 正文

asyncio

asyncio.messagehandler/onerror (第 31 行)

一、简介

asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。

asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO

此模块为编写单线程并发代码提高基础架构,通过使用协程、套接字和其他资源的 I/O 多路复用,运行网络客户端和服务器,以及其他相关的基元。

包内容的详细的列表如下:

  • 各种系统具体实现的可插拔 event loop
  • transport 和 protocol 抽象(类似于 Twisted 里的那些)
  • 具体支持 TCP、 UDP、 SSL、 子进程管道,延迟调用,和其他 (有些可能依赖于系统)
  • 一个模仿 concurrent.futures 模块但适合在事件循环中使用的 Future 类
  • 基于 yield from ( PEP 380)的协程和任务帮助以顺序的方式编写并发代码
  • 取消 Future 和协程的操作支持
  • 单线程的协程间使用的 synchronization primitives 类似于 threading 模块里那些
  • 一个接口,用于将工作传递到线程池,在绝对的时候,积极地使用一个阻塞I / O调用的库

二、参考文档

中文文档:http://python.usyiyi.cn/translate/python_352/library/asyncio.html

官方文档:https://docs.python.org/3/library/asyncio.html

三、关键示例

关于asyncio模块详细使用说明这里不再赘述,下面将为大家展示一些例子,作为快速学习之用:

1、使用 AbstractEventLoop.call_soon() 方法来安排回调的示例。回调显示 "Hello World",然后停止事件循环:

  1. import asyncio
  2. def hello_world(loop):
  3. print('Hello World')
  4. loop.stop()
  5. loop = asyncio.get_event_loop()
  6. # Schedule a call to hello_world()
  7. loop.call_soon(hello_world, loop)
  8. # Blocking call interrupted by loop.stop()
  9. loop.run_forever()
  10. loop.close()

2、回调示例每​​秒显示当前日期。回调使用AbstractEventLoop.call_later()方法在5秒内重新计划自身,然后停止事件循环:

  1. import asyncio
  2. import datetime
  3. def display_date(end_time, loop):
  4. print(datetime.datetime.now())
  5. if (loop.time() + 1.0) < end_time:
  6. loop.call_later(1, display_date, end_time, loop)
  7. else:
  8. loop.stop()
  9. loop = asyncio.get_event_loop()
  10. # Schedule the first call to display_date()
  11. end_time = loop.time() + 5.0
  12. loop.call_soon(display_date, end_time, loop)
  13. # Blocking call interrupted by loop.stop()
  14. loop.run_forever()
  15. loop.close()

3、等待文件描述器使用AbstractEventLoop.add_reader()方法接收到一些数据,然后关闭事件循环:

  1. import asyncio
  2. try:
  3. from socket import socketpair
  4. except ImportError:
  5. from asyncio.windows_utils import socketpair
  6. # Create a pair of connected file descriptors
  7. rsock, wsock = socketpair()
  8. loop = asyncio.get_event_loop()
  9. def reader():
  10. data = rsock.recv(100)
  11. print("Received:", data.decode())
  12. # We are done: unregister the file descriptor
  13. loop.remove_reader(rsock)
  14. # Stop the event loop
  15. loop.stop()
  16. # Register the file descriptor for read event
  17. loop.add_reader(rsock, reader)
  18. # Simulate the reception of data from the network
  19. loop.call_soon(wsock.send, 'abc'.encode())
  20. # Run the event loop
  21. loop.run_forever()
  22. # We are done, close sockets and the event loop
  23. rsock.close()
  24. wsock.close()
  25. loop.close()

4、使用AbstractEventLoop.add_signal_handler()方法的信号SIGINTSIGTERM的寄存器处理程序:

  1. import asyncio
  2. import functools
  3. import os
  4. import signal
  5. def ask_exit(signame):
  6. print("got signal %s: exit" % signame)
  7. loop.stop()
  8. loop = asyncio.get_event_loop()
  9. for signame in ('SIGINT', 'SIGTERM'):
  10. loop.add_signal_handler(getattr(signal, signame),
  11. functools.partial(ask_exit, signame))
  12. print("Event loop running forever, press Ctrl+C to interrupt.")
  13. print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid())
  14. try:
  15. loop.run_forever()
  16. finally:
  17. loop.close()
  18. #此示例仅适用于UNIX

5、组合Futurecoroutine function的示例:

协程函数负责计算(需要1秒),并将结果存储到Futurerun_until_complete()方法等待Future的完成。

  1. import asyncio
  2. @asyncio.coroutine
  3. def slow_operation(future):
  4. yield from asyncio.sleep(1)
  5. future.set_result('Future is done!')
  6. loop = asyncio.get_event_loop()
  7. future = asyncio.Future()
  8. asyncio.ensure_future(slow_operation(future))
  9. loop.run_until_complete(future)
  10. print(future.result())
  11. loop.close()

6、使用Future.add_done_callback()方法来不同地编写前面的示例来明确描述控制流:

在此示例中,Future用于将slow_operation()链接到got_result():当slow_operation()完成时,got_result()与结果一起调用。

  1. import asyncio
  2. @asyncio.coroutine
  3. def slow_operation(future):
  4. yield from asyncio.sleep(1)
  5. future.set_result('Future is done!')
  6. def got_result(future):
  7. print(future.result())
  8. loop.stop()
  9. loop = asyncio.get_event_loop()
  10. future = asyncio.Future()
  11. asyncio.ensure_future(slow_operation(future))
  12. future.add_done_callback(got_result)
  13. try:
  14. loop.run_forever()
  15. finally:
  16. loop.close()

7、并行执行3个任务(A,B,C)的示例:

任务在创建时自动计划执行。所有任务完成后,事件循环停止。

  1. import asyncio
  2. @asyncio.coroutine
  3. def factorial(name, number):
  4. f = 1
  5. for i in range(2, number+1):
  6. print("Task %s: Compute factorial(%s)..." % (name, i))
  7. yield from asyncio.sleep(1)
  8. f *= i
  9. print("Task %s: factorial(%s) = %s" % (name, number, f))
  10. loop = asyncio.get_event_loop()
  11. tasks = [
  12. asyncio.ensure_future(factorial("A", 2)),
  13. asyncio.ensure_future(factorial("B", 3)),
  14. asyncio.ensure_future(factorial("C", 4))]
  15. loop.run_until_complete(asyncio.gather(*tasks))
  16. loop.close()

output:

  1. Task A: Compute factorial(2)...
  2. Task B: Compute factorial(2)...
  3. Task C: Compute factorial(2)...
  4. Task A: factorial(2) = 2
  5. Task B: Compute factorial(3)...
  6. Task C: Compute factorial(3)...
  7. Task B: factorial(3) = 6
  8. Task C: Compute factorial(4)...
  9. Task C: factorial(4) = 24

8、TCP echo客户端使用AbstractEventLoop.create_connection()方法,TCP回显服务器使用AbstractEventLoop.create_server()方法

客户端:

事件循环运行两次。在这个简短的例子中,优先使用run_until_complete()方法来引发异常,如果服务器没有监听,而不必写一个短的协程来处理异常并停止运行循环。run_until_complete()退出时,循环不再运行,因此在发生错误时不需要停止循环。

  1. import asyncio
  2. class EchoClientProtocol(asyncio.Protocol):
  3. def __init__(self, message, loop):
  4. self.message = message
  5. self.loop = loop
  6. def connection_made(self, transport):
  7. transport.write(self.message.encode())
  8. print('Data sent: {!r}'.format(self.message))
  9. def data_received(self, data):
  10. print('Data received: {!r}'.format(data.decode()))
  11. def connection_lost(self, exc):
  12. print('The server closed the connection')
  13. print('Stop the event loop')
  14. self.loop.stop()
  15. loop = asyncio.get_event_loop()
  16. message = 'Hello World!'
  17. coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
  18. '127.0.0.1', 8888)
  19. loop.run_until_complete(coro)
  20. loop.run_forever()
  21. loop.close()

服务器:

Transport.close()可以在WriteTransport.write()之后立即调用,即使数据尚未在套接字上发送:两种方法都是异步的。不需要yield from,因为这些传输方法不是协程。

  1. import asyncio
  2. class EchoServerClientProtocol(asyncio.Protocol):
  3. def connection_made(self, transport):
  4. peername = transport.get_extra_info('peername')
  5. print('Connection from {}'.format(peername))
  6. self.transport = transport
  7. def data_received(self, data):
  8. message = data.decode()
  9. print('Data received: {!r}'.format(message))
  10. print('Send: {!r}'.format(message))
  11. self.transport.write(data)
  12. print('Close the client socket')
  13. self.transport.close()
  14. loop = asyncio.get_event_loop()
  15. # Each client connection will create a new protocol instance
  16. coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
  17. server = loop.run_until_complete(coro)
  18. # Serve requests until Ctrl+C is pressed
  19. print('Serving on {}'.format(server.sockets[0].getsockname()))
  20. try:
  21. loop.run_forever()
  22. except KeyboardInterrupt:
  23. pass
  24. # Close the server
  25. server.close()
  26. loop.run_until_complete(server.wait_closed())
  27. loop.close()

9、UDP echo客户端使用AbstractEventLoop.create_datagram_endpoint()方法,UDP echo服务器使用AbstractEventLoop.create_datagram_endpoint()方法

客户端:

  1. import asyncio
  2. class EchoClientProtocol:
  3. def __init__(self, message, loop):
  4. self.message = message
  5. self.loop = loop
  6. self.transport = None
  7. def connection_made(self, transport):
  8. self.transport = transport
  9. print('Send:', self.message)
  10. self.transport.sendto(self.message.encode())
  11. def datagram_received(self, data, addr):
  12. print("Received:", data.decode())
  13. print("Close the socket")
  14. self.transport.close()
  15. def error_received(self, exc):
  16. print('Error received:', exc)
  17. def connection_lost(self, exc):
  18. print("Socket closed, stop the event loop")
  19. loop = asyncio.get_event_loop()
  20. loop.stop()
  21. loop = asyncio.get_event_loop()
  22. message = "Hello World!"
  23. connect = loop.create_datagram_endpoint(
  24. lambda: EchoClientProtocol(message, loop),
  25. remote_addr=('127.0.0.1', 9999))
  26. transport, protocol = loop.run_until_complete(connect)
  27. loop.run_forever()
  28. transport.close()
  29. loop.close()

服务器:

  1. import asyncio
  2. class EchoServerProtocol:
  3. def connection_made(self, transport):
  4. self.transport = transport
  5. def datagram_received(self, data, addr):
  6. message = data.decode()
  7. print('Received %r from %s' % (message, addr))
  8. print('Send %r to %s' % (message, addr))
  9. self.transport.sendto(data, addr)
  10. loop = asyncio.get_event_loop()
  11. print("Starting UDP server")
  12. # One protocol instance will be created to serve all client requests
  13. listen = loop.create_datagram_endpoint(
  14. EchoServerProtocol, local_addr=('127.0.0.1', 9999))
  15. transport, protocol = loop.run_until_complete(listen)
  16. try:
  17. loop.run_forever()
  18. except KeyboardInterrupt:
  19. pass
  20. transport.close()
  21. loop.close()

10、等待套接字使用协议使用AbstractEventLoop.create_connection()方法接收数据,然后关闭事件循环

  1. import asyncio
  2. try:
  3. from socket import socketpair
  4. except ImportError:
  5. from asyncio.windows_utils import socketpair
  6. # Create a pair of connected sockets
  7. rsock, wsock = socketpair()
  8. loop = asyncio.get_event_loop()
  9. class MyProtocol(asyncio.Protocol):
  10. transport = None
  11. def connection_made(self, transport):
  12. self.transport = transport
  13. def data_received(self, data):
  14. print("Received:", data.decode())
  15. # We are done: close the transport (it will call connection_lost())
  16. self.transport.close()
  17. def connection_lost(self, exc):
  18. # The socket has been closed, stop the event loop
  19. loop.stop()
  20. # Register the socket to wait for data
  21. connect_coro = loop.create_connection(MyProtocol, sock=rsock)
  22. transport, protocol = loop.run_until_complete(connect_coro)
  23. # Simulate the reception of data from the network
  24. loop.call_soon(wsock.send, 'abc'.encode())
  25. # Run the event loop
  26. loop.run_forever()
  27. # We are done, close sockets and the event loop
  28. rsock.close()
  29. wsock.close()
  30. loop.close()

11、TCP回显客户端使用asyncio.open_connection()函数,TCP回显服务器使用asyncio.start_server()函数

客户端:

  1. import asyncio
  2. @asyncio.coroutine
  3. def tcp_echo_client(message, loop):
  4. reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
  5. loop=loop)
  6. print('Send: %r' % message)
  7. writer.write(message.encode())
  8. data = yield from reader.read(100)
  9. print('Received: %r' % data.decode())
  10. print('Close the socket')
  11. writer.close()
  12. message = 'Hello World!'
  13. loop = asyncio.get_event_loop()
  14. loop.run_until_complete(tcp_echo_client(message, loop))
  15. loop.close()

服务器:

  1. import asyncio
  2. @asyncio.coroutine
  3. def handle_echo(reader, writer):
  4. data = yield from reader.read(100)
  5. message = data.decode()
  6. addr = writer.get_extra_info('peername')
  7. print("Received %r from %r" % (message, addr))
  8. print("Send: %r" % message)
  9. writer.write(data)
  10. yield from writer.drain()
  11. print("Close the client socket")
  12. writer.close()
  13. loop = asyncio.get_event_loop()
  14. coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
  15. server = loop.run_until_complete(coro)
  16. # Serve requests until Ctrl+C is pressed
  17. print('Serving on {}'.format(server.sockets[0].getsockname()))
  18. try:
  19. loop.run_forever()
  20. except KeyboardInterrupt:
  21. pass
  22. # Close the server
  23. server.close()
  24. loop.run_until_complete(server.wait_closed())
  25. loop.close()

12、在命令行中获取URL的HTTP头的简单示例:

  1. import asyncio
  2. import urllib.parse
  3. import sys
  4. @asyncio.coroutine
  5. def print_http_headers(url):
  6. url = urllib.parse.urlsplit(url)
  7. if url.scheme == 'https':
  8. connect = asyncio.open_connection(url.hostname, 443, ssl=True)
  9. else:
  10. connect = asyncio.open_connection(url.hostname, 80)
  11. reader, writer = yield from connect
  12. query = ('HEAD {path} HTTP/1.0\r\n'
  13. 'Host: {hostname}\r\n'
  14. '\r\n').format(path=url.path or '/', hostname=url.hostname)
  15. writer.write(query.encode('latin-1'))
  16. while True:
  17. line = yield from reader.readline()
  18. if not line:
  19. break
  20. line = line.decode('latin1').rstrip()
  21. if line:
  22. print('HTTP header> %s' % line)
  23. # Ignore the body, close the socket
  24. writer.close()
  25. url = sys.argv[1]
  26. loop = asyncio.get_event_loop()
  27. task = asyncio.ensure_future(print_http_headers(url))
  28. loop.run_until_complete(task)
  29. loop.close()
  1. #用法:
  2. python example.py http://example.com/path/page.html
  3. #使用HTTPS:
  4. python example.py https://example.com/path/page.html

13、协程等待,直到套接字使用open_connection()函数接收数据:

  1. import asyncio
  2. try:
  3. from socket import socketpair
  4. except ImportError:
  5. from asyncio.windows_utils import socketpair
  6. @asyncio.coroutine
  7. def wait_for_data(loop):
  8. # Create a pair of connected sockets
  9. rsock, wsock = socketpair()
  10. # Register the open socket to wait for data
  11. reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)
  12. # Simulate the reception of data from the network
  13. loop.call_soon(wsock.send, 'abc'.encode())
  14. # Wait for data
  15. data = yield from reader.read(100)
  16. # Got data, we are done: close the socket
  17. print("Received:", data.decode())
  18. writer.close()
  19. # Close the second socket
  20. wsock.close()
  21. loop = asyncio.get_event_loop()
  22. loop.run_until_complete(wait_for_data(loop))
  23. loop.close()

14、子进程协议的示例,用于获取子进程的输出并等待子进程退出。子过程由AbstractEventLoop.subprocess_exec()方法创建:

  1. import asyncio
  2. import sys
  3. class DateProtocol(asyncio.SubprocessProtocol):
  4. def __init__(self, exit_future):
  5. self.exit_future = exit_future
  6. self.output = bytearray()
  7. def pipe_data_received(self, fd, data):
  8. self.output.extend(data)
  9. def process_exited(self):
  10. self.exit_future.set_result(True)
  11. @asyncio.coroutine
  12. def get_date(loop):
  13. code = 'import datetime; print(datetime.datetime.now())'
  14. exit_future = asyncio.Future(loop=loop)
  15. # Create the subprocess controlled by the protocol DateProtocol,
  16. # redirect the standard output into a pipe
  17. create = loop.subprocess_exec(lambda: DateProtocol(exit_future),
  18. sys.executable, '-c', code,
  19. stdin=None, stderr=None)
  20. transport, protocol = yield from create
  21. # Wait for the subprocess exit using the process_exited() method
  22. # of the protocol
  23. yield from exit_future
  24. # Close the stdout pipe
  25. transport.close()
  26. # Read the output which was collected by the pipe_data_received()
  27. # method of the protocol
  28. data = bytes(protocol.output)
  29. return data.decode('ascii').rstrip()
  30. if sys.platform == "win32":
  31. loop = asyncio.ProactorEventLoop()
  32. asyncio.set_event_loop(loop)
  33. else:
  34. loop = asyncio.get_event_loop()
  35. date = loop.run_until_complete(get_date(loop))
  36. print("Current date: %s" % date)
  37. loop.close()

15、使用Process类控制子进程和StreamReader类从标准输出读取的示例。子过程由create_subprocess_exec()函数创建:

  1. import asyncio.subprocess
  2. import sys
  3. @asyncio.coroutine
  4. def get_date():
  5. code = 'import datetime; print(datetime.datetime.now())'
  6. # Create the subprocess, redirect the standard output into a pipe
  7. create = asyncio.create_subprocess_exec(sys.executable, '-c', code,
  8. stdout=asyncio.subprocess.PIPE)
  9. proc = yield from create
  10. # Read one line of output
  11. data = yield from proc.stdout.readline()
  12. line = data.decode('ascii').rstrip()
  13. # Wait for the subprocess exit
  14. yield from proc.wait()
  15. return line
  16. if sys.platform == "win32":
  17. loop = asyncio.ProactorEventLoop()
  18. asyncio.set_event_loop(loop)
  19. else:
  20. loop = asyncio.get_event_loop()
  21. date = loop.run_until_complete(get_date())
  22. print("Current date: %s" % date)
  23. loop.close()

 

  

转载于:https://www.cnblogs.com/styier/p/6415850.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/91413
推荐阅读
相关标签
  

闽ICP备14008679号