当前位置:   article > 正文

Springboot集成SSE消息推送_springboot sse推送

springboot sse推送

SSE介绍
        SSE(Server-Sent Events)的全称是服务器推送事件,它是一种基于 HTTP 协议的实时通信技术,用于在客户端和服务器之间建立持久、单向的链接,允许服务器向客户端发送异步消息。

        了解 websocket 的小伙伴,可能也知道它也是长连接,可以推送信息,但是它们有一个明显的区别:SSE 是单通道,只能服务端向客户端发消息;而 webscoket 是双通道,客户端和服务端可以相互发消息

SSE
        1. 单向通信:SSE是一种服务器推送技术,服务器可以向客户端发送消息,但客户端无法主动发送消息到服务器。
        2. 持久连接:SSE在单个HTTP连接上建立持久连接,服务器可以多次发送事件到客户端,客户端只需保持连接不关闭。

        3 . 实时性:适用于需要从服务器获取实时更新的场景,如即时通知、实时数据更新

Websocket
        1. 双向通信:WebSocket提供了全双工通信,客户端和服务器可以双向发送消息,不需要等待请求-响应。
        2. 持久连接:WebSocket在单个TCP连接上实现持久连接,适用于双向通信的场景。
        3. 实时性:非常适合实时性要求高的应用,如在线游戏、实时聊天等。

使用方法:

Maven依赖

springboot中封装了sse代码,不需要额外的依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>cn.hutool</groupId>
  7. <artifactId>hutool-all</artifactId>
  8. <version>5.8.21</version>
  9. </dependency>

SseEmitterUtil工具类

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.http.MediaType;
  3. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  4. import java.io.IOException;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.concurrent.ConcurrentHashMap;
  9. import java.util.function.Consumer;
  10. /**
  11. * SSE长链接工具类
  12. */
  13. @Slf4j
  14. public class SseEmitterUtil {
  15. /**
  16. * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
  17. */
  18. private final static Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
  19. public static SseEmitter connect(Long userId) {
  20. // 设置超时时间,0表示不过期。默认30S,超时时间未完成会抛出异常:AsyncRequestTimeoutException
  21. SseEmitter sseEmitter = new SseEmitter(0L);
  22. // 注册回调
  23. sseEmitter.onCompletion(completionCallBack(userId));
  24. sseEmitter.onError(errorCallBack(userId));
  25. sseEmitter.onTimeout(timeoutCallBack(userId));
  26. sseEmitterMap.put(userId, sseEmitter);
  27. log.info("创建新的 SSE 连接,当前用户 {}, 连接总数 {}", userId, sseEmitterMap.size());
  28. return sseEmitter;
  29. }
  30. /**
  31. * 给制定用户发送消息
  32. *
  33. * @param userId 指定用户名
  34. * @param sseMessage 消息体
  35. */
  36. public static void sendMessage(Long userId, String sseMessage) {
  37. if (sseEmitterMap.containsKey(userId)) {
  38. try {
  39. sseEmitterMap.get(userId).send(sseMessage);
  40. log.info("用户 {} 推送消息 {}", userId, sseMessage);
  41. } catch (IOException e) {
  42. log.error("用户 {} 推送消息异常", userId, e);
  43. removeUser(userId);
  44. }
  45. } else {
  46. log.error("消息推送 用户 {} 不存在,链接总数 {}", userId, sseEmitterMap.size());
  47. }
  48. }
  49. /**
  50. * 群发消息
  51. */
  52. public static void batchSendMessage(String message, List<Long> ids) {
  53. ids.forEach(userId -> sendMessage(userId, message));
  54. }
  55. /**
  56. * 群发所有人
  57. */
  58. public static void batchSendMessage(String message) {
  59. sseEmitterMap.forEach((k, v) -> {
  60. try {
  61. v.send(message, MediaType.APPLICATION_JSON);
  62. } catch (IOException e) {
  63. log.error("用户 {} 推送异常", k, e);
  64. removeUser(k);
  65. }
  66. });
  67. }
  68. /**
  69. * 移除用户连接
  70. *
  71. * @param userId 用户 ID
  72. */
  73. public static void removeUser(Long userId) {
  74. if (sseEmitterMap.containsKey(userId)) {
  75. sseEmitterMap.get(userId).complete();
  76. sseEmitterMap.remove(userId);
  77. log.info("移除用户 {}, 剩余连接 {}", userId, sseEmitterMap.size());
  78. } else {
  79. log.error("消息推送 用户 {} 已被移除,剩余连接 {}", userId, sseEmitterMap.size());
  80. }
  81. }
  82. /**
  83. * 获取当前连接信息
  84. *
  85. * @return 所有的连接用户
  86. */
  87. public static List<Long> getIds() {
  88. return new ArrayList<>(sseEmitterMap.keySet());
  89. }
  90. /**
  91. * 获取当前的连接数量
  92. *
  93. * @return 当前的连接数量
  94. */
  95. public static int getUserCount() {
  96. return sseEmitterMap.size();
  97. }
  98. private static Runnable completionCallBack(Long userId) {
  99. return () -> {
  100. log.info("用户 {} 结束连接", userId);
  101. };
  102. }
  103. private static Runnable timeoutCallBack(Long userId) {
  104. return () -> {
  105. log.error("用户 {} 连接超时", userId);
  106. removeUser(userId);
  107. };
  108. }
  109. private static Consumer<Throwable> errorCallBack(Long userId) {
  110. return throwable -> {
  111. log.error("用户 {} 连接异常", userId);
  112. removeUser(userId);
  113. };
  114. }
  115. }
Controller层
  1. import cn.hutool.json.JSONUtil;
  2. import com.geb.common.utils.SseEmitterUtil;
  3. import com.geb.domain.SseMessage;
  4. import com.geb.domain.WdAgent;
  5. import io.grpc.internal.JsonUtil;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.PathVariable;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.RestController;
  11. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  12. @Slf4j
  13. @RestController
  14. @RequestMapping("/sse")
  15. public class SseController {
  16. /**
  17. * 用于创建连接
  18. */
  19. @GetMapping("/connect/{userId}")
  20. public SseEmitter connect(@PathVariable Long userId) {
  21. return SseEmitterUtil.connect(userId);
  22. }
  23. /**
  24. * 关闭连接
  25. */
  26. @GetMapping("/close/{userid}")
  27. public void close(@PathVariable("userid") Long userid) {
  28. SseEmitterUtil.removeUser(userid);
  29. }
  30. @GetMapping("/sse")
  31. public void sse(){
  32. // 构建推送消息体
  33. SseMessage sseMessage = new SseMessage();
  34. sseMessage.setId(1L);
  35. sseMessage.setMsg("SSE测试发送消息");
  36. sseMessage.setAgentName("测试智能体推送消息");
  37. SseEmitterUtil.sendMessage(1L, JSONUtil.toJsonStr(sseMessage));
  38. }
  39. }
测试

这里使用postman测试,输入链接点击运行后,会自动处于链接状态

我们发送一条消息测试,我这里对消息实体进行了封装,在postman新建窗口输入测试请求

消息发送完毕,在postman就可以看到刚刚的消息

可以多发送几条消息看看效果,关闭链接调用关闭的接口即可。

这是个本地的测试,如果使用了Nginx反向代理和Gateway网关,具体的使用情况需要根据测试或者生产环境进行相应的配置。

文章参考:Springboot集成SSE消息推送_springboot 集成sse推送-CSDN博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/849954
推荐阅读
相关标签
  

闽ICP备14008679号