赞
踩
SSE(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件。我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式
在web端消息推送功能中,由于传统的HTTP协议是由客户端主动发起请求,服务端才会响应。基本的ajax轮询技术便是如此。而在SSE中,浏览发送一个请求给服务端,通过响应头中的Content-Type:text/event-stream等向客户端声名这是一个长连接,发送的是流数据,这样客户端就不会关闭连接,一直等待服务端发送数据。
如果服务器返回的数据中包含了事件标识符,浏览器会记录最后一次接收的事件的标识符。如果与服务器的连接中断,当浏览器再次进行连接时,会通过头来声明最后一次接收的事件的标识符。服务器端可以通过浏览器发送的事件标识符来确定从哪个事件来继续连接
使用eventsource完成get请求
缺点:客户端无法通过一个get请求完成数据传递
参考文档:
EventSource - Web API 接口参考 | MDN
实现流程:
代码实现:
- const eventSourceRef = useRef<any>(null)
- const contact = async (messageData: any) => {
- eventSourceRef.current = new EventSource(
- `${API_BASE}/v1/model/stream?id=${id}`,
- )
- if (!eventSourceRef.current) return
- // 监听 SSE 事件,因为后端定义了具名事件,所以这儿要用addEventListener监听,而不是onmessage
- eventSourceRef.current.addEventListener('add', function (e: any) {
- // 处理数据展示
- })
- eventSourceRef.current.addEventListener('finish', function (e: any) {
- // 结束标识finish
- eventSourceRef.current.close() // 关闭连接
- })
- eventSourceRef.current.addEventListener('error', function (e: any) {
- if (e.status === 401) {
- // 用户登录状态失效处理
- }
- // error报错处理
- console.log('Error occurred:', e)
- // 关闭连接
- eventSourceRef.current.close()
- })
- }
使用fetch-event-source完成连接,仅需一个接口,支持添加请求头
缺点:在浏览器返回的text/eventstream里看不到具体返回,无法进行预览
参考文档:
实现流程:
具体实现:
- const eventSourceRef = useRef<any>(null)
- const [abortController, setAbortController] = useState(new AbortController())
- // 通信事件
- const contact = async (messageData: any) => {
- messageData = { ...messageData, do_stream: modelArg.do_stream } // 请求参数
- receivedDataRef.current = ''
- const token: string = getLocal('AUTHCODE') || ''
- fetchEventSource(`${MAAS_API_BASE}/v1/model_api/invoke`, {
- method: 'POST',
- // 添加请求头
- headers: {
- Authorization: token,
- 'Content-Type': 'application/json',
- },
- // 传参必须保证是json
- body: JSON.stringify(messageData),
- // abortController.signal 提供了一个信号对象给 fetchEventSource 函数。
- // 如果在任何时候你想取消正在进行的 fetch 操作,你可以调用
- // abortController.abort()。这会发出关联任务的信号,你可以使用
- // AbortController 的信号来检查异步操作是否已被取消。
- signal: abortController.signal,
- openWhenHidden: true, // 切换标签页时连接不关闭
- async onopen(resp) {
- // 处理登录失效
- if (resp.status === 401) {
- message.warning('登录过期')
- return
- }
- },
- onmessage(msg: any) {
- const eventType = msg.event // 监听event的具名事件
- switch (eventType) {
- case 'add':
- // 流式输出事件,add每次会返回具体字符,前端负责拼接展示
- break
- case 'finish':
- setStatu('finish') // 结束标识
- break
- case 'error':
- if (msg.status === 401) {
- message.warning('登录过期')
- }
- console.log('Error occurred:', e)
- break
- }
- },
- onerror(err) {
- throw err // 连接遇到http错误时,如跨域等,必须要throw才能停止,不然会一直重连
- },
- onclose() {},
- })
- }
-
- // 终止连接方法,比如在切换模型时,你可能有必要终止上一次连接来避免问答串联
- const closeSSE = () => {
- abortController.abort()
- setAbortController(new AbortController())
- }
同上post方法
- fetchEventSource(sseUrl, {
- method: 'POST',
- headers,
- signal: abortController.signal,
- body: JSON.stringify(customInferData),
- openWhenHidden: true,
- /**
- *在onopen阶段处理
- 第一步:判断resp.headers.get('content-type'),如果不包含text/event-stream,
- 则代表非流式
- 第二步:需要在onopen阶段处理非流式返回,即json返回,读取json返回并渲染,注意异常也要处理
- 第三步:
- */
- async onopen(resp) {
- const contentype = resp.headers.get('content-type') || ''
- console.log('contentype =>', contentype)
- console.log('resp.ok =>', resp.ok)
- if (resp.ok && !contentype.includes('text/event-stream')) {
- // 读取json数据
- const responseData = await resp.json()
- if (responseData.code !== 0) {
- // 报错处理+关闭连接
- } else {
- //处理数据渲染+关闭连接
-
- stopSession()
- }
- } else if (resp.status === 401) {
- message.warning('登录过期')
- // 报错处理+关闭连接
- stopSession()
-
- }
- },
- onmessage(msg: any) {
- const eventType = msg.event
- const messages: any = cloneDeep(chatState.sessionMessages)
- let lastMessage: any = messages[messages.length - 1] || {}
-
- switch (eventType) {
- case 'add':
- lastMessage = {
- ...lastMessage,
- text: `${lastMessage.text}${msg.data || ' '}`,
- loading: false,
- }
- messages.splice(messages.length - 1, 1, lastMessage)
- chatAction.updateSessionMessages(messages)
- break
- case 'finish':
- console.log('finish lastMessage =>', lastMessage)
- chatAction.updateSessionStatu(SessionStatuTypes.ready)
- chatAction.updateContext(msg.data)
- break
- case 'info':
- {
- const messages: any = cloneDeep(chatState.sessionMessages)
- let lastMessage: any = messages[messages.length - 1] || {}
- lastMessage = {
- referenceDocs: JSON.parse(msg.data).reference_by_docs,
- ...lastMessage,
- }
- messages.splice(messages.length - 1, 1, lastMessage)
- chatAction.updateSessionMessages(messages)
- }
- break
- case 'error':
- if (msg.status === 401) {
- chatAction.updateSessionStatu(SessionStatuTypes.ready)
- } else {
- errorItemFn(msg?.msg || msg?.data || '抱歉,暂无法回答问题')
- }
- break
- }
- },
- onerror(err: any) {
- errorItemFn(err?.msg || '抱歉,暂无法回答该问题')
- console.log('eventSource error: ', `${err}`)
- throw err // 连接遇到http错误时,如跨域等,必须要throw才能停止,不然会一直重连
- },
- onclose() {
- console.log('eventSource close')
- },
- })
-
- // 终止会话
- const stopSession = () => {
- abortController.abort()
- setAbortController(new AbortController())
- }
应用fetch-event-source解决
应用fetch-event-source在onopen阶段处理非流式输出,如报错、接口json返回等
应用fetch-event-source在监听具名事件时,如error,将错误throw err,否则无法中断连接
服务器响应状态码应该为 200,header 为 Content-Type: text/event-stream
,然后保持此连接并以一种特殊的格式写入消息,就像这样:
- data: Message 1
-
- data: Message 2
-
- data: Message 3
- data: of two lines
data:
后为消息文本,冒号后面的空格是可选的。
这个可选便是主要原因。查询社区后,找到了两种解决办法:
如:data: {"content":" 1234"}
具体表现,点击放大查看:
- const stopSession = () => {
- abortController.abort()
- setAbortController(new AbortController())
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。