赞
踩
SSE介绍
SSE(Server-Sent Events)的全称是服务器推送事件,它是一种基于 HTTP 协议的实时通信技术,用于在客户端和服务器之间建立持久、单向的链接,允许服务器向客户端发送异步消息。
了解 websocket 的小伙伴,可能也知道它也是长连接,可以推送信息,但是它们有一个明显的区别:SSE 是单通道,只能服务端向客户端发消息;而 webscoket 是双通道,客户端和服务端可以相互发消息
SSE
1. 单向通信:SSE是一种服务器推送技术,服务器可以向客户端发送消息,但客户端无法主动发送消息到服务器。
2. 持久连接:SSE在单个HTTP连接上建立持久连接,服务器可以多次发送事件到客户端,客户端只需保持连接不关闭。
3 . 实时性:适用于需要从服务器获取实时更新的场景,如即时通知、实时数据更新
Websocket
1. 双向通信:WebSocket提供了全双工通信,客户端和服务器可以双向发送消息,不需要等待请求-响应。
2. 持久连接:WebSocket在单个TCP连接上实现持久连接,适用于双向通信的场景。
3. 实时性:非常适合实时性要求高的应用,如在线游戏、实时聊天等。
springboot中封装了sse代码,不需要额外的依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>cn.hutool</groupId>
- <artifactId>hutool-all</artifactId>
- <version>5.8.21</version>
- </dependency>
SseEmitterUtil工具类
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.http.MediaType;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.function.Consumer;
-
- /**
- * SSE长链接工具类
- */
- @Slf4j
- public class SseEmitterUtil {
-
- /**
- * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
- */
- private final static Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
-
- public static SseEmitter connect(Long userId) {
- // 设置超时时间,0表示不过期。默认30S,超时时间未完成会抛出异常:AsyncRequestTimeoutException
- SseEmitter sseEmitter = new SseEmitter(0L);
-
- // 注册回调
- sseEmitter.onCompletion(completionCallBack(userId));
- sseEmitter.onError(errorCallBack(userId));
- sseEmitter.onTimeout(timeoutCallBack(userId));
- sseEmitterMap.put(userId, sseEmitter);
-
- log.info("创建新的 SSE 连接,当前用户 {}, 连接总数 {}", userId, sseEmitterMap.size());
- return sseEmitter;
- }
-
- /**
- * 给制定用户发送消息
- *
- * @param userId 指定用户名
- * @param sseMessage 消息体
- */
- public static void sendMessage(Long userId, String sseMessage) {
- if (sseEmitterMap.containsKey(userId)) {
- try {
- sseEmitterMap.get(userId).send(sseMessage);
- log.info("用户 {} 推送消息 {}", userId, sseMessage);
- } catch (IOException e) {
- log.error("用户 {} 推送消息异常", userId, e);
- removeUser(userId);
- }
- } else {
- log.error("消息推送 用户 {} 不存在,链接总数 {}", userId, sseEmitterMap.size());
- }
- }
-
- /**
- * 群发消息
- */
- public static void batchSendMessage(String message, List<Long> ids) {
- ids.forEach(userId -> sendMessage(userId, message));
- }
-
- /**
- * 群发所有人
- */
- public static void batchSendMessage(String message) {
- sseEmitterMap.forEach((k, v) -> {
- try {
- v.send(message, MediaType.APPLICATION_JSON);
- } catch (IOException e) {
- log.error("用户 {} 推送异常", k, e);
- removeUser(k);
- }
- });
- }
-
- /**
- * 移除用户连接
- *
- * @param userId 用户 ID
- */
- public static void removeUser(Long userId) {
- if (sseEmitterMap.containsKey(userId)) {
- sseEmitterMap.get(userId).complete();
- sseEmitterMap.remove(userId);
- log.info("移除用户 {}, 剩余连接 {}", userId, sseEmitterMap.size());
- } else {
- log.error("消息推送 用户 {} 已被移除,剩余连接 {}", userId, sseEmitterMap.size());
- }
- }
-
- /**
- * 获取当前连接信息
- *
- * @return 所有的连接用户
- */
- public static List<Long> getIds() {
- return new ArrayList<>(sseEmitterMap.keySet());
- }
-
- /**
- * 获取当前的连接数量
- *
- * @return 当前的连接数量
- */
- public static int getUserCount() {
- return sseEmitterMap.size();
- }
-
- private static Runnable completionCallBack(Long userId) {
- return () -> {
- log.info("用户 {} 结束连接", userId);
- };
- }
-
- private static Runnable timeoutCallBack(Long userId) {
- return () -> {
- log.error("用户 {} 连接超时", userId);
- removeUser(userId);
- };
- }
-
- private static Consumer<Throwable> errorCallBack(Long userId) {
- return throwable -> {
- log.error("用户 {} 连接异常", userId);
- removeUser(userId);
- };
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- import cn.hutool.json.JSONUtil;
- import com.geb.common.utils.SseEmitterUtil;
- import com.geb.domain.SseMessage;
- import com.geb.domain.WdAgent;
- import io.grpc.internal.JsonUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-
- @Slf4j
- @RestController
- @RequestMapping("/sse")
- public class SseController {
-
- /**
- * 用于创建连接
- */
- @GetMapping("/connect/{userId}")
- public SseEmitter connect(@PathVariable Long userId) {
- return SseEmitterUtil.connect(userId);
- }
-
- /**
- * 关闭连接
- */
- @GetMapping("/close/{userid}")
- public void close(@PathVariable("userid") Long userid) {
- SseEmitterUtil.removeUser(userid);
- }
-
- @GetMapping("/sse")
- public void sse(){
- // 构建推送消息体
- SseMessage sseMessage = new SseMessage();
- sseMessage.setId(1L);
- sseMessage.setMsg("SSE测试发送消息");
- sseMessage.setAgentName("测试智能体推送消息");
- SseEmitterUtil.sendMessage(1L, JSONUtil.toJsonStr(sseMessage));
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这里使用postman测试,输入链接点击运行后,会自动处于链接状态
我们发送一条消息测试,我这里对消息实体进行了封装,在postman新建窗口输入测试请求
消息发送完毕,在postman就可以看到刚刚的消息
可以多发送几条消息看看效果,关闭链接调用关闭的接口即可。
这是个本地的测试,如果使用了Nginx反向代理和Gateway网关,具体的使用情况需要根据测试或者生产环境进行相应的配置。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。