赞
踩
目录
实现功能:
- 注册后台任务(如:邮件发送、文件处理等异步场景,不影响接口返回)
- 监控后台任务执行进度(进度条功能)
- 支持根据任务ID查询对应任务进度
- #!/usr/bin/env python
- # -*- coding: UTF-8 -*-
- import random
- import asyncio
-
- from typing import List, Dict
- from fastapi import FastAPI, BackgroundTasks, WebSocket
-
- app = FastAPI()
-
- # 用于存储连接的 WebSocket 实例
- connected_websockets: Dict[int, List[WebSocket]] = {}
-
-
- @app.websocket("/ws/{task_id}/")
- async def websocket_endpoint(websocket: WebSocket, task_id: int):
- """WebSocket路由,用于接收任务进度"""
- await websocket.accept()
- connected_websockets.setdefault(task_id, []).append(websocket)
- try:
- while True:
- await websocket.receive_text()
- except:
- connected_websockets[task_id].remove(websocket)
-
-
- @app.post("/task/{task_id}/")
- async def start_task(background_tasks: BackgroundTasks, task_id: int):
- """注册后台任务"""
- background_tasks.add_task(process_task, task_id=task_id)
- return {"task_id": task_id}
-
-
- async def process_task(task_id):
- """处理任务的后台任务"""
- progress = 0
- while progress < 100:
- await asyncio.sleep(1)
- progress += random.randint(1, 10)
- progress = min(progress, 100)
- for ws in connected_websockets[task_id]:
- await ws.send_json({"task_id": task_id, "progress": progress})
- await asyncio.sleep(1)
-
-
- # 启动应用
- if __name__ == "__main__":
- import uvicorn
-
- uvicorn.run(app, host="0.0.0.0", port=8000)

POST
请求:http://127.0.0.1:8000/task/1/,指定任务ID为1
websocket
请求工具:npm install -g wscat
wscat -c ws://127.0.0.1:8000/ws/1/
,监控任务ID为1
的执行进度Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。