赞
踩
GPT火了之后,一种新的Http MediaType慢慢火了起来,它就是text/event-stream
,如果你对这个陌生,一定对他的兄弟比较熟悉:application/octet-stream
。这几种类型本质上都是客户端与服务端打开了一个长连接,服务端可以多次写入一部分数据给客户端,客户端可以多次读取,直到全部读取完成。
由于ChatGPT的特性,如果需要生成的token较多,等它全部生成完成将消耗较多的时间,但是如果你将它生成过程中的数据源源不断地展示给用户,那么用户端的体验也不会差(类似于在线播放视频,不需要把整个视频下载完成才能播放)
text/event-stream
支持服务端分多次往客户端写内容。
利用spring mvc的SseEmitter提供服务接口
@PostMapping(value = "/test-stream", produces = "text/event-stream") public SseEmitter conversation(@RequestBody ChatRequest request) { final SseEmitter emitter = new SseEmitter(); new Thread(() -> { try { for (int i = 0; i < 10; i++) { // 模拟某些耗时操作 Thread.sleep(200L); emitter.send("这是第" + i +"次往服务端发送内容"); } } finally { emitter.complete(); } }).start(); return emitter; }
客户端如何读取text/event-stream
的接口呢?对于客户端来说,无论什么协议,都是用InputStream去读取数据,需要注意的是,这种协议每次都会在send的数据前面加上data:
,每次send后都会发送一次空行数据。
URL url = new URL(urlStr); HttpURLConnection conn = (HttpURLConnection)url.openConnection(); conn.setRequestMethod("POST"); conn.setDoOutput(true); conn.setDoInput(true); // 写入请求参数 OutputStream os = conn.getOutputStream(); os.write("request body".getBytes(StandardCharsets.UTF_8)); os.flush(); os.close(); //读取响应参数 try (InputStream is = conn.getInputStream();) { String line; BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); while ((line = reader.readLine()) != null) { if (StringUtils.isBlank(line)) { continue; } if (line.startsWith("data:")) { line = line.substring("data:".length()); // 处理数据 } }
默认情况下,tomcat对于AsyncRequest会设置默认30秒的超时时间,如果你的异步请求耗时较大,会抛出AsyncRequestTimeoutException,可以通过以下方式解决:
@Component
public class MyWebMvcConfig implements WebMvcConfigurer {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setDefaultTimeout(-1L);
}
}
创建SseEmitter的时候,通过构造函数指定超时时间(可选)。
final SseEmitter emitter = new SseEmitter(120 * 1000L);
public interface StreamMessageListener<T> { /** * 收到消息 * @param message */ void messageReceived(T message); /** * 连接关闭 */ void done(); void onException(Throwable t); }
event: ping
消息来检测客户端是否还存活,客户端收到此消息时应当直接忽略。public static <R,P> void streamPost(String urlStr, Map<String, String> headerMap , Map<String, String> queryParams , P body, ParameterizedTypeReference<R> responseType, StreamMessageListener<R> listener) { HttpURLConnection conn = null; InputStream is = null; try { UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromUriString(urlStr); if (queryParams != null) { for (Map.Entry<String, String> paramEntry : queryParams.entrySet()) { uriBuilder.queryParam(paramEntry.getKey(), paramEntry.getValue()); } } urlStr = uriBuilder.build().toString(); URL url = new URL(urlStr); conn = getHttpURLConnection("POST", url, headerMap); // 写入请求参数 OutputStream os = conn.getOutputStream(); os.write(Objects.requireNonNull(JsonUtil.toJson(body)).getBytes(StandardCharsets.UTF_8)); os.flush(); os.close(); // 读取响应参数 throwIfError(conn); is = conn.getInputStream(); String line; BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); ChatResponse.Data data = new ChatResponse.Data(); // 读取响应参数 while ((line = reader.readLine()) != null) { if (StringUtils.isBlank(line) || "event: ping".equals(line)) { continue; } if (line.startsWith("data:")) { line = line.substring("data:".length()); } R message = JsonUtil.toObject(line, convertToTypeReference(responseType)); listener.messageReceived(message); } } catch (Throwable e) { listener.onException(e); } finally { if (is != null) { try { is.close(); } catch (IOException ignored) { } } if (conn != null) { conn.disconnect(); } listener.done(); } } // 失败需要读取errorStream private static void throwIfError(HttpURLConnection conn) throws IOException { if (conn.getResponseCode() >= 400) { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; InputStream inputStream = conn.getErrorStream(); int bytesRead; while ((bytesRead = inputStream.read(buffer)) != -1) { outputStream.write(buffer, 0, bytesRead); } String errorMsg = outputStream.toString(StandardCharsets.UTF_8.name()); throw new RuntimeException(errorMsg); } } private static HttpURLConnection getHttpURLConnection(String method, URL url, Map<String, String> headerMap) throws IOException { HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod(method); if (headerMap != null) { for (Map.Entry<String, String> entry : headerMap.entrySet()) { conn.setRequestProperty(entry.getKey(), entry.getValue()); } } if (headerMap == null || !headerMap.containsKey("Content-Type")) { conn.setRequestProperty("Content-Type", MediaType.APPLICATION_JSON_VALUE); } conn.setRequestProperty("Accept", "*/*"); conn.setDoOutput(true); conn.setDoInput(true); conn.setReadTimeout(READ_TIME_OUT); return conn; } private static <T> TypeReference<T> convertToTypeReference(ParameterizedTypeReference<T> parameterizedTypeReference) { return new TypeReference<T>() { @Override public Type getType() { return parameterizedTypeReference.getType(); } }; }
HttpUtil.streamPost(endPoint, headers, null, input, new ParameterizedTypeReference<DemoClass>() {}, new StreamMessageListener<DemoClass>() { @Override @SneakyThrows public void messageReceived(DemoClass message) { emitter.send(message); } @Override public void done() { emitter.complete(); } @Override public void onException(Throwable e) { log.warn("call api error", e); try { emitter.send("<default error message>"); } catch (IOException ignored) {} } }); );
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。