当前位置:   article > 正文

SSE的使用(Vue3+springBoot3)_springboot sse vue

springboot sse vue

1、前端js代码

  1. function MyMount(){
  2. //判空
  3. if (window.EventSource){
  4. //得到eventSource对象实例,这个就是js专门为sse封装的
  5. let eventSource = new EventSource('http://192.168.1.166:9011/api/sse/event?clientId=' + 123);
  6. eventSource.onmessage = (event) =>{
  7. console.log("收到消息id和内容是:",event.lastEventId,event.data)
  8. };
  9. //连接成功回调(第一次连接成功的回调)
  10. eventSource.onopen = (event) =>{
  11. console.log("第一次连接成功的回调:"+event)
  12. };
  13. //连接错误回调
  14. eventSource.onerror = (event) =>{
  15. console.log("连接错误回调 :关闭连接")
  16. console.log("error-event:",event)
  17. //关闭连接(关闭后不会再自动重连)
  18. eventSource.close();
  19. };
  20. eventSource.addEventListener('close', (event) => {
  21. console.log("Connection closed: " + event);
  22. // 在这里可以执行一些关闭后的处理逻辑
  23. });
  24. }else {
  25. console.log("你的浏览器不支持SSE")
  26. ElMessage.error("你的浏览器不支持SSE")
  27. }
  28. }

2、后端代码(springboot)

2.1、接口写法

  1. package cn.com.onsafe.api.sse;
  2. import cn.com.onsafe.common.utils.SseUtils;
  3. import lombok.RequiredArgsConstructor;
  4. import org.apache.coyote.Response;
  5. import org.springframework.http.MediaType;
  6. import org.springframework.web.bind.annotation.*;
  7. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  8. import reactor.core.publisher.Flux;
  9. import java.net.http.HttpResponse;
  10. /**
  11. * @Author: 双歌
  12. * @Data: 2024/7/11
  13. */
  14. @RestController
  15. @RequestMapping("/sse")
  16. @RequiredArgsConstructor
  17. public class SseController {
  18. private final SseUtils sseUtils;
  19. //设置Content-Type:text/event-stream
  20. @GetMapping(value = "/event", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  21. public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) Long clientId) {
  22. return sseUtils.connect(clientId);
  23. }
  24. }

2.2、工具类

  1. package cn.com.onsafe.common.utils;
  2. import cn.hutool.core.util.IdUtil;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  6. import java.util.Map;
  7. import java.util.concurrent.ConcurrentHashMap; /**
  8. * @Author: 双歌
  9. * @Data: 2024/7/11
  10. */
  11. @Slf4j
  12. @Component
  13. public class SseUtils {
  14. private static final Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
  15. /**
  16. * 创建连接
  17. */
  18. public SseEmitter connect(Long userId) {
  19. if (sseEmitterMap.containsKey(userId)) {
  20. SseEmitter sseEmitter =sseEmitterMap.get(userId);
  21. sseEmitterMap.remove(userId);
  22. sseEmitter.complete();
  23. }
  24. try {
  25. // 设置超时时间,0表示不过期。默认30秒
  26. SseEmitter sseEmitter = new SseEmitter(60 * 1000L * 2);
  27. //发送一个空的SSE事件,这样可以确保客户端连接可以立即建立
  28. sseEmitter.send(SseEmitter.event().id("").data(""));
  29. //给sseEmitter对象注册两个回调函数
  30. //当SseEmitter完成时执行的回调。这通常用于清理工作或记录日志。
  31. sseEmitter.onCompletion(completionCallBack(userId));
  32. //当SseEmitter因超时而关闭时执行的回调。这允许在连接超时时执行一些额外的操作
  33. sseEmitter.onTimeout(timeoutCallBack(userId));
  34. sseEmitterMap.put(userId, sseEmitter);
  35. log.info("创建sse连接完成,当前用户:{}", userId);
  36. log.info("总用户数:{}", sseEmitterMap.size());
  37. return sseEmitter;
  38. } catch (Exception e) {
  39. log.info("创建sse连接异常,异常原因:{}", e.getMessage());
  40. }
  41. return null;
  42. }
  43. /**
  44. * 给指定用户发送消息
  45. * @param userId userId
  46. * @param messageId 消息标识
  47. * @param message 消息体,JSON格式
  48. * @return 布尔值
  49. */
  50. public boolean sendMessage(Long userId,String messageId, String message) {
  51. if (sseEmitterMap.containsKey(userId)) {
  52. SseEmitter sseEmitter = sseEmitterMap.get(userId);
  53. try {
  54. sseEmitter.send(SseEmitter.event().id(messageId).data(message));
  55. log.info("用户{},消息id:{},推送成功:{}", userId,messageId, message);
  56. return true;
  57. }catch (Exception e) {
  58. sseEmitterMap.remove(userId);
  59. log.info("用户{},消息id:{},推送异常:{}", userId,messageId, e.getMessage());
  60. sseEmitter.complete();
  61. return false;
  62. }
  63. }else {
  64. log.info("用户{}未上线", userId);
  65. }
  66. return false;
  67. }
  68. /**
  69. * 删除连接
  70. * @param userId userId
  71. */
  72. public void deleteUser(Long userId){
  73. removeUser(userId);
  74. }
  75. private static Runnable completionCallBack(Long userId) {
  76. return () -> {
  77. log.info("结束sse用户连接:{}", userId);
  78. removeUser(userId);
  79. };
  80. }
  81. private static Throwable errorCallBack(Long userId) {
  82. log.info("sse用户连接异常:{}", userId);
  83. removeUser(userId);
  84. return new Throwable();
  85. }
  86. /**
  87. * 连接超时回调
  88. */
  89. private static Runnable timeoutCallBack(Long userId) {
  90. return () -> {
  91. log.info("连接sse用户超时:{}", userId);
  92. removeUser(userId);
  93. };
  94. }
  95. /**
  96. * 断开连接
  97. * @param userId userID
  98. */
  99. public static void removeUser(Long userId){
  100. if (sseEmitterMap.containsKey(userId)) {
  101. SseEmitter sseEmitter = sseEmitterMap.get(userId);
  102. sseEmitterMap.remove(userId);
  103. //关闭与客户端的连接
  104. sseEmitter.complete();
  105. }else {
  106. log.info("用户{} 连接已关闭,当前连接总数量:{}",userId,sseEmitterMap.size());
  107. }
  108. }
  109. public Map<Long, SseEmitter> listSseConnect(){
  110. return sseEmitterMap;
  111. }
  112. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/849984
推荐阅读
相关标签
  

闽ICP备14008679号