赞
踩
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
SpringBoot和WebSocket是没有token鉴权的,需要自己定义实现WebSocket建立连接时的鉴权,websocket的实现方式就和一般博客看到的不一样了,本文自定义websocket配置:WebSocketConfig implements WebSocketConfigurer,
websocket的拦截器:CustomWebsocketInterceptor extends HttpSessionHandshakeInterceptor,
websocket的Handler:CustomWebSocketHandler extends TextWebSocketHandler
- @Configuration
- @EnableWebSocket
- public class WebSocketConfig implements WebSocketConfigurer {
-
- @Resource
- private CustomWebsocketInterceptor customWebsocketInterceptor;
-
- @Resource
- private CustomWebSocketHandler customWebSocketHandler;
-
-
- @Bean
- public ServerEndpointExporter serverEndpointExporter() {
- return new ServerEndpointExporter();
- }
-
- @Override
- public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
- registry
- // 设置处理器处理/custom/**
- .addHandler(customWebSocketHandler,"/wsTest/websocket")
- // 允许跨越
- .setAllowedOrigins("*")
- // 设置监听器
- .addInterceptors(customWebsocketInterceptor);
- }
-
- }
建立连接的url
wss://websocket.test.com/wsTest/websocket
注意,使用wss调用需要在nginx配置ssl证书,否则部署到服务器是无法建立websocket连接的。
- upstream wsTestServer {
- ip_hash;
- server 10.*.*.92:8008;
- server 10.*.*.92:8008;
- }
-
- server {
- # cloudflare
- listen 80;
- listen 443 ssl;
- server_name websocket.test.com;
- # 证书路径:
- ssl_certificate /etc/nginx/cert/ssl-c.crt;
- ssl_certificate_key /etc/nginx/cert/ssl-k.key;
-
- ssl_session_cache shared:SSL:1m;
- ssl_session_timeout 5m;
-
- ssl_ciphers HIGH:!aNULL:!MD5;
- ssl_prefer_server_ciphers on;
-
- location /wsTest/websocket {
- proxy_pass http://wsTestServer;
-
- proxy_set_header Host $host;
- proxy_http_version 1.1;
- proxy_set_header X-Client-IP $remote_addr;
- proxy_set_header Upgrade $http_upgrade;
- proxy_set_header Connection "upgrade";
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- proxy_set_header Host $http_host;
- proxy_set_header Upgrade $http_upgrade;
- proxy_connect_timeout 60;
- proxy_read_timeout 600;
-
- #允许跨域请求的域,* 代表所有
- add_header 'Access-Control-Allow-Origin' *;
- #允许带上cookie请求
- add_header 'Access-Control-Allow-Credentials' 'true';
- #允许请求的方法,比如 GET/POST/PUT/DELETE
- add_header 'Access-Control-Allow-Methods' *;
- #允许请求的header
- add_header 'Access-Control-Allow-Headers' *;
- }
- }
-
其中要监听listen 80端口,尤其注意的是要注意这下面2个配置必须加上,否则建立连接成功立刻就会断掉。
proxy_set_header Host $http_host;
proxy_set_header Upgrade $http_upgrade;
从nginx配置看,进行了负载均衡,服务进行了集群部署,项目部署在多台机器上,如此,当一个客户端用户和一台机器建立了链接,客户端自然是不会再去和其他机器建立连接,同时一般建立连接成功时,还会断开旧的连接,这样就不能和新的机器建立连接。
解决方案
1、使用消息中间件解决websocket session共享问题。
2、使用redis的发布订阅模式解决
我采用了方案2,
收到客户端发送到服务器发给接收者的消息时,
调用:
- JSONObject obj = new JSONObject();
- obj.put("title", "我是标题");
- obj.put("smsContent","我是内容");
- customWebSocketHandler.dealNodeSession(userId, obj);
- public void dealNodeSession(String uid, JSONObject message){
- boolean isSendMessage = sendMessage(uid, message.toJSONString());
- // 如果当前服务发送成功就不再同志其他服务订阅发送
- if (isSendMessage){
- log.info("连接对象【{}】在本服务发送消息",uid);
- return;
- }
- log.info("连接对象【{}】在本服务未建立连接,通知其他服务发送消息",uid);
- try {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("userId",uid);
- jsonObject.put("message",message);
- redisService.convertAndSend(RedisConstants.WETECH_REDIS_CHANNEL, jsonObject);
- } catch (Exception e) {
- log.error("dealNodeSession Exception: {}",e);
- }
- }
- @Configuration
- public class MessageConfig {
-
- /**
- * 配置 RedisMessageListenerContainer 监听器容器
- *
- * @param connectionFactory 连接工厂
- * @param listenerAdapter 消息侦听器适配器
- * @return
- */
-
- @Bean
- public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
- MessageListenerAdapter listenerAdapter) {
- RedisMessageListenerContainer container = new RedisMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- // 消息主题列表
- List<Topic> topicList = new ArrayList<>();
- // 订阅 “wsTopic” 频道
- topicList.add(new ChannelTopic(RedisConstants.WETECH_REDIS_CHANNEL));
- // 订阅 “wsTopic2” 模式
- // topicList.add(new PatternTopic((RedisConstants.REDIS_CHANNEL2));
- // 将消息侦听器添加到容器中
- container.addMessageListener(listenerAdapter, topicList);
- return container;
- }
-
- @Bean
- MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
- // 消息监听适配器
- return new MessageListenerAdapter(receiver, "onMessage");
- }
- }
- /**
- * redis消息监听对象,接收订阅消息
- */
- @Component
- @Slf4j
- public class RedisReceiver implements MessageListener {
-
- @Autowired
- private CustomWebSocketHandler customWebSocketHandler;
-
- /**
- * @param message
- * @param pattern
- */
- @Override
- public void onMessage(Message message, byte[] pattern) {
- // log.info("redisReceiver onMessage message: {}",message);
- String channel = new String(message.getChannel());// 订阅的频道名称
- String msg = "";
- try {
- msg = new String(message.getBody(), Constants.UTF8);//注意与发布消息编码一致,否则会乱码
- if (!StringUtils.isEmpty(msg)) {
- if (RedisConstants.WETECH_REDIS_CHANNEL.equals(channel)) { // 最新消息
- String replaceAll = msg.replaceAll("\\\\", "");
-
- JSONObject jsonObject = JSONObject.parseObject(replaceAll);
- if (jsonObject == null) {
- log.info("redisReceiver onMessage redisMessage is null");
- return;
- }
- String userId = (String) jsonObject.get("userId");
- JSONObject obj=jsonObject.getJSONObject("message");
- log.info("redisReceiver onMessage userId: {}",userId);
- customWebSocketHandler.sendMessage(userId, obj.toJSONString());
- } else {
- // 其他订阅的消息处理
- }
- } else {
- log.info("redisReceiver onMessage msg is null");
- }
- } catch (Exception e) {
- log.error("redisReceiver exception:" + e.toString());
- }
- }
- }
-
- /**
- * @author wangchengfeng
- * 用来处理webscocket拦截器
- */
- @Component
- @Slf4j
- public class CustomWebsocketInterceptor extends HttpSessionHandshakeInterceptor {
-
- @Autowired
- private UserService userService;
-
- /**
- * 建立连接时
- *
- * @param request the current request
- * @param response the current response
- * @param wsHandler the target WebSocket handler
- * @param attributes the attributes from the HTTP handshake to associate with the WebSocket
- * session; the provided attributes are copied, the original map is not used.
- * @return
- * @throws Exception
- */
- @Override
- public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler
- , Map<String, Object> attributes) {
- try {
- ServletServerHttpRequest req = (ServletServerHttpRequest) request;
- ServletServerHttpResponse res = (ServletServerHttpResponse) response;
- String token = req.getServletRequest().getHeader("Sec-WebSocket-Protocol");
- if (StringUtils.isBlank(token)) {
- log.error("CustomWebsocketInterceptor beforeHandshake token is empty");
- return false;
- }
-
- UserInfo model = userService.queryUserInfoByToken(token);
- if (model == null) {
- log.error("CustomWebsocketInterceptor beforeHandshake userInfoModel is empty");
- return false;
- }
-
- attributes.put("userId", model.getUid());
- log.info("attributes:{}", attributes);
- //在后端握手时设置一下请求头(Sec-WebSocket-Protocol),前端发来什么授权值,这里就设置什么值,不设置会报错导致建立连接成功后立即被关闭
- res.getServletResponse().setHeader("Sec-WebSocket-Protocol", token);
-
- /**
- * 鉴权: return false 不通过
- * response.setStatusCode(HttpStatus.UNAUTHORIZED);
- * return false;
- */
- super.setCreateSession(true);
- return super.beforeHandshake(request, response, wsHandler, attributes);
- } catch (Exception e) {
- log.info("beforeHandshake Exception:{}", e);
- }
- return false;
- }
-
- /**
- * 成功建立连接后
- *
- * @param request the current request
- * @param response the current response
- * @param wsHandler the target WebSocket handler
- * @param exception an exception raised during the handshake, or {@code null} if none
- */
- @Override
- public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
- Exception exception) {
- log.info("连接成功....");
- //其他业务代码
- super.afterHandshake(request, response, wsHandler, exception);
- }
- }
- @Slf4j
- @Component
- public class CustomWebSocketHandler extends TextWebSocketHandler {
-
- @Autowired
- private RedisService redisService;
-
- /**
- * 当前websocket连接集合
- */
- public static final ConcurrentHashMap<String, WebSocketSession> WEB_SOCKET_SESSION_MAP = new ConcurrentHashMap<>();
-
- /**
- * 收到客户端消息时触发的回调
- *
- * @param session 连接对象
- * @param message 消息体
- */
- @Override
- protected void handleTextMessage(WebSocketSession session, TextMessage message) {
- log.info("接受到消息【{}】的消息:{}", session.getId(), message.getPayload());
- String messagePayload = message.getPayload();
- JSONObject jsonObject = JSONObject.parseObject(messagePayload);
- if (jsonObject != null && jsonObject.get("wsMsgType").equals("heartBeat")) {
- String uid = (String) jsonObject.get("uid");
- JSONObject obj = new JSONObject();
- obj.put("heartBeatResult", "done");
- dealNodeSession(uid, obj);
- }
- }
-
- /**
- * 建立连接后触发的回调
- *
- * @param session 连接对象
- * @throws Exception
- */
- @Override
- public void afterConnectionEstablished(WebSocketSession session) {
- try {
- String sessionId = getSessionId(session);
- if (StringUtils.isBlank(sessionId)){
- log.error("建立了连接时 sessionId is null");
- return;
- }
- // 如果存在则断开连接
- if (WEB_SOCKET_SESSION_MAP.containsKey(sessionId)) {
- WEB_SOCKET_SESSION_MAP.get(sessionId).close();
- WEB_SOCKET_SESSION_MAP.remove(sessionId);
- }
- // 将新连接添加
- WEB_SOCKET_SESSION_MAP.put(sessionId, session);
- log.debug("与【{}】建立了连接", sessionId);
- //sendMessage(sessionId, sessionId);
- log.debug("attributes:{}", session.getAttributes());
- } catch (IOException e) {
- log.error("建立连接后触发的回调【{}】,status:{}", getSessionId(session));
- }
- }
-
- /**
- * 断开连接后触发的回调
- *
- * @param session 连接对象
- * @param status 状态
- * @throws Exception 异常
- */
- @Override
- public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
- log.info("连接对象【{}】断开连接,status:{}", getSessionId(session), status.getCode());
- try {
- String sessionId = getSessionId(session);
- if (StringUtils.isBlank(sessionId)){
- return;
- }
- // 关闭连接
- session.close(CloseStatus.SERVER_ERROR);
- // 删除对象
- WEB_SOCKET_SESSION_MAP.remove(sessionId);
- } catch (IOException e) {
- log.error("连接对象【{}】断开连接,IOException :{}", e);
- }
- }
-
- /**
- * 传输消息出错时触发的回调
- *
- * @param session 连接对象
- * @param exception 异常
- * @throws Exception 异常
- */
- @Override
- public void handleTransportError(WebSocketSession session, Throwable exception) {
- log.info("连接对象【{}】发生错误,exception:{}", session.getId(), exception.getMessage());
- // 如果发送异常,则断开连接
- String sessionId = getSessionId(session);
- if (StringUtils.isBlank(sessionId)){
- log.error("建立了连接时 sessionId is null");
- return;
- }
- try {
- if (session.isOpen()) {
- session.close();
- WEB_SOCKET_SESSION_MAP.remove(sessionId);
- }
- } catch (IOException e) {
- log.error("连接对象【{}】发生错误,IOException :{}", e);
- }
- }
-
- /**
- * 自定义判断 sessionId
- *
- * @param session 连接对象
- * @return sessionId
- */
- private String getSessionId(WebSocketSession session) {
- return (String) session.getAttributes().get("userId");
- }
-
- public void dealNodeSession(String uid, JSONObject message){
- boolean isSendMessage = sendMessage(uid, message.toJSONString());
- // 如果当前服务发送成功就不再同志其他服务订阅发送
- if (isSendMessage){
- log.info("连接对象【{}】在本服务发送消息",uid);
- return;
- }
- log.info("连接对象【{}】在本服务未建立连接,通知其他服务发送消息",uid);
- try {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("userId",uid);
- jsonObject.put("message",message);
- redisService.convertAndSend(RedisConstants.WETECH_REDIS_CHANNEL, jsonObject);
- } catch (Exception e) {
- log.error("dealNodeSession Exception: {}",e);
- }
- }
-
- /**
- * 发送消息
- *
- * @param sessionId 对象id
- * @param message 消息
- * @throws IOException IO
- */
- public boolean sendMessage(String sessionId, String message) {
- WebSocketSession webSocketSession = WEB_SOCKET_SESSION_MAP.get(sessionId);
- if (webSocketSession == null) {
- log.error("连接对象【{}】未在该服务建立连接,不发送消息{}", sessionId,message);
- return false;
- }
- if (!webSocketSession.isOpen()) {
- log.error("连接对象【{}】已关闭 无法送消息:{}", sessionId, message);
- return false;
- } else {
- try {
- webSocketSession.sendMessage(new TextMessage(message));
- log.info("sendMessage:向{}发送消息", sessionId);
- return true;
- } catch (IOException e) {
- log.error("sendMessage IOException:向{}发送消息:{}", sessionId, message);
- return false;
- }
- }
- }
-
- /**
- * 获取所有的连接对象ID
- *
- * @return ids
- */
- public List<String> getSessionIds() {
- Enumeration<String> keys = WEB_SOCKET_SESSION_MAP.keys();
- List<String> ks = new ArrayList<>();
- while (keys.hasMoreElements()) {
- ks.add(keys.nextElement());
- }
- return ks;
- }
- }
- @RestController
- @RequestMapping("/chat")
- @Slf4j
- public class ChatController {
-
- @Resource
- private CustomWebSocketHandler customWebSocketHandler;
-
- /**
- * 添加聊天记录,web
- */
- @PostMapping("/add/record")
- public void addChatRecord(@RequestBody AddChatRecordDto dto) {
- JSONObject obj = new JSONObject();
- obj.put("chatDialogResult", dto);
- customWebSocketHandler.dealNodeSession(dto.getUid(), obj);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。