当前位置:   article > 正文

FastAPI实现event-stream响应式流式输出_fastapi event-stream

fastapi event-stream
  1. import io
  2. from fastapi import FastAPI, Query, Path
  3. from fastapi.responses import StreamingResponse
  4. import uvicorn
  5. from pydantic import BaseModel
  6. from typing import Optional
  7. from starlette.templating import Jinja2Templates
  8. from starlette.requests import Request
  9. from starlette.responses import HTMLResponse
  10. import asyncio
  11. app = FastAPI()
  12. templates = Jinja2Templates(directory="../templates")
  13. @app.get("/stream")
  14. async def root(request: Request):
  15. async def event_generator(request: Request):
  16. res_str = "双天至尊真是一部好的电视剧!!!"
  17. for i in res_str:
  18. if await request.is_disconnected():
  19. print("连接已中断")
  20. break
  21. data = f'"event": "message"\n"data":{i}\n'
  22. yield data
  23. await asyncio.sleep(1)
  24. g = event_generator(request)
  25. return StreamingResponse(g, media_type="text/event-stream")
  26. if __name__ == '__main__':
  27. uvicorn.run("Run:app", host="0.0.0.0", port=8080)

运行结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/886203
推荐阅读
相关标签
  

闽ICP备14008679号