赞
踩
stream
是OpenAI API
中的一个参数,用于控制请求的模式。当将stream
参数设置为True
时,API
会以流式方式返回响应,即在请求过程中会立即返回部分结果,而不需要等待完整的响应。
这使得实时交互、大规模数据处理和逐步呈现结果成为可能。
我们的AI Assistant,本质上也是一个聊天机器人,因此非常需要流式方式返回来以提高交互性和响应速度。
通过使用流式方式返回响应,我们可以在用户输入后立即开始处理,并逐步返回部分结果,使用户能够更快地获得反馈,以及更加流畅的交互体验。
同时,它更能够让用户在等待完整响应的同时,逐步阅读和理解部分结果,以便更连贯地进行对话。
下面是两种请求方式的交互示意以及效果对比
交互示意
实际效果
前端采用**EventSource**
来进行请求服务器端的实时数据更新。EventSource提供了一种简单的、基于事件的方式来接收服务器端的实时事件通知。通过使用EventSource,前端可以建立持久的、基于HTTP的连接,以接收服务器端的实时数据更新。
后端采用**Reactor Flux**
技术来处理异步数据流,它是Reactive Streams
规范中的一种数据流处理规范。Flux可以处理非阻塞的异步数据流,并具备背压处理能力,从而实现高效的响应式编程。通过使用Flux,后端可以有效地处理大量的异步数据流,并根据消费者的处理能力进行背压控制,确保数据流的稳定和高效处理。
结合前端的EventSource
和后端的Flux
,我们可以简单地建立起一个实时数据更新的机制。
前端通过EventSource与后端建立持久连接,一旦后端有新的数据更新,就会通过EventSource将数据推送给前端。前端接收到数据后,可以根据需要进行相应的处理和展示。
前端前后换了4种方案,最终使用了event-source-polyfill
来进行处理。
几种方案的对比如下:
方案 | 实现参考 | 优缺点 | 使用注意 |
---|---|---|---|
sse.js | sse.js | 无需第三方依赖,不适用于React | 需要显式调用stream()方法发起请求 |
sse.jsx | SSE.jsx | 无需第三方依赖,使用于React,但是打包后无效 | 需要显式调用stream()方法发起请求 |
react-native-sse | https://www.npmjs.com/package/react-native-sse/v/1.0.1 | 仅适用于RN,不适用于fishx(打包后可用,但是会导致基于React构建的fishx项目启动报错)支持POST\GET | 无需显式调用open()方法,否则可能出现连续请求的问题 |
event-source-polyfill | https://www.npmjs.com/package/event-source-polyfill | 适用于fishx. 但只支持GET请求(如果想传参则需要采用url带参的形式) | 无需显式调用open()方法,否则可能出现连续请求的问题 |
import { EventSourcePolyfill } from 'event-source-polyfill';
const streamChatWithBSS = (value, now) => {
let es = null;
try {
// 1.生成带参URL
const baseUrl = '/chat';
const params = {
}
const queryString = Object.keys(params)
.map(key => `${encodeURIComponent(key)}=${encodeURIComponent(params[key])}`)
.join('&');
const url = `${baseUrl}?${queryString}`;
// 2.构造EventSourcePolyfill对象
es = new EventSourcePolyfill(url, {});
// 3.监听open (请求第一次响应时会触发)
es.addEventListener('open', event => {
// console.log(loading);
});
const currentIndex = chatMessageList.length + 1;
// 4.监听message(请求返回信息流过程中连续触发)
es.addEventListener('message', res => {
if (!res || !res.data) {
return;
}
let resJson = JSON.parse(res.data);
// 4.1 返回异常
if (resJson.code !== 0) {
es.close();
return;
}
resJson = resJson.data;
// 4.2 第一条消息,标记记录开始,在聊天框新增一条消息
if (resJson.begin === true) {
selectedSceneSessionId.current = resJson?.sceneSessionId;
setChatMessageList(res => {
res.push(resJson);
});
setLoading(false);
}
// 4.3 更新 (后续消息都是在更新新增的这条消息内容,以达到打字效果)
else {
setTimeout(() => {
setChatMessageList(res => {
const updatedList = res.map((item, index) => {
if (index === currentIndex) {
return {
...item,
content: item.content + resJson.content,
};
}
return item;
});
return [...updatedList];
});
// 4.4 信息流结束
if (resJson.end === true) {
es.close();
setStreamLoading(false);
}
toBottomFn();
}, 1);
}
});
// 5.监听error
es.addEventListener('error', event => {
if (event.type === 'error') {
console.error('Connection error:', event.message);
} else if (event.type === 'exception') {
console.error('Error:', event.message, event.error);
}
if (es) {
es.close();
}
setLoading(false);
setStreamLoading(false);
});
// 监听close
es.addEventListener('close', event => {
setStreamLoading(false);
console.log('Close SSE connection.');
});
} catch (error) {
console.log(error);
setLoading(false);
setStreamLoading(false);
} finally {
if (textAreaRef.current != null) {
textAreaRef.current.focus();
}
}
};
问题描述:上面代码示例 4.3中,我们通过不断接受信息流并更新消息内容(setState)来实现流式输出(打字效果),但是在最开始测试时发现,虽然前端不断接受到信息流并且连续进行了setState, 但界面并没有按预期的进行连续渲染,还是在信息流完全返回结束后(即最后一次setState)后才进行了渲染,消息才得以展现。
问题原因:React对连续的setState会合并以减少渲染次数进而提升性能
解决方法:利用setTimeout对 setState进行隔离,防止合并刷新
问题描述:使用react-native-sse开发时,发现会连续调用请求两次。最开始以为是因为是复杂请求,所以会有一次预检请求。但是看控制台记录,并不是预检请求。这导致最终记录了两次Message。
问题原因:代码原因,对于使用sse.js,我们需要现实地调用steam()方法来发起请求,而在用react-native-sse改造时,想当然地也用了类似的open()方法进行发起请求。然而它其实在构造sse对象时就会默认发起请求,无需额外的再次调用open()。
解决方法:去除调用open()的逻辑。
问题描述:由于react-native-sse不适用于基于React构建的fishx项目,所以采用evetn-source-polyfill来替代。然而改造后发现请求报错。虽然设置了method为POST,但实际上查看控制台记录,请求仍然是GET。
问题原因:查询文档才知道evetn-source-polyfill仅支持GET请求。然而我们需要在GET请求发送一些参数,例如聊天信息等。
解决方法:改为GET请求,参数采用带参URL的形式。构造带参URL示例代码如下:
const baseUrl = 'baseURL';
const params = {
}
const queryString = Object.keys(params)
.map(key => `${encodeURIComponent(key)}=${encodeURIComponent(params[key])}`)
.join('&');
const url = `${baseUrl}?${queryString}`;
reactor-core-feature文档,开源项目chatgpt-stream
// 需要将produces设置为MediaType.TEXT_EVENT_STREAM_VALUE,表示响应类型是文本事件流
@RequestMapping(value = "/chat", method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ApiOperation(value = "进行对话")
public @ResponseBody Flux<String> chat(@RequestParam("params") String params) {
return Service.send(params);
}
public Flux<String> send(params) {
ChatMessage chatMessage = new ChatMessage();
chatMessage.setContent(params);
return Flux.create(emitter -> {
StreamSubscriber subscriber = new StreamSubscriber(emitter, this, params);
Flux<String> openAiResponse =
webClient.getChatResponse(params);
openAiResponse.subscribe(subscriber);
emitter(FluxSink).onDispose(subscriber);
});
}
return webClient.post()
.uri(url)
.header(HttpHeaders.AUTHORIZATION, token)
.body(BodyInserters.fromObject(body.toJSONString()))
.retrieve()
.bodyToFlux(String.class)
.onErrorResume(WebClientResponseException.class, ex -> {
HttpStatus status = ex.getStatusCode();
String res = ex.getResponseBodyAsString();
return Mono.error(new RuntimeException(res));
});
public void onNext(String data) {
ChatMessageRes res = new ChatMessageRes();
// 最后一条消息
if ("[DONE]".equals(data)) {
emitter.next(JSON.toJSONString(R.success(res)));
// 记录聊天记录到数据库
completedCallBack.completed(questions, sessionId, sceneId, sceneSessionId, sb.toString(), saveMessage);
// 结束推送
emitter.complete();
} else {
// 标记下第一条消息
if (isFirst) {
res.setBegin(true);
isFirst = false;
}
// 解析消息内容并构造返回消息
String content = null;
OpenAiResponse openAiResponse = parseData2Json(data);
content = openAiResponse.getContent();
res.setMessage(content);
res.setContent(content);
R r = R.success(res);
String str = JSON.toJSONString(r);
// 推送
emitter.next(str);
// 同时拼接完整消息,方便结束后记录
sb.append(content);
}
}
问题描述:stream方式调用私有模型时,会出现消息展示一半后停止的问题
问题原因:对于返回的信息流,预期是每次都只有一个消息(即上面subscriber.onNext()方法入参,可以解析为一个JSON对象),但是调用公司私有模型时,会偶发返回两个JSON对象一起返回的情况(两个JSON字符串拼接为一个字符串),导致解析该数据为JSON对象报错,不能继续处理。具体原因未确定,本地开发环境无法复现,目前猜测可能是网络问题。
解决方法:对于这类情况进行兼容处理,即将消息内容进行合并,先对data进行预处理,尝试解析,如果解析异常,则应该是出现了消息积压,那么就对起进行特殊处理——将content进行合并。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。