当前位置:   article > 正文

【项目方案】OpenAI流式请求实现方案_event-source-polyfill

event-source-polyfill


实现目的

streamOpenAI API中的一个参数,用于控制请求的模式。当将stream参数设置为True时,API会以流式方式返回响应,即在请求过程中会立即返回部分结果,而不需要等待完整的响应
这使得实时交互、大规模数据处理和逐步呈现结果成为可能。
ChatGpt回答stream模式与普通模式区别
我们的AI Assistant,本质上也是一个聊天机器人,因此非常需要流式方式返回来以提高交互性和响应速度
通过使用流式方式返回响应,我们可以在用户输入后立即开始处理,并逐步返回部分结果,使用户能够更快地获得反馈,以及更加流畅的交互体验。
同时,它更能够让用户在等待完整响应的同时,逐步阅读和理解部分结果,以便更连贯地进行对话。

效果比对

下面是两种请求方式的交互示意以及效果对比

非stream模式
  • 交互示意
    在这里插入图片描述
  • 实际效果
    非stream模式请求OpenAI——等待时间久,直接显示
stream模式
  • 交互示意

  • 实际效果
    stream模式请求OpenAI——等待时间短,逐步显示

实现方案

方案思路总体描述

前端采用**EventSource**来进行请求服务器端的实时数据更新。EventSource提供了一种简单的、基于事件的方式来接收服务器端的实时事件通知。通过使用EventSource,前端可以建立持久的、基于HTTP的连接,以接收服务器端的实时数据更新。
后端采用**Reactor Flux**技术来处理异步数据流,它是Reactive Streams规范中的一种数据流处理规范。Flux可以处理非阻塞的异步数据流,并具备背压处理能力,从而实现高效的响应式编程。通过使用Flux,后端可以有效地处理大量的异步数据流,并根据消费者的处理能力进行背压控制,确保数据流的稳定和高效处理。
结合前端的EventSource和后端的Flux,我们可以简单地建立起一个实时数据更新的机制。
前端通过EventSource与后端建立持久连接,一旦后端有新的数据更新,就会通过EventSource将数据推送给前端。前端接收到数据后,可以根据需要进行相应的处理和展示。

前端

前端前后换了4种方案,最终使用了event-source-polyfill来进行处理。

方案对比

几种方案的对比如下:

方案实现参考优缺点使用注意
sse.jssse.js无需第三方依赖,不适用于React需要显式调用stream()方法发起请求
sse.jsxSSE.jsx无需第三方依赖,使用于React,但是打包后无效需要显式调用stream()方法发起请求
react-native-ssehttps://www.npmjs.com/package/react-native-sse/v/1.0.1仅适用于RN,不适用于fishx(打包后可用,但是会导致基于React构建的fishx项目启动报错)支持POST\GET无需显式调用open()方法,否则可能出现连续请求的问题
event-source-polyfillhttps://www.npmjs.com/package/event-source-polyfill适用于fishx. 但只支持GET请求(如果想传参则需要采用url带参的形式)无需显式调用open()方法,否则可能出现连续请求的问题

event-source-polyfill代码示例
import { EventSourcePolyfill } from 'event-source-polyfill';

const streamChatWithBSS = (value, now) => {
    let es = null;
    try {
      // 1.生成带参URL
      const baseUrl = '/chat';
      const params = {
      }
      const queryString = Object.keys(params)
        .map(key => `${encodeURIComponent(key)}=${encodeURIComponent(params[key])}`)
        .join('&');
      const url = `${baseUrl}?${queryString}`;

      // 2.构造EventSourcePolyfill对象
      es = new EventSourcePolyfill(url, {});

      // 3.监听open (请求第一次响应时会触发)
      es.addEventListener('open', event => {
        // console.log(loading);
      });
      
      const currentIndex = chatMessageList.length + 1;

      // 4.监听message(请求返回信息流过程中连续触发)
      es.addEventListener('message', res => {
        if (!res || !res.data) {
          return;
        }
        let resJson = JSON.parse(res.data);

        // 4.1 返回异常
        if (resJson.code !== 0) {
          es.close();
          return;
        }
        resJson = resJson.data;

        // 4.2  第一条消息,标记记录开始,在聊天框新增一条消息
        if (resJson.begin === true) {
          selectedSceneSessionId.current = resJson?.sceneSessionId;
          setChatMessageList(res => {
            res.push(resJson);
          });
          setLoading(false);
        }
        // 4.3 更新 (后续消息都是在更新新增的这条消息内容,以达到打字效果)
        else {
          setTimeout(() => {
            setChatMessageList(res => {
              const updatedList = res.map((item, index) => {
                if (index === currentIndex) {
                  return {
                    ...item,
                    content: item.content + resJson.content,
                  };
                }
                return item;
              });
              return [...updatedList];
            });
            // 4.4 信息流结束
            if (resJson.end === true) {
              es.close();
              setStreamLoading(false);
            }
            toBottomFn();
          }, 1);
        }
      });

      // 5.监听error
      es.addEventListener('error', event => {
        if (event.type === 'error') {
          console.error('Connection error:', event.message);
        } else if (event.type === 'exception') {
          console.error('Error:', event.message, event.error);
        }
        if (es) {
          es.close();
        }
        setLoading(false);
        setStreamLoading(false);
      });

      // 监听close
      es.addEventListener('close', event => {
        setStreamLoading(false);
        console.log('Close SSE connection.');
      });
    } catch (error) {
      console.log(error);
      setLoading(false);
      setStreamLoading(false);
    } finally {
      if (textAreaRef.current != null) {
        textAreaRef.current.focus();
      }
    }
  };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100

前端实现遇到的问题与解决方法
  • React会合并刷新 导致无法正常显示打字效果

问题描述:上面代码示例 4.3中,我们通过不断接受信息流并更新消息内容(setState)来实现流式输出(打字效果),但是在最开始测试时发现,虽然前端不断接受到信息流并且连续进行了setState, 但界面并没有按预期的进行连续渲染,还是在信息流完全返回结束后(即最后一次setState)后才进行了渲染,消息才得以展现。
问题原因:React对连续的setState会合并以减少渲染次数进而提升性能
解决方法:利用setTimeout对 setState进行隔离,防止合并刷新

  • 使用react-native-sse时出现,出现连续请求两次的问题

问题描述:使用react-native-sse开发时,发现会连续调用请求两次。最开始以为是因为是复杂请求,所以会有一次预检请求。但是看控制台记录,并不是预检请求。这导致最终记录了两次Message。
问题原因:代码原因,对于使用sse.js,我们需要现实地调用steam()方法来发起请求,而在用react-native-sse改造时,想当然地也用了类似的open()方法进行发起请求。然而它其实在构造sse对象时就会默认发起请求,无需额外的再次调用open()。
解决方法:去除调用open()的逻辑。

  • event-source-polyfill 不支持POST请求,需要改为GET

问题描述:由于react-native-sse不适用于基于React构建的fishx项目,所以采用evetn-source-polyfill来替代。然而改造后发现请求报错。虽然设置了method为POST,但实际上查看控制台记录,请求仍然是GET。
问题原因:查询文档才知道evetn-source-polyfill仅支持GET请求。然而我们需要在GET请求发送一些参数,例如聊天信息等。
解决方法:改为GET请求,参数采用带参URL的形式。构造带参URL示例代码如下:

const baseUrl = 'baseURL';
const params = {
}
const queryString = Object.keys(params)
  .map(key => `${encodeURIComponent(key)}=${encodeURIComponent(params[key])}`)
  .join('&');
const url = `${baseUrl}?${queryString}`;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

后端

参考资料

reactor-core-feature文档开源项目chatgpt-stream

时序图

在这里插入图片描述

关键代码示例
// 需要将produces设置为MediaType.TEXT_EVENT_STREAM_VALUE,表示响应类型是文本事件流
@RequestMapping(value = "/chat", method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ApiOperation(value = "进行对话")
public @ResponseBody Flux<String> chat(@RequestParam("params") String params) {
    return Service.send(params);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
public Flux<String> send(params) {
        ChatMessage chatMessage = new ChatMessage();
        chatMessage.setContent(params);
        return Flux.create(emitter -> {
            StreamSubscriber subscriber = new StreamSubscriber(emitter, this, params);
            Flux<String> openAiResponse =
                    webClient.getChatResponse(params);
            openAiResponse.subscribe(subscriber);
            emitter(FluxSink).onDispose(subscriber);
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
return webClient.post()
            .uri(url)
            .header(HttpHeaders.AUTHORIZATION, token)
            .body(BodyInserters.fromObject(body.toJSONString()))
            .retrieve()
            .bodyToFlux(String.class)
            .onErrorResume(WebClientResponseException.class, ex -> {
                HttpStatus status = ex.getStatusCode();
                String res = ex.getResponseBodyAsString();
                return Mono.error(new RuntimeException(res));
            });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
public void onNext(String data) {
    ChatMessageRes res = new ChatMessageRes();
	// 最后一条消息
    if ("[DONE]".equals(data)) {
        emitter.next(JSON.toJSONString(R.success(res)));
        // 记录聊天记录到数据库
        completedCallBack.completed(questions, sessionId, sceneId, sceneSessionId, sb.toString(), saveMessage);
        // 结束推送
        emitter.complete();
    } else {
        // 标记下第一条消息
        if (isFirst) {
            res.setBegin(true);
            isFirst = false;
        }
        // 解析消息内容并构造返回消息
        String content = null;
        OpenAiResponse openAiResponse = parseData2Json(data);
        content = openAiResponse.getContent();
        res.setMessage(content);
        res.setContent(content);
        R r = R.success(res);
        String str = JSON.toJSONString(r);
        // 推送
        emitter.next(str);
        // 同时拼接完整消息,方便结束后记录
        sb.append(content);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

后端实现时遇到的问题与解决方法
  • 消息积压导致解析报错,前端消息显示一半后卡住

问题描述:stream方式调用私有模型时,会出现消息展示一半后停止的问题
问题原因:对于返回的信息流,预期是每次都只有一个消息(即上面subscriber.onNext()方法入参,可以解析为一个JSON对象),但是调用公司私有模型时,会偶发返回两个JSON对象一起返回的情况(两个JSON字符串拼接为一个字符串),导致解析该数据为JSON对象报错,不能继续处理。具体原因未确定,本地开发环境无法复现,目前猜测可能是网络问题。
解决方法:对于这类情况进行兼容处理,即将消息内容进行合并,先对data进行预处理,尝试解析,如果解析异常,则应该是出现了消息积压,那么就对起进行特殊处理——将content进行合并。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/803414
推荐阅读
相关标签
  

闽ICP备14008679号