当前位置:   article > 正文

AIGC 在前端流式获取内容SSE

AIGC 在前端流式获取内容SSE

AIGC 在前端流式获取内容SSE

简介

在 OpenAI 的 API 中,SSE 通常用于实现实时数据传输。例如,在聊天模型(如 ChatGPT)中,使用 SSE 可以让客户端实时接收到生成的对话内容,而不需要等待整个响应完成。

EventSource会发送一个get类型的http请求,也可以通过fetch实现post类型的请求。我们这里使用fetch,SSE本质是字节流的传输,fetch中处理对应的字节流信息,同样可以实现EventSource的功能

SSE即Server-Sent 是HTML5提出一个标准。由客户端发起与服务器之间创建TCP连接,然后并维持这个连接,直到客户端或服务器中的任何一方断开。HTTP响应内容有一种特殊的content-type:text/event-stream,该响应头标识了响应内容为事件流,客户端不会关闭连接,而是等待服务端不断得发送响应结果。

服务器通过 SSE 向客户端发送的每条消息通常包含以下几部分:

  1. 事件类型(可选):指定事件的类型。
  2. 数据:事件的主要数据内容。
  3. ID(可选):事件的唯一标识符。
  4. 重试时间(可选):客户端在连接断开后重试连接的时间间隔。

每条消息以两个换行符(\n\n)结束。

// sse 返回数据模型如下

data:{"id":"1805489381010771970","object":"chat.completion.chunk","created":1719297188,"model":"gpt-4o-2024-05-13","choices":[{"index":0,"delta":{"role":"assistant","content":""},"message":null,"finishReason":null}],"usage":null}

data:{"id":"1805489381010771970","object":"chat.completion.chunk","created":1719297188,"model":"gpt-4o-2024-05-13","choices":[{"index":0,"delta":{"content":"你好"},"message":null,"finishReason":null}],"usage":null}

data:{"id":"1805489381010771970","object":"chat.completion.chunk","created":1719297188,"model":"gpt-4o-2024-05-13","choices":[{"index":0,"delta":{"content":"!"},"message":null,"finishReason":null}],"usage":null}

data:{"id":"1805489381010771970","object":"chat.completion.chunk","created":1719297188,"model":"gpt-4o-2024-05-13","choices":[{"index":0,"delta":{},"message":null,"finishReason":"stop"}],"usage":null}

data:[DONE]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

具体实现

提示:关于流式获取,我们在业务开发中如果后端服务正常的话响应头Content-Type会是text/event-stream,否则为application/json。

封装一个工具方法来处理流式内容,并且在该方法中,使用者可以通过回调函数的形式来获取内容、状态等信息。

注意:需要通过fetch来创建请求,请求完成之后body为ReadableStream,可以获取它的读取器,一块一块的来读取数据,由于读取的数据为字节流数据,所以需要把字节流转换成字符串。为了更好的解析sse响应数据,这里使用了eventsource-parser第三方库,在读取过程中来拼接文本内容

import { ParseEvent, createParser } from 'eventsource-parser';
import emitter, { EmitterEvents } from './emitter';

interface Options<T> {
  params?: T;
  onSucess?: (text: string, done: boolean, streamData: { id: string } | null) => void;
  onError?: (error: { code: number; msg: string }) => void;
}

const sse = async function <T>(url: string, options?: Options<T>) {
  const userInfo = JSON.parse(localStorage.getItem('userInfo') ?? '{}');
  let responseText = '';
  let streamData: any = null;

  if (!userInfo?.jwt) {
    // 处理无token
    emitter.emit(EmitterEvents.ON_LOGIN, true);
    return;
  }

  function onParse(event: ParseEvent) {
    if (event.type === 'event') {
      const data = event.data;

      if (data === '[DONE]') {
        options?.onSucess?.(responseText, true, streamData);
        return;
      }

      streamData = JSON.parse(data);

      const text = streamData.choices[0].delta?.content || '';

      responseText += text;

      // 尝试解析敏感词检测结果
      try {
        const match = responseText.match(/```json\n(.*?)"code":10013(.*?)```/g);
        if (match) {
          const jsonStr = match
            .pop()!
            .replace(/```json\n/, '')
            .replace(/```$/, '');
          const sensitiveObj = JSON.parse(jsonStr);
          if (sensitiveObj.DONE === 'DONE') {
            responseText = sensitiveObj.filteredText || '**';
            return;
          }
        }
      } catch (e) {
        console.log(e);
      }

      options?.onSucess?.(responseText, false, streamData);
    }
  }

  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      token: userInfo.jwt,
    },
    body: JSON.stringify(options?.params),
  });

  if (res.ok) {
    const contentType = res.headers.get('Content-Type') ?? '';

    if (!contentType.includes('stream')) {
      const data = await res.json();
      options?.onError?.({ code: data.code, msg: data.msg });
      return;
    }

    const parser = createParser(onParse);
    const reader = res.body?.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const content = await reader?.read();
      const decodeContent = decoder.decode(content?.value);
      
      if (content?.done) {
        break;
      }

      parser.feed(decodeContent);
    }
  } else {
    options?.onError?.({ code: res.status, msg: res.statusText });
  }
};

export default sse;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号