当前位置:   article > 正文

python项目中fastapi接口事件流的返回和接收处理_fastapi 实现 event stream

fastapi 实现 event stream

在FastAPI项目中,如果你想将一段JSON字符串按照事件流(Server-Sent Events, SSE)的形式返回给调用方,你可以利用FastAPI的StreamingResponse来实现。以下是具体的步骤和示例代码。

返回事件流

首先,确保你已经安装了FastAPI和Uvicorn。如果还未安装,可以通过以下命令来安装:

pip install fastapi uvicorn

然后,创建一个FastAPI应用,并定义一个路由来发送事件流。在这个示例中,我们将一个JSON字符串分成多个部分,每隔一段时间发送一部分,模拟事件流的发送:

  1. from fastapi import FastAPI, Response
  2. from fastapi.responses import StreamingResponse
  3. import json
  4. import time
  5. app = FastAPI()
  6. # 假设这是你想按事件流形式发送的JSON数据
  7. data = {
  8. "messages": [
  9. {"text": "Hello, World!", "timestamp": "2021-01-01T12:00:00"},
  10. {"text": "Hello, FastAPI!", "timestamp": "2021-01-02T12:00:01"},
  11. {"text": "Hello, FastAPI!", "timestamp": "2021-01-02T12:00:02"},
  12. ]
  13. }
  14. def generate_json_stream(data):
  15. # 分割JSON数据,并逐个发送
  16. for message in data["messages"]:
  17. json_str = "data: " + json.dumps(message) + "\n\n"
  18. yield json_str.encode("utf-8")
  19. time.sleep(1) # 模拟延时
  20. @app.get("/stream-json")
  21. async def stream_json():
  22. # return Response(content=generate_json_stream(data), media_type="text/event-stream")
  23. return StreamingResponse(generate_json_stream(data), media_type="text/event-stream")
  24. if __name__ == "__main__":
  25. import uvicorn
  26. uvicorn.run(app, host="127.0.0.1", port=8000)

在这个例子中,generate_json_stream是一个生成器函数,它逐个处理data字典中的messages列表,将每个消息转换为JSON字符串,并通过yield逐个返回。我们在每次yield之后暂停一秒(time.sleep(1)),模拟事件流中的时间间隔。

使用Response对象创建一个响应,并将generate_json_stream(data)作为content参数传入,media_type设置为text/event-stream。这告诉客户端,响应是一个事件流。

最后,使用uvicorn作为ASGI服务器来运行你的FastAPI应用。你可以通过运行以下命令来启动服务器:

uvicorn your_file_name:app --reload

请确保将your_file_name替换为你的Python文件名。然后,你可以通过访问http://127.0.0.1:8000/stream-json来接收JSON数据的事件流。

接收事件流

接收事件流(Server-Sent Events, SSE)通常在客户端进行,最常见的是在Web浏览器中使用JavaScript。这里将展示如何在浏览器中使用JavaScript接收从http://127.0.0.1:8000/stream-json返回的事件流,以及如何在Python中使用requests库接收事件流。

在浏览器中接收事件流

在Web浏览器中,你可以使用EventSource接口来监听和接收事件流。以下是一个简单的HTML和JavaScript示例,展示了如何做到这一点:

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <title>Event Stream</title>
  5. </head>
  6. <body>
  7. <h2>Event Stream</h2>
  8. <div id="messages"></div>
  9. <script>
  10. // 创建一个EventSource实例连接到服务器发送的事件流
  11. const eventSource = new EventSource('http://127.0.0.1:8000/stream-json');
  12. // 监听消息事件
  13. eventSource.onmessage = function(event) {
  14. // 将接收到的数据显示在网页上
  15. const messagesDiv = document.getElementById('messages');
  16. const message = JSON.parse(event.data); // 假设数据是JSON格式的
  17. messagesDiv.innerHTML += `<p>${message.text} at ${message.timestamp}</p>`;
  18. };
  19. // 监听错误事件
  20. eventSource.onerror = function(error) {
  21. console.error("EventSource failed:", error);
  22. eventSource.close(); // 关闭连接
  23. };
  24. </script>
  25. </body>
  26. </html>

这个HTML页面会创建一个到http://127.0.0.1:8000/stream-jsonEventSource连接,监听服务器发送的事件流。每当接收到一个消息事件时,它将事件数据解析为JSON,并将消息显示在页面上。

Python项目中接收事件流

按行读取

在Python中,你可以使用requests库的Response.iter_lines()方法来接收并处理事件流。请确保你安装了requests库。以下是如何使用requests接收事件流的示例代码:

  1. import requests
  2. def receive_event_stream(url):
  3. # 发起请求并设置stream=True来获取响应流
  4. response = requests.get(url, stream=True)
  5. # 确保请求成功
  6. response.raise_for_status()
  7. # 逐行读取响应
  8. for line in response.iter_lines():
  9. if line:
  10. decoded_line = line.decode('utf-8')
  11. print(decoded_line)
  12. # 接收事件流
  13. receive_event_stream('http://127.0.0.1:8000/stream-json')

这段代码向http://127.0.0.1:8000/stream-json发起一个GET请求,并逐行读取响应。由于我们设置了stream=Truerequests会以流式方式接收响应,允许我们即时处理每行数据。

按块读取

使用Response.iter_content()方法代替Response.iter_lines()来按chunk_size读取和返回数据,而不是按行读取。这样,每次可以直接返回指定字节大小的数据块(chunk)。

iter_content()方法提供了一种灵活的方式来处理数据流,使你能够根据需要控制每次读取数据的量。以下是如何使用iter_content()chunk_size读取数据的示例:

  1. import requests
  2. def receive_event_stream(url):
  3. with requests.get(url, stream=True) as response:
  4. # 确保请求成功
  5. response.raise_for_status()
  6. # 使用iter_content按chunk_size逐块读取数据
  7. for chunk in response.iter_content(chunk_size=16):
  8. # 注意:在二进制模式下处理数据,如果需要可以转换为字符串
  9. if chunk:
  10. print(chunk)
  11. # 接收事件流
  12. receive_event_stream('http://127.0.0.1:8000/stream-json')

在这个例子中,iter_content(chunk_size=16)指定了每次从数据流中读取16字节的数据。这种方式不会根据换行符来分割数据,而是直接根据chunk_size返回数据块。如果你需要以文本形式处理数据(比如,数据是JSON字符串),可能需要手动解码每个数据块,如使用chunk.decode('utf-8')

使用iter_content()时,重要的是要注意它返回的是二进制数据。这对于下载文件或处理非文本内容特别有用。

两种接收方式对比

选择iter_content()还是iter_lines()取决于你的具体需求:如果你需要更细粒度的控制或处理非文本数据,iter_content()是一个很好的选择;如果你处理的是文本数据,并希望按行读取,iter_lines()可能更合适,因为它会自动处理行结束符。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/689826
推荐阅读
相关标签
  

闽ICP备14008679号