当前位置:   article > 正文

Python使用fastAPI实现一个流式传输接口_fastapi接口返回openapi的stream

fastapi接口返回openapi的stream

1. 使用fastapi实现流式传输

1.1 服务端 fastapi_server.py

编写服务端代码fastapi_server.py。服务端代码主要使用了fastapi和uvicorn两个库。

  1. #!/usr/bin/env python
  2. # coding=utf-8
  3. # @Time : 2024/1/31 19:13
  4. # @Software: PyCharm
  5. from fastapi import FastAPI
  6. from fastapi.responses import StreamingResponse
  7. import time
  8. import uvicorn
  9. app = FastAPI()
  10. async def generate_data():
  11. for i in range(1, 11):
  12. time.sleep(1) # 模拟每秒生成一个块的耗时操作
  13. yield f"FASTAPI Chunk {i}\n"
  14. @app.get("/stream")
  15. async def stream_data():
  16. return StreamingResponse(generate_data(), media_type="text/plain")
  17. if __name__ == "__main__":
  18. uvicorn.run(app, host="127.0.0.1", port=8001)

1.2客户端 (requests)

使用python编写一个客户端stream_client.py,进行流式接收。

  1. #!/usr/bin/env python
  2. # coding=utf-8
  3. # @Time : 2024/1/31 19:14
  4. #
  5. # stream_client.py
  6. import requests
  7. url = "http://127.0.0.1:8001/stream/" # 替换为你的实际接口地址
  8. def test1():
  9. try:
  10. response = requests.get(url, stream=True) # stream参数为True
  11. if response.status_code == 200:
  12. for chunk in response.iter_content(chunk_size=7): # 这行很重要哦
  13. if chunk:
  14. print(chunk.decode("utf-8"), end="")
  15. except requests.RequestException as e:
  16. print(f"Request failed: {e}")
  17. def test2():
  18. try:
  19. response = requests.get(url, stream=True)
  20. if response.status_code == 200:
  21. for line in response.iter_lines(decode_unicode=True, chunk_size=8):
  22. if line:
  23. print("Received SSE event:", line)
  24. except requests.RequestException as e:
  25. print(f"Request failed: {e}")
  26. # test1()
  27. test2()

1.3 在html中流式显示

新建一个client.html文件,放在fastapi_server.py目录下。同时修改fastapi_server.py, 增加一个BlockIterator迭代器对长文本进行切块。

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <meta http-equiv="X-UA-Compatible" content="IE=edge">
  6. <meta name="viewport" content="width=device-width, initial-scale=1.0">
  7. <title>Streaming Client (client.html)</title>
  8. </head>
  9. <body>
  10. aaa
  11. <div id="output"></div>
  12. <script>
  13. const outputDiv = document.getElementById('output');
  14. // 替换为你的实际接口地址
  15. const url = '$FASTAPI$';
  16. // 使用 Fetch API 发送请求
  17. fetch(url)
  18. .then(response => {
  19. const reader = response.body.getReader();
  20. return new ReadableStream({
  21. async start(controller) {
  22. while (true) {
  23. const { done, value } = await reader.read();
  24. // 如果读取完毕,中止流
  25. if (done) {
  26. controller.close();
  27. break;
  28. }
  29. // 将每个块的内容添加到页面上
  30. outputDiv.innerHTML += new TextDecoder().decode(value) +"<br>";
  31. }
  32. }
  33. });
  34. })
  35. .then(stream => {
  36. // 使用 TextStream,将流连接到页面上的输出
  37. const textStream = new TextStream(stream);
  38. return textStream.pipeTo(new WritableStream({
  39. write: chunk => {
  40. // 在这里你可以处理每个块的数据
  41. console.log('Received chunk:', chunk);
  42. }
  43. }));
  44. })
  45. .catch(error => console.error('Error:', error));
  46. </script>
  47. </body>
  48. </html>

服务端增加一个/web_client接口,读取上述网页内容

 

  1. #!/usr/bin/env python
  2. # coding=utf-8
  3. # @Time : 2024/1/31 19:13
  4. # @Software: PyCharm
  5. from fastapi import FastAPI
  6. from starlette.responses import HTMLResponse
  7. from fastapi.responses import StreamingResponse
  8. import time
  9. import uvicorn
  10. app = FastAPI()
  11. CONTENT = """《易经》被誉为诸经之首,大道之源,是中华优秀传统文化的总纲领,是中华民族五千年智慧的结晶。他含盖万有、纲纪群伦,是中华文化的杰出代表;他博大精微、包罗万象,亦是中华文明的源头。其内容涉及哲学、生命、自然、科学、政治、天文、地理、文学、艺术等诸多领域,是各家共同的经典。
  12. 《易经》包括《连山》《归藏》《周易》三部易书,现存于世的只有《周易》。《周易》相传是周文王被囚羑里时,研究《易经》所作的结论。"""
  13. class BlockIterator:
  14. def __init__(self, text, block_size=10):
  15. self.text = text
  16. self.block_size = block_size
  17. self.index = 0
  18. def __iter__(self):
  19. return self
  20. def __next__(self):
  21. if self.index >= len(self.text):
  22. raise StopIteration
  23. block = self.text[self.index:self.index + self.block_size]
  24. self.index += self.block_size
  25. return block
  26. async def generate_data():
  27. for i in BlockIterator(CONTENT, block_size=1):
  28. time.sleep(0.05) # 模拟每秒生成一个块的耗时操作
  29. # yield f"FASTAPI Chunk {i}\n"
  30. yield i
  31. @app.get("/web_client", response_class=HTMLResponse)
  32. async def read_root():
  33. with open("static/client.html", "r") as file:
  34. html_content = file.read()
  35. html_content = html_content.replace("$FASTAPI$", "http://127.0.0.1:8001/stream/")
  36. return HTMLResponse(content=html_content)
  37. @app.get("/stream")
  38. async def stream_data():
  39. return StreamingResponse(generate_data(), media_type="text/plain")
  40. if __name__ == "__main__":
  41. uvicorn.run(app, host="127.0.0.1", port=8001)

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号