当前位置:   article > 正文

Spring Boot 整合 SSE(Server Sent Event)_springboot sse

springboot sse

服务器发送事件(Server-Sent Events),简称 SSE。这是一种服务器端到客户端的单向消息推送。SSE 基于 HTTP 协议的,SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息

后端代码:

  1. import cn.hutool.core.util.IdUtil;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.stereotype.Component;
  4. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  5. import java.util.Map;
  6. import java.util.concurrent.ConcurrentHashMap;
  7. import java.util.function.Consumer;
  8. @Slf4j
  9. @Component
  10. public class SseUtil {
  11. private static final Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
  12. /**
  13. * 创建连接
  14. */
  15. public SseEmitter connect(Long userId, Consumer<Throwable> errorCallback,Runnable timeOutCallback) {
  16. if (sseEmitterMap.containsKey(userId)) {
  17. SseEmitter sseEmitter =sseEmitterMap.get(userId);
  18. sseEmitterMap.remove(userId);
  19. sseEmitter.complete();
  20. }
  21. try {
  22. // 设置超时时间,0表示不过期。默认30秒
  23. SseEmitter sseEmitter = new SseEmitter(5*60*1000L);
  24. sseEmitter.send(SseEmitter.event().id(IdUtil.simpleUUID()).reconnectTime(1*60*1000L).data(""));
  25. // 注册回调
  26. sseEmitter.onCompletion(() -> {
  27. });
  28. sseEmitter.onError(errorCallback);
  29. sseEmitter.onTimeout(timeOutCallback);
  30. sseEmitterMap.put(userId, sseEmitter);
  31. log.info("创建sse连接完成,当前用户:{}", userId);
  32. return sseEmitter;
  33. } catch (Exception e) {
  34. log.info("创建sse连接异常,当前用户:{}", userId);
  35. }
  36. return null;
  37. }
  38. /**
  39. * 给指定用户发送消息
  40. *
  41. */
  42. public boolean sendMessage(Long userId,String messageId, String message) {
  43. if (sseEmitterMap.containsKey(userId)) {
  44. SseEmitter sseEmitter = sseEmitterMap.get(userId);
  45. try {
  46. sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(1*60*1000L).data(message));
  47. log.info("用户{},消息id:{},推送成功:{}", userId,messageId, message);
  48. return true;
  49. }catch (Exception e) {
  50. sseEmitterMap.remove(userId);
  51. log.info("用户{},消息id:{},推送异常:{}", userId,messageId, e.getMessage());
  52. sseEmitter.complete();
  53. return false;
  54. }
  55. }else {
  56. log.info("用户{}未上线", userId);
  57. }
  58. return false;
  59. }
  60. /**
  61. * 断开
  62. * @param userId
  63. */
  64. public void removeUser(Long userId){
  65. if (sseEmitterMap.containsKey(userId)) {
  66. SseEmitter sseEmitter = sseEmitterMap.get(userId);
  67. sseEmitterMap.remove(userId);
  68. sseEmitter.complete();
  69. }else {
  70. log.info("用户{} 连接已关闭",userId);
  71. }
  72. }
  73. }

细节:

  1. 创建SseEmitter 对象时需要返回给客户端,且不能二次包装
  2. 建立连接后,浏览器会处于加载中的状态,直到SseEmitter发送消息,或者连接超时和关闭,所以连接后就像进行了一次空消息的发送,避免浏览器一直处于加载中
  3. SseEmitter 中, timeout 属性表示 SseEmitter 在发送 SSE 事件到客户端时的超时时间。也就是说,当您使用 SseEmittersend() 方法来发送 SSE 事件时,如果超过了 timeout 属性指定的时间,则将抛出 AsyncRequestTimeoutException 异常。客户端会进行自动重连,这个异常最好直接交给spring处理,因为这个请求是text/event-stream,全局异常处理可能会报错
  4. SseEventBuilder 中, timeout 属性表示当前正在构建的 SSE 事件的超时时间。也就是说,当您调用 SseEventBuilderbuild() 方法来构建 SSE 事件时,如果超过了 timeout 属性指定的时间,则将抛出 SseEventTimeoutException 异常

前端代码:

  1. <script>
  2. if (window.EventSource) {
  3. // 建立连接
  4. source = new EventSource(http://localhost:8080/test/sse?id=1);
  5. source.onopen = function (event) {
  6. console.log('SSE链接成功');
  7. }
  8. source.onmessage = function (event) {
  9. if(event.data){
  10. console.log('后端返回的数据:', data.value);
  11. }
  12. }
  13. source.onerror = (error) => {
  14. console.log('SSE链接失败');
  15. };
  16. } else {
  17. alert("你的浏览器不支持SSE");
  18. }
  19. </script>


 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/335484
推荐阅读
相关标签
  

闽ICP备14008679号