赞
踩
在FastAPI项目中,如果你想将一段JSON字符串按照事件流(Server-Sent Events, SSE)的形式返回给调用方,你可以利用FastAPI的StreamingResponse
来实现。以下是具体的步骤和示例代码。
首先,确保你已经安装了FastAPI和Uvicorn。如果还未安装,可以通过以下命令来安装:
pip install fastapi uvicorn
然后,创建一个FastAPI应用,并定义一个路由来发送事件流。在这个示例中,我们将一个JSON字符串分成多个部分,每隔一段时间发送一部分,模拟事件流的发送:
- from fastapi import FastAPI, Response
- from fastapi.responses import StreamingResponse
- import json
- import time
-
- app = FastAPI()
-
- # 假设这是你想按事件流形式发送的JSON数据
- data = {
- "messages": [
- {"text": "Hello, World!", "timestamp": "2021-01-01T12:00:00"},
- {"text": "Hello, FastAPI!", "timestamp": "2021-01-02T12:00:01"},
- {"text": "Hello, FastAPI!", "timestamp": "2021-01-02T12:00:02"},
- ]
- }
-
- def generate_json_stream(data):
- # 分割JSON数据,并逐个发送
- for message in data["messages"]:
- json_str = "data: " + json.dumps(message) + "\n\n"
- yield json_str.encode("utf-8")
- time.sleep(1) # 模拟延时
-
- @app.get("/stream-json")
- async def stream_json():
- # return Response(content=generate_json_stream(data), media_type="text/event-stream")
- return StreamingResponse(generate_json_stream(data), media_type="text/event-stream")
-
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="127.0.0.1", port=8000)
在这个例子中,generate_json_stream
是一个生成器函数,它逐个处理data
字典中的messages
列表,将每个消息转换为JSON字符串,并通过yield
逐个返回。我们在每次yield
之后暂停一秒(time.sleep(1)
),模拟事件流中的时间间隔。
使用Response
对象创建一个响应,并将generate_json_stream(data)
作为content
参数传入,media_type
设置为text/event-stream
。这告诉客户端,响应是一个事件流。
最后,使用uvicorn
作为ASGI服务器来运行你的FastAPI应用。你可以通过运行以下命令来启动服务器:
uvicorn your_file_name:app --reload
请确保将your_file_name
替换为你的Python文件名。然后,你可以通过访问http://127.0.0.1:8000/stream-json
来接收JSON数据的事件流。
接收事件流(Server-Sent Events, SSE)通常在客户端进行,最常见的是在Web浏览器中使用JavaScript。这里将展示如何在浏览器中使用JavaScript接收从http://127.0.0.1:8000/stream-json
返回的事件流,以及如何在Python中使用requests
库接收事件流。
在Web浏览器中,你可以使用EventSource
接口来监听和接收事件流。以下是一个简单的HTML和JavaScript示例,展示了如何做到这一点:
- <!DOCTYPE html>
- <html>
- <head>
- <title>Event Stream</title>
- </head>
- <body>
- <h2>Event Stream</h2>
- <div id="messages"></div>
- <script>
- // 创建一个EventSource实例连接到服务器发送的事件流
- const eventSource = new EventSource('http://127.0.0.1:8000/stream-json');
-
- // 监听消息事件
- eventSource.onmessage = function(event) {
- // 将接收到的数据显示在网页上
- const messagesDiv = document.getElementById('messages');
- const message = JSON.parse(event.data); // 假设数据是JSON格式的
- messagesDiv.innerHTML += `<p>${message.text} at ${message.timestamp}</p>`;
- };
-
- // 监听错误事件
- eventSource.onerror = function(error) {
- console.error("EventSource failed:", error);
- eventSource.close(); // 关闭连接
- };
- </script>
- </body>
- </html>
这个HTML页面会创建一个到http://127.0.0.1:8000/stream-json
的EventSource
连接,监听服务器发送的事件流。每当接收到一个消息事件时,它将事件数据解析为JSON,并将消息显示在页面上。
在Python中,你可以使用requests
库的Response.iter_lines()
方法来接收并处理事件流。请确保你安装了requests
库。以下是如何使用requests
接收事件流的示例代码:
- import requests
-
- def receive_event_stream(url):
- # 发起请求并设置stream=True来获取响应流
- response = requests.get(url, stream=True)
-
- # 确保请求成功
- response.raise_for_status()
-
- # 逐行读取响应
- for line in response.iter_lines():
- if line:
- decoded_line = line.decode('utf-8')
- print(decoded_line)
-
- # 接收事件流
- receive_event_stream('http://127.0.0.1:8000/stream-json')
这段代码向http://127.0.0.1:8000/stream-json
发起一个GET请求,并逐行读取响应。由于我们设置了stream=True
,requests
会以流式方式接收响应,允许我们即时处理每行数据。
使用Response.iter_content()
方法代替Response.iter_lines()
来按chunk_size
读取和返回数据,而不是按行读取。这样,每次可以直接返回指定字节大小的数据块(chunk)。
iter_content()
方法提供了一种灵活的方式来处理数据流,使你能够根据需要控制每次读取数据的量。以下是如何使用iter_content()
按chunk_size
读取数据的示例:
- import requests
-
- def receive_event_stream(url):
- with requests.get(url, stream=True) as response:
- # 确保请求成功
- response.raise_for_status()
-
- # 使用iter_content按chunk_size逐块读取数据
- for chunk in response.iter_content(chunk_size=16):
- # 注意:在二进制模式下处理数据,如果需要可以转换为字符串
- if chunk:
- print(chunk)
-
- # 接收事件流
- receive_event_stream('http://127.0.0.1:8000/stream-json')
在这个例子中,iter_content(chunk_size=16)
指定了每次从数据流中读取16字节的数据。这种方式不会根据换行符来分割数据,而是直接根据chunk_size
返回数据块。如果你需要以文本形式处理数据(比如,数据是JSON字符串),可能需要手动解码每个数据块,如使用chunk.decode('utf-8')
。
使用iter_content()
时,重要的是要注意它返回的是二进制数据。这对于下载文件或处理非文本内容特别有用。
选择iter_content()
还是iter_lines()
取决于你的具体需求:如果你需要更细粒度的控制或处理非文本数据,iter_content()
是一个很好的选择;如果你处理的是文本数据,并希望按行读取,iter_lines()
可能更合适,因为它会自动处理行结束符。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。