当前位置:   article > 正文

Springboot之整合SSE实现消息推送_springboot sse推送

springboot sse推送

前言

项目中涉及到部分请求,后端处理时间较长,使用常规Http请求,页面等待时间太长,对用户不友好,故考虑使用长链接进行消息推送,可选方案有WebSocket、SSE,WebSocket可实现双工通信,SSE仅支持服务端向客户端推送消息,根据实际使用场景,SSE即可满足,故选用SSE。

一、SSE是什么?

SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。

  • 注意:因为EventSource对象是SSE的客户端,可能会有浏览器对其不支持,但谷歌、火狐、360是可以的,IE不可以。
  • 优点:SSE和WebSocket相比,最大的优势是便利,服务端不需要其他的类库,开发难度较低,SSE和轮询相比它不用处理很多请求,不用每次建立新连接,延迟较低。
  • 缺点:如果客户端有很多,那就要保持很多长连接,这会占用服务器大量内存和连接数
  • sse 规范:在 html5 的定义中,服务端 sse,一般需要遵循以下要求:
    Content-Type: text/event-stream;
    charset=UTF-8Cache-Control: no-cache
    Connection: keep-alive

二、使用步骤

1.客户端代码示例

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>SseEmitter</title>
  6. </head>
  7. <body>
  8. <button onclick="closeSse()">关闭连接</button>
  9. <div id="message"></div>
  10. </body>
  11. <script>
  12. let source = null;
  13. // 用时间戳模拟登录用户
  14. const userId = new Date().getTime();
  15. if (!!window.EventSource) {
  16. // 建立连接
  17. source = new EventSource('http://ip:端口/CreateSseConnect?clientId=39bd662b7942418595c21a1ef0af7fad');
  18. /**
  19. * 连接一旦建立,就会触发open事件
  20. * 另一种写法:source.onopen = function (event) {}
  21. */
  22. source.addEventListener('open', function (e) {
  23. setMessageInnerHTML("建立连接。。。");
  24. }, false);
  25. /**
  26. * 客户端收到服务器发来的数据
  27. * 另一种写法:source.onmessage = function (event) {}
  28. */
  29. source.addEventListener('message', function (e) {
  30. setMessageInnerHTML(e.data);
  31. });
  32. /**
  33. * 如果发生通信错误(比如连接中断),就会触发error事件
  34. * 或者:
  35. * 另一种写法:source.onerror = function (event) {}
  36. */
  37. source.addEventListener('error', function (e) {
  38. if (e.readyState === EventSource.CLOSED) {
  39. setMessageInnerHTML("连接关闭");
  40. } else {
  41. console.log(e);
  42. }
  43. }, false);
  44. } else {
  45. setMessageInnerHTML("你的浏览器不支持SSE");
  46. }
  47. // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
  48. window.onbeforeunload = function () {
  49. closeSse();
  50. };
  51. // 关闭Sse连接
  52. function closeSse() {
  53. source.close();
  54. const httpRequest = new XMLHttpRequest();
  55. httpRequest.open('GET', 'http://localhost:8080/sse/CloseConnect/?clientId=e410d4c1d71c469b8d719de5d39783b7', true);
  56. httpRequest.send();
  57. console.log("close");
  58. }
  59. // 将消息显示在网页上
  60. function setMessageInnerHTML(innerHTML) {
  61. document.getElementById('message').innerHTML += innerHTML + '<br/>';
  62. }
  63. </script>
  64. </html>

2.服务端整合

Controller:

  1. /**
  2. * SSE长链接
  3. */
  4. @RestController
  5. @RequestMapping("/sse")
  6. public class SseEmitterController {
  7. @Autowired
  8. private SseEmitterService sseEmitterService;
  9. /**
  10. * 创建SSE长链接
  11. *
  12. * @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端)
  13. * @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
  14. **/
  15. @CrossOrigin //如果nginx做了跨域处理,此处可去掉
  16. @GetMapping("/CreateSseConnect")
  17. public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {
  18. return sseEmitterService.createSseConnect(clientId);
  19. }
  20. /**
  21. * 关闭SSE连接
  22. *
  23. * @param clientId 客户端ID
  24. **/
  25. @GetMapping("/CloseSseConnect")
  26. public Result closeSseConnect(String clientId) {
  27. sseEmitterService.closeSseConnect(clientId);
  28. return ResultGenerator.genSuccessResult(true);
  29. }
  30. }

ServiceImpl

  1. @Service
  2. public class SseEmitterServiceImpl implements SseEmitterService {
  3. /**
  4. * 容器,保存连接,用于输出返回
  5. */
  6. private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
  7. @Override
  8. public SseEmitter createSseConnect(String clientId) {
  9. // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
  10. SseEmitter sseEmitter = new SseEmitter(0L);
  11. // 是否需要给客户端推送ID
  12. if (StringUtils.isBlank(clientId)) {
  13. clientId = IdUtil.simpleUUID();
  14. }
  15. // 注册回调
  16. sseEmitter.onCompletion(completionCallBack(clientId));
  17. sseCache.put(clientId, sseEmitter);
  18. logger.info("创建新的sse连接,当前用户:{}", clientId);
  19. try {
  20. sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId));
  21. } catch (IOException e) {
  22. logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);
  23. throw new BusinessException("创建连接异常!", e);
  24. }
  25. return sseEmitter;
  26. }
  27. @Override
  28. public void closeSseConnect(String clientId) {
  29. SseEmitter sseEmitter = sseCache.get(clientId);
  30. if (sseEmitter != null) {
  31. sseEmitter.complete();
  32. removeUser(clientId);
  33. }
  34. }
  35. // 根据客户端id获取SseEmitter对象
  36. @Override
  37. public SseEmitter getSseEmitterByClientId(String clientId) {
  38. return sseCache.get(clientId);
  39. }
  40. // 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息
  41. @Override
  42. public void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList) {
  43. if (CollectionUtil.isEmpty(sseCache)) {
  44. return;
  45. }
  46. for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {
  47. sendMsgToClientByClientId(entry.getKey(), sseEmitterResultVOList, entry.getValue());
  48. }
  49. }
  50. /**
  51. * 推送消息到客户端
  52. * 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
  53. *
  54. * @param clientId 客户端ID
  55. * @param sseEmitterResultVOList 推送信息,此处结合具体业务,定义自己的返回值即可
  56. **/
  57. private void sendMsgToClientByClientId(String clientId, List<SseEmitterResultVO> sseEmitterResultVOList, SseEmitter sseEmitter) {
  58. if (sseEmitter == null) {
  59. logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",
  60. clientId, sseEmitterResultVOList.toString());
  61. return;
  62. }
  63. SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(SseEmitterConstant.TASK_RESULT).data(sseEmitterResultVOList, MediaType.APPLICATION_JSON);
  64. try {
  65. sseEmitter.send(sendData);
  66. } catch (IOException e) {
  67. // 推送消息失败,记录错误日志,进行重推
  68. logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList.toString(), e);
  69. boolean isSuccess = true;
  70. // 推送消息失败后,每隔10s推送一次,推送5
  71. for (int i = 0; i < 5; i++) {
  72. try {
  73. Thread.sleep(10000);
  74. sseEmitter = sseCache.get(clientId);
  75. if (sseEmitter == null) {
  76. logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
  77. continue;
  78. }
  79. sseEmitter.send(sendData);
  80. } catch (Exception ex) {
  81. logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);
  82. continue;
  83. }
  84. logger.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList.toString());
  85. return;
  86. }
  87. }
  88. }
  89. /**
  90. * 长链接完成后回调接口(即关闭连接时调用)
  91. *
  92. * @param clientId 客户端ID
  93. * @return java.lang.Runnable
  94. **/
  95. private Runnable completionCallBack(String clientId) {
  96. return () -> {
  97. logger.info("结束连接:{}", clientId);
  98. removeUser(clientId);
  99. };
  100. }
  101. /**
  102. * 连接超时时调用
  103. *
  104. * @param clientId 客户端ID
  105. * @return java.lang.Runnable
  106. **/
  107. private Runnable timeoutCallBack(String clientId) {
  108. return () -> {
  109. logger.info("连接超时:{}", clientId);
  110. removeUser(clientId);
  111. };
  112. }
  113. /**
  114. * 推送消息异常时,回调方法
  115. *
  116. * @param clientId 客户端ID
  117. * @return java.util.function.Consumer<java.lang.Throwable>
  118. **/
  119. private Consumer<Throwable> errorCallBack(String clientId) {
  120. return throwable -> {
  121. logger.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);
  122. // 推送消息失败后,每隔10s推送一次,推送5
  123. for (int i = 0; i < 5; i++) {
  124. try {
  125. Thread.sleep(10000);
  126. SseEmitter sseEmitter = sseCache.get(clientId);
  127. if (sseEmitter == null) {
  128. logger.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
  129. continue;
  130. }
  131. sseEmitter.send("失败后重新推送");
  132. } catch (Exception e) {
  133. e.printStackTrace();
  134. }
  135. }
  136. };
  137. }
  138. /**
  139. * 移除用户连接
  140. *
  141. * @param clientId 客户端ID
  142. **/
  143. private void removeUser(String clientId) {
  144. sseCache.remove(clientId);
  145. logger.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
  146. }
  147. }

3. Nginx配置

如果项目中使用nginx对后端服务做了代理,nginx代理转发后,默认会在1min的时候断掉长链接,SSE需要设置自己的长链接时间,则需要在nginx中进行配置;
在反向代理的location块中加入如下配置

  1. proxy_set_header Host $http_host; ##proxy_set_header用来重定义发往后端服务器的请求头
  2. proxy_set_header X-Real-IP $remote_addr;
  3. proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  4. proxy_set_header X-Forwarded-Proto $scheme;
  5. proxy_buffering off;
  6. proxy_http_version 1.1;
  7. proxy_read_timeout 600s; ##设置SSE长链接保持时间为 600s

4. 请求示例

在这里插入图片描述
在这里插入图片描述

常见问题

1、前端报错:EventSource’s response has a MIME type (“application/json”) that is not “text/event-stream”. Aborting the connection
前端在创建SSE长链接时,完整的请求(包括参数和参数值)都必须放在new EventSource(完整请求)中;
2、创建长链接时,接口状态一直处于pending,检查后端nginx是否做相应配置;
3、推送消息失败:检查客户端创建链接时的id,和推送消息时的id是否一致;

总结

整体业务流程为:客户端创建链接——>服务端保持生成SseEmitter对象,并通过SseEmitter对象实现向客户端主动推送消息——>客户端收到推送消息后,刷新页面(根据推送消息,请求相关业务接口)

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号