赞
踩
目录
SSE(Server-Sent Events)是指服务器发送事件,这是一种浏览器 API,允许服务器端向客户端持续推送数据,而无需客户端发起请求。
比如在社交媒体更新、新闻直播、股票市场、物联网智能设备等等方面,如下图。如果数据发生了更新,更新的数据能实时高效的推送给客户端就显的尤为重要。传统的方式是客户端需要去主动获取数据的变化,但是这种事不实时的,效率低,而且浪费资源。
相比于 WebSocket,SSE 比较简单,基于 HTTP 协议,易于理解,但是 SSE 也有一个天然的不足,那就是无法定向推送数据,推送数据时广播的模式,所有的客户端都能收到。
EventBus 是一个广泛应用于开发中的轻量级事件发布/订阅框架,它的核心设计理念是简化应用程序内部各组件间的通信。通过采用发布/订阅(Publish/Subscribe)设计模式,EventBus能够有效地降低组件之间的耦合度,提高代码可读性和维护性。
EventBus 的工作流程如下:
POSTING
:事件处理在发布事件所在线程中执行。MAIN
:如果不在主线程,则切换到主线程执行事件处理。MAIN_ORDERED
:类似MAIN
,但在主线程中按照事件发布的顺序逐个执行。BACKGROUND
:如果不在后台线程,则新建一个后台线程执行。ASYNC
:无论在哪种线程环境下,都会在独立的线程池中异步执行。由于 SSE 只能实现广播模式的消息推送,如果要实现推送数据到指定的客户端,就需要做一些改动,加入有这么一个场景,某一个客户端关注某一个事件,当该事件发生变动时,只把这个新时间推送给指定的客户端,这如何实现呢?
在客户端的入参中需要加入 clientId 来进行区分,新数据到来后也要有相应的标识能获取到绑定的客户端,这样就能将变动的数据推送给指定的客户端了,具体实现如下。
添加依赖
- // Spring boot相关依赖请自行添加
- <dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- <version>3.0.2</version>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>32.0.1-jre</version>
- </dependency>
添加 EventBus 配置类
- package cn.scf.sse.config;
-
- import com.google.common.eventbus.EventBus;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class EventBusConfig {
-
- @Bean
- public EventBus eventBus() {
- return new EventBus();
- }
- }
定义事件类
- package cn.scf.sse.event;
-
- public class ClientEvent {
-
-
- // 客户端ID
- private final String clientId;
-
-
- private final String data;
-
- public ClientEvent(String clientId, String data) {
- this.clientId = clientId;
- this.data = data;
- }
-
- public String getClientId() {
- return clientId;
- }
-
- public String getData() {
- return data;
- }
- }
定义 SSE 的管理类
- package cn.scf.sse.event;
-
- import com.google.common.eventbus.EventBus;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
-
-
- @Slf4j
- @Component
- public class SSEManager {
-
-
- // 存储与客户端关联的SseEmitter实例
- private final Map<String, SseEmitter> clientEmitters = new ConcurrentHashMap<>();
-
- @Autowired
- private EventBus eventBus;
-
- public void register(Object listener) {
- // 注册监听器,当EventBus上有事件时,根据事件中的客户端标识发送数据
- eventBus.register(listener);
- }
-
- public void postEvent(String clientId, String data) {
- ClientEvent event = new ClientEvent(clientId, data);
- eventBus.post(event);
- }
-
- // 关闭连接时调用
- public void closeEmitter(String clientId, String exception) {
- SseEmitter sseEmitter = clientEmitters.remove(clientId);
- if (sseEmitter != null) {
- sseEmitter.complete();
- }
- log.info("关闭连接清理资源成功, ex: {}", exception);
- }
-
-
- public Map<String, SseEmitter> getClientEmitters() {
- return clientEmitters;
- }
-
- }
事件处理类,从事件总线订阅了自己感兴趣的事件
- package cn.scf.sse.event;
-
- import com.alibaba.fastjson.JSONObject;
- import com.google.common.eventbus.Subscribe;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-
- import javax.annotation.PostConstruct;
- import java.util.Map;
-
-
- @Service
- @Slf4j
- public class EventHandler {
-
- @Autowired
- private SSEManager sseManager;
-
- @PostConstruct
- public void init() {
- // 初始化时将自己注册到 EventBus 中
- sseManager.register(this);
- }
-
- @Subscribe
- public void handleEvent(ClientEvent event) {
- Map<String, SseEmitter> clientEmitters = sseManager.getClientEmitters();
- SseEmitter emitter = clientEmitters.get(event.getClientId());
- if (emitter != null) {
- new Thread(() -> {
- try {
- String message = JSONObject.toJSONString(event);
- // 发送给客户端
- emitter.send(SseEmitter.event().data(message));
- } catch (Exception e) {
- emitter.completeWithError(e);
- }
- }).start();
- }
- }
- }
SSE 接口实现
- @RestController
- public class SseController {
-
- @Autowired
- private SSEManager sseManager;
-
- @CrossOrigin
- @GetMapping(value = "/sse/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public SseEmitter handleSseRequest(@PathVariable String clientId) {
- // 设置超时时间, 如果在指定的时间内没有向客户端发送任何数据,则连接将自动关闭。
- SseEmitter emitter = new SseEmitter(TimeUnit.MINUTES.toMillis(10));
- Map<String, SseEmitter> clientEmitters = sseManager.getClientEmitters();
- clientEmitters.put(clientId, emitter);
- // 当SSE连接关闭时,从管理器中移除
- emitter.onCompletion(() -> sseManager.closeEmitter(clientId, null));
- // 监听链接错误
- emitter.onError((ex) -> sseManager.closeEmitter(clientId, ex.getMessage()));
- return emitter;
- }
-
- // 此接口用来模拟有新数据到来时发布事件,将新数据推送给指定客户端,实际中这里可能是三方数据推送,也可能是数据库数据变化
- @GetMapping(value = "/pub/event")
- public String pubEvent(@RequestParam String clientId) {
- sseManager.postEvent(clientId, "发送事件客户端ID:" + clientId + "-" +UUID.randomUUID().toString());
- return "ok";
- }
- }
最后是 H5 调用 SSE 接口
- <!DOCTYPE html>
- <html>
- <head>
- <title>SSE Example</title>
- <script type="text/javascript">
- document.addEventListener('DOMContentLoaded', function() {
- let clientId = 'some-client-id'; // 假设已经获取到客户端标识
- let source = new EventSource(`http://localhost:8080/sse/${clientId}`);
-
- source.onmessage = function(event) {
- let data = event.data;
- console.log(data);
- // 更新页面上的某些内容,例如:
- document.getElementById('sse').innerHTML += data + '<br>';
-
- };
-
- source.onerror = function(event) {
- console.error("EventSource failed.");
- };
-
- // source.close();
- });
- </script>
- </head>
- <body>
- <h1>Server Sent Events</h1>
- <div id="sse"></div>
- <!-- 页面内容根据接收到的数据更新 -->
- </body>
- </html>
当服务启动时,EventHandler 就将自己注册到了 EventBus 中,并通过 @Subscribe 标记想要监听的事件,在方法内部取出客户端与 SseEmitter 的关系,判断是否有对应的 SseEmitter,如果存在,就向客户端推送新的数据。
使用Server-Sent Events(SSE)时,以下是一些值得注意的关键事项:
浏览器兼容:SSE 是 HTML5 的一项功能,所以并非所有浏览器都支持。在使用之前,应检查目标浏览器是否支持 EventSource
API。
连接管理:SSE 通过单个 HTTP 连接进行数据推送,这意味着浏览器会维持一个长连接至服务器。确保服务器端正确处理连接的生命周期,包括维持连接、处理空闲连接、以及在连接断开时自动重新连接。
并发限制:浏览器可能对同一域名下的并发 SSE 连接有所限制,通常每个浏览器标签页共享一个最大连接数,超过这个数量的 SSE 连接可能无法建立。
资源管理:由于连接长期存在,需要考虑服务器资源消耗和客户端内存占用。在服务器端确保及时释放不再使用的资源,客户端也需要适当管理 EventSource 对象,比如在页面卸载时取消注册事件源。
往期经典推荐
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。