赞
踩
springboot
实现websocket
有4
种方式
servlet,spring,netty,stomp
使用下来spring
方式是最简单的.
springboot
版本:3.1.2
jdk:17
当前依赖版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>3.1.2</version>
</dependency>
demo
借鉴了ruoyi-plus
spring
实现websocket
处理核心就是AbstractWebSocketHandler+HandshakeInterceptor
HandshakeInterceptor
实现握手前后处理,比如握手前鉴权
AbstractWebSocketHandler
实现消息的接收
,监听连接的建立+关闭
,顺序在HandshakeInterceptor
之后.
拦截器
package org.xxx.xxx.websocket.interceptor; import cn.dev33.satoken.session.SaSession; import cn.dev33.satoken.stp.StpUtil; import cn.hutool.core.lang.Assert; import org.xxx.common.core.domain.model.LoginUser; import org.xxx.common.core.exception.ServiceException; import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.util.CollectionUtils; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.List; import java.util.Map; import static org.qps.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; /** * WebSocket握手请求的拦截器 */ @Slf4j public class PlusWebSocketInterceptor implements HandshakeInterceptor { /** * 握手前 * * @param request request * @param response response * @param wsHandler wsHandler * @param attributes attributes * @return 是否握手成功 */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) { attributes.put(LOGIN_USER_KEY, getLoginUser(request)); List<String> tokenList = request.getHeaders().get("Sec-Websocket-Protocol"); //再塞回协议头里 response.getHeaders().put("Sec-Websocket-Protocol", tokenList); return true; } private LoginUser getLoginUser(ServerHttpRequest request){ List<String> tokenList = request.getHeaders().get("Sec-Websocket-Protocol"); if(CollectionUtils.isEmpty(tokenList)){ throw new ServiceException("用户未登录"); } //请求头的子协议中获取token String token = tokenList.get(0); return loginUser; } /** * 握手后 * * @param request request * @param response response * @param wsHandler wsHandler * @param exception 异常 */ @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } }
消息接收+连接建立+关闭处理
package org.xxx.common.websocket.handler; import org.xxx.common.core.domain.model.LoginUser; import org.xxx.common.websocket.dto.WebSocketMessageDto; import org.xxx.common.websocket.holder.WebSocketSessionHolder; import org.xxx.common.websocket.utils.WebSocketUtils; import lombok.extern.slf4j.Slf4j; import org.xxx.common.websocket.constant.WebSocketConstants; import org.springframework.web.socket.*; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import java.util.List; import java.util.Objects; /** * WebSocketHandler 实现类 */ @Slf4j public class PlusWebSocketHandler extends AbstractWebSocketHandler { /** * 连接成功后 */ @Override public void afterConnectionEstablished(WebSocketSession session) { LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstants.LOGIN_USER_KEY); WebSocketSessionHolder.addSession(loginUser.getUserId(), session); log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); } /** * 处理发送来的文本消息 * * @param session * @param message * @throws Exception */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { String msgPayload = message.getPayload(); //todo 业务操作 } @Override protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { super.handleBinaryMessage(session, message); } /** * 心跳监测的回复 * 针对ping帧的恢复,非text形式的消息接收 * @param session * @param message * @throws Exception */ @Override protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { WebSocketUtils.sendPongMessage(session); } /** * 连接出错时 * * @param session * @param exception * @throws Exception */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage()); } /** * 连接关闭后 * * @param session * @param status */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { //连接关闭 } /** * 是否支持分片消息 * * @return */ @Override public boolean supportsPartialMessages() { return false; } }
配置类
package org.xxx.common.websocket.config; import cn.hutool.core.util.StrUtil; import org.xxx.common.websocket.config.properties.WebSocketProperties; import org.xxx.common.websocket.handler.PlusWebSocketHandler; import org.xxx.common.websocket.interceptor.PlusWebSocketInterceptor; import org.xxx.common.websocket.listener.WebSocketTopicListener; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.server.HandshakeInterceptor; import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean; /** * WebSocket 配置 */ @AutoConfiguration //我是走的自定义配置文件是否允许开启websocket @ConditionalOnProperty(value = "websocket.enabled", havingValue = "true") @EnableConfigurationProperties(WebSocketProperties.class) @EnableWebSocket public class WebSocketConfig{ @Bean public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) { if (StrUtil.isBlank(webSocketProperties.getPath())) { webSocketProperties.setPath("/websocket"); } if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) { webSocketProperties.setAllowedOrigins("*"); } return registry -> registry .addHandler(webSocketHandler, webSocketProperties.getPath()) .addInterceptors(handshakeInterceptor) .setAllowedOrigins(webSocketProperties.getAllowedOrigins()); } @Bean public HandshakeInterceptor handshakeInterceptor() { return new PlusWebSocketInterceptor(); } @Bean public WebSocketHandler webSocketHandler() { return new PlusWebSocketHandler(); } /** * 自定义服务器属性容器配置 * * @return 自定义服务器容器属性配置 */ @Bean public ServletServerContainerFactoryBean createWebSocketContainer() { ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); //session超时时间25s,客户端25s没有交互就断开连接 container.setMaxSessionIdleTimeout(25 * 1000L); container.setMaxTextMessageBufferSize(10 * 1024); container.setMaxBinaryMessageBufferSize(10 * 1024); return container; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。