当前位置:   article > 正文

text/event-stream

text/event-stream

简介

GPT火了之后,一种新的Http MediaType慢慢火了起来,它就是text/event-stream,如果你对这个陌生,一定对他的兄弟比较熟悉:application/octet-stream。这几种类型本质上都是客户端与服务端打开了一个长连接,服务端可以多次写入一部分数据给客户端,客户端可以多次读取,直到全部读取完成。
由于ChatGPT的特性,如果需要生成的token较多,等它全部生成完成将消耗较多的时间,但是如果你将它生成过程中的数据源源不断地展示给用户,那么用户端的体验也不会差(类似于在线播放视频,不需要把整个视频下载完成才能播放)
text/event-stream支持服务端分多次往客户端写内容。

提供text/event-stream接口

利用spring mvc的SseEmitter提供服务接口

@PostMapping(value = "/test-stream", produces = "text/event-stream")
	public SseEmitter conversation(@RequestBody ChatRequest request) {
		final SseEmitter emitter = new SseEmitter();
		new Thread(() -> {
			try {
				for (int i = 0; i < 10; i++) {
					// 模拟某些耗时操作
					Thread.sleep(200L);
					emitter.send("这是第" + i +"次往服务端发送内容");
				}
			} finally {
				emitter.complete();
			}
		}).start();
		return emitter;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

客户端如何读取text/event-stream的接口呢?对于客户端来说,无论什么协议,都是用InputStream去读取数据,需要注意的是,这种协议每次都会在send的数据前面加上data: ,每次send后都会发送一次空行数据。

URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.setRequestMethod("POST");
conn.setDoOutput(true);
conn.setDoInput(true);

// 写入请求参数
OutputStream os = conn.getOutputStream();
os.write("request body".getBytes(StandardCharsets.UTF_8));
os.flush();
os.close();

//读取响应参数
try (InputStream is = conn.getInputStream();) {
String line;
BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
while ((line = reader.readLine()) != null) {
    if (StringUtils.isBlank(line)) {
        continue;
    }
    if (line.startsWith("data:")) {
        line = line.substring("data:".length());
        // 处理数据
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

超时时间设置

默认情况下,tomcat对于AsyncRequest会设置默认30秒的超时时间,如果你的异步请求耗时较大,会抛出AsyncRequestTimeoutException,可以通过以下方式解决:

@Component
public class MyWebMvcConfig implements WebMvcConfigurer {
	@Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setDefaultTimeout(-1L);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

创建SseEmitter的时候,通过构造函数指定超时时间(可选)。

final SseEmitter emitter = new SseEmitter(120 * 1000L);
  • 1

调用text/event-stream接口

  1. 新建StreamMessageListener接口
    提供给调用方定义自己的业务逻辑。
public interface StreamMessageListener<T> {

    /**
     * 收到消息
     * @param message
     */
    void messageReceived(T message);

    /**
     * 连接关闭
     */
    void done();

    void onException(Throwable t);

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  1. 定义HttpUtil类
    实现post请求的公共逻辑,服务端可能会发送event: ping消息来检测客户端是否还存活,客户端收到此消息时应当直接忽略。
public static <R,P> void streamPost(String urlStr, Map<String, String> headerMap , Map<String, String> queryParams
		, P body, ParameterizedTypeReference<R> responseType, StreamMessageListener<R> listener) {
	HttpURLConnection conn = null;
	InputStream is = null;
	try {
		UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromUriString(urlStr);
		if (queryParams != null) {
			for (Map.Entry<String, String> paramEntry : queryParams.entrySet()) {
				uriBuilder.queryParam(paramEntry.getKey(), paramEntry.getValue());
			}
		}
		urlStr = uriBuilder.build().toString();
		URL url = new URL(urlStr);
		conn = getHttpURLConnection("POST", url, headerMap);
		// 写入请求参数
		OutputStream os = conn.getOutputStream();
		os.write(Objects.requireNonNull(JsonUtil.toJson(body)).getBytes(StandardCharsets.UTF_8));
		os.flush();
		os.close();
		// 读取响应参数
		throwIfError(conn);
		is = conn.getInputStream();
		String line;
		BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
		ChatResponse.Data data = new ChatResponse.Data();
		// 读取响应参数
		while ((line = reader.readLine()) != null) {
			if (StringUtils.isBlank(line) || "event: ping".equals(line)) {
				continue;
			}
			if (line.startsWith("data:")) {
				line = line.substring("data:".length());
			}
			R message = JsonUtil.toObject(line, convertToTypeReference(responseType));
			listener.messageReceived(message);
		}
	} catch (Throwable e) {
		listener.onException(e);
	} finally {
		if (is != null) {
			try {
				is.close();
			} catch (IOException ignored) { }
		}
		if (conn != null) {
			conn.disconnect();
		}
		listener.done();
	}
}

// 失败需要读取errorStream
private static void throwIfError(HttpURLConnection conn) throws IOException {
	if (conn.getResponseCode() >= 400) {
		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
		byte[] buffer = new byte[1024];
		InputStream inputStream = conn.getErrorStream();
		int bytesRead;
		while ((bytesRead = inputStream.read(buffer)) != -1) {
			outputStream.write(buffer, 0, bytesRead);
		}
		String errorMsg = outputStream.toString(StandardCharsets.UTF_8.name());
		throw new RuntimeException(errorMsg);
	}
}

private static HttpURLConnection getHttpURLConnection(String method, URL url, Map<String, String> headerMap) throws IOException {
	HttpURLConnection conn = (HttpURLConnection) url.openConnection();
	conn.setRequestMethod(method);
	if (headerMap != null) {
		for (Map.Entry<String, String> entry : headerMap.entrySet()) {
			conn.setRequestProperty(entry.getKey(), entry.getValue());
		}
	}
	if (headerMap == null || !headerMap.containsKey("Content-Type")) {
		conn.setRequestProperty("Content-Type", MediaType.APPLICATION_JSON_VALUE);
	}
	conn.setRequestProperty("Accept", "*/*");
	conn.setDoOutput(true);
	conn.setDoInput(true);
	conn.setReadTimeout(READ_TIME_OUT);
	return conn;
}

private static <T> TypeReference<T> convertToTypeReference(ParameterizedTypeReference<T> parameterizedTypeReference) {
	return new TypeReference<T>() {
		@Override
		public Type getType() {
			return parameterizedTypeReference.getType();
		}
	};
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  1. 调用接口
    给的示例为代理一个text/event-stream接口,内部逻辑简单的将下游的内容返回给上游。
HttpUtil.streamPost(endPoint, headers, null, input, new ParameterizedTypeReference<DemoClass>() {}, new StreamMessageListener<DemoClass>() {
		
		@Override
		@SneakyThrows
		public void messageReceived(DemoClass message) {
			emitter.send(message);
		}

		@Override
		public void done() {
			emitter.complete();
		}

		@Override
		public void onException(Throwable e) {
			log.warn("call api error", e);
			try {
				emitter.send("<default error message>");
			} catch (IOException ignored) {}
		}
	});
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号