赞
踩
核心方案:基于http text/event-stream 事件流
/** * 通义千问流式请求 * @param messages * @param modelType */ async TYQWSteamRequest(messages: chatBaseMessages[], modelType: string) { const payload = { model: modelType || 'qwen-max', // 模型名称 必填 input: { messages: messages.map(({ role, content }) => ({ role, content })), }, stream: true, }; // 请求埋点 this.logger.info(`TYQWSteamRequest: reqBody =>${JSON.stringify(payload)};`); const response = await this.ctx.curl( 'https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation', { method: 'POST', headers: { 'Content-Type': 'application/json', Accept: 'text/event-stream', 'X-DashScope-SSE': 'enable', }, streaming: true, // 启用流式响应 // 请求数据类型 data: payload, timeout: 60 * 1000, } ); // 处理流式响应 if (response.status === 200) { // 获取node原生的响应对象 const stream = response.res; return stream; } else { console.error(`请求失败,状态码: ${response.status}`); } }
/** * 提取stream流中的data部分 * @param streamData * @returns */ export const getStreamDataWithJson = (streamData: string) => { const dataRegex = /data:({.*})/; const matches = streamData.match(dataRegex); if (matches && matches.length > 1) { return matches[1]; } return null; }; /** * 统一处理stream流数据格式 * @param streamData * @param modelType */ export const streamFormater = (streamData: string, modelType: string) => { if (qytwModels.some(({ value }) => value === modelType)) { const fromatData = getStreamDataWithJson(streamData); const objectData = JSON.parse(fromatData); return objectData?.output?.text; } throw new Error('模型暂不支持'); }; /** * 数据流对话接口 * @param sendChatMessageParams */ @Post('/send_chat_message_by_stream') async sendChatMessageByStream(@Body() body: sendChatMessageParams) { const { sessionKey, input, modelType } = body; if (!sessionKey) throw new Error('缺少参数[sessionKey]'); if (!input) throw new Error('缺少参数[input]'); if (!modelType) throw new Error('缺少参数[modelType]'); // 1.历史消息 const { messages, creator } = await this.AiChatService.getAiChatDetailData( sessionKey ); if (creator && creator !== this.ctx.user.workid) throw new Error('禁止操作他人的会话'); // 2. 开始对话 let chatResult = ``; const reqTime = moment().format('YYYY-MM-DD HH:mm:ss'); const stream = await this.AiChatService.sendChatMessageByStream( messages, input, modelType ); // 3.处理响应 // 3.1 设置客户端响应头,这里是返回给浏览器 this.ctx.status = 200; this.ctx.set({ 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive', }); // 3.2 监听数据变化 stream.on('data', chunk => { const textData = chunk.toString(); const formatData = streamFormater(textData, modelType); chatResult = formatData; this.ctx.res.write(JSON.stringify(this.HttpReturn.success(formatData))); }); // 3.3 监听结束事件 stream.on('end', async () => { const resTime = moment().format('YYYY-MM-DD HH:mm:ss'); // 3.3.1 记录对话信息 await this.AiChatService.setMessagesBySessionKey(sessionKey, { modelType, messages: [ ...messages, { role: roleType.user, content: input, time: reqTime, }, { role: roleType.assistant, content: chatResult, time: resTime, }, ], }); // 3.3.2 标记对话完成 this.ctx.res.write( JSON.stringify(this.HttpReturn.success(`event-stream:end`)) ); }); // 3.4 监听错误事件 stream.on('error', err => { this.ctx.res.write(`event-stream:error[${err.toString()}]`); throw new Error(`Stream error: ${err}`); }); // 保持长链 return new Promise(resolve => { this.ctx.req.on('end', resolve); }); }
// 发起 POST 请求并处理 text/event-stream import { ElMessage, ElNotification } from 'element-plus'; export function sendChatMessageByStreamAPI(payLoad, callback) { fetch(`/api/chat/send_chat_message_by_stream`, { method: 'POST', headers: { Accept: 'text/event-stream', 'Content-Type': 'application/json', }, body: JSON.stringify(payLoad), }) .then((response) => { const reader = response.body.getReader(); const decoder = new TextDecoder(); let isFirst = true; function read() { reader.read().then(({ value, done }) => { if (done) return; const text = decoder.decode(value, { stream: true }); const { data, success, message } = JSON.parse(text); // 1. 报错 if (!success) { ElNotification({ title: 'Error', message, type: 'error', dangerouslyUseHTMLString: true, zIndex: 9999999, customClass: 'network-error-notification', }); callback(`event-stream:error`); return; } // 2. 正常开始 if (isFirst) { callback(`event-stream:start`); } // 3. 正常流转 callback(data); read(); isFirst = false; }); } read(); }) .catch((error) => { ElMessage.error(`Fetch error:',${error}`); console.error('Fetch error:', error); }); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。