当前位置:   article > 正文

Websocket客户端实现平台数据-后端-前端数据传输 wss协议_websocketclient.setsocketfactory

websocketclient.setsocketfactory
WebSocketClientAbs
  1. import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
  2. import org.apache.http.ssl.SSLContexts;
  3. import org.java_websocket.client.WebSocketClient;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.core.io.DefaultResourceLoader;
  6. import org.springframework.core.io.ResourceLoader;
  7. import javax.net.ssl.SSLContext;
  8. import javax.net.ssl.SSLSocketFactory;
  9. import java.io.IOException;
  10. import java.io.InputStream;
  11. import java.security.KeyStore;
  12. import java.util.Map;
  13. public abstract class WebSocketClientAbs {
  14. @Value("${server.ssl.key-store}")
  15. private String cerPath;
  16. @Value("${server.ssl.key-store-password}")
  17. private String cerPwd;
  18. /**
  19. * 创建WebSocket客户端
  20. *
  21. * @param wsUri
  22. * @param httpHeaders
  23. * @return
  24. */
  25. public abstract WebSocketClient createWebSocketClient(String wsUri, Map<String, String> httpHeaders);
  26. /**
  27. * 客户端连接
  28. *
  29. * @param uri
  30. * @param httpHeaders
  31. * @return
  32. */
  33. public abstract WebSocketClient connect(String uri, Map<String, String> httpHeaders);
  34. /**
  35. * wss协议证书认证
  36. *
  37. * @param webSocketClient
  38. */
  39. public void createWebSocketClient(WebSocketClient webSocketClient) {
  40. try {
  41. KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
  42. keyStore.load(resourceLoader(cerPath), cerPwd.toCharArray());
  43. SSLContext sslContext = SSLContexts.custom()
  44. .loadTrustMaterial(keyStore, new TrustSelfSignedStrategy()).build();
  45. SSLSocketFactory sslFactory = sslContext.getSocketFactory();
  46. webSocketClient.setSocketFactory(sslFactory);
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. /**
  52. * 读取文件信息
  53. *
  54. * @param fileFullPath
  55. * @return
  56. * @throws IOException
  57. */
  58. public InputStream resourceLoader(String fileFullPath) throws IOException {
  59. ResourceLoader resourceLoader = new DefaultResourceLoader();
  60. return resourceLoader.getResource(fileFullPath).getInputStream();
  61. }
  62. }
PlatformWebsocketClient:平台客户端
  1. import lombok.Data;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.java_websocket.client.WebSocketClient;
  4. import org.java_websocket.handshake.ServerHandshake;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.Resource;
  7. import java.net.URI;
  8. import java.net.URISyntaxException;
  9. import java.util.Map;
  10. @Component
  11. @Slf4j
  12. @Data
  13. public class PlatformWebsocketClient extends WebSocketClientAbs {
  14. @Resource
  15. private MessageService messageService;
  16. private WebSocketClient wsClient;
  17. // 消息类型
  18. private String type;
  19. // 0:链接断开或者异常;1:代表链接中;2:代表正在连接;
  20. public static int isConnect = 0;
  21. /**
  22. * 获取客户端连接实例
  23. *
  24. * @param wsUri
  25. * @param httpHeaders
  26. * @return
  27. */
  28. @Override
  29. public WebSocketClient createWebSocketClient(String wsUri, Map<String, String> httpHeaders) {
  30. try {
  31. //创建客户端连接对象
  32. return new WebSocketClient(new URI(wsUri), httpHeaders) {
  33. /**
  34. * 建立连接调用
  35. * @param serverHandshake
  36. */
  37. @Override
  38. public void onOpen(ServerHandshake serverHandshake) {
  39. isConnect = 1;
  40. }
  41. /**
  42. * 收到服务端消息调用
  43. * @param s
  44. */
  45. @Override
  46. public void onMessage(String s) {
  47. // log.info("WebsocketClient-> 收到服务端消息:{}", s);
  48. messageService.handleMessage(s);
  49. }
  50. /**
  51. * 断开连接调用
  52. * @param i
  53. * @param s
  54. * @param b
  55. */
  56. @Override
  57. public void onClose(int i, String s, boolean b) {
  58. isConnect = 0;
  59. }
  60. /**
  61. * 连接报错调用
  62. * @param e
  63. */
  64. @Override
  65. public void onError(Exception e) {
  66. if (null != wsClient) {
  67. wsClient.close();
  68. }
  69. isConnect = 0;
  70. }
  71. };
  72. } catch (URISyntaxException e) {
  73. e.printStackTrace();
  74. }
  75. return null;
  76. }
  77. /**
  78. * 连接websocket服务端
  79. * 注意 synchronized 关键字,保证多个请求同时连接时,
  80. * 只有一个连接在创建
  81. *
  82. * @param uri
  83. * @param httpHeaders
  84. * @return
  85. */
  86. @Override
  87. public synchronized WebSocketClient connect(String uri, Map<String, String> httpHeaders) {
  88. WebSocketClient oldWsClient = this.getWsClient();
  89. if (null != oldWsClient) {
  90. log.info("WebsocketClient -> 已存在连接,oldWsClient:{}-{}",
  91. oldWsClient.getReadyState(), oldWsClient.getReadyState().ordinal());
  92. if (1 == oldWsClient.getReadyState().ordinal()) {
  93. log.info("WebsocketClient -> 使用存在且已打开的连接");
  94. return oldWsClient;
  95. } else {
  96. log.info("WebsocketClient -> 注销存在且未打开的连接,并重新获取新的连接");
  97. oldWsClient.close();
  98. }
  99. }
  100. WebSocketClient newWsClient = createWebSocketClient(uri, httpHeaders);
  101. // 如果是 "wss" 协议,则进行证书认证,认证方法在父类中
  102. if (uri.startsWith("wss")) {
  103. createWebSocketClient(newWsClient);
  104. }
  105. if (null == newWsClient) {
  106. log.error("创建失败");
  107. }
  108. assert newWsClient != null;
  109. newWsClient.connect();
  110. // 设置连接状态为正在连接
  111. isConnect = 2;
  112. // 连接状态不再是0请求中,判断建立结果是不是1已建立
  113. long startTime = System.currentTimeMillis();
  114. while (1 != newWsClient.getReadyState().ordinal()) {
  115. // 避免网络波动,设置持续等待连接时间
  116. long endTime = System.currentTimeMillis();
  117. long waitTime = (endTime - startTime) / 1000;
  118. if (5L < waitTime) {
  119. // log.info("WebsocketClient -> 建立连接异常,请稍后再试");
  120. break;
  121. }
  122. }
  123. if (1 == newWsClient.getReadyState().ordinal()) {
  124. this.setWsClient(newWsClient);
  125. // newWsClient.send("WebsocketClient -> 服务端连接成功!");
  126. // log.info("WebsocketClient -> 服务端连接成功!");
  127. return newWsClient;
  128. }
  129. return null;
  130. }
  131. }
LocalWebsocketClient:本地客户端
  1. import lombok.Data;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.java_websocket.client.WebSocketClient;
  4. import org.java_websocket.handshake.ServerHandshake;
  5. import org.springframework.stereotype.Component;
  6. import java.net.URI;
  7. import java.net.URISyntaxException;
  8. import java.util.Map;
  9. @Component
  10. @Slf4j
  11. @Data
  12. public class LocalWebsocketClient extends WebSocketClientAbs {
  13. private WebSocketClient wsClient;
  14. public static int isConnect = 0;
  15. /**
  16. * 获取客户端连接实例
  17. *
  18. * @param wsUri
  19. * @param httpHeaders
  20. * @return
  21. */
  22. @Override
  23. public WebSocketClient createWebSocketClient(String wsUri, Map<String, String> httpHeaders) {
  24. try {
  25. //创建客户端连接对象
  26. WebSocketClient client = new WebSocketClient(new URI(wsUri), httpHeaders) {
  27. /**
  28. * 建立连接调用
  29. * @param serverHandshake
  30. */
  31. @Override
  32. public void onOpen(ServerHandshake serverHandshake) {
  33. isConnect = 1;
  34. }
  35. /**
  36. * 收到服务端消息调用
  37. * @param s
  38. */
  39. @Override
  40. public void onMessage(String s) {
  41. log.info("WebsocketClient-> 收到服务端消息:{}", s);
  42. }
  43. /**
  44. * 断开连接调用
  45. * @param i
  46. * @param s
  47. * @param b
  48. */
  49. @Override
  50. public void onClose(int i, String s, boolean b) {
  51. isConnect = 0;
  52. }
  53. /**
  54. * 连接报错调用
  55. * @param e
  56. */
  57. @Override
  58. public void onError(Exception e) {
  59. if (null != wsClient) {
  60. wsClient.close();
  61. }
  62. isConnect = 0;
  63. }
  64. };
  65. return client;
  66. } catch (URISyntaxException e) {
  67. e.printStackTrace();
  68. }
  69. return null;
  70. }
  71. /**
  72. * 连接websocket服务端
  73. * 注意 synchronized 关键字,保证多个请求同时连接时,
  74. * 只有一个连接在创建
  75. *
  76. * @param uri
  77. * @param httpHeaders
  78. * @return
  79. */
  80. @Override
  81. public synchronized WebSocketClient connect(String uri, Map<String, String> httpHeaders) {
  82. WebSocketClient oldWsClient = this.getWsClient();
  83. if (null != oldWsClient) {
  84. log.info("WebsocketClient -> 已存在连接,oldWsClient:{}-{}",
  85. oldWsClient.getReadyState(), oldWsClient.getReadyState().ordinal());
  86. if (1 == oldWsClient.getReadyState().ordinal()) {
  87. log.info("WebsocketClient -> 使用存在且已打开的连接");
  88. return oldWsClient;
  89. } else {
  90. log.info("WebsocketClient -> 注销存在且未打开的连接,并重新获取新的连接");
  91. oldWsClient.close();
  92. }
  93. }
  94. WebSocketClient newWsClient = createWebSocketClient(uri, httpHeaders);
  95. // 如果是 "wss" 协议,则进行证书认证,认证方法在父类中
  96. if (uri.startsWith("wss")) {
  97. createWebSocketClient(newWsClient);
  98. }
  99. if (null == newWsClient) {
  100. log.error("创建失败");
  101. }
  102. assert newWsClient != null;
  103. newWsClient.connect();
  104. // 设置连接状态为正在连接
  105. isConnect = 2;
  106. // 连接状态不再是0请求中,判断建立结果是不是1已建立
  107. long startTime = System.currentTimeMillis();
  108. while (1 != newWsClient.getReadyState().ordinal()) {
  109. // 避免网络波动,设置持续等待连接时间
  110. long endTime = System.currentTimeMillis();
  111. long waitTime = (endTime - startTime) / 1000;
  112. if (5L < waitTime) {
  113. break;
  114. }
  115. }
  116. if (1 == newWsClient.getReadyState().ordinal()) {
  117. this.setWsClient(newWsClient);
  118. // newWsClient.send("WebsocketClient -> 服务端连接成功!");
  119. return newWsClient;
  120. }
  121. return null;
  122. }
  123. }
WebSocketHeartbeatTimer:心跳检测
  1. import javax.annotation.Resource;
  2. import java.time.LocalDateTime;
  3. import java.time.format.DateTimeFormatter;
  4. /**
  5. * 心跳重连机制
  6. */
  7. @Component
  8. @Slf4j
  9. public class WebSocketHeartbeatTimer {
  10. @Resource
  11. private PlatformWebsocketClient platformWebsocketClient;
  12. @Resource
  13. private LocalWebsocketClient localWebsocketClient;
  14. @Value("${websocket.url}")
  15. private String platformWebSocketUrl;
  16. @Value("${localWebsocket.url}")
  17. private String localWebSocketUrl;
  18. private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  19. /**
  20. * 平台WebSocket连接心跳检测,重连机制,每30秒触发一次
  21. * 注意 @Async 注解,要使用异步线程的方式来执行心跳检测,
  22. * 避免任务线程被其他任务占用
  23. */
  24. @Async
  25. @Scheduled(cron = "0/30 * * * * ?")
  26. public void platformWebSocket() {
  27. try {
  28. int isConnect = PlatformWebsocketClient.isConnect;
  29. // log.info("心跳检测 -> platformWebSocket: {}-{}", isConnect, ((isConnect == 1) ? "连接中" : "未连接"));
  30. if (1 != PlatformWebsocketClient.isConnect) {
  31. String now = DATE_TIME_FORMATTER.format(LocalDateTime.now());
  32. // log.info("心跳检测 -> platformWebSocket服务连接异常,时间:{},尝试重新连接---", now);
  33. platformWebsocketClient.connect(platformWebSocketUrl, null);
  34. }
  35. } catch (Exception e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. /**
  40. * 本地WebSocket连接心跳检测,重连机制,每30秒触发一次
  41. * 注意 @Async 注解,要使用异步线程的方式来执行心跳检测,
  42. * 避免任务线程被其他任务占用
  43. */
  44. @Async
  45. @Scheduled(cron = "0/30 * * * * ?")
  46. public void LocalWebSocket() {
  47. try {
  48. int isConnect = LocalWebsocketClient.isConnect;
  49. // log.info("心跳检测 -> LocalWebSocket: {}-{}", isConnect, ((isConnect == 1) ? "连接中" : "未连接"));
  50. if (1 != LocalWebsocketClient.isConnect) {
  51. String now = DATE_TIME_FORMATTER.format(LocalDateTime.now());
  52. // log.info("心跳检测 -> LocalWebSocket服务连接异常,时间:{},尝试重新连接---", now);
  53. localWebsocketClient.connect(localWebSocketUrl, null);
  54. }
  55. } catch (Exception e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. }
MessageService:消息转发,从平台客户端转发至本地客户端,本地客户端需要连接websocket服务
  1. import com.alibaba.fastjson.JSONObject;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.java_websocket.client.WebSocketClient;
  4. import org.springframework.stereotype.Service;
  5. import javax.annotation.Resource;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. @Service
  9. @Slf4j
  10. public class MessageService {
  11. /**
  12. * 处理消息
  13. * 这里可根据具体业务来实现,比如解析入库、再次分发发送MQ等
  14. * @param message
  15. */
  16. @Resource
  17. private LocalWebsocketClient localWebsocketClient;
  18. public void handleMessage(String message) {
  19. JSONObject jsonObject = JSONObject.parseObject(message);
  20. send(jsonObject.toJSONString());
  21. }
  22. public void send(String message){
  23. WebSocketClient client = localWebsocketClient.getWsClient();
  24. client.send(message);
  25. }
  26. }
SubWebSocket:前端推送服务
  1. import com.alibaba.fastjson.JSONArray;
  2. import com.alibaba.fastjson.JSONObject;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.stereotype.Component;
  5. import javax.websocket.OnClose;
  6. import javax.websocket.OnMessage;
  7. import javax.websocket.OnOpen;
  8. import javax.websocket.Session;
  9. import javax.websocket.server.PathParam;
  10. import javax.websocket.server.ServerEndpoint;
  11. import java.util.*;
  12. import java.util.concurrent.CopyOnWriteArraySet;
  13. @Component
  14. @Slf4j
  15. @ServerEndpoint(value = "/ws-subscribe/{userId}")
  16. public class SubWebSocket {
  17. /**
  18. * 线程安全的无序的集合
  19. */
  20. private static final CopyOnWriteArraySet<Session> SESSIONS = new CopyOnWriteArraySet<>();
  21. /**
  22. * 存储在线连接数
  23. */
  24. private static final Map<String, Session> SESSION_POOL = new HashMap<>();
  25. @OnOpen
  26. public void onOpen(Session session, @PathParam(value = "userId") String userId) {
  27. try {
  28. SESSIONS.add(session);
  29. SESSION_POOL.put(userId, session);
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. @OnClose
  35. public void onClose(Session session) {
  36. try {
  37. SESSIONS.remove(session);
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. @OnMessage
  43. public void onMessage(String message) {
  44. JSONObject json = JSONObject.parseObject(message);
  45. JSONArray subUsers = json.getJSONArray("subUser");
  46. json.remove("subUser");
  47. sendMoreMessage(subUsers,json.toJSONString());
  48. }
  49. /**
  50. * 此为广播消息
  51. *
  52. * @param message 消息
  53. */
  54. public void sendAllMessage(String message) {
  55. for (Session session : SESSIONS) {
  56. try {
  57. if(session.isOpen() && session != SESSION_POOL.get("jeecgboot")){
  58. synchronized (session) {
  59. session.getBasicRemote().sendText(message);
  60. }
  61. }
  62. } catch (Exception e) {
  63. e.printStackTrace();
  64. }
  65. }
  66. }
  67. /**
  68. * 此为单点消息
  69. *
  70. * @param userId 用户编号
  71. * @param message 消息
  72. */
  73. public void sendOneMessage(String userId, String message) {
  74. Session session = SESSION_POOL.get(userId);
  75. if (session != null && session.isOpen()) {
  76. try {
  77. synchronized (session) {
  78. session.getAsyncRemote().sendText(message);
  79. }
  80. } catch (Exception e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. }
  85. /**
  86. * 此为单点消息(多人)
  87. *
  88. * @param userIds 用户编号列表
  89. * @param message 消息
  90. */
  91. public void sendMoreMessage(JSONArray userIds, String message) {
  92. if(userIds.size() != 0) {
  93. for (Object userId : userIds) {
  94. Session session = SESSION_POOL.get(userId.toString());
  95. if (session != null && session.isOpen()) {
  96. try {
  97. synchronized(session) {
  98. session.getAsyncRemote().sendText(message);
  99. }
  100. } catch (Exception e) {
  101. e.printStackTrace();
  102. }
  103. }
  104. }
  105. }
  106. }
  107. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/164630
推荐阅读
相关标签
  

闽ICP备14008679号