赞
踩
我们经常会有一些流式响应的需求,即客户端发一次请求,服务端需要多次响应才能返回完整数据的案例。譬如用户请求一篇长文,在数据库里有很多个段落,我们也不希望一次性全部查询完毕再返回给客户端(耗时太久),而是希望查一段就返回一段,逐次批量返回给客户端。
那么在springboot中,就可以很简单地实现这种场景需求,即SSE(Server-Sent Events)模式。
举例:
-
@RequestMapping("/emitter")
-
public SseEmitter
sse
(@RequestBody String inputParameter, HttpServletResponse response) {
-
response.setContentType(
"text/event-stream");
-
response.setCharacterEncoding(
"UTF-8");
-
SseEmitter
emitter
=
new
SseEmitter();
-
-
// Simulate asynchronous data retrieval from the database
-
new
Thread(() -> {
-
try {
-
// Query the database based on the input parameter and send data in batches
-
for (
int
i
=
0; i <
10; i++) {
-
String
data
=
"Data batch " + i +
" for parameter: " + inputParameter;
-
emitter.send(data);
-
Thread.sleep(
1000);
// Simulate delay between batches
-
}
-
-
emitter.complete();
// Complete the SSE connection
-
}
catch (Exception e) {
-
emitter.completeWithError(e);
// Handle errors
-
}
-
}).start();
-
-
return emitter;
-
}
如以上代码,返回的对象是SseEmitter,每次调用emitter.send()方法,客户端就会收到一条消息,即一次响应,响应结束的标志是调用emitter.complete方法。
所以即便我读取数据是在new Thread里完成的,在请求该接口时,该方法也不会立即结束,而是阻塞住,等待emitter.complete方法完成。
下面我们可以发起个http客户端请求去查看效果,这里我们使用okhttp客户端。
pom依赖:
-
<dependency>
-
<groupId>com.squareup.okhttp3</groupId>
-
<artifactId>okhttp</artifactId>
-
<version>
4.10
.0</version>
-
</dependency>
-
<dependency>
-
<groupId>com.squareup.okhttp3</groupId>
-
<artifactId>okhttp-sse</artifactId>
-
<version>
4.10
.0</version>
-
</dependency>
-
public
static
void
main
(String[] args) {
-
String
json
=
"{\"inputParameter\": \"1234\"}";
-
-
HttpUtils.stream(
"http://localhost:8081/emitter",
new
HashMap<>(), json,
new
EventSourceListener() {
-
@Override
-
public
void
onClosed
(@NotNull EventSource eventSource) {
-
System.out.println(
"closed");
-
}
-
-
@Override
-
public
void
onOpen
(@NotNull EventSource eventSource, @NotNull Response response) {
-
System.out.println(
"open");
-
}
-
-
@Override
-
public
void
onEvent
(@NotNull EventSource eventSource, @Nullable String id, @Nullable String type, @NotNull String data) {
-
System.out.println(data);
-
}
-
});
-
}
其中HttpUtils是用okhttp封装的工具类:
HttpUtils.java
-
public
static
final
MediaType
MEDIA_TYPE_JSON
= MediaType.parse(
"application/json; charset=utf-8");
-
public
final
static
int
MAX_IDLE_CONNECTIONS
=
20;
-
public
final
static
long
KEEP_ALIVE_DURATION
=
30L;
-
-
public
final
static
int
CONNECT_TIME_OUT
=
6;
-
-
public
final
static
int
WRITE_TIME_OUT
=
10;
-
-
public
final
static
int
READ_TIME_OUT
=
40;
-
-
-
/**
-
* client
-
* 配置重试
-
*/
-
private
final
static
OkHttpClient
HTTP_CLIENT
=
new
OkHttpClient.Builder()
-
.connectTimeout(CONNECT_TIME_OUT, TimeUnit.SECONDS)
-
.writeTimeout(WRITE_TIME_OUT, TimeUnit.SECONDS)
-
.readTimeout(READ_TIME_OUT, TimeUnit.SECONDS)
-
.connectionPool(
new
ConnectionPool(MAX_IDLE_CONNECTIONS, KEEP_ALIVE_DURATION, TimeUnit.MINUTES))
-
.build();
-
-
-
public
static
boolean
stream
(String url, Map<String, String> headers, String json, EventSourceListener eventSourceListener) {
-
try {
-
RequestBody
body
= RequestBody.create(MEDIA_TYPE_JSON, json);
-
Request.
Builder
builder
=
new
Request.Builder();
-
buildHeader(builder, headers);
-
Request
request
= builder.url(url).post(body).build();
-
EventSource.
Factory
factory
= EventSources.createFactory(HTTP_CLIENT);
-
//创建事件
-
log.info(
"http stream请求,url: {},参数: {}", url, json);
-
factory.newEventSource(request, eventSourceListener);
-
return
true;
-
}
catch (Exception e) {
-
log.error(
"http stream请求,url: {} 失败 ,参数: {}", url, json, e);
-
}
-
return
false;
-
}
当服务端每次调用emitter.send方法时,客户端的onEvent就会触发一次,同理,onOpen,onClose,onFailure都对应服务端的对应方法调用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。