当前位置:   article > 正文

ChatGpt接口流式输出解决方案_chatgpt中text/event-stream形式返回的数据,获取不到里面的content

chatgpt中text/event-stream形式返回的数据,获取不到里面的content

阿里云通义千问ChatAPI事件流传输案例

核心方案:基于http text/event-stream 事件流

请求配置

  /**
   * 通义千问流式请求
   * @param messages
   * @param modelType
   */
  async TYQWSteamRequest(messages: chatBaseMessages[], modelType: string) {
    const payload = {
      model: modelType || 'qwen-max', // 模型名称 必填
      input: {
        messages: messages.map(({ role, content }) => ({ role, content })),
      },
      stream: true,
    };
    // 请求埋点
    this.logger.info(`TYQWSteamRequest: reqBody =>${JSON.stringify(payload)};`);
    const response = await this.ctx.curl(
      'https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation',
      {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          Accept: 'text/event-stream',
          'X-DashScope-SSE': 'enable',
        },
        streaming: true, // 启用流式响应
        // 请求数据类型
        data: payload,
        timeout: 60 * 1000,
      }
    );
    // 处理流式响应
    if (response.status === 200) {
      // 获取node原生的响应对象
      const stream = response.res;
      return stream;
    } else {
      console.error(`请求失败,状态码: ${response.status}`);
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

服务中间流程处理

/**
 * 提取stream流中的data部分
 * @param streamData
 * @returns
 */
export const getStreamDataWithJson = (streamData: string) => {
  const dataRegex = /data:({.*})/;
  const matches = streamData.match(dataRegex);
  if (matches && matches.length > 1) {
    return matches[1];
  }
  return null;
};

/**
 * 统一处理stream流数据格式
 * @param streamData
 * @param modelType
 */
export const streamFormater = (streamData: string, modelType: string) => {
  if (qytwModels.some(({ value }) => value === modelType)) {
    const fromatData = getStreamDataWithJson(streamData);
    const objectData = JSON.parse(fromatData);
    return objectData?.output?.text;
  }
  throw new Error('模型暂不支持');
};


/**
   * 数据流对话接口
   * @param sendChatMessageParams
   */
  @Post('/send_chat_message_by_stream')
  async sendChatMessageByStream(@Body() body: sendChatMessageParams) {
    const { sessionKey, input, modelType } = body;
    if (!sessionKey) throw new Error('缺少参数[sessionKey]');
    if (!input) throw new Error('缺少参数[input]');
    if (!modelType) throw new Error('缺少参数[modelType]');
    // 1.历史消息
    const { messages, creator } = await this.AiChatService.getAiChatDetailData(
      sessionKey
    );
    if (creator && creator !== this.ctx.user.workid)
      throw new Error('禁止操作他人的会话');
    // 2. 开始对话
    let chatResult = ``;
    const reqTime = moment().format('YYYY-MM-DD HH:mm:ss');
    const stream = await this.AiChatService.sendChatMessageByStream(
      messages,
      input,
      modelType
    );
    // 3.处理响应
    // 3.1 设置客户端响应头,这里是返回给浏览器
    this.ctx.status = 200;
    this.ctx.set({
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive',
    });
    // 3.2 监听数据变化
    stream.on('data', chunk => {
      const textData = chunk.toString();
      const formatData = streamFormater(textData, modelType);
      chatResult = formatData;
      this.ctx.res.write(JSON.stringify(this.HttpReturn.success(formatData)));
    });

    // 3.3 监听结束事件
    stream.on('end', async () => {
      const resTime = moment().format('YYYY-MM-DD HH:mm:ss');
      // 3.3.1 记录对话信息
      await this.AiChatService.setMessagesBySessionKey(sessionKey, {
        modelType,
        messages: [
          ...messages,
          {
            role: roleType.user,
            content: input,
            time: reqTime,
          },
          {
            role: roleType.assistant,
            content: chatResult,
            time: resTime,
          },
        ],
      });
      // 3.3.2 标记对话完成
      this.ctx.res.write(
        JSON.stringify(this.HttpReturn.success(`event-stream:end`))
      );
    });

    // 3.4 监听错误事件
    stream.on('error', err => {
      this.ctx.res.write(`event-stream:error[${err.toString()}]`);
      throw new Error(`Stream error: ${err}`);
    });

    // 保持长链
    return new Promise(resolve => {
      this.ctx.req.on('end', resolve);
    });
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106

客户端接收

// 发起 POST 请求并处理 text/event-stream
import { ElMessage, ElNotification } from 'element-plus';

export function sendChatMessageByStreamAPI(payLoad, callback) {
  fetch(`/api/chat/send_chat_message_by_stream`, {
    method: 'POST',
    headers: {
      Accept: 'text/event-stream',
      'Content-Type': 'application/json',
    },
    body: JSON.stringify(payLoad),
  })
    .then((response) => {
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let isFirst = true;
      function read() {
        reader.read().then(({ value, done }) => {
          if (done) return;
          const text = decoder.decode(value, { stream: true });
          const { data, success, message } = JSON.parse(text);
          // 1. 报错
          if (!success) {
            ElNotification({
              title: 'Error',
              message,
              type: 'error',
              dangerouslyUseHTMLString: true,
              zIndex: 9999999,
              customClass: 'network-error-notification',
            });
            callback(`event-stream:error`);
            return;
          }
          // 2. 正常开始
          if (isFirst) {
            callback(`event-stream:start`);
          }
          // 3. 正常流转
          callback(data);
          read();
          isFirst = false;
        });
      }
      read();
    })
    .catch((error) => {
      ElMessage.error(`Fetch error:',${error}`);
      console.error('Fetch error:', error);
    });
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/518000
推荐阅读
相关标签
  

闽ICP备14008679号