赞
踩
asyncio提供一组高层级API用于:
Eventloop实例提供了注册、取消和执行任务和回调的方法;是asyncio应用的核心,是中央总控。
把一些异步函数(任务,Task)注册到Eventloop上,Eventloop会循环执行这些函数(但同时只能执行一个),当执行到某个函数时,如果它正在等待I/O返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同(Cooperative)运行。
协程(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程:
import asyncio
async def a():
print('Suspending a')
await asyncio.sleep(0.1)
print('Resuming a')
async def b():
print('In b')
async def main():
await asyncio.gather(a(), b())
if __name__ == '__main__':
asyncio.run(main())
其上有以下关键点:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Future是对协程的封装,异步操作结束后会把最终结果设置到这个Future对象上。可以对这个Future实例:
await myFuture
等待future执行完成。
要保证多个任务并发,需要使用gather或create_task(创建任务)方式来执行;否则可能达不到想要的结果:
import asyncio import time # 在spyder中,因spyder本身就在EventLoop中执行,需要添加以下引用才能正常执行(pyCharm等中不需要) #import nest_asyncio #nest_asyncio.apply() async def a(): print('Suspending a') await asyncio.sleep(3) print('Resuming a') async def b(): print('Suspending b') await asyncio.sleep(2) print('Resuming b') async def runEach(): await a() await b() async def runGather(): await asyncio.gather(a(), b()) async def runTask(): t1 = asyncio.create_task(a()) t2 = asyncio.create_task(b()) await t1 await t2 async def runDirect(): await asyncio.create_task(a()) await asyncio.create_task(b()) async def runCombine(): tb = asyncio.create_task(b()) await a() await tb def show_perf(func): print('*' * 20) start = time.perf_counter() asyncio.run(func()) print(f'{func.__name__} Cost: {time.perf_counter() - start}') if __name__ == '__main__': show_perf(runEach) # 5 show_perf(runGather) # 3 show_perf(runTask) # 3 show_perf(runDirect) # 5 show_perf(runCombine) # 3
如上,不能直接await协程,也不能直接await create_task(要wait其结果,返回的task);否则无法并行执行。
WebSocket是基于TCP的应用层协议,实现了浏览器与服务器全双工(full-duplex)通信。其本质是保持TCP连接,在浏览器和服务端通过Socket进行通信。
websockets是基于asyncio的简单、高效的websocket库,使用时需要先安装库:
pip install websockets
# 以上方式默认安装到python目录中,在anaconda中无法使用;要使用需要指定环境(webStudy)的目录
pip install websockets -t C:\Users\gdxu\Anaconda3\envs\webStudy\Lib\site-packages
若要同步收发消息(发送然后等待应答):
import asyncio
from websockets import connect
async def hello(uri):
async with connect(uri) as websocket:
await websocket.send("Hello world!")
reply = await websocket.recv()
return reply
基于websockets中发送与接收接口:
import json import logging from websockets import connect class Handler: def __init__(self, loop=None): self.ws = None self.loop = loop async def async_connect(self, url): logging.info("attempting connection to {}".format(url)) # perform async connect, and store the connected WebSocketClientProtocol # object, for later reuse for send & recv self.ws = await connect(url) logging.info("connected") def sendJsonObj(self, cmd): return self.loop.run_until_complete(self.async_sendJsonObj(cmd)) async def async_sendJsonObj(self, cmd): return await self.ws.send(json.dumps(cmd)) def sendByte(self, cmd): return self.loop.run_until_complete(self.async_sendByte(cmd)) async def async_sendByte(self, cmd): return await self.ws.send(cmd) def close(self): self.loop.run_until_complete(self.async_close()) logging.info('closed') async def async_close(self): await self.ws.close(reason="user quit") self.ws = None def toRecv(self): self.loop.run_until_complete(self.async_recv()) async def async_recv(self, callback=None): # i = 1 logging.info('async_recv begin') while True: # logging.info('to recv:') reply = await self.ws.recv() if callback: callback(reply) # logging.info("{}- recv: {}".format(i, reply)) # i += 1 logging.info('async_recv end')
使用以上类异步收发数据:
import struct import json import asyncio import logging from wsHandler import Handler from logging_config import init_logging # import nest_asyncio # nest_asyncio.apply() remoteUri = 'wss://ws.test.com:2443/' login = {'role': 'admin', 'user_id': 'test1234'} toFree = struct.pack('=ib', 0, 0) setService = struct.pack('=i', 2) + json.dumps(["first", "test"]).encode('utf-8') async def sendCmd(handle): await handle.async_sendJsonObj(login) await asyncio.sleep(0.5) await handle.async_sendByte(setService) await asyncio.sleep(0.5) await handle.async_sendByte(toFree) def handleReply(reply): if len(reply) >= 4: resp = struct.unpack('=i', reply[:4]) logging.info("reply: {}".format(resp)) async def recvCmd(handle): await handle.async_recv(handleReply) if __name__ == '__main__': init_logging() handler = Handler() loop = asyncio.get_event_loop() loop.run_until_complete(handler.async_connect(remoteUri)) tasks = asyncio.gather(sendCmd(handler), recvCmd(handler)) loop.run_until_complete(tasks) loop.run_until_complete(handler.async_close()) logging.info('quit')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。