赞
踩
本文将和大家一起探讨python的多协程并发编程(上篇),使用内置基本库asyncio来实现并发,先通过官方来简单使用这个模块。先打好基础,能够有个基本的用法与认知,后续文章,我们再进行详细使用。
本文为python并发编程的第十篇,上一篇文章地址如下:
python:并发编程(九)_Lion King的博客-CSDN博客
下一篇文章地址如下:
python:并发编程(十一)_Lion King的博客-CSDN博客
官方文档:asyncio --- 异步 I/O — Python 3.11.4 文档
asyncio
提供了事件循环(Event Loop)作为协程的调度器和执行者。事件循环负责处理协程的调度和执行,并处理事件、回调函数等。以下是关于 asyncio
事件循环的一些重要概念和用法:
(1)获取事件循环对象:可以使用 asyncio.get_event_loop()
获取默认的事件循环对象,或者使用 asyncio.new_event_loop()
创建一个新的事件循环对象。
(2)设置默认事件循环:可以使用 asyncio.set_event_loop()
设置默认的事件循环对象。
(3)运行事件循环:可以使用 loop.run_forever()
方法以无限循环的方式运行事件循环,直到调用 loop.stop()
。
(4)执行协程:可以使用 loop.run_until_complete()
方法执行一个协程或任务,并等待其完成。
(5)停止事件循环:可以使用 loop.stop()
停止事件循环的运行。
(6)调度协程:可以使用 loop.create_task()
方法创建一个任务,并将其添加到事件循环中运行。
(7)定时调度:可以使用 loop.call_later()
或 loop.call_at()
方法在指定的时间点调度回调函数的执行。
(8)异常处理:可以使用 try-except
块来捕获和处理在协程执行过程中抛出的异常。
以下是一个简单的示例代码,演示了如何使用 asyncio
的事件循环:
- import asyncio
-
- # 定义一个协程
- async def my_coroutine():
- print("Coroutine is running")
- await asyncio.sleep(1) # 模拟耗时操作
- print("Coroutine is done")
-
- # 创建事件循环
- loop = asyncio.get_event_loop()
-
- # 将协程添加到事件循环中
- task = loop.create_task(my_coroutine())
-
- # 运行事件循环直到任务完成
- loop.run_until_complete(task)
-
- # 关闭事件循环
- loop.close()
以上代码创建了一个简单的协程 my_coroutine()
,并将其添加到事件循环中运行。loop.run_until_complete(task)
运行事件循环直到任务完成,然后关闭事件循环。
通过使用 asyncio
的事件循环,可以方便地管理和调度协程的执行,实现异步操作和并发编程。
在 asyncio
中,Futures
是一种用于表示异步操作结果的对象。它充当了协程和异步函数之间的桥梁,可以用于等待和获取异步操作的结果。
Futures
提供了以下主要功能:
(1)表示异步操作的结果:Future
对象表示一个尚未完成的异步操作,可以在协程中使用它来等待操作的完成,并获取最终的结果。
(2)设置结果值:通过调用 Future
对象的 set_result()
方法,可以设置异步操作的结果值。
(3)获取结果值:可以使用 await
关键字或 yield from
表达式等待 Future
对象的完成,并获取操作的结果值。
(4)异常处理:Future
对象可以捕获和传播异步操作中抛出的异常。可以使用 try-except
块来捕获异常,并使用 set_exception()
方法将异常传播给等待的协程。
以下是一个简单的示例代码,演示了如何使用 Futures
:
- import asyncio
-
- # 定义一个异步函数
- async def my_async_function():
- await asyncio.sleep(1)
- return "Async operation completed"
-
- # 创建事件循环
- loop = asyncio.get_event_loop()
-
- # 创建一个 Future 对象
- future = asyncio.ensure_future(my_async_function())
-
- # 执行事件循环,等待 Future 完成
- loop.run_until_complete(future)
-
- # 获取 Future 的结果值
- result = future.result()
- print(result)
-
- # 关闭事件循环
- loop.close()
在上述示例中,通过调用 asyncio.ensure_future()
方法,将异步函数 my_async_function()
转换为 Future
对象。然后,使用 loop.run_until_complete()
方法等待 Future
对象的完成,并获取结果值。
通过使用 Futures
,可以方便地进行异步操作的等待和结果处理,实现并发编程和异步任务的管理。
服务端
运行以下代码将启动一个简单的服务端,监听本地的8888端口。当客户端连接并发送数据时,服务端将接收数据并发送回一条响应。请确保在运行服务器之前,没有其他应用程序在使用相同的IP地址和端口号,并且防火墙设置不会阻止客户端与服务器建立连接。
- import asyncio
-
- # 自定义协议类
- class MyProtocol(asyncio.Protocol):
- def connection_made(self, transport):
- print('Client connected')
- self.transport = transport
-
- def data_received(self, data):
- print('Received:', data.decode())
-
- # 响应客户端请求
- response = b'Hello, client!'
- self.transport.write(response)
-
- def connection_lost(self, exc):
- print('Client disconnected')
-
- # 创建事件循环
- loop = asyncio.get_event_loop()
-
- # 启动服务端
- async def start_server():
- # 创建服务器
- server = await loop.create_server(lambda: MyProtocol(), 'localhost', 8888)
-
- # 获取绑定的地址和端口
- address = server.sockets[0].getsockname()
- print('Server started at', address)
-
- # 等待服务器关闭
- await server.wait_closed()
-
- # 运行事件循环,启动服务端
- loop.run_until_complete(start_server())
客户端
在 asyncio
中,传输(Transport)和协议(Protocol)是用于实现网络通信的重要组件。
**传输(Transport)**是网络连接的抽象表示,负责发送和接收数据。它提供了发送和关闭连接的方法,以及处理连接状态的相关属性。
**协议(Protocol)**是定义网络通信规则和行为的抽象接口。它定义了在网络连接上进行数据交换的方式,包括数据的编码、解码和处理等。协议通常是从 asyncio.Protocol
类派生而来的子类。
在 asyncio
中,可以通过以下步骤来使用传输和协议进行网络通信:
(1)创建传输和协议对象:使用 loop.create_connection()
方法创建传输和协议对象。该方法接受一个协议类和主机地址、端口号等参数,并返回一个 (transport, protocol)
对象。
(2)连接到远程主机:调用传输对象的 transport.connect()
方法,将传输对象与远程主机建立连接。连接成功后,将调用协议对象的 protocol.connection_made()
方法。
(3)数据收发:使用传输对象的 transport.write()
方法发送数据到远程主机,以及通过协议对象的回调方法接收和处理远程主机发送的数据。
(4)关闭连接:调用传输对象的 transport.close()
方法关闭连接。关闭连接后,将调用协议对象的 protocol.connection_lost()
方法。
以下是一个简单的示例代码,演示了如何使用传输和协议进行网络通信:
- import asyncio
-
- # 自定义协议类
- class MyProtocol(asyncio.Protocol):
- def connection_made(self, transport):
- print('Connected')
- self.transport = transport
-
- def data_received(self, data):
- print('Received:', data.decode())
-
- def connection_lost(self, exc):
- print('Connection closed')
-
- # 创建事件循环
- loop = asyncio.get_event_loop()
-
- # 创建传输和协议对象
- coro = loop.create_connection(MyProtocol, 'localhost', 8888)
-
- # 运行事件循环,建立连接
- transport, protocol = loop.run_until_complete(coro)
-
- # 发送数据
- transport.write(b'Hello, server!')
-
- # 关闭连接
- transport.close()
-
- # 停止事件循环
- loop.stop()
在上述示例中,我们自定义了一个协议类 MyProtocol
,继承自 asyncio.Protocol
。然后,使用 loop.create_connection()
方法创建传输和协议对象,并通过 loop.run_until_complete()
方法建立连接。最后,通过传输对象的 transport.write()
方法发送数据,并调用传输对象的 transport.close()
方法关闭连接。
通过使用传输和协议,我们可以方便地实现网络通信,并处理收发数据的逻辑。
在asyncio
中,策略对象是用于控制事件循环行为的一种机制。asyncio
提供了默认的策略对象,但也允许用户自定义和替换策略对象来满足特定的需求。
默认的策略对象是asyncio.DefaultEventLoopPolicy
,它基于具体的事件循环实现,如asyncio.SelectorEventLoop
(基于selectors
模块)或asyncio.SelectorEventLoop
(基于select
模块)。默认策略对象通过asyncio.get_event_loop_policy()
方法获取。
用户可以通过自定义策略对象来改变事件循环的行为,例如使用特定的事件循环实现或者自定义调度器。自定义策略对象必须实现asyncio.AbstractEventLoopPolicy
接口的方法,例如get_event_loop()
、set_event_loop()
、new_event_loop()
等。
下面是一个使用自定义策略对象的示例:
- import asyncio
-
- # 自定义策略对象
- class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
- def get_event_loop(self):
- loop = super().get_event_loop()
- print('Using custom event loop:', type(loop).__name__)
- return loop
-
- # 其他自定义方法...
-
- # 设置自定义策略对象
- asyncio.set_event_loop_policy(MyEventLoopPolicy())
-
- # 创建事件循环并运行
- loop = asyncio.get_event_loop()
- loop.run_forever()
在这个示例中,我们创建了一个自定义的策略对象MyEventLoopPolicy
,并重写了get_event_loop()
方法来打印使用的事件循环类型。然后通过asyncio.set_event_loop_policy()
方法将自定义策略对象设置为当前的策略。最后,使用asyncio.get_event_loop()
获取事件循环并运行。
请注意,自定义策略对象的设置应该在创建事件循环之前进行,以确保新创建的事件循环使用自定义策略。
asyncio
提供了扩展其功能的机制,使开发者能够根据自己的需求进行定制和扩展。以下是一些扩展asyncio
功能的常见方法:
(1)自定义协议(Protocol):可以通过继承asyncio.Protocol
类来创建自定义的协议。通过实现connection_made
、data_received
和connection_lost
等方法,可以处理连接的建立、数据的接收和连接的关闭等事件。
(2)自定义传输(Transport):可以通过继承asyncio.BaseTransport
类来创建自定义的传输。传输是协议和网络层之间的接口,负责处理数据的发送和接收。
(3)自定义事件循环(Event Loop):可以通过继承asyncio.AbstractEventLoop
类来创建自定义的事件循环。事件循环是asyncio
的核心组件,负责调度和执行异步任务。
(4)自定义异步任务(Coroutines):可以使用asyncio.coroutine
装饰器和async def
关键字来定义自己的异步任务。通过使用协程,可以编写异步的、非阻塞的代码逻辑。
(5)自定义异步函数(Async Functions):可以使用@asyncio.coroutine
装饰器和async def
关键字来定义自己的异步函数。异步函数可以与协程配合使用,用于编写更高级的异步逻辑。
(6)自定义事件处理器(Event Handlers):可以使用asyncio
提供的事件处理器机制来定制和处理特定的事件。例如,可以注册和处理定时器事件、I/O事件、信号事件等。
通过以上扩展方法,可以根据具体需求对asyncio
进行定制和扩展,以满足特定的异步编程场景和功能要求。这使得asyncio
具有很高的灵活性和可扩展性,适用于各种异步编程任务和应用领域。
下面是一个简单的示例代码,展示了如何扩展asyncio
的功能,通过自定义协议和传输来实现一个简单的Echo服务器:
- import asyncio
-
- # 自定义协议类
- class EchoProtocol(asyncio.Protocol):
- def connection_made(self, transport):
- self.transport = transport
-
- def data_received(self, data):
- message = data.decode()
- print("Received:", message)
- self.transport.write(data)
-
- def connection_lost(self, exc):
- print("Connection closed")
-
- # 自定义传输类
- class EchoTransport(asyncio.Transport):
- def __init__(self, loop):
- super().__init__(extra=None)
- self.loop = loop
- self.buffer = b''
-
- def write(self, data):
- print("Sending:", data.decode())
- self.buffer += data
-
- def close(self):
- print("Closing connection")
-
- def can_write_eof(self):
- return False
-
- def get_write_buffer_size(self):
- return len(self.buffer)
-
- def write_eof(self):
- pass
-
- def abort(self):
- pass
-
- def pause_reading(self):
- pass
-
- def resume_reading(self):
- pass
-
- # 创建事件循环
- loop = asyncio.get_event_loop()
-
- # 创建协议和传输对象
- protocol = EchoProtocol()
- transport = EchoTransport(loop)
-
- # 将协议和传输对象关联起来
- protocol.connection_made(transport)
-
- # 模拟收到数据并处理
- data = b"Hello, server!"
- protocol.data_received(data)
-
- # 关闭连接
- protocol.connection_lost(None)
-
- # 停止事件循环
- loop.stop()
这个示例中,EchoProtocol
类继承了asyncio.Protocol
,并实现了connection_made
、data_received
和connection_lost
等方法,用于处理连接的建立、数据的接收和连接的关闭。EchoTransport
类继承了asyncio.Transport
,并实现了一系列传输相关的方法,用于处理数据的发送和关闭连接。
通过自定义协议和传输,我们可以根据需要来扩展和定制asyncio
的功能,以适应不同的应用场景和需求。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。