当前位置:   article > 正文

【Python笔记-FastAPI】后台任务+WebSocket监控进度_fastapi list[websocket]

fastapi list[websocket]

目录

一、代码示例

二、执行说明

(一) 调用任务执行接口

(二) 监控任务进度


实现功能:

  1. 注册后台任务(如:邮件发送、文件处理等异步场景,不影响接口返回)
  2. 监控后台任务执行进度(进度条功能)
  3. 支持根据任务ID查询对应任务进度

一、代码示例

  1. #!/usr/bin/env python
  2. # -*- coding: UTF-8 -*-
  3. import random
  4. import asyncio
  5. from typing import List, Dict
  6. from fastapi import FastAPI, BackgroundTasks, WebSocket
  7. app = FastAPI()
  8. # 用于存储连接的 WebSocket 实例
  9. connected_websockets: Dict[int, List[WebSocket]] = {}
  10. @app.websocket("/ws/{task_id}/")
  11. async def websocket_endpoint(websocket: WebSocket, task_id: int):
  12. """WebSocket路由,用于接收任务进度"""
  13. await websocket.accept()
  14. connected_websockets.setdefault(task_id, []).append(websocket)
  15. try:
  16. while True:
  17. await websocket.receive_text()
  18. except:
  19. connected_websockets[task_id].remove(websocket)
  20. @app.post("/task/{task_id}/")
  21. async def start_task(background_tasks: BackgroundTasks, task_id: int):
  22. """注册后台任务"""
  23. background_tasks.add_task(process_task, task_id=task_id)
  24. return {"task_id": task_id}
  25. async def process_task(task_id):
  26. """处理任务的后台任务"""
  27. progress = 0
  28. while progress < 100:
  29. await asyncio.sleep(1)
  30. progress += random.randint(1, 10)
  31. progress = min(progress, 100)
  32. for ws in connected_websockets[task_id]:
  33. await ws.send_json({"task_id": task_id, "progress": progress})
  34. await asyncio.sleep(1)
  35. # 启动应用
  36. if __name__ == "__main__":
  37. import uvicorn
  38. uvicorn.run(app, host="0.0.0.0", port=8000)

二、执行说明

(一) 调用任务执行接口

  1. 启动服务后,访问:http://127.0.0.1:8000/docs
  2. POST请求:http://127.0.0.1:8000/task/1/,指定任务ID为1

(二) 监控任务进度

  1. 安装websocket请求工具:npm install -g wscat
  2. 终端输入wscat -c ws://127.0.0.1:8000/ws/1/,监控任务ID为1的执行进度

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/819500
推荐阅读
相关标签
  

闽ICP备14008679号