赞
踩
在开发过程中,我们经常遇到需要对前台的列表数据,实现实时展示最新的几条数据,或者是调度的任务进度条实现实时的刷新......,而对于这种需求,无状态的http协议显然无法满足我们的需求,于是websocket协议应运而生。websocket协议本质上是一个基于tcp的协议,是双向通讯协议,实现了浏览器和客户端的实时通讯,接收端和发送端可以互相发送或接收消息。
本文整合websocket方式采用后台自定义Endpoint,前端使用内置的WebSocket。
- SpringBoot:2.5.13
- JDK:1.8
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
在配置类中手动注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket端点。@ServerEndpoint注解与@Controller注解类似,都是用来配置请求的uri。
- @Configuration
- public class WebSocketConfig {
-
- /**
- * ServerEndpointExporter类的作用是,会扫描所有的服务器端点,
- * 把带有@ServerEndpoint 注解的所有类都添加进来
- *
- */
- @Bean
- public ServerEndpointExporter serverEndpointExporter(){
- return new ServerEndpointExporter();
- }
-
- }
server类用来对与前端建立的websocket连接做出相应的响应,同时通过该类我们可以主动向前台推送消息。在该类中我们无法使用通过@Resource和@Autowired注入spring容器中的bean,由于spring容器管理的bean都是以单例的形式存在的,而websocket服务类则可以对应多个客户端。 项目初始化启动时,会初始化websocket服务类,此时还没有用户连接,spring会为其注入service, 所以该对象的service不是null,而当新用户建立websocket连接时,系统会新建一个websocket服务类对象, 但不会注入service,导致后续用户连接websocket服务类中的service都是null。
- @Slf4j
- @Component
- @ServerEndpoint("/notice/{userId}")
- public class WebSocketServer {
-
- /**
- * 解决无法注入bean:定义静态service对象,通过@Autowired在系统启动时为静态变量赋值
- * @Autowired 注解作用在方法上,如果方法没有参数,spring容器会在类加载完后执行一次这个方法,
- * 如果方法中有参数的话,还会从容器中自动注入这个方法的参数,然后执行一次这个方法。
- */
- public static XxService xxService;
-
- @Autowired
- public void setXxService(XxService xxService){
- WebSocketServer.xxService = xxService;
- }
-
- //存储客户端session信息
- public static Map<String, Session> clients = new ConcurrentHashMap<>();
-
- //存储把不同用户的客户端session信息集合
- public static Map<String, Set<String>> connection = new ConcurrentHashMap<>();
-
- //会话id
- private String sid = null;
-
- //建立连接的用户id
- private String userId;
-
- /**
- * @description: 当与前端的websocket连接成功时,执行该方法
- * @PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 @PathVariable注解
- **/
- @OnOpen
- public void onOpen(Session session, @PathParam("userId") String userId){
- this.sid = UUID.randomUUID().toString();
- this.userId = userId;
- clients.put(this.sid,session);
- //判断该用户是否存在会话信息,不存在则添加
- Set<String> clientSet = connection.get(userId);
- if (clientSet == null){
- clientSet = new HashSet<>();
- connection.put(userId,clientSet);
- }
- clientSet.add(this.sid);
- log.info(this.userId + "用户建立连接," + this.sid+"连接开启!");
- }
-
- /**
- * @description: 当连接失败时,执行该方法
- **/
- @OnClose
- public void onClose(){
- clients.remove(this.sid);
- log.info(this.sid+"连接断开");
- }
-
- /**
- * @description: 当收到前台发送的消息时,执行该方法
- **/
- @OnMessage
- public void onMessage(String message,Session session) {
- log.info("收到来自用户:" + this.userId + "的信息 " + message);
- //自定义消息实体
- ViewQueryInfoDTO viewQueryInfoDTO = JSON.parseObject(message, ViewQueryInfoDTO.class);
- viewQueryInfoDTO.setUserId(this.userId);
- //判断该次请求的消息类型是心跳检测还是获取信息
- if (viewQueryInfoDTO.getType().equals("heartbeat")){
- //立刻向前台发送消息,代表后台正常运行
- sendMessageByUserId(this.userId,new MessageInfo("heartbeat","ok"));
- }
- if (viewQueryInfoDTO.getType().equals("message")){
- //执行业务逻辑
- MessageInfo messageInfo = xxService.list(viewQueryInfoDTO);
- sendMessageByUserId(this.userId,messageInfo);
- }
- }
-
- /**
- * @description: 当连接发生错误时,执行该方法
- **/
- @OnError
- public void onError(Throwable error){
- log.info("系统错误");
- error.printStackTrace();
- }
-
- /**
- * @description: 通过userId向用户发送信息
- * 该类定义成静态可以配合定时任务实现定时推送
- **/
- public static void sendMessageByUserId(String userId, MessageInfo message){
- if (!StringUtils.isEmpty(userId)) {
- Set<String> clientSet = connection.get(userId);
- //用户是否存在客户端连接
- if (Objects.nonNull(clientSet)) {
- Iterator<String> iterator = clientSet.iterator();
- while (iterator.hasNext()) {
- String sid = iterator.next();
- Session session = clients.get(sid);
- //向每个会话发送消息
- if (Objects.nonNull(session)){
- try {
- String jsonString = JSON.toJSONString(message);
- //同步发送数据,需要等上一个sendText发送完成才执行下一个发送
- session.getBasicRemote().sendText(jsonString);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
- }
系统中使用权限框架时,需要对端点进行放行。
- @Override
- protected void configure(HttpSecurity httpSecurity) throws Exception
- {
- httpSecurity.authorizeRequests().antMatchers("/notice/**").anonymous();
- }
如果请求还是被拦截,则再加入下面的配置。
- //忽略websocket拦截
- @Override
- public void configure(WebSecurity webSecurity){
- webSecurity.ignoring().antMatchers("/notice/**");
- }
websocket在线调试网址:websocket/ws/wss在线调试测试工具 (jackxiang.com)
模拟前台: 后台信息:
- //暴露自定义websocket对象
- export const socket = {
- //后台请求路径
- url: "",
- //websocket对象
- websocket: null,
- //websocket状态
- websocketState: false,
- //重新连接次数
- reconnectNum: 0,
- //重连锁状态,保证重连按顺序执行
- lockReconnect: false,
- //定时器信息
- timeout: null,
- clientTimeout: null,
- serverTimeout: null,
- //初始化方法,根据url创建websocket对象封装基本连接方法,并重置心跳检测
- initWebSocket(newUrl) {
- socket.url = newUrl;
- socket.websocket = new WebSocket(socket.url);
- socket.websocket.onopen = socket.websocketOnOpen;
- socket.websocket.onerror = socket.websocketOnError;
- socket.websocket.onclose = socket.websocketOnClose;
- this.resetHeartbeat()
- },
- reconnect() {
- //判断连接状态
- if (socket.lockReconnect) return;
- socket.reconnectNum += 1;
- //重新连接三次还未成功调用连接关闭方法
- if (socket.reconnectNum === 3) {
- socket.reconnectNum = 0;
- socket.websocket.onclose()
- return;
- }
- //等待本次重连完成后再进行下一次
- socket.lockReconnect = true;
- //5s后进行重新连接
- socket.timeout = setTimeout(() => {
- socket.initWebSocket(socket.url);
- socket.lockReconnect = false;
- }, 5000);
- },
- //重置心跳检测
- resetHeartbeat() {
- socket.heartbeat();
- },
- //心跳检测
- heartbeat() {
- socket.clientTimeout = setTimeout(() => {
- if (socket.websocket) {
- //向后台发送消息进行心跳检测
- socket.websocket.send(JSON.stringify({ type: "heartbeat" }));
- socket.websocketState = false;
- //一分钟内服务器不响应则关闭连接
- socket.serverTimeout = setTimeout(() => {
- if (!socket.websocketState) {
- socket.websocket.onclose()
- } else {
- this.resetHeartbeat()
- }
- }, 60 * 1000);
- }
- }, 3 * 1000);
- },
- //发送消息
- sendMsg(message) {
- socket.websocket.send(message);
- },
- websocketOnOpen(event) {
- //连接开启后向后台发送消息进行一次心跳检测
- socket.sendMsg(JSON.stringify({ type: "heartbeat" }));
- },
- websocketOnError(error) {
- console.log(error);
- socket.reconnect();
- },
- websocketOnClose() {
- socket.websocket.close();
- },
- };
- <script>
- //引入socket对象
- import { socket } from "@/utils/websocket";
- export default {
- data() {
- return {
- loading: true,
- websocketCount: -1,
- //查询条件
- queryCondition: {
- type: "message",
- },
- }
- },
- created() {
- //初始化websocket对象
- //window.location.host获取ip和端口,
- //process.env.VUE_APP_WEBSOCKET_BASE_API获取请求前缀
- socket.initWebSocket(
- `ws:${window.location.host}${process.env.VUE_APP_WEBSOCKET_BASE_API}/notice/` +
- userId
- );
- //绑定接收消息方法
- socket.websocket.onmessage = this.websocketOnMessage;
- },
- methods: {
- init() {
- this.queryCondition.type = "message";
- socket.sendMsg(JSON.stringify(this.queryCondition));
- },
- websocketOnMessage(event) {
- //初始化界面时,主动向后台发送一次消息,获取数据
- this.websocketCount += 1;
- if (this.websocketCount === 0) {
- this.init();
- }
- let info = JSON.parse(event.data);
- switch (info.type) {
- case "heartbeat":
- socket.websocketState = true;
- break;
- case "message":
- this.loading = true;
- this.$nextTick(() => {
- this.consumeMessage(info);
- })
- break;
- case "error":
- this.loading = false;
- break;
- }
- },
- consumeMessage(info) {
- //拿到最新数据重新渲染界面
- },
- }
- }
- </script>
前后端可以正常通信响应数据,至此整合websocket完毕。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。