赞
踩
SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <title>SseEmitter</title>
- </head>
- <body>
- <button onclick="closeSse()">关闭连接</button>
- <div id="message"></div>
- </body>
- <script>
- let source = null;
-
- // 用时间戳模拟登录用户
- const userId = new Date().getTime();
-
- if (!!window.EventSource) {
-
- // 建立连接
- source = new EventSource('http://ip:端口/CreateSseConnect?clientId=39bd662b7942418595c21a1ef0af7fad');
-
- /**
- * 连接一旦建立,就会触发open事件
- * 另一种写法:source.onopen = function (event) {}
- */
- source.addEventListener('open', function (e) {
- setMessageInnerHTML("建立连接。。。");
- }, false);
-
- /**
- * 客户端收到服务器发来的数据
- * 另一种写法:source.onmessage = function (event) {}
- */
- source.addEventListener('message', function (e) {
- setMessageInnerHTML(e.data);
- });
-
-
- /**
- * 如果发生通信错误(比如连接中断),就会触发error事件
- * 或者:
- * 另一种写法:source.onerror = function (event) {}
- */
- source.addEventListener('error', function (e) {
- if (e.readyState === EventSource.CLOSED) {
- setMessageInnerHTML("连接关闭");
- } else {
- console.log(e);
- }
- }, false);
-
- } else {
- setMessageInnerHTML("你的浏览器不支持SSE");
- }
-
- // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
- window.onbeforeunload = function () {
- closeSse();
- };
-
- // 关闭Sse连接
- function closeSse() {
- source.close();
- const httpRequest = new XMLHttpRequest();
- httpRequest.open('GET', 'http://localhost:8080/sse/CloseConnect/?clientId=e410d4c1d71c469b8d719de5d39783b7', true);
- httpRequest.send();
- console.log("close");
- }
-
- // 将消息显示在网页上
- function setMessageInnerHTML(innerHTML) {
- document.getElementById('message').innerHTML += innerHTML + '<br/>';
- }
- </script>
- </html>

Controller:
- /**
- * SSE长链接
- */
- @RestController
- @RequestMapping("/sse")
- public class SseEmitterController {
-
- @Autowired
- private SseEmitterService sseEmitterService;
-
- /**
- * 创建SSE长链接
- *
- * @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端)
- * @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
- **/
- @CrossOrigin //如果nginx做了跨域处理,此处可去掉
- @GetMapping("/CreateSseConnect")
- public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {
- return sseEmitterService.createSseConnect(clientId);
- }
-
- /**
- * 关闭SSE连接
- *
- * @param clientId 客户端ID
- **/
- @GetMapping("/CloseSseConnect")
- public Result closeSseConnect(String clientId) {
- sseEmitterService.closeSseConnect(clientId);
- return ResultGenerator.genSuccessResult(true);
- }
-
- }

- @Service
- public class SseEmitterServiceImpl implements SseEmitterService {
-
- /**
- * 容器,保存连接,用于输出返回
- */
- private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
-
- @Override
- public SseEmitter createSseConnect(String clientId) {
- // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
- SseEmitter sseEmitter = new SseEmitter(0L);
- // 是否需要给客户端推送ID
- if (StringUtils.isBlank(clientId)) {
- clientId = IdUtil.simpleUUID();
- }
- // 注册回调
- sseEmitter.onCompletion(completionCallBack(clientId));
- sseCache.put(clientId, sseEmitter);
- logger.info("创建新的sse连接,当前用户:{}", clientId);
-
- try {
- sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId));
- } catch (IOException e) {
- logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);
- throw new BusinessException("创建连接异常!", e);
- }
- return sseEmitter;
- }
-
- @Override
- public void closeSseConnect(String clientId) {
- SseEmitter sseEmitter = sseCache.get(clientId);
- if (sseEmitter != null) {
- sseEmitter.complete();
- removeUser(clientId);
- }
- }
-
- // 根据客户端id获取SseEmitter对象
- @Override
- public SseEmitter getSseEmitterByClientId(String clientId) {
- return sseCache.get(clientId);
- }
-
- // 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息
- @Override
- public void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList) {
- if (CollectionUtil.isEmpty(sseCache)) {
- return;
- }
- for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {
- sendMsgToClientByClientId(entry.getKey(), sseEmitterResultVOList, entry.getValue());
- }
- }
-
- /**
- * 推送消息到客户端
- * 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
- *
- * @param clientId 客户端ID
- * @param sseEmitterResultVOList 推送信息,此处结合具体业务,定义自己的返回值即可
- **/
- private void sendMsgToClientByClientId(String clientId, List<SseEmitterResultVO> sseEmitterResultVOList, SseEmitter sseEmitter) {
- if (sseEmitter == null) {
- logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",
- clientId, sseEmitterResultVOList.toString());
- return;
- }
-
- SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(SseEmitterConstant.TASK_RESULT).data(sseEmitterResultVOList, MediaType.APPLICATION_JSON);
- try {
- sseEmitter.send(sendData);
- } catch (IOException e) {
- // 推送消息失败,记录错误日志,进行重推
- logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList.toString(), e);
- boolean isSuccess = true;
- // 推送消息失败后,每隔10s推送一次,推送5次
- for (int i = 0; i < 5; i++) {
- try {
- Thread.sleep(10000);
- sseEmitter = sseCache.get(clientId);
- if (sseEmitter == null) {
- logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
- continue;
- }
- sseEmitter.send(sendData);
- } catch (Exception ex) {
- logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);
- continue;
- }
- logger.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList.toString());
- return;
- }
- }
- }
-
- /**
- * 长链接完成后回调接口(即关闭连接时调用)
- *
- * @param clientId 客户端ID
- * @return java.lang.Runnable
- **/
- private Runnable completionCallBack(String clientId) {
- return () -> {
- logger.info("结束连接:{}", clientId);
- removeUser(clientId);
- };
- }
-
- /**
- * 连接超时时调用
- *
- * @param clientId 客户端ID
- * @return java.lang.Runnable
- **/
- private Runnable timeoutCallBack(String clientId) {
- return () -> {
- logger.info("连接超时:{}", clientId);
- removeUser(clientId);
- };
- }
-
- /**
- * 推送消息异常时,回调方法
- *
- * @param clientId 客户端ID
- * @return java.util.function.Consumer<java.lang.Throwable>
- **/
- private Consumer<Throwable> errorCallBack(String clientId) {
- return throwable -> {
- logger.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);
-
- // 推送消息失败后,每隔10s推送一次,推送5次
- for (int i = 0; i < 5; i++) {
- try {
- Thread.sleep(10000);
- SseEmitter sseEmitter = sseCache.get(clientId);
- if (sseEmitter == null) {
- logger.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
- continue;
- }
- sseEmitter.send("失败后重新推送");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- }
-
- /**
- * 移除用户连接
- *
- * @param clientId 客户端ID
- **/
- private void removeUser(String clientId) {
- sseCache.remove(clientId);
- logger.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
- }
- }

如果项目中使用nginx对后端服务做了代理,nginx代理转发后,默认会在1min的时候断掉长链接,SSE需要设置自己的长链接时间,则需要在nginx中进行配置;
在反向代理的location块中加入如下配置
- proxy_set_header Host $http_host; ##proxy_set_header用来重定义发往后端服务器的请求头
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto $scheme;
- proxy_buffering off;
- proxy_http_version 1.1;
- proxy_read_timeout 600s; ##设置SSE长链接保持时间为 600s
1、前端报错:EventSource’s response has a MIME type (“application/json”) that is not “text/event-stream”. Aborting the connection
前端在创建SSE长链接时,完整的请求(包括参数和参数值)都必须放在new EventSource(完整请求)中;
2、创建长链接时,接口状态一直处于pending,检查后端nginx是否做相应配置;
3、推送消息失败:检查客户端创建链接时的id,和推送消息时的id是否一致;
整体业务流程为:客户端创建链接——>服务端保持生成SseEmitter对象,并通过SseEmitter对象实现向客户端主动推送消息——>客户端收到推送消息后,刷新页面(根据推送消息,请求相关业务接口)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。