赞
踩
SseEmitter类在Spring框架中用于实现服务器主动向客户端发送消息推送(Server-Sent Events,简称SSE)。SSE是一种允许服务器向客户端推送实时更新的技术,通常用于实现实时数据通信,如股票行情、聊天室等场景。
SSE和websocket的主要区别在于SSE是单向的【只能从服务端到客户端】,而websocket是双向的
SseEmitter(ResponseBodyEmitter的子类)支持服务器发送的事件,
服务器发送的事件将按照W3C SSE规范进行格式化。
为了从控制器生成SSE流,需要返回SseEmitter。
spring5.3.34对应的Spring Boot 版本是2.7.18
https://docs.spring.io/spring-framework/docs/5.3.34/reference/html/web.html#mvc-ann-async-sse
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.18</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ljh</groupId> <artifactId>demo3</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo3</name> <description>Demo project for Spring Boot</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <image> <builder>paketobuildpacks/builder-jammy-base:latest</builder> </image> </configuration> </plugin> </plugins> </build> </project>
java 代码
package com.ljh.demo3; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; /** * 服务端主动推送技术 */ @RestController @RequestMapping(value = "/sse") @CrossOrigin public class SeverSendEvent { private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>(); /** * 客户端发起订阅消息的方法 * @param id 标识ID 可理解为通道ID * @return */ @GetMapping(value = "/subscribe",produces = {MediaType.TEXT_EVENT_STREAM_VALUE}) public SseEmitter subscribe(@RequestParam(value = "id") String id) { //创建一个具有自定义超时值的 SseEmitter。 //默认情况下不设置超时值,在这种情况下,将使用 MVC Java 配置或 MVC 命名空间中配置的默认值; // 如果未设置超时值,则超时值取决于底层服务器的默认值:30秒。 //SseEmitter在构造器中设置超时2秒,设置前端的重试时间为2秒,则一共需要等待4秒 SseEmitter sseEmitter = new SseEmitter(2_000L); // 设置前端的重试时间:2秒 reconnectTime(sseEmitter, 2_000L); //放入map缓存中 sseCache.put(id, sseEmitter); //注册相关回调 //异步请求超时时调用:当前端重连接时,会触发请求超时回调 sseEmitter.onTimeout(() -> { System.out.println("触发请求超时!!!"); sseCache.remove(id); }); //注册完成时回调,以便在异步请求完成时调用。当异步请求因任何原因(包括超时和网络错误)完成时,容器线程会调用此方法。该方法可用于检测 ResponseBodyEmitter 实例是否不再可用 sseEmitter.onCompletion(() -> { System.out.println("完成!!!"); //sseCache.remove(id); }); //在异步请求处理过程中出现错误时调用 sseEmitter.onError(error->{ System.out.println("出现错误啦"); //sseCache.remove(id); error.printStackTrace(); }); return sseEmitter; } /** * 设置前端的重试时间 * @param sseEmitter * @param reconnectTimeMillis 单位毫秒 */ private static void reconnectTime(SseEmitter sseEmitter, long reconnectTimeMillis) { Objects.requireNonNull(sseEmitter,"sseEmitter对象为空啦"); //开启另一个线程 Executors.newFixedThreadPool(1).execute(()-> { try { String dateTime = LocalDateTime.now().withNano(0).toString().replace("T", " "); sseEmitter.send(SseEmitter.event().reconnectTime(reconnectTimeMillis).data(String.format("连接成功:%s", dateTime))); } catch (IOException e) { //sseEmitter.completeWithError(e); throw new RuntimeException(e); } }); } /** * 推送消息 * @param id * @param content * @return * @throws IOException */ @GetMapping(value = "/push") public String push(@RequestParam(value = "id") String id, @RequestParam(value = "content") String content) throws IOException { SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { sseEmitter.send(content); } return "推送成功!"; } /** * 服务器主动停止推送 * @param id * @return */ @GetMapping(value = "/stop") public String stop(@RequestParam(value = "id")String id) { SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { //通过向 servlet 容器执行分派来完成请求处理,Spring MVC 会在其中再次调用,并完成请求处理生命周期。 //注意:此方法应由应用程序调用,以完成请求处理。它不应在容器相关事件(如发送时出错)发生后使用。 sseEmitter.complete(); sseCache.remove(id); } return "断开连接!"; } }
在电脑桌面新建一个index.html, 内容如下:
<!doctype html> <html lang="en"> <head> <title>Sse测试文档</title> </head> <body> <div>sse测试</div> <div id="result"></div> </body> </html> <script> //实际开发中id参数可以做成变量传参 var source = new EventSource('http://127.0.0.1:8080/sse/subscribe?id=123'); var result=document.getElementById('result'); // 监听到服务端发来的消息回调 source.onmessage = function (event) { text = result.innerText; text += '\n' + event.data; result.innerText = text; }; //连接上服务端回调 source.onopen = function (event) { text = result.innerText; text += '\n 开启: '; console.log(event); result.innerText = text; }; </script>
开启后端服务,打开index.html 连接上后即显示连接成功
打开访问push接口,手动模拟推送数据,id为通道id,即在index.html中传递的id参数
停止发送后测试两次重连间隔时间
服务端控制台打印
如果看完有收获,欢迎点赞关注一波~!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。