赞
踩
- import io
-
- from fastapi import FastAPI, Query, Path
- from fastapi.responses import StreamingResponse
- import uvicorn
- from pydantic import BaseModel
- from typing import Optional
- from starlette.templating import Jinja2Templates
- from starlette.requests import Request
- from starlette.responses import HTMLResponse
- import asyncio
-
- app = FastAPI()
- templates = Jinja2Templates(directory="../templates")
-
-
- @app.get("/stream")
- async def root(request: Request):
- async def event_generator(request: Request):
- res_str = "双天至尊真是一部好的电视剧!!!"
- for i in res_str:
- if await request.is_disconnected():
- print("连接已中断")
- break
- data = f'"event": "message"\n"data":{i}\n'
- yield data
- await asyncio.sleep(1)
- g = event_generator(request)
- return StreamingResponse(g, media_type="text/event-stream")
-
-
- if __name__ == '__main__':
- uvicorn.run("Run:app", host="0.0.0.0", port=8080)
运行结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。