赞
踩
在 OpenAI 的 API 中,SSE 通常用于实现实时数据传输。例如,在聊天模型(如 ChatGPT)中,使用 SSE 可以让客户端实时接收到生成的对话内容,而不需要等待整个响应完成。
EventSource会发送一个get类型的http请求,也可以通过fetch实现post类型的请求。我们这里使用fetch,SSE本质是字节流的传输,fetch中处理对应的字节流信息,同样可以实现EventSource的功能
SSE即Server-Sent
是HTML5提出一个标准。由客户端发起与服务器之间创建TCP连接,然后并维持这个连接,直到客户端或服务器中的任何一方断开。HTTP响应内容有一种特殊的content-type:text/event-stream
,该响应头标识了响应内容为事件流,客户端不会关闭连接,而是等待服务端不断得发送响应结果。
服务器通过 SSE 向客户端发送的每条消息通常包含以下几部分:
每条消息以两个换行符(\n\n)结束。
// sse 返回数据模型如下
data:{"id":"1805489381010771970","object":"chat.completion.chunk","created":1719297188,"model":"gpt-4o-2024-05-13","choices":[{"index":0,"delta":{"role":"assistant","content":""},"message":null,"finishReason":null}],"usage":null}
data:{"id":"1805489381010771970","object":"chat.completion.chunk","created":1719297188,"model":"gpt-4o-2024-05-13","choices":[{"index":0,"delta":{"content":"你好"},"message":null,"finishReason":null}],"usage":null}
data:{"id":"1805489381010771970","object":"chat.completion.chunk","created":1719297188,"model":"gpt-4o-2024-05-13","choices":[{"index":0,"delta":{"content":"!"},"message":null,"finishReason":null}],"usage":null}
data:{"id":"1805489381010771970","object":"chat.completion.chunk","created":1719297188,"model":"gpt-4o-2024-05-13","choices":[{"index":0,"delta":{},"message":null,"finishReason":"stop"}],"usage":null}
data:[DONE]
提示:关于流式获取,我们在业务开发中如果后端服务正常的话响应头Content-Type会是text/event-stream,否则为application/json。
封装一个工具方法来处理流式内容,并且在该方法中,使用者可以通过回调函数的形式来获取内容、状态等信息。
注意:需要通过fetch来创建请求,请求完成之后body为ReadableStream,可以获取它的读取器,一块一块的来读取数据,由于读取的数据为字节流数据,所以需要把字节流转换成字符串。为了更好的解析sse响应数据,这里使用了eventsource-parser第三方库,在读取过程中来拼接文本内容
import { ParseEvent, createParser } from 'eventsource-parser'; import emitter, { EmitterEvents } from './emitter'; interface Options<T> { params?: T; onSucess?: (text: string, done: boolean, streamData: { id: string } | null) => void; onError?: (error: { code: number; msg: string }) => void; } const sse = async function <T>(url: string, options?: Options<T>) { const userInfo = JSON.parse(localStorage.getItem('userInfo') ?? '{}'); let responseText = ''; let streamData: any = null; if (!userInfo?.jwt) { // 处理无token emitter.emit(EmitterEvents.ON_LOGIN, true); return; } function onParse(event: ParseEvent) { if (event.type === 'event') { const data = event.data; if (data === '[DONE]') { options?.onSucess?.(responseText, true, streamData); return; } streamData = JSON.parse(data); const text = streamData.choices[0].delta?.content || ''; responseText += text; // 尝试解析敏感词检测结果 try { const match = responseText.match(/```json\n(.*?)"code":10013(.*?)```/g); if (match) { const jsonStr = match .pop()! .replace(/```json\n/, '') .replace(/```$/, ''); const sensitiveObj = JSON.parse(jsonStr); if (sensitiveObj.DONE === 'DONE') { responseText = sensitiveObj.filteredText || '**'; return; } } } catch (e) { console.log(e); } options?.onSucess?.(responseText, false, streamData); } } const res = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json', token: userInfo.jwt, }, body: JSON.stringify(options?.params), }); if (res.ok) { const contentType = res.headers.get('Content-Type') ?? ''; if (!contentType.includes('stream')) { const data = await res.json(); options?.onError?.({ code: data.code, msg: data.msg }); return; } const parser = createParser(onParse); const reader = res.body?.getReader(); const decoder = new TextDecoder(); while (true) { const content = await reader?.read(); const decodeContent = decoder.decode(content?.value); if (content?.done) { break; } parser.feed(decodeContent); } } else { options?.onError?.({ code: res.status, msg: res.statusText }); } }; export default sse;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。