赞
踩
WebSocketClientAbs
-
- import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
- import org.apache.http.ssl.SSLContexts;
- import org.java_websocket.client.WebSocketClient;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.core.io.DefaultResourceLoader;
- import org.springframework.core.io.ResourceLoader;
-
- import javax.net.ssl.SSLContext;
- import javax.net.ssl.SSLSocketFactory;
- import java.io.IOException;
- import java.io.InputStream;
- import java.security.KeyStore;
- import java.util.Map;
-
- public abstract class WebSocketClientAbs {
-
-
- @Value("${server.ssl.key-store}")
- private String cerPath;
-
- @Value("${server.ssl.key-store-password}")
- private String cerPwd;
-
- /**
- * 创建WebSocket客户端
- *
- * @param wsUri
- * @param httpHeaders
- * @return
- */
- public abstract WebSocketClient createWebSocketClient(String wsUri, Map<String, String> httpHeaders);
-
- /**
- * 客户端连接
- *
- * @param uri
- * @param httpHeaders
- * @return
- */
- public abstract WebSocketClient connect(String uri, Map<String, String> httpHeaders);
-
- /**
- * wss协议证书认证
- *
- * @param webSocketClient
- */
- public void createWebSocketClient(WebSocketClient webSocketClient) {
- try {
- KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
- keyStore.load(resourceLoader(cerPath), cerPwd.toCharArray());
- SSLContext sslContext = SSLContexts.custom()
- .loadTrustMaterial(keyStore, new TrustSelfSignedStrategy()).build();
- SSLSocketFactory sslFactory = sslContext.getSocketFactory();
- webSocketClient.setSocketFactory(sslFactory);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- /**
- * 读取文件信息
- *
- * @param fileFullPath
- * @return
- * @throws IOException
- */
- public InputStream resourceLoader(String fileFullPath) throws IOException {
- ResourceLoader resourceLoader = new DefaultResourceLoader();
- return resourceLoader.getResource(fileFullPath).getInputStream();
- }
-
- }

PlatformWebsocketClient:平台客户端
-
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.java_websocket.client.WebSocketClient;
- import org.java_websocket.handshake.ServerHandshake;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.net.URI;
- import java.net.URISyntaxException;
- import java.util.Map;
-
-
- @Component
- @Slf4j
- @Data
- public class PlatformWebsocketClient extends WebSocketClientAbs {
-
-
- @Resource
- private MessageService messageService;
- private WebSocketClient wsClient;
-
- // 消息类型
- private String type;
-
- // 0:链接断开或者异常;1:代表链接中;2:代表正在连接;
- public static int isConnect = 0;
-
- /**
- * 获取客户端连接实例
- *
- * @param wsUri
- * @param httpHeaders
- * @return
- */
- @Override
- public WebSocketClient createWebSocketClient(String wsUri, Map<String, String> httpHeaders) {
-
- try {
- //创建客户端连接对象
- return new WebSocketClient(new URI(wsUri), httpHeaders) {
-
- /**
- * 建立连接调用
- * @param serverHandshake
- */
- @Override
- public void onOpen(ServerHandshake serverHandshake) {
- isConnect = 1;
- }
-
- /**
- * 收到服务端消息调用
- * @param s
- */
- @Override
- public void onMessage(String s) {
- // log.info("WebsocketClient-> 收到服务端消息:{}", s);
- messageService.handleMessage(s);
- }
-
- /**
- * 断开连接调用
- * @param i
- * @param s
- * @param b
- */
- @Override
- public void onClose(int i, String s, boolean b) {
- isConnect = 0;
- }
-
- /**
- * 连接报错调用
- * @param e
- */
- @Override
- public void onError(Exception e) {
- if (null != wsClient) {
- wsClient.close();
- }
- isConnect = 0;
- }
- };
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
- return null;
- }
-
- /**
- * 连接websocket服务端
- * 注意 synchronized 关键字,保证多个请求同时连接时,
- * 只有一个连接在创建
- *
- * @param uri
- * @param httpHeaders
- * @return
- */
- @Override
- public synchronized WebSocketClient connect(String uri, Map<String, String> httpHeaders) {
- WebSocketClient oldWsClient = this.getWsClient();
- if (null != oldWsClient) {
- log.info("WebsocketClient -> 已存在连接,oldWsClient:{}-{}",
- oldWsClient.getReadyState(), oldWsClient.getReadyState().ordinal());
- if (1 == oldWsClient.getReadyState().ordinal()) {
- log.info("WebsocketClient -> 使用存在且已打开的连接");
- return oldWsClient;
- } else {
- log.info("WebsocketClient -> 注销存在且未打开的连接,并重新获取新的连接");
- oldWsClient.close();
- }
- }
-
- WebSocketClient newWsClient = createWebSocketClient(uri, httpHeaders);
- // 如果是 "wss" 协议,则进行证书认证,认证方法在父类中
- if (uri.startsWith("wss")) {
- createWebSocketClient(newWsClient);
- }
- if (null == newWsClient) {
- log.error("创建失败");
- }
- assert newWsClient != null;
- newWsClient.connect();
- // 设置连接状态为正在连接
- isConnect = 2;
- // 连接状态不再是0请求中,判断建立结果是不是1已建立
- long startTime = System.currentTimeMillis();
- while (1 != newWsClient.getReadyState().ordinal()) {
- // 避免网络波动,设置持续等待连接时间
- long endTime = System.currentTimeMillis();
- long waitTime = (endTime - startTime) / 1000;
- if (5L < waitTime) {
- // log.info("WebsocketClient -> 建立连接异常,请稍后再试");
- break;
- }
- }
- if (1 == newWsClient.getReadyState().ordinal()) {
- this.setWsClient(newWsClient);
- // newWsClient.send("WebsocketClient -> 服务端连接成功!");
- // log.info("WebsocketClient -> 服务端连接成功!");
- return newWsClient;
- }
- return null;
- }
-
- }

LocalWebsocketClient:本地客户端
-
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.java_websocket.client.WebSocketClient;
- import org.java_websocket.handshake.ServerHandshake;
- import org.springframework.stereotype.Component;
-
- import java.net.URI;
- import java.net.URISyntaxException;
- import java.util.Map;
-
-
- @Component
- @Slf4j
- @Data
- public class LocalWebsocketClient extends WebSocketClientAbs {
-
-
- private WebSocketClient wsClient;
-
-
- public static int isConnect = 0;
-
- /**
- * 获取客户端连接实例
- *
- * @param wsUri
- * @param httpHeaders
- * @return
- */
- @Override
- public WebSocketClient createWebSocketClient(String wsUri, Map<String, String> httpHeaders) {
-
- try {
- //创建客户端连接对象
- WebSocketClient client = new WebSocketClient(new URI(wsUri), httpHeaders) {
-
- /**
- * 建立连接调用
- * @param serverHandshake
- */
- @Override
- public void onOpen(ServerHandshake serverHandshake) {
- isConnect = 1;
- }
-
- /**
- * 收到服务端消息调用
- * @param s
- */
- @Override
- public void onMessage(String s) {
- log.info("WebsocketClient-> 收到服务端消息:{}", s);
- }
-
- /**
- * 断开连接调用
- * @param i
- * @param s
- * @param b
- */
- @Override
- public void onClose(int i, String s, boolean b) {
- isConnect = 0;
- }
-
- /**
- * 连接报错调用
- * @param e
- */
- @Override
- public void onError(Exception e) {
- if (null != wsClient) {
- wsClient.close();
- }
- isConnect = 0;
- }
- };
- return client;
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
- return null;
- }
-
- /**
- * 连接websocket服务端
- * 注意 synchronized 关键字,保证多个请求同时连接时,
- * 只有一个连接在创建
- *
- * @param uri
- * @param httpHeaders
- * @return
- */
- @Override
- public synchronized WebSocketClient connect(String uri, Map<String, String> httpHeaders) {
- WebSocketClient oldWsClient = this.getWsClient();
- if (null != oldWsClient) {
- log.info("WebsocketClient -> 已存在连接,oldWsClient:{}-{}",
- oldWsClient.getReadyState(), oldWsClient.getReadyState().ordinal());
- if (1 == oldWsClient.getReadyState().ordinal()) {
- log.info("WebsocketClient -> 使用存在且已打开的连接");
- return oldWsClient;
- } else {
- log.info("WebsocketClient -> 注销存在且未打开的连接,并重新获取新的连接");
- oldWsClient.close();
- }
- }
-
- WebSocketClient newWsClient = createWebSocketClient(uri, httpHeaders);
- // 如果是 "wss" 协议,则进行证书认证,认证方法在父类中
- if (uri.startsWith("wss")) {
- createWebSocketClient(newWsClient);
- }
- if (null == newWsClient) {
- log.error("创建失败");
- }
- assert newWsClient != null;
- newWsClient.connect();
- // 设置连接状态为正在连接
- isConnect = 2;
- // 连接状态不再是0请求中,判断建立结果是不是1已建立
- long startTime = System.currentTimeMillis();
- while (1 != newWsClient.getReadyState().ordinal()) {
- // 避免网络波动,设置持续等待连接时间
- long endTime = System.currentTimeMillis();
- long waitTime = (endTime - startTime) / 1000;
- if (5L < waitTime) {
- break;
- }
- }
- if (1 == newWsClient.getReadyState().ordinal()) {
- this.setWsClient(newWsClient);
- // newWsClient.send("WebsocketClient -> 服务端连接成功!");
- return newWsClient;
- }
- return null;
- }
-
- }

WebSocketHeartbeatTimer:心跳检测
-
- import javax.annotation.Resource;
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
-
-
- /**
- * 心跳重连机制
- */
- @Component
- @Slf4j
- public class WebSocketHeartbeatTimer {
-
- @Resource
- private PlatformWebsocketClient platformWebsocketClient;
-
- @Resource
- private LocalWebsocketClient localWebsocketClient;
-
-
- @Value("${websocket.url}")
- private String platformWebSocketUrl;
-
-
- @Value("${localWebsocket.url}")
- private String localWebSocketUrl;
-
- private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-
- /**
- * 平台WebSocket连接心跳检测,重连机制,每30秒触发一次
- * 注意 @Async 注解,要使用异步线程的方式来执行心跳检测,
- * 避免任务线程被其他任务占用
- */
- @Async
- @Scheduled(cron = "0/30 * * * * ?")
- public void platformWebSocket() {
- try {
- int isConnect = PlatformWebsocketClient.isConnect;
- // log.info("心跳检测 -> platformWebSocket: {}-{}", isConnect, ((isConnect == 1) ? "连接中" : "未连接"));
- if (1 != PlatformWebsocketClient.isConnect) {
- String now = DATE_TIME_FORMATTER.format(LocalDateTime.now());
- // log.info("心跳检测 -> platformWebSocket服务连接异常,时间:{},尝试重新连接---", now);
- platformWebsocketClient.connect(platformWebSocketUrl, null);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
-
- /**
- * 本地WebSocket连接心跳检测,重连机制,每30秒触发一次
- * 注意 @Async 注解,要使用异步线程的方式来执行心跳检测,
- * 避免任务线程被其他任务占用
- */
- @Async
- @Scheduled(cron = "0/30 * * * * ?")
- public void LocalWebSocket() {
- try {
- int isConnect = LocalWebsocketClient.isConnect;
- // log.info("心跳检测 -> LocalWebSocket: {}-{}", isConnect, ((isConnect == 1) ? "连接中" : "未连接"));
- if (1 != LocalWebsocketClient.isConnect) {
- String now = DATE_TIME_FORMATTER.format(LocalDateTime.now());
- // log.info("心跳检测 -> LocalWebSocket服务连接异常,时间:{},尝试重新连接---", now);
- localWebsocketClient.connect(localWebSocketUrl, null);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- }

MessageService:消息转发,从平台客户端转发至本地客户端,本地客户端需要连接websocket服务
-
- import com.alibaba.fastjson.JSONObject;
- import lombok.extern.slf4j.Slf4j;
- import org.java_websocket.client.WebSocketClient;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.Resource;
- import java.util.ArrayList;
- import java.util.List;
-
- @Service
- @Slf4j
- public class MessageService {
- /**
- * 处理消息
- * 这里可根据具体业务来实现,比如解析入库、再次分发发送MQ等
- * @param message
- */
-
- @Resource
- private LocalWebsocketClient localWebsocketClient;
-
- public void handleMessage(String message) {
- JSONObject jsonObject = JSONObject.parseObject(message);
- send(jsonObject.toJSONString());
-
- }
-
-
- public void send(String message){
- WebSocketClient client = localWebsocketClient.getWsClient();
- client.send(message);
- }
- }

SubWebSocket:前端推送服务
-
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
-
- import javax.websocket.OnClose;
- import javax.websocket.OnMessage;
- import javax.websocket.OnOpen;
- import javax.websocket.Session;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.util.*;
- import java.util.concurrent.CopyOnWriteArraySet;
-
- @Component
- @Slf4j
- @ServerEndpoint(value = "/ws-subscribe/{userId}")
- public class SubWebSocket {
- /**
- * 线程安全的无序的集合
- */
- private static final CopyOnWriteArraySet<Session> SESSIONS = new CopyOnWriteArraySet<>();
-
- /**
- * 存储在线连接数
- */
- private static final Map<String, Session> SESSION_POOL = new HashMap<>();
-
- @OnOpen
- public void onOpen(Session session, @PathParam(value = "userId") String userId) {
- try {
- SESSIONS.add(session);
- SESSION_POOL.put(userId, session);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @OnClose
- public void onClose(Session session) {
- try {
- SESSIONS.remove(session);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @OnMessage
- public void onMessage(String message) {
- JSONObject json = JSONObject.parseObject(message);
- JSONArray subUsers = json.getJSONArray("subUser");
- json.remove("subUser");
- sendMoreMessage(subUsers,json.toJSONString());
- }
-
- /**
- * 此为广播消息
- *
- * @param message 消息
- */
- public void sendAllMessage(String message) {
- for (Session session : SESSIONS) {
- try {
- if(session.isOpen() && session != SESSION_POOL.get("jeecgboot")){
- synchronized (session) {
- session.getBasicRemote().sendText(message);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * 此为单点消息
- *
- * @param userId 用户编号
- * @param message 消息
- */
- public void sendOneMessage(String userId, String message) {
- Session session = SESSION_POOL.get(userId);
- if (session != null && session.isOpen()) {
- try {
- synchronized (session) {
- session.getAsyncRemote().sendText(message);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * 此为单点消息(多人)
- *
- * @param userIds 用户编号列表
- * @param message 消息
- */
- public void sendMoreMessage(JSONArray userIds, String message) {
- if(userIds.size() != 0) {
- for (Object userId : userIds) {
- Session session = SESSION_POOL.get(userId.toString());
- if (session != null && session.isOpen()) {
- try {
- synchronized(session) {
- session.getAsyncRemote().sendText(message);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
-
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。