赞
踩
服务器发送事件(Server-Sent Events),简称 SSE。这是一种服务器端到客户端的单向消息推送。SSE 基于 HTTP 协议的,SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息
后端代码:
- import cn.hutool.core.util.IdUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.function.Consumer;
-
- @Slf4j
- @Component
- public class SseUtil {
-
- private static final Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
-
- /**
- * 创建连接
- */
- public SseEmitter connect(Long userId, Consumer<Throwable> errorCallback,Runnable timeOutCallback) {
- if (sseEmitterMap.containsKey(userId)) {
- SseEmitter sseEmitter =sseEmitterMap.get(userId);
- sseEmitterMap.remove(userId);
- sseEmitter.complete();
- }
- try {
- // 设置超时时间,0表示不过期。默认30秒
- SseEmitter sseEmitter = new SseEmitter(5*60*1000L);
- sseEmitter.send(SseEmitter.event().id(IdUtil.simpleUUID()).reconnectTime(1*60*1000L).data(""));
- // 注册回调
- sseEmitter.onCompletion(() -> {
- });
- sseEmitter.onError(errorCallback);
- sseEmitter.onTimeout(timeOutCallback);
- sseEmitterMap.put(userId, sseEmitter);
- log.info("创建sse连接完成,当前用户:{}", userId);
- return sseEmitter;
- } catch (Exception e) {
- log.info("创建sse连接异常,当前用户:{}", userId);
- }
- return null;
- }
-
- /**
- * 给指定用户发送消息
- *
- */
- public boolean sendMessage(Long userId,String messageId, String message) {
- if (sseEmitterMap.containsKey(userId)) {
- SseEmitter sseEmitter = sseEmitterMap.get(userId);
- try {
- sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(1*60*1000L).data(message));
- log.info("用户{},消息id:{},推送成功:{}", userId,messageId, message);
- return true;
- }catch (Exception e) {
- sseEmitterMap.remove(userId);
- log.info("用户{},消息id:{},推送异常:{}", userId,messageId, e.getMessage());
- sseEmitter.complete();
- return false;
- }
- }else {
- log.info("用户{}未上线", userId);
- }
- return false;
- }
-
- /**
- * 断开
- * @param userId
- */
- public void removeUser(Long userId){
- if (sseEmitterMap.containsKey(userId)) {
- SseEmitter sseEmitter = sseEmitterMap.get(userId);
- sseEmitterMap.remove(userId);
- sseEmitter.complete();
- }else {
- log.info("用户{} 连接已关闭",userId);
- }
-
- }
-
- }
细节:
SseEmitter
中, timeout
属性表示 SseEmitter
在发送 SSE 事件到客户端时的超时时间。也就是说,当您使用 SseEmitter
的 send()
方法来发送 SSE 事件时,如果超过了 timeout
属性指定的时间,则将抛出 AsyncRequestTimeoutException
异常。客户端会进行自动重连,这个异常最好直接交给spring处理,因为这个请求是text/event-stream,全局异常处理可能会报错SseEventBuilder
中, timeout
属性表示当前正在构建的 SSE 事件的超时时间。也就是说,当您调用 SseEventBuilder
的 build()
方法来构建 SSE 事件时,如果超过了 timeout
属性指定的时间,则将抛出 SseEventTimeoutException
异常前端代码:
- <script>
- if (window.EventSource) {
- // 建立连接
- source = new EventSource(http://localhost:8080/test/sse?id=1);
-
- source.onopen = function (event) {
- console.log('SSE链接成功');
- }
-
- source.onmessage = function (event) {
-
- if(event.data){
-
- console.log('后端返回的数据:', data.value);
- }
-
- }
- source.onerror = (error) => {
- console.log('SSE链接失败');
- };
-
- } else {
- alert("你的浏览器不支持SSE");
- }
- </script>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。