赞
踩
- function MyMount(){
- //判空
- if (window.EventSource){
- //得到eventSource对象实例,这个就是js专门为sse封装的
- let eventSource = new EventSource('http://192.168.1.166:9011/api/sse/event?clientId=' + 123);
- eventSource.onmessage = (event) =>{
- console.log("收到消息id和内容是:",event.lastEventId,event.data)
- };
-
- //连接成功回调(第一次连接成功的回调)
- eventSource.onopen = (event) =>{
- console.log("第一次连接成功的回调:"+event)
- };
-
- //连接错误回调
- eventSource.onerror = (event) =>{
- console.log("连接错误回调 :关闭连接")
- console.log("error-event:",event)
- //关闭连接(关闭后不会再自动重连)
- eventSource.close();
- };
-
- eventSource.addEventListener('close', (event) => {
- console.log("Connection closed: " + event);
- // 在这里可以执行一些关闭后的处理逻辑
- });
- }else {
- console.log("你的浏览器不支持SSE")
- ElMessage.error("你的浏览器不支持SSE")
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package cn.com.onsafe.api.sse;
-
- import cn.com.onsafe.common.utils.SseUtils;
- import lombok.RequiredArgsConstructor;
- import org.apache.coyote.Response;
- import org.springframework.http.MediaType;
- import org.springframework.web.bind.annotation.*;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
- import reactor.core.publisher.Flux;
-
- import java.net.http.HttpResponse;
-
- /**
- * @Author: 双歌
- * @Data: 2024/7/11
- */
-
-
- @RestController
- @RequestMapping("/sse")
- @RequiredArgsConstructor
- public class SseController {
-
- private final SseUtils sseUtils;
-
-
- //设置Content-Type:text/event-stream
- @GetMapping(value = "/event", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) Long clientId) {
- return sseUtils.connect(clientId);
- }
-
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package cn.com.onsafe.common.utils;
-
- import cn.hutool.core.util.IdUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap; /**
- * @Author: 双歌
- * @Data: 2024/7/11
- */
-
- @Slf4j
- @Component
- public class SseUtils {
-
- private static final Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
-
- /**
- * 创建连接
- */
- public SseEmitter connect(Long userId) {
- if (sseEmitterMap.containsKey(userId)) {
- SseEmitter sseEmitter =sseEmitterMap.get(userId);
- sseEmitterMap.remove(userId);
- sseEmitter.complete();
- }
- try {
-
- // 设置超时时间,0表示不过期。默认30秒
- SseEmitter sseEmitter = new SseEmitter(60 * 1000L * 2);
-
- //发送一个空的SSE事件,这样可以确保客户端连接可以立即建立
- sseEmitter.send(SseEmitter.event().id("").data(""));
-
- //给sseEmitter对象注册两个回调函数
- //当SseEmitter完成时执行的回调。这通常用于清理工作或记录日志。
- sseEmitter.onCompletion(completionCallBack(userId));
- //当SseEmitter因超时而关闭时执行的回调。这允许在连接超时时执行一些额外的操作
- sseEmitter.onTimeout(timeoutCallBack(userId));
-
- sseEmitterMap.put(userId, sseEmitter);
- log.info("创建sse连接完成,当前用户:{}", userId);
- log.info("总用户数:{}", sseEmitterMap.size());
- return sseEmitter;
- } catch (Exception e) {
- log.info("创建sse连接异常,异常原因:{}", e.getMessage());
- }
- return null;
- }
-
- /**
- * 给指定用户发送消息
- * @param userId userId
- * @param messageId 消息标识
- * @param message 消息体,JSON格式
- * @return 布尔值
- */
- public boolean sendMessage(Long userId,String messageId, String message) {
- if (sseEmitterMap.containsKey(userId)) {
- SseEmitter sseEmitter = sseEmitterMap.get(userId);
- try {
- sseEmitter.send(SseEmitter.event().id(messageId).data(message));
- log.info("用户{},消息id:{},推送成功:{}", userId,messageId, message);
- return true;
- }catch (Exception e) {
- sseEmitterMap.remove(userId);
- log.info("用户{},消息id:{},推送异常:{}", userId,messageId, e.getMessage());
- sseEmitter.complete();
- return false;
- }
- }else {
- log.info("用户{}未上线", userId);
- }
- return false;
- }
-
- /**
- * 删除连接
- * @param userId userId
- */
- public void deleteUser(Long userId){
- removeUser(userId);
- }
-
- private static Runnable completionCallBack(Long userId) {
- return () -> {
- log.info("结束sse用户连接:{}", userId);
- removeUser(userId);
- };
- }
-
- private static Throwable errorCallBack(Long userId) {
- log.info("sse用户连接异常:{}", userId);
- removeUser(userId);
- return new Throwable();
- }
-
- /**
- * 连接超时回调
- */
- private static Runnable timeoutCallBack(Long userId) {
- return () -> {
- log.info("连接sse用户超时:{}", userId);
- removeUser(userId);
- };
- }
-
- /**
- * 断开连接
- * @param userId userID
- */
- public static void removeUser(Long userId){
- if (sseEmitterMap.containsKey(userId)) {
- SseEmitter sseEmitter = sseEmitterMap.get(userId);
- sseEmitterMap.remove(userId);
- //关闭与客户端的连接
- sseEmitter.complete();
- }else {
- log.info("用户{} 连接已关闭,当前连接总数量:{}",userId,sseEmitterMap.size());
- }
- }
-
- public Map<Long, SseEmitter> listSseConnect(){
- return sseEmitterMap;
- }
- }
-
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。