赞
踩
SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>SseEmitter</title> </head> <body> <button onclick="closeSse()">关闭连接</button> <div id="message"></div> </body> <script> let source = null; // 用时间戳模拟登录用户 const userId = new Date().getTime(); if (!!window.EventSource) { // 建立连接 source = new EventSource('http://ip:端口/CreateSseConnect?clientId=39bd662b7942418595c21a1ef0af7fad'); /** * 连接一旦建立,就会触发open事件 * 另一种写法:source.onopen = function (event) {} */ source.addEventListener('open', function (e) { setMessageInnerHTML("建立连接。。。"); }, false); /** * 客户端收到服务器发来的数据 * 另一种写法:source.onmessage = function (event) {} */ source.addEventListener('message', function (e) { setMessageInnerHTML(e.data); }); /** * 如果发生通信错误(比如连接中断),就会触发error事件 * 或者: * 另一种写法:source.onerror = function (event) {} */ source.addEventListener('error', function (e) { if (e.readyState === EventSource.CLOSED) { setMessageInnerHTML("连接关闭"); } else { console.log(e); } }, false); } else { setMessageInnerHTML("你的浏览器不支持SSE"); } // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据 window.onbeforeunload = function () { closeSse(); }; // 关闭Sse连接 function closeSse() { source.close(); const httpRequest = new XMLHttpRequest(); httpRequest.open('GET', 'http://localhost:8080/sse/CloseConnect/?clientId=e410d4c1d71c469b8d719de5d39783b7', true); httpRequest.send(); console.log("close"); } // 将消息显示在网页上 function setMessageInnerHTML(innerHTML) { document.getElementById('message').innerHTML += innerHTML + '<br/>'; } </script> </html>
Controller:
/** * SSE长链接 */ @RestController @RequestMapping("/sse") public class SseEmitterController { @Autowired private SseEmitterService sseEmitterService; /** * 创建SSE长链接 * * @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端) * @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter **/ @CrossOrigin //如果nginx做了跨域处理,此处可去掉 @GetMapping("/CreateSseConnect") public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) { return sseEmitterService.createSseConnect(clientId); } /** * 关闭SSE连接 * * @param clientId 客户端ID **/ @GetMapping("/CloseSseConnect") public Result closeSseConnect(String clientId) { sseEmitterService.closeSseConnect(clientId); return ResultGenerator.genSuccessResult(true); } }
@Service public class SseEmitterServiceImpl implements SseEmitterService { /** * 容器,保存连接,用于输出返回 */ private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>(); @Override public SseEmitter createSseConnect(String clientId) { // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException SseEmitter sseEmitter = new SseEmitter(0L); // 是否需要给客户端推送ID if (StringUtils.isBlank(clientId)) { clientId = IdUtil.simpleUUID(); } // 注册回调 sseEmitter.onCompletion(completionCallBack(clientId)); sseCache.put(clientId, sseEmitter); logger.info("创建新的sse连接,当前用户:{}", clientId); try { sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId)); } catch (IOException e) { logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e); throw new BusinessException("创建连接异常!", e); } return sseEmitter; } @Override public void closeSseConnect(String clientId) { SseEmitter sseEmitter = sseCache.get(clientId); if (sseEmitter != null) { sseEmitter.complete(); removeUser(clientId); } } // 根据客户端id获取SseEmitter对象 @Override public SseEmitter getSseEmitterByClientId(String clientId) { return sseCache.get(clientId); } // 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息 @Override public void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList) { if (CollectionUtil.isEmpty(sseCache)) { return; } for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) { sendMsgToClientByClientId(entry.getKey(), sseEmitterResultVOList, entry.getValue()); } } /** * 推送消息到客户端 * 此处做了推送失败后,重试推送机制,可根据自己业务进行修改 * * @param clientId 客户端ID * @param sseEmitterResultVOList 推送信息,此处结合具体业务,定义自己的返回值即可 **/ private void sendMsgToClientByClientId(String clientId, List<SseEmitterResultVO> sseEmitterResultVOList, SseEmitter sseEmitter) { if (sseEmitter == null) { logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}", clientId, sseEmitterResultVOList.toString()); return; } SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(SseEmitterConstant.TASK_RESULT).data(sseEmitterResultVOList, MediaType.APPLICATION_JSON); try { sseEmitter.send(sendData); } catch (IOException e) { // 推送消息失败,记录错误日志,进行重推 logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList.toString(), e); boolean isSuccess = true; // 推送消息失败后,每隔10s推送一次,推送5次 for (int i = 0; i < 5; i++) { try { Thread.sleep(10000); sseEmitter = sseCache.get(clientId); if (sseEmitter == null) { logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1); continue; } sseEmitter.send(sendData); } catch (Exception ex) { logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex); continue; } logger.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList.toString()); return; } } } /** * 长链接完成后回调接口(即关闭连接时调用) * * @param clientId 客户端ID * @return java.lang.Runnable **/ private Runnable completionCallBack(String clientId) { return () -> { logger.info("结束连接:{}", clientId); removeUser(clientId); }; } /** * 连接超时时调用 * * @param clientId 客户端ID * @return java.lang.Runnable **/ private Runnable timeoutCallBack(String clientId) { return () -> { logger.info("连接超时:{}", clientId); removeUser(clientId); }; } /** * 推送消息异常时,回调方法 * * @param clientId 客户端ID * @return java.util.function.Consumer<java.lang.Throwable> **/ private Consumer<Throwable> errorCallBack(String clientId) { return throwable -> { logger.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId); // 推送消息失败后,每隔10s推送一次,推送5次 for (int i = 0; i < 5; i++) { try { Thread.sleep(10000); SseEmitter sseEmitter = sseCache.get(clientId); if (sseEmitter == null) { logger.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId); continue; } sseEmitter.send("失败后重新推送"); } catch (Exception e) { e.printStackTrace(); } } }; } /** * 移除用户连接 * * @param clientId 客户端ID **/ private void removeUser(String clientId) { sseCache.remove(clientId); logger.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId); } }
如果项目中使用nginx对后端服务做了代理,nginx代理转发后,默认会在1min的时候断掉长链接,SSE需要设置自己的长链接时间,则需要在nginx中进行配置;
在反向代理的location块中加入如下配置
proxy_set_header Host $http_host; ##proxy_set_header用来重定义发往后端服务器的请求头
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
proxy_http_version 1.1;
proxy_read_timeout 600s; ##设置SSE长链接保持时间为 600s
1、前端报错:EventSource’s response has a MIME type (“application/json”) that is not “text/event-stream”. Aborting the connection
前端在创建SSE长链接时,完整的请求(包括参数和参数值)都必须放在new EventSource(完整请求)中;
2、创建长链接时,接口状态一直处于pending,检查后端nginx是否做相应配置;
3、推送消息失败:检查客户端创建链接时的id,和推送消息时的id是否一致;
整体业务流程为:客户端创建链接——>服务端保持生成SseEmitter对象,并通过SseEmitter对象实现向客户端主动推送消息——>客户端收到推送消息后,刷新页面(根据推送消息,请求相关业务接口)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。