赞
踩
这篇文章当时写得比较匆忙,这里进行一下更深入的补充
SSE 技术不是什么新鲜东西,就是一个 HTTP 请求和响应,关键就是响应这个环节,原始的响应都是一次性的,普通的响应是这样的:
Nginx 与后端服务器之间的通信
评论中有人问是否直接用 StreamResponse 就行了,如果你自己打算封装一个StreamResponse,并且可以反复发送多次,差不多就实现了与EventSourceResponse同样的效果吧!
nginx如果要支持SSE,要调整一些参数,比较重要的就是要避免缓存,之前 nginx 处理的都是一次性的响应,例如网页,所以会尽量把内容缓存起来一次性发送到客户端,所以这里要关掉缓存
conf 配置文件,这里做了修正
- http {
- ...
-
- server {
- ...
-
- location /sse {
- proxy_http_version 1.1;
- proxy_set_header Connection "";
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header Host $host;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
-
- # SSE 连接时的超时时间
- proxy_read_timeout 86400s;
-
- # 取消缓冲
- proxy_buffering off;
-
- # 关闭代理缓存
- proxy_cache off;
-
- # 禁用分块传输编码
- #chunked_transfer_encoding off
-
- # 反向代理到 SSE 应用的地址和端口
- proxy_pass http://backend-server;
- }
-
- ...
- }
-
- ...
- }
proxy_http_version 1.1 才支持 keep-alive,保持连接不中断
在location /sse
块内,设置代理头部以确保不会对SSE响应进行缓存。Cache-Control
和Content-Type
头部用于告诉浏览器这是一个SSE连接。
使用proxy_buffering off;
来禁用Nginx的缓冲,以确保SSE响应立即被传递给客户端。
使用proxy_pass
指令将SSE请求代理到后端服务器的SSE端点。请将http://backend_server;
替换为你的后端服务器地址和SSE端点路径。
设置连接超时时间,以避免不活动连接被Nginx关闭。在这里,我设置了一个较长的超时时间(3600秒),以便连接可以保持较长时间。你可以根据你的需求进行调整。
Server-Sent Events(SSE)和WebSocket是两种用于实现实时通信的不同技术,它们在某些方面有相似之处,但也存在一些关键的区别:
Server-Sent Events (SSE):
单向通信:SSE是一种单向通信机制,其中服务器向客户端发送数据。客户端接收服务器推送的数据,但不能向服务器发送数据。这使得它适合用于服务器向客户端的实时通知或事件推送。
基于HTTP:SSE建立在标准的HTTP/HTTPS协议之上,使用普通的HTTP请求和响应来实现。它不需要特殊的协议升级。
文本数据:SSE主要用于发送文本数据。服务器可以将文本事件推送到客户端,客户端通过监听事件来接收数据。每个事件通常包含一个标识符、数据字段和可选的注释字段。
浏览器支持:SSE在现代浏览器中得到广泛支持,不需要额外的JavaScript库或框架。客户端使用EventSource
API来与SSE服务端通信。
WebSocket:
双向通信:WebSocket是一种双向通信协议,允许客户端和服务器之间进行双向数据交换。客户端可以向服务器发送数据,服务器也可以主动向客户端推送数据。
独立协议:WebSocket是一种独立的协议,与HTTP不同。建立WebSocket连接需要进行协议升级,然后在一个持久连接上进行数据交换。
二进制和文本数据:WebSocket支持二进制和文本数据的传输,因此可以用于多种类型的数据交换,包括游戏、实时聊天和多媒体流等。
浏览器支持:WebSocket在现代浏览器中得到广泛支持,同时也有许多服务器端和客户端库可用于各种编程语言和环境中。
服务端判断前端要建立 SSE 连接的识别标识是 Accept: text/event-stream,该标识代表前端会接收后端发过来的事件流
- GET /sse-endpoint HTTP/1.1
- Host: example.com
- Accept: text/event-stream
连接后因为消息至少区分两种,message 和 close,所以就把推送消息做成了一种结构:
- event: message
- data: Hello, this is a message!
如果是 close 事件
- event: close
- data:
前端接收关闭事件
- const eventSource = new EventSource('/sse-endpoint');
-
- eventSource.onmessage = function(event) {
- // 处理事件消息
- };
-
- eventSource.onerror = function(event) {
- if (event.readyState === EventSource.CLOSED) {
- // 服务器关闭了连接
- console.log('SSE连接已关闭');
- // 可以尝试重新建立连接
- }
- };
OpenAI 官方给我了一个超简单的文档,还直接用curl的方式搞得,真是能多省就多省,大家可以使用apifox 或者 postman 将curl 转成 fetch 或者 request 等自己能看懂的代码,当然也可以自己自学一下curl的命令,如果你能访问OpenAI,可以点下面的链接,自己看看
大家如果对上面的双语翻译感兴趣,我推荐一个技术大佬的免费插件,沉浸式翻译
其中有个 stream 使用讲解,stream这个东西,我之前也没用过,经过学习后,发现这东西一直都存在就是一个content-type格式,只是我们原来没有注意过,我们都是用urlencode或者json格式来处理数据的,其实可以以二进制的方式,发过来,然后你再自行处理。
我发现了一个大佬,开源了一个插件,从中窥见了SSE的使用案例,大家有兴趣,可以看另外一篇SSE的学习案例,这里不对前端再做深入的讨论了
GitHub - openai/openai-python: The OpenAI Python library provides convenient access to the OpenAI API from applications written in the Python language.The OpenAI Python library provides convenient access to the OpenAI API from applications written in the Python language. - GitHub - openai/openai-python: The OpenAI Python library provides convenient access to the OpenAI API from applications written in the Python language.https://github.com/openai/openai-python/我没怎么看,但看起来没有给stream案例,只是给了request的案例,如果只是request的那其实就挺简单了,就没啥讲的了
不用官网的openai库,根据开发文档,直接发送request请求也可以,这里给的是一位大佬的请求方式,用的是httpx,大家可以自行学习下,知乎有一篇比较文
这是一个生成器函数,通过yield函数,yield 很多地方都讲得很晦涩难懂,《你不知道的javascript》中非常简洁地说,这就是一个return,只是对于生成器来说,return次数要进行多次,所以搞了一个yield用来区分同步函数的return,而且return的意义还有停止下面的代码,而返回数据的意思,两者还是有点差异,但是yield就是return,多次返回的return
from sse_starlette import EventSourceResponse
AI给出的EventSourceResponse解读,自己也可以把EventSourceResponse源码丢给Claude,让其看过,给你解读,都是好方法
下面给下EventSourceResponse的FastAPI简单案例代码,让大家玩起来
- import uvicorn
- import asyncio
- from fastapi import FastAPI, Request
- from fastapi.middleware.cors import CORSMiddleware
- from sse_starlette.sse import EventSourceResponse
-
- times = 0
- app = FastAPI()
-
- origins = [
- "*"
- ]
-
- app.add_middleware(
- CORSMiddleware,
- allow_origins=origins,
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
-
-
- @app.post("/sse/data")
- async def root(request: Request):
- event_generator = status_event_generator(request)
- return EventSourceResponse(event_generator)
-
-
- status_stream_delay = 1 # second
- status_stream_retry_timeout = 30000 # milisecond
-
-
- # 其实就是绑定函数事件 一直在跑循环
- async def status_event_generator(request):
- global times
- while True:
- if not await request.is_disconnected() == True:
- yield {
- "event": "message",
- "retry": status_stream_retry_timeout,
- "data": "data:" + "times" + str(times) + "\n\n"
- }
- print("alive")
- times += 1
- await asyncio.sleep(status_stream_delay)
-
-
- if __name__ == "__main__":
- uvicorn.run(app, host="0.0.0.0", port=8000, log_level='info')
大家对照着上面的讲解,就能把代码搞出来
- fetch('http://localhost:8000/sse/data', {
- method: 'POST',
- headers: {
- 'Content-Type': 'application/json',
- },
- body: JSON.stringify({
- text: "hello"
- })
- }).then(async response=>{
- const reader = response.body.pipeThrough(new TextDecoderStream()).getReader()
- while (true) {
- let {value, done} = await reader.read();
- if (done)
- break;
- if (value.indexOf("data:") < 0 || value.indexOf('event: ping') >= 0)
- continue;
- // console.log('Received~~:', value);
- let values = value.split("\r\n")
- for (let i = 0; i < values.length; i++) {
- let _v = values[i].replace("data:", "")
- // console.log(_v)
- if (_v.trim() === '')
- continue
- console.log(_v)
- }
- }
- }
- ).catch(error=>{
- console.error(error);
- }
- );
因为自己的业务代码有很多鉴权和数据库操作,就不便放出来了,大家根据自己的所需,可以在这个简单的代码基础上,只要自己写生成器函数即可
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。