赞
踩
前言:最近使用大模型的API时候都会用到SSE响应,目的是模型生成一点东西,返回一点东西。下面记录一下Python的使用方法。
from fastapi import FastAPI, Request
import uvicorn
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
if __name__ == "__main__":
uvicorn.run(app, host='0.0.0.0', port=6006)
from fastapi import FastAPI, Request from sse_starlette.sse import EventSourceResponse import asyncio import uvicorn app = FastAPI() async def event_generator(request: Request): res_str = '七夕情人节即将来临,我们为您准备了精美的鲜花和美味的蛋糕' for i in res_str: if await request.is_disconnected(): print("连接已中断") break yield{ 'event': 'message', 'retry': 15000, 'data': i } await asyncio.sleep(0.2) @app.get("/") async def root(request: Request): g = event_generator(request) return EventSourceResponse(g) if __name__ == "__main__": uvicorn.run(app, host='0.0.0.0', port=6006)
import requests
# 使用requests 接收流式信息
def test():
url = r"http://localhost:6006/"
response = requests.get(url, stream=True, verify=False)
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
print(chunk)
if __name__ == '__main__':
test()
import aiohttp import asyncio # 使用aiohttp 接收流式信息 async def test(): sseresp = aiohttp.request("GET", r'http://localhost:6006/') async with sseresp as r: async for chunk in r.content.iter_any(): print(chunk.decode()) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(test())
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。