赞
踩
pip install websockets-routes
- import asyncio
- import json
- from typing import Union
-
- import websockets
- import websockets_routes
- from websockets.legacy.server import WebSocketServerProtocol
- from websockets_routes import RoutedPath
-
- # 初始化一个router对象
- router = websockets_routes.Router()
-
- # 连接句柄
- connections: dict[str, Union[None, WebSocketServerProtocol]] = {
- 'setting': None,
- 'fastapi': None,
- 'administrator': None
- }
-
- flags: dict[str, any] = {
- 'is_allow_query': True,
- 'opt_success': False
- }
-
-
- # 消息监听
- @router.route("/command/{identification}")
- async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath):
- """
- 更新连接句柄
- """
- if path.params['identification'] == 'setting':
- connections['setting'] = websocket
- elif path.params["identification"] == 'fastapi':
- connections['fastapi'] = websocket
- elif path.params["identification"] == 'administrator':
- connections['administrator'] = websocket
- else:
- return
- """
- 处理消息
- """
- async for message in websocket:
- # 处理setting用户的业务
- if path.params['identification'] == 'setting':
- if connections['fastapi'] is None:
- await websocket.send("fastapi的websocket未连接")
- else:
- msg: dict = json.loads(message)
- if msg['cmd'] == 'is_allow_query':
- flags['is_allow_query'] = True if msg['data'] == 'true' else False
-
- # 处理fastapi用户的业务
- elif path.params["identification"] == 'fastapi':
- if connections['setting'] is None:
- await websocket.send("setting的websocket未连接")
- else:
- await connections['setting'].send(message)
- await websocket.send("发送成功")
-
- # 处理administrator用户的业务
- elif path.params["identification"] == 'administrator':
- await websocket.send("【administrator】你发给我的消息是:" + json.dumps(message))
- # 其他
- else:
- await websocket.send("身份错误")
-
-
- # 启动WebSocket服务器
- async def main():
- # 启动WebSocket服务
- async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089):
- await asyncio.Future() # run forever
-
-
- def start():
- # 启动WebSocket服务
- asyncio.run(main())
fastAPI中调用
连接服务
在fastAPI中启动websocket服务
- import asyncio
- import websockets
- import websockets_routes
- from websockets.legacy.server import WebSocketServerProtocol
- from websockets_routes import RoutedPath
-
- # 初始化一个router对象
- router = websockets_routes.Router()
-
-
- # 消息监听
- @router.route("/command/{identification}")
- async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath):
- # 收到消息
- async for message in websocket:
- # 处理setting用户的业务
- if path.params['identification'] == 'setting':
- await websocket.send("你发给我的消息是:" + message)
- # 处理administrator用户的业务
- elif path.params["identification"] == 'administrator':
- await websocket.send("你发给我的消息是:" + message)
- else:
- await websocket.send("指令码错误")
-
-
- # 启动WebSocket服务器
- async def main():
- # 启动WebSocket服务
- async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089):
- await asyncio.Future() # run forever
-
-
- def start():
- # 启动WebSocket服务
- asyncio.run(main())
- if __name__ == "__main__":
- # 开启一个线程去启动WebSocket服务器
- thread = Thread(target=start)
- thread.start()
- # 启动Web服务
- uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。