当前位置:   article > 正文

python:并发编程(十)_python asyncio并发编程

python asyncio并发编程

前言

本文将和大家一起探讨python的多协程并发编程(上篇),使用内置基本库asyncio来实现并发,先通过官方来简单使用这个模块。先打好基础,能够有个基本的用法与认知,后续文章,我们再进行详细使用。

本文为python并发编程的第十篇,上一篇文章地址如下:

python:并发编程(九)_Lion King的博客-CSDN博客

下一篇文章地址如下:

python:并发编程(十一)_Lion King的博客-CSDN博客

一、快速开始

官方文档:asyncio --- 异步 I/O — Python 3.11.4 文档

1、事件循环

asyncio 提供了事件循环(Event Loop)作为协程的调度器和执行者。事件循环负责处理协程的调度和执行,并处理事件、回调函数等。以下是关于 asyncio 事件循环的一些重要概念和用法:

(1)获取事件循环对象:可以使用 asyncio.get_event_loop() 获取默认的事件循环对象,或者使用 asyncio.new_event_loop() 创建一个新的事件循环对象。

(2)设置默认事件循环:可以使用 asyncio.set_event_loop() 设置默认的事件循环对象。

(3)运行事件循环:可以使用 loop.run_forever() 方法以无限循环的方式运行事件循环,直到调用 loop.stop()

(4)执行协程:可以使用 loop.run_until_complete() 方法执行一个协程或任务,并等待其完成。

(5)停止事件循环:可以使用 loop.stop() 停止事件循环的运行。

(6)调度协程:可以使用 loop.create_task() 方法创建一个任务,并将其添加到事件循环中运行。

(7)定时调度:可以使用 loop.call_later()loop.call_at() 方法在指定的时间点调度回调函数的执行。

(8)异常处理:可以使用 try-except 块来捕获和处理在协程执行过程中抛出的异常。

以下是一个简单的示例代码,演示了如何使用 asyncio 的事件循环:

  1. import asyncio
  2. # 定义一个协程
  3. async def my_coroutine():
  4. print("Coroutine is running")
  5. await asyncio.sleep(1) # 模拟耗时操作
  6. print("Coroutine is done")
  7. # 创建事件循环
  8. loop = asyncio.get_event_loop()
  9. # 将协程添加到事件循环中
  10. task = loop.create_task(my_coroutine())
  11. # 运行事件循环直到任务完成
  12. loop.run_until_complete(task)
  13. # 关闭事件循环
  14. loop.close()

以上代码创建了一个简单的协程 my_coroutine(),并将其添加到事件循环中运行。loop.run_until_complete(task) 运行事件循环直到任务完成,然后关闭事件循环。

通过使用 asyncio 的事件循环,可以方便地管理和调度协程的执行,实现异步操作和并发编程。

2、Futures

asyncio 中,Futures 是一种用于表示异步操作结果的对象。它充当了协程和异步函数之间的桥梁,可以用于等待和获取异步操作的结果。

Futures 提供了以下主要功能:

(1)表示异步操作的结果:Future 对象表示一个尚未完成的异步操作,可以在协程中使用它来等待操作的完成,并获取最终的结果。

(2)设置结果值:通过调用 Future 对象的 set_result() 方法,可以设置异步操作的结果值。

(3)获取结果值:可以使用 await 关键字或 yield from 表达式等待 Future 对象的完成,并获取操作的结果值。

(4)异常处理:Future 对象可以捕获和传播异步操作中抛出的异常。可以使用 try-except 块来捕获异常,并使用 set_exception() 方法将异常传播给等待的协程。

以下是一个简单的示例代码,演示了如何使用 Futures

  1. import asyncio
  2. # 定义一个异步函数
  3. async def my_async_function():
  4. await asyncio.sleep(1)
  5. return "Async operation completed"
  6. # 创建事件循环
  7. loop = asyncio.get_event_loop()
  8. # 创建一个 Future 对象
  9. future = asyncio.ensure_future(my_async_function())
  10. # 执行事件循环,等待 Future 完成
  11. loop.run_until_complete(future)
  12. # 获取 Future 的结果值
  13. result = future.result()
  14. print(result)
  15. # 关闭事件循环
  16. loop.close()

在上述示例中,通过调用 asyncio.ensure_future() 方法,将异步函数 my_async_function() 转换为 Future 对象。然后,使用 loop.run_until_complete() 方法等待 Future 对象的完成,并获取结果值。

通过使用 Futures,可以方便地进行异步操作的等待和结果处理,实现并发编程和异步任务的管理。

3、传输和协议

服务端

运行以下代码将启动一个简单的服务端,监听本地的8888端口。当客户端连接并发送数据时,服务端将接收数据并发送回一条响应。请确保在运行服务器之前,没有其他应用程序在使用相同的IP地址和端口号,并且防火墙设置不会阻止客户端与服务器建立连接。

  1. import asyncio
  2. # 自定义协议类
  3. class MyProtocol(asyncio.Protocol):
  4. def connection_made(self, transport):
  5. print('Client connected')
  6. self.transport = transport
  7. def data_received(self, data):
  8. print('Received:', data.decode())
  9. # 响应客户端请求
  10. response = b'Hello, client!'
  11. self.transport.write(response)
  12. def connection_lost(self, exc):
  13. print('Client disconnected')
  14. # 创建事件循环
  15. loop = asyncio.get_event_loop()
  16. # 启动服务端
  17. async def start_server():
  18. # 创建服务器
  19. server = await loop.create_server(lambda: MyProtocol(), 'localhost', 8888)
  20. # 获取绑定的地址和端口
  21. address = server.sockets[0].getsockname()
  22. print('Server started at', address)
  23. # 等待服务器关闭
  24. await server.wait_closed()
  25. # 运行事件循环,启动服务端
  26. loop.run_until_complete(start_server())

客户端

asyncio 中,传输(Transport)和协议(Protocol)是用于实现网络通信的重要组件。

**传输(Transport)**是网络连接的抽象表示,负责发送和接收数据。它提供了发送和关闭连接的方法,以及处理连接状态的相关属性。

**协议(Protocol)**是定义网络通信规则和行为的抽象接口。它定义了在网络连接上进行数据交换的方式,包括数据的编码、解码和处理等。协议通常是从 asyncio.Protocol 类派生而来的子类。

asyncio 中,可以通过以下步骤来使用传输和协议进行网络通信:

(1)创建传输和协议对象:使用 loop.create_connection() 方法创建传输和协议对象。该方法接受一个协议类和主机地址、端口号等参数,并返回一个 (transport, protocol) 对象。

(2)连接到远程主机:调用传输对象的 transport.connect() 方法,将传输对象与远程主机建立连接。连接成功后,将调用协议对象的 protocol.connection_made() 方法。

(3)数据收发:使用传输对象的 transport.write() 方法发送数据到远程主机,以及通过协议对象的回调方法接收和处理远程主机发送的数据。

(4)关闭连接:调用传输对象的 transport.close() 方法关闭连接。关闭连接后,将调用协议对象的 protocol.connection_lost() 方法。

以下是一个简单的示例代码,演示了如何使用传输和协议进行网络通信:

  1. import asyncio
  2. # 自定义协议类
  3. class MyProtocol(asyncio.Protocol):
  4. def connection_made(self, transport):
  5. print('Connected')
  6. self.transport = transport
  7. def data_received(self, data):
  8. print('Received:', data.decode())
  9. def connection_lost(self, exc):
  10. print('Connection closed')
  11. # 创建事件循环
  12. loop = asyncio.get_event_loop()
  13. # 创建传输和协议对象
  14. coro = loop.create_connection(MyProtocol, 'localhost', 8888)
  15. # 运行事件循环,建立连接
  16. transport, protocol = loop.run_until_complete(coro)
  17. # 发送数据
  18. transport.write(b'Hello, server!')
  19. # 关闭连接
  20. transport.close()
  21. # 停止事件循环
  22. loop.stop()

在上述示例中,我们自定义了一个协议类 MyProtocol,继承自 asyncio.Protocol。然后,使用 loop.create_connection() 方法创建传输和协议对象,并通过 loop.run_until_complete() 方法建立连接。最后,通过传输对象的 transport.write() 方法发送数据,并调用传输对象的 transport.close() 方法关闭连接。

通过使用传输和协议,我们可以方便地实现网络通信,并处理收发数据的逻辑。

4、策略

asyncio中,策略对象是用于控制事件循环行为的一种机制。asyncio提供了默认的策略对象,但也允许用户自定义和替换策略对象来满足特定的需求。

默认的策略对象是asyncio.DefaultEventLoopPolicy,它基于具体的事件循环实现,如asyncio.SelectorEventLoop(基于selectors模块)或asyncio.SelectorEventLoop(基于select模块)。默认策略对象通过asyncio.get_event_loop_policy()方法获取。

用户可以通过自定义策略对象来改变事件循环的行为,例如使用特定的事件循环实现或者自定义调度器。自定义策略对象必须实现asyncio.AbstractEventLoopPolicy接口的方法,例如get_event_loop()set_event_loop()new_event_loop()等。

下面是一个使用自定义策略对象的示例:

  1. import asyncio
  2. # 自定义策略对象
  3. class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
  4. def get_event_loop(self):
  5. loop = super().get_event_loop()
  6. print('Using custom event loop:', type(loop).__name__)
  7. return loop
  8. # 其他自定义方法...
  9. # 设置自定义策略对象
  10. asyncio.set_event_loop_policy(MyEventLoopPolicy())
  11. # 创建事件循环并运行
  12. loop = asyncio.get_event_loop()
  13. loop.run_forever()

在这个示例中,我们创建了一个自定义的策略对象MyEventLoopPolicy,并重写了get_event_loop()方法来打印使用的事件循环类型。然后通过asyncio.set_event_loop_policy()方法将自定义策略对象设置为当前的策略。最后,使用asyncio.get_event_loop()获取事件循环并运行。

请注意,自定义策略对象的设置应该在创建事件循环之前进行,以确保新创建的事件循环使用自定义策略。

5、Extending

asyncio提供了扩展其功能的机制,使开发者能够根据自己的需求进行定制和扩展。以下是一些扩展asyncio功能的常见方法:

(1)自定义协议(Protocol):可以通过继承asyncio.Protocol类来创建自定义的协议。通过实现connection_madedata_receivedconnection_lost等方法,可以处理连接的建立、数据的接收和连接的关闭等事件。

(2)自定义传输(Transport):可以通过继承asyncio.BaseTransport类来创建自定义的传输。传输是协议和网络层之间的接口,负责处理数据的发送和接收。

(3)自定义事件循环(Event Loop):可以通过继承asyncio.AbstractEventLoop类来创建自定义的事件循环。事件循环是asyncio的核心组件,负责调度和执行异步任务。

(4)自定义异步任务(Coroutines):可以使用asyncio.coroutine装饰器和async def关键字来定义自己的异步任务。通过使用协程,可以编写异步的、非阻塞的代码逻辑。

(5)自定义异步函数(Async Functions):可以使用@asyncio.coroutine装饰器和async def关键字来定义自己的异步函数。异步函数可以与协程配合使用,用于编写更高级的异步逻辑。

(6)自定义事件处理器(Event Handlers):可以使用asyncio提供的事件处理器机制来定制和处理特定的事件。例如,可以注册和处理定时器事件、I/O事件、信号事件等。

通过以上扩展方法,可以根据具体需求对asyncio进行定制和扩展,以满足特定的异步编程场景和功能要求。这使得asyncio具有很高的灵活性和可扩展性,适用于各种异步编程任务和应用领域。

下面是一个简单的示例代码,展示了如何扩展asyncio的功能,通过自定义协议和传输来实现一个简单的Echo服务器:

  1. import asyncio
  2. # 自定义协议类
  3. class EchoProtocol(asyncio.Protocol):
  4. def connection_made(self, transport):
  5. self.transport = transport
  6. def data_received(self, data):
  7. message = data.decode()
  8. print("Received:", message)
  9. self.transport.write(data)
  10. def connection_lost(self, exc):
  11. print("Connection closed")
  12. # 自定义传输类
  13. class EchoTransport(asyncio.Transport):
  14. def __init__(self, loop):
  15. super().__init__(extra=None)
  16. self.loop = loop
  17. self.buffer = b''
  18. def write(self, data):
  19. print("Sending:", data.decode())
  20. self.buffer += data
  21. def close(self):
  22. print("Closing connection")
  23. def can_write_eof(self):
  24. return False
  25. def get_write_buffer_size(self):
  26. return len(self.buffer)
  27. def write_eof(self):
  28. pass
  29. def abort(self):
  30. pass
  31. def pause_reading(self):
  32. pass
  33. def resume_reading(self):
  34. pass
  35. # 创建事件循环
  36. loop = asyncio.get_event_loop()
  37. # 创建协议和传输对象
  38. protocol = EchoProtocol()
  39. transport = EchoTransport(loop)
  40. # 将协议和传输对象关联起来
  41. protocol.connection_made(transport)
  42. # 模拟收到数据并处理
  43. data = b"Hello, server!"
  44. protocol.data_received(data)
  45. # 关闭连接
  46. protocol.connection_lost(None)
  47. # 停止事件循环
  48. loop.stop()

这个示例中,EchoProtocol类继承了asyncio.Protocol,并实现了connection_madedata_receivedconnection_lost等方法,用于处理连接的建立、数据的接收和连接的关闭。EchoTransport类继承了asyncio.Transport,并实现了一系列传输相关的方法,用于处理数据的发送和关闭连接。

通过自定义协议和传输,我们可以根据需要来扩展和定制asyncio的功能,以适应不同的应用场景和需求。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/91423?site
推荐阅读
相关标签
  

闽ICP备14008679号