当前位置:   article > 正文

springboot整合webSocket:鉴权,心跳检测,wss请求,nginx配置、集群部署_springboot websocket 鉴权

springboot websocket 鉴权

1.首先pom引入websocket

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>

SpringBoot和WebSocket是没有token鉴权的,需要自己定义实现WebSocket建立连接时的鉴权,websocket的实现方式就和一般博客看到的不一样了,本文自定义websocket配置:WebSocketConfig implements WebSocketConfigurer,

websocket的拦截器:CustomWebsocketInterceptor extends HttpSessionHandshakeInterceptor,

websocket的Handler:CustomWebSocketHandler extends TextWebSocketHandler

2.配置WebSocketConfig

  1. @Configuration
  2. @EnableWebSocket
  3. public class WebSocketConfig implements WebSocketConfigurer {
  4. @Resource
  5. private CustomWebsocketInterceptor customWebsocketInterceptor;
  6. @Resource
  7. private CustomWebSocketHandler customWebSocketHandler;
  8. @Bean
  9. public ServerEndpointExporter serverEndpointExporter() {
  10. return new ServerEndpointExporter();
  11. }
  12. @Override
  13. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  14. registry
  15. // 设置处理器处理/custom/**
  16. .addHandler(customWebSocketHandler,"/wsTest/websocket")
  17. // 允许跨越
  18. .setAllowedOrigins("*")
  19. // 设置监听器
  20. .addInterceptors(customWebsocketInterceptor);
  21. }
  22. }

建立连接的url
wss://websocket.test.com/wsTest/websocket
注意,使用wss调用需要在nginx配置ssl证书,否则部署到服务器是无法建立websocket连接的。

3.Nginx配置如下,

  1. upstream wsTestServer {
  2. ip_hash;
  3. server 10.*.*.92:8008;
  4. server 10.*.*.92:8008;
  5. }
  6. server {
  7. # cloudflare
  8. listen 80;
  9. listen 443 ssl;
  10. server_name websocket.test.com;
  11. # 证书路径:
  12. ssl_certificate /etc/nginx/cert/ssl-c.crt;
  13. ssl_certificate_key /etc/nginx/cert/ssl-k.key;
  14. ssl_session_cache shared:SSL:1m;
  15. ssl_session_timeout 5m;
  16. ssl_ciphers HIGH:!aNULL:!MD5;
  17. ssl_prefer_server_ciphers on;
  18. location /wsTest/websocket {
  19. proxy_pass http://wsTestServer;
  20. proxy_set_header Host $host;
  21. proxy_http_version 1.1;
  22. proxy_set_header X-Client-IP $remote_addr;
  23. proxy_set_header Upgrade $http_upgrade;
  24. proxy_set_header Connection "upgrade";
  25. proxy_set_header X-Real-IP $remote_addr;
  26. proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  27. proxy_set_header Host $http_host;
  28. proxy_set_header Upgrade $http_upgrade;
  29. proxy_connect_timeout 60;
  30. proxy_read_timeout 600;
  31. #允许跨域请求的域,* 代表所有
  32. add_header 'Access-Control-Allow-Origin' *;
  33. #允许带上cookie请求
  34. add_header 'Access-Control-Allow-Credentials' 'true';
  35. #允许请求的方法,比如 GET/POST/PUT/DELETE
  36. add_header 'Access-Control-Allow-Methods' *;
  37. #允许请求的header
  38. add_header 'Access-Control-Allow-Headers' *;
  39. }
  40. }

其中要监听listen 80端口,尤其注意的是要注意这下面2个配置必须加上,否则建立连接成功立刻就会断掉。
proxy_set_header Host $http_host;
proxy_set_header Upgrade $http_upgrade;

从nginx配置看,进行了负载均衡,服务进行了集群部署,项目部署在多台机器上,如此,当一个客户端用户和一台机器建立了链接,客户端自然是不会再去和其他机器建立连接,同时一般建立连接成功时,还会断开旧的连接,这样就不能和新的机器建立连接。
解决方案
1、使用消息中间件解决websocket session共享问题。
2、使用redis的发布订阅模式解决
我采用了方案2,

收到客户端发送到服务器发给接收者的消息时,

调用:

  1. JSONObject obj = new JSONObject(); 
  2. obj.put("title", "我是标题");
  3. obj.put("smsContent","我是内容");
  4. customWebSocketHandler.dealNodeSession(userId, obj);

  1. public void dealNodeSession(String uid, JSONObject message){
  2. boolean isSendMessage = sendMessage(uid, message.toJSONString());
  3. // 如果当前服务发送成功就不再同志其他服务订阅发送
  4. if (isSendMessage){
  5. log.info("连接对象【{}】在本服务发送消息",uid);
  6. return;
  7. }
  8. log.info("连接对象【{}】在本服务未建立连接,通知其他服务发送消息",uid);
  9. try {
  10. JSONObject jsonObject = new JSONObject();
  11. jsonObject.put("userId",uid);
  12. jsonObject.put("message",message);
  13. redisService.convertAndSend(RedisConstants.WETECH_REDIS_CHANNEL, jsonObject);
  14. } catch (Exception e) {
  15. log.error("dealNodeSession Exception: {}",e);
  16. }
  17. }

4.Redis发布订阅的配置

  1. @Configuration
  2. public class MessageConfig {
  3. /**
  4. * 配置 RedisMessageListenerContainer 监听器容器
  5. *
  6. * @param connectionFactory 连接工厂
  7. * @param listenerAdapter 消息侦听器适配器
  8. * @return
  9. */
  10. @Bean
  11. public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
  12. MessageListenerAdapter listenerAdapter) {
  13. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  14. container.setConnectionFactory(connectionFactory);
  15. // 消息主题列表
  16. List<Topic> topicList = new ArrayList<>();
  17. // 订阅 “wsTopic” 频道
  18. topicList.add(new ChannelTopic(RedisConstants.WETECH_REDIS_CHANNEL));
  19. // 订阅 “wsTopic2” 模式
  20. // topicList.add(new PatternTopic((RedisConstants.REDIS_CHANNEL2));
  21. // 将消息侦听器添加到容器中
  22. container.addMessageListener(listenerAdapter, topicList);
  23. return container;
  24. }
  25. @Bean
  26. MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
  27. // 消息监听适配器
  28. return new MessageListenerAdapter(receiver, "onMessage");
  29. }
  30. }

5.Redis订阅监听

  1. /**
  2. * redis消息监听对象,接收订阅消息
  3. */
  4. @Component
  5. @Slf4j
  6. public class RedisReceiver implements MessageListener {
  7. @Autowired
  8. private CustomWebSocketHandler customWebSocketHandler;
  9. /**
  10. * @param message
  11. * @param pattern
  12. */
  13. @Override
  14. public void onMessage(Message message, byte[] pattern) {
  15. // log.info("redisReceiver onMessage message: {}",message);
  16. String channel = new String(message.getChannel());// 订阅的频道名称
  17. String msg = "";
  18. try {
  19. msg = new String(message.getBody(), Constants.UTF8);//注意与发布消息编码一致,否则会乱码
  20. if (!StringUtils.isEmpty(msg)) {
  21. if (RedisConstants.WETECH_REDIS_CHANNEL.equals(channel)) { // 最新消息
  22. String replaceAll = msg.replaceAll("\\\\", "");
  23. JSONObject jsonObject = JSONObject.parseObject(replaceAll);
  24. if (jsonObject == null) {
  25. log.info("redisReceiver onMessage redisMessage is null");
  26. return;
  27. }
  28. String userId = (String) jsonObject.get("userId");
  29. JSONObject obj=jsonObject.getJSONObject("message");
  30. log.info("redisReceiver onMessage userId: {}",userId);
  31. customWebSocketHandler.sendMessage(userId, obj.toJSONString());
  32. } else {
  33. // 其他订阅的消息处理
  34. }
  35. } else {
  36. log.info("redisReceiver onMessage msg is null");
  37. }
  38. } catch (Exception e) {
  39. log.error("redisReceiver exception:" + e.toString());
  40. }
  41. }
  42. }

6.websocket的拦截器:CustomWebsocketInterceptor

  1. /**
  2. * @author wangchengfeng
  3. * 用来处理webscocket拦截器
  4. */
  5. @Component
  6. @Slf4j
  7. public class CustomWebsocketInterceptor extends HttpSessionHandshakeInterceptor {
  8. @Autowired
  9. private UserService userService;
  10. /**
  11. * 建立连接时
  12. *
  13. * @param request the current request
  14. * @param response the current response
  15. * @param wsHandler the target WebSocket handler
  16. * @param attributes the attributes from the HTTP handshake to associate with the WebSocket
  17. * session; the provided attributes are copied, the original map is not used.
  18. * @return
  19. * @throws Exception
  20. */
  21. @Override
  22. public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler
  23. , Map<String, Object> attributes) {
  24. try {
  25. ServletServerHttpRequest req = (ServletServerHttpRequest) request;
  26. ServletServerHttpResponse res = (ServletServerHttpResponse) response;
  27. String token = req.getServletRequest().getHeader("Sec-WebSocket-Protocol");
  28. if (StringUtils.isBlank(token)) {
  29. log.error("CustomWebsocketInterceptor beforeHandshake token is empty");
  30. return false;
  31. }
  32. UserInfo model = userService.queryUserInfoByToken(token);
  33. if (model == null) {
  34. log.error("CustomWebsocketInterceptor beforeHandshake userInfoModel is empty");
  35. return false;
  36. }
  37. attributes.put("userId", model.getUid());
  38. log.info("attributes:{}", attributes);
  39. //在后端握手时设置一下请求头(Sec-WebSocket-Protocol),前端发来什么授权值,这里就设置什么值,不设置会报错导致建立连接成功后立即被关闭
  40. res.getServletResponse().setHeader("Sec-WebSocket-Protocol", token);
  41. /**
  42. * 鉴权: return false 不通过
  43. * response.setStatusCode(HttpStatus.UNAUTHORIZED);
  44. * return false;
  45. */
  46. super.setCreateSession(true);
  47. return super.beforeHandshake(request, response, wsHandler, attributes);
  48. } catch (Exception e) {
  49. log.info("beforeHandshake Exception:{}", e);
  50. }
  51. return false;
  52. }
  53. /**
  54. * 成功建立连接后
  55. *
  56. * @param request the current request
  57. * @param response the current response
  58. * @param wsHandler the target WebSocket handler
  59. * @param exception an exception raised during the handshake, or {@code null} if none
  60. */
  61. @Override
  62. public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
  63. Exception exception) {
  64. log.info("连接成功....");
  65. //其他业务代码
  66. super.afterHandshake(request, response, wsHandler, exception);
  67. }
  68. }

7.websocket的处理器CustomWebSocketHandler

  1. @Slf4j
  2. @Component
  3. public class CustomWebSocketHandler extends TextWebSocketHandler {
  4. @Autowired
  5. private RedisService redisService;
  6. /**
  7. * 当前websocket连接集合
  8. */
  9. public static final ConcurrentHashMap<String, WebSocketSession> WEB_SOCKET_SESSION_MAP = new ConcurrentHashMap<>();
  10. /**
  11. * 收到客户端消息时触发的回调
  12. *
  13. * @param session 连接对象
  14. * @param message 消息体
  15. */
  16. @Override
  17. protected void handleTextMessage(WebSocketSession session, TextMessage message) {
  18. log.info("接受到消息【{}】的消息:{}", session.getId(), message.getPayload());
  19. String messagePayload = message.getPayload();
  20. JSONObject jsonObject = JSONObject.parseObject(messagePayload);
  21. if (jsonObject != null && jsonObject.get("wsMsgType").equals("heartBeat")) {
  22. String uid = (String) jsonObject.get("uid");
  23. JSONObject obj = new JSONObject();
  24. obj.put("heartBeatResult", "done");
  25. dealNodeSession(uid, obj);
  26. }
  27. }
  28. /**
  29. * 建立连接后触发的回调
  30. *
  31. * @param session 连接对象
  32. * @throws Exception
  33. */
  34. @Override
  35. public void afterConnectionEstablished(WebSocketSession session) {
  36. try {
  37. String sessionId = getSessionId(session);
  38. if (StringUtils.isBlank(sessionId)){
  39. log.error("建立了连接时 sessionId is null");
  40. return;
  41. }
  42. // 如果存在则断开连接
  43. if (WEB_SOCKET_SESSION_MAP.containsKey(sessionId)) {
  44. WEB_SOCKET_SESSION_MAP.get(sessionId).close();
  45. WEB_SOCKET_SESSION_MAP.remove(sessionId);
  46. }
  47. // 将新连接添加
  48. WEB_SOCKET_SESSION_MAP.put(sessionId, session);
  49. log.debug("与【{}】建立了连接", sessionId);
  50. //sendMessage(sessionId, sessionId);
  51. log.debug("attributes:{}", session.getAttributes());
  52. } catch (IOException e) {
  53. log.error("建立连接后触发的回调【{}】,status:{}", getSessionId(session));
  54. }
  55. }
  56. /**
  57. * 断开连接后触发的回调
  58. *
  59. * @param session 连接对象
  60. * @param status 状态
  61. * @throws Exception 异常
  62. */
  63. @Override
  64. public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
  65. log.info("连接对象【{}】断开连接,status:{}", getSessionId(session), status.getCode());
  66. try {
  67. String sessionId = getSessionId(session);
  68. if (StringUtils.isBlank(sessionId)){
  69. return;
  70. }
  71. // 关闭连接
  72. session.close(CloseStatus.SERVER_ERROR);
  73. // 删除对象
  74. WEB_SOCKET_SESSION_MAP.remove(sessionId);
  75. } catch (IOException e) {
  76. log.error("连接对象【{}】断开连接,IOException :{}", e);
  77. }
  78. }
  79. /**
  80. * 传输消息出错时触发的回调
  81. *
  82. * @param session 连接对象
  83. * @param exception 异常
  84. * @throws Exception 异常
  85. */
  86. @Override
  87. public void handleTransportError(WebSocketSession session, Throwable exception) {
  88. log.info("连接对象【{}】发生错误,exception:{}", session.getId(), exception.getMessage());
  89. // 如果发送异常,则断开连接
  90. String sessionId = getSessionId(session);
  91. if (StringUtils.isBlank(sessionId)){
  92. log.error("建立了连接时 sessionId is null");
  93. return;
  94. }
  95. try {
  96. if (session.isOpen()) {
  97. session.close();
  98. WEB_SOCKET_SESSION_MAP.remove(sessionId);
  99. }
  100. } catch (IOException e) {
  101. log.error("连接对象【{}】发生错误,IOException :{}", e);
  102. }
  103. }
  104. /**
  105. * 自定义判断 sessionId
  106. *
  107. * @param session 连接对象
  108. * @return sessionId
  109. */
  110. private String getSessionId(WebSocketSession session) {
  111. return (String) session.getAttributes().get("userId");
  112. }
  113. public void dealNodeSession(String uid, JSONObject message){
  114. boolean isSendMessage = sendMessage(uid, message.toJSONString());
  115. // 如果当前服务发送成功就不再同志其他服务订阅发送
  116. if (isSendMessage){
  117. log.info("连接对象【{}】在本服务发送消息",uid);
  118. return;
  119. }
  120. log.info("连接对象【{}】在本服务未建立连接,通知其他服务发送消息",uid);
  121. try {
  122. JSONObject jsonObject = new JSONObject();
  123. jsonObject.put("userId",uid);
  124. jsonObject.put("message",message);
  125. redisService.convertAndSend(RedisConstants.WETECH_REDIS_CHANNEL, jsonObject);
  126. } catch (Exception e) {
  127. log.error("dealNodeSession Exception: {}",e);
  128. }
  129. }
  130. /**
  131. * 发送消息
  132. *
  133. * @param sessionId 对象id
  134. * @param message 消息
  135. * @throws IOException IO
  136. */
  137. public boolean sendMessage(String sessionId, String message) {
  138. WebSocketSession webSocketSession = WEB_SOCKET_SESSION_MAP.get(sessionId);
  139. if (webSocketSession == null) {
  140. log.error("连接对象【{}】未在该服务建立连接,不发送消息{}", sessionId,message);
  141. return false;
  142. }
  143. if (!webSocketSession.isOpen()) {
  144. log.error("连接对象【{}】已关闭 无法送消息:{}", sessionId, message);
  145. return false;
  146. } else {
  147. try {
  148. webSocketSession.sendMessage(new TextMessage(message));
  149. log.info("sendMessage:向{}发送消息", sessionId);
  150. return true;
  151. } catch (IOException e) {
  152. log.error("sendMessage IOException:向{}发送消息:{}", sessionId, message);
  153. return false;
  154. }
  155. }
  156. }
  157. /**
  158. * 获取所有的连接对象ID
  159. *
  160. * @return ids
  161. */
  162. public List<String> getSessionIds() {
  163. Enumeration<String> keys = WEB_SOCKET_SESSION_MAP.keys();
  164. List<String> ks = new ArrayList<>();
  165. while (keys.hasMoreElements()) {
  166. ks.add(keys.nextElement());
  167. }
  168. return ks;
  169. }
  170. }

8.向接收者发送websocket通知

  1. @RestController
  2. @RequestMapping("/chat")
  3. @Slf4j
  4. public class ChatController {
  5. @Resource
  6. private CustomWebSocketHandler customWebSocketHandler;
  7. /**
  8. * 添加聊天记录,web
  9. */
  10. @PostMapping("/add/record")
  11. public void addChatRecord(@RequestBody AddChatRecordDto dto) {
  12. JSONObject obj = new JSONObject();
  13. obj.put("chatDialogResult", dto);
  14. customWebSocketHandler.dealNodeSession(dto.getUid(), obj);
  15. }
  16. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/717158
推荐阅读
相关标签
  

闽ICP备14008679号