当前位置:   article > 正文

python使用websocket服务并在fastAPI中启动websocket服务_python fastapi websocket

python fastapi websocket
  1. 依赖
    pip install websockets-routes
  2. 代码
    1. import asyncio
    2. import json
    3. from typing import Union
    4. import websockets
    5. import websockets_routes
    6. from websockets.legacy.server import WebSocketServerProtocol
    7. from websockets_routes import RoutedPath
    8. # 初始化一个router对象
    9. router = websockets_routes.Router()
    10. # 连接句柄
    11. connections: dict[str, Union[None, WebSocketServerProtocol]] = {
    12. 'setting': None,
    13. 'fastapi': None,
    14. 'administrator': None
    15. }
    16. flags: dict[str, any] = {
    17. 'is_allow_query': True,
    18. 'opt_success': False
    19. }
    20. # 消息监听
    21. @router.route("/command/{identification}")
    22. async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath):
    23. """
    24. 更新连接句柄
    25. """
    26. if path.params['identification'] == 'setting':
    27. connections['setting'] = websocket
    28. elif path.params["identification"] == 'fastapi':
    29. connections['fastapi'] = websocket
    30. elif path.params["identification"] == 'administrator':
    31. connections['administrator'] = websocket
    32. else:
    33. return
    34. """
    35. 处理消息
    36. """
    37. async for message in websocket:
    38. # 处理setting用户的业务
    39. if path.params['identification'] == 'setting':
    40. if connections['fastapi'] is None:
    41. await websocket.send("fastapi的websocket未连接")
    42. else:
    43. msg: dict = json.loads(message)
    44. if msg['cmd'] == 'is_allow_query':
    45. flags['is_allow_query'] = True if msg['data'] == 'true' else False
    46. # 处理fastapi用户的业务
    47. elif path.params["identification"] == 'fastapi':
    48. if connections['setting'] is None:
    49. await websocket.send("setting的websocket未连接")
    50. else:
    51. await connections['setting'].send(message)
    52. await websocket.send("发送成功")
    53. # 处理administrator用户的业务
    54. elif path.params["identification"] == 'administrator':
    55. await websocket.send("【administrator】你发给我的消息是:" + json.dumps(message))
    56. # 其他
    57. else:
    58. await websocket.send("身份错误")
    59. # 启动WebSocket服务器
    60. async def main():
    61. # 启动WebSocket服务
    62. async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089):
    63. await asyncio.Future() # run forever
    64. def start():
    65. # 启动WebSocket服务
    66. asyncio.run(main())
  3. fastAPI中调用

     

  4. 连接服务

  5. 在fastAPI中启动websocket服务

    1. import asyncio
    2. import websockets
    3. import websockets_routes
    4. from websockets.legacy.server import WebSocketServerProtocol
    5. from websockets_routes import RoutedPath
    6. # 初始化一个router对象
    7. router = websockets_routes.Router()
    8. # 消息监听
    9. @router.route("/command/{identification}")
    10. async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath):
    11. # 收到消息
    12. async for message in websocket:
    13. # 处理setting用户的业务
    14. if path.params['identification'] == 'setting':
    15. await websocket.send("你发给我的消息是:" + message)
    16. # 处理administrator用户的业务
    17. elif path.params["identification"] == 'administrator':
    18. await websocket.send("你发给我的消息是:" + message)
    19. else:
    20. await websocket.send("指令码错误")
    21. # 启动WebSocket服务器
    22. async def main():
    23. # 启动WebSocket服务
    24. async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089):
    25. await asyncio.Future() # run forever
    26. def start():
    27. # 启动WebSocket服务
    28. asyncio.run(main())
    1. if __name__ == "__main__":
    2. # 开启一个线程去启动WebSocket服务器
    3. thread = Thread(target=start)
    4. thread.start()
    5. # 启动Web服务
    6. uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/887687
推荐阅读
相关标签
  

闽ICP备14008679号