<!-- 实际使用包 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!--websocket作为客户端--> <dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> <version>1.3.5</version> </dependency>
package com.xie.websocket; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; /** * @Description * @Date 2022-04-01 11:28 * @Author xie */ @Slf4j @Component @ServerEndpoint(value = "/ws/data/{param}") public class WebSocketServer { //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>(); //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; // 业务service private TestService testService = (TestService) ApplicationContextHandle.getBean(TestService.class); /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("param") String param) { this.session = session; //加入set中 webSocketSet.add(this); try { while (true) { // 处理数据并发送 List<Map<String, Object>> result = testService.findDataByParam(param); sendMessage(JSONObject.toJSONString(result)); Thread.sleep(3000); } } catch (Exception e) { e.printStackTrace(); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); //从set中删除 } /** * 收到客户端消息后调用的方法 */ @OnMessage public void onMessage(String message, Session session) { log.info("收到客户端的消息:{}", message); } /** * 发生错误时调用 */ public void onError(Session session, Throwable error) { error.printStackTrace(); } /** * 发送消息 */ public void sendMessage(String message) throws IOException { this.session.getAsyncRemote().sendText(message); } /** * 群发自定义消息 */ public static void sendInfo(String message) throws IOException { for (WebSocketServer item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); continue; } } } }
package com.cdzs.websocket; import cn.hutool.core.util.RandomUtil; import com.cdzs.common.core.constant.SecurityConstants; import com.cdzs.common.core.exception.ServiceException; import com.cdzs.common.redis.service.CacheService; import com.cdzs.system.api.RemoteDeviceGrpcService; import com.cdzs.system.api.domain.device.CdzsDevicesEntity; import com.cdzs.system.api.domain.grpc.dto.RegisterFaceByAppointDeviceDTO; import com.cdzs.websocket.util.ByteArraySplitter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * type: register/verify(注册/人证核验) */ @Component @ServerEndpoint(value = "/person/{type}/{token}") @Slf4j public class CdzsPersonWebSocket { //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; private static RemoteDeviceGrpcService remoteDeviceGrpcService; //只能通过set方法/getBean()来注入 @Autowired public void setRemoteDeviceGrpcService(RemoteDeviceGrpcService remoteDeviceGrpcService) { CdzsPersonWebSocket.remoteDeviceGrpcService = remoteDeviceGrpcService; } private static CacheService cacheService; @Autowired public void setCacheService(CacheService cacheService) { CdzsPersonWebSocket.cacheService = cacheService; } //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static int onlineCount = 0; private static ConcurrentHashMap<String, CdzsPersonWebSocket> socketServersMap = new ConcurrentHashMap<>(); private String key = ""; private final static String REGISTER = "register"; private final static String VERIFY = "verify"; /** * 建立连接 * * @param session * @param type type为register/verify */ @OnOpen public void onOpen(Session session, @PathParam("type") String type, @PathParam("token") String token) { this.session = session; //key = type + ":" + token + ":" + sessionId key = type + ":" + token + ":" + session.getId(); log.info("onOpen->>key:{}", key); //判断当前是否已经连接过 boolean flag = socketServersMap.containsKey(key); if (flag) { socketServersMap.remove(key); socketServersMap.put(key, this); } else { socketServersMap.put(key, this); addOnlineCount(); //在线数加1 } //获取到token,查询该token下的设备 CdzsDevicesEntity device = cacheService.getCacheObject("choose_device:" + token); if (device == null){ throw new ServiceException("您登录时未选择注册平板"); } String deviceIp = device.getDeviceIp(); String deviceSn = device.getDeviceSn(); log.info("token->>deviceIp:{}", deviceIp); if (REGISTER.equals(type)) { //注册 RegisterFaceByAppointDeviceDTO registerFaceByAppointDeviceDTO = new RegisterFaceByAppointDeviceDTO(); registerFaceByAppointDeviceDTO.setDeviceIp(deviceIp); registerFaceByAppointDeviceDTO.setSessionId(token); registerFaceByAppointDeviceDTO.setIsVerify(false); registerFaceByAppointDeviceDTO.setParams(0); registerFaceByAppointDeviceDTO.setPersonId("1000" + RandomUtil.randomNumbers(4)); registerFaceByAppointDeviceDTO.setDeviceSn(deviceSn); remoteDeviceGrpcService.regFeaDataByAppointDevice(registerFaceByAppointDeviceDTO, SecurityConstants.INNER); log.info("register call success"); } else if (VERIFY.equals(type)) { //人证核验 remoteDeviceGrpcService.verifyPersonCard(deviceIp, token, SecurityConstants.INNER); log.info("verify call success"); } log.info("online number:{}", getOnlineCount()); } /** * 主要功能:收到客户端消息后调用的方法 * 方法名:onMessage * * @param message 客户端发送过来的消息 * @param session 返回类型:void * 时间:2019年8月14日 上午9:44:42 */ @OnMessage public void onMessage(String message, Session session) { log.info("来自客户端的消息->>message:{};sessionId:{}", message, session.getId()); } /** * 主要功能:关闭连接 * 方法名:onClose * 返回类型:void * 时间:2019年8月14日 上午9:44:58 */ @OnClose public void onClose() { log.info("onClose : {}" + key); if (socketServersMap.containsKey(key)) { socketServersMap.remove(key); subOnlineCount(); //在线数减1 } log.info("有一连接关闭!当前在线连接数为: {}", getOnlineCount()); } /** * 发生错误时调用 * 主要功能: * 方法名:onError * * @param error 返回类型:void * 时间:2019年8月14日 上午9:44:07 */ @OnError public void onError(Throwable error) { socketServersMap.remove(key); subOnlineCount(); log.error("webSocket连接发生错误->>errorMessage:{}", error.getMessage()); } /** * 人脸注册webSocket * * @param message 要发送给前端的数据 * @param sessionId token */ public synchronized void reproductiveSendToRegister(String message, String sessionId) { synchronized (this.getClass()) { for (Map.Entry<String, CdzsPersonWebSocket> stringMyWebSocketEntry : socketServersMap.entrySet()) { try { String key = stringMyWebSocketEntry.getKey(); CdzsPersonWebSocket value = stringMyWebSocketEntry.getValue(); if (key.contains(sessionId) && key.contains(REGISTER)) { log.info("reproductiveSendToRegister:推送的消息为:" + key); List<byte[]> result = ByteArraySplitter.split(message, 64 * 1024); for (byte[] bytes : result) { String s = new String(bytes); value.session.getBasicRemote().sendText(s); } // 推送结束符 value.session.getBasicRemote().sendText("########"); } } catch (IOException e) { e.printStackTrace(); } } } } /** * 人脸注册webSocket * * @param message 要发送给前端的数据 * @param sessionId token */ public synchronized void reproductiveSendToVerify(String message, String sessionId) { synchronized (this.getClass()) { for (Map.Entry<String, CdzsPersonWebSocket> stringMyWebSocketEntry : socketServersMap.entrySet()) { try { String key = stringMyWebSocketEntry.getKey(); CdzsPersonWebSocket value = stringMyWebSocketEntry.getValue(); if (key.contains(sessionId) && key.contains(VERIFY)) { log.info("reproductiveSendToVerify:推送的消息为:" + key); List<byte[]> result = ByteArraySplitter.split(message, 64 * 1024); for (byte[] bytes : result) { String s = new String(bytes); value.session.getBasicRemote().sendText(s); } // 推送结束符 value.session.getBasicRemote().sendText("########"); } } catch (IOException e) { e.printStackTrace(); } } } } /** * 获取在线数 * * @return */ public static synchronized int getOnlineCount() { return onlineCount; } /** * 在线数+1 */ public static synchronized void addOnlineCount() { CdzsPersonWebSocket.onlineCount++; } /** * 在线数-1 */ public static synchronized void subOnlineCount() { CdzsPersonWebSocket.onlineCount--; } }
package com.cdzs.websocket.util; import cn.hutool.core.util.ArrayUtil; import com.cdzs.common.core.utils.StringUtils; import com.google.common.collect.Lists; import java.util.List; public class ByteArraySplitter { /** * 对String分片转换为List<byte[]> * @param source 字符串 * @param size 分片的长度 单位字节 * @return */ public static List<byte[]> split(String source, int size) { // 存放最终结果 List<byte[]> result = Lists.newArrayList(); if (StringUtils.isEmpty(source)) { return null; } byte[] sourceBytes = source.getBytes(); if (size > sourceBytes.length) { result.add(sourceBytes); return result; } // 开始进行split int startIndex = 0; int endIndex = sourceBytes.length - 1; boolean isRunning = true; while (isRunning) { if ((endIndex + 1) - startIndex > size) { result.add(ArrayUtil.sub(sourceBytes, startIndex, startIndex + size)); startIndex += size; } else { result.add(ArrayUtil.sub(sourceBytes, startIndex, endIndex + 1)); isRunning = false; } } return result; } }
package cn.weblab.plugin.socket; import com.intellij.execution.ui.ConsoleView; import com.intellij.execution.ui.ConsoleViewContentType; import com.intellij.openapi.project.Project; import cn.weblab.plugin.util.ConsoleUtil; import lombok.extern.slf4j.Slf4j; import org.java_websocket.WebSocket; import org.java_websocket.client.WebSocketClient; import org.java_websocket.framing.Framedata; import org.java_websocket.handshake.ServerHandshake; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; public class MyLogWebSocketClient extends WebSocketClient { Logger log=LoggerFactory.getLogger(MyLogWebSocketClient.class); private Project project; private static List<MyLogWebSocketClient> list = new ArrayList<>(); /** * construct a instance * * @param serverUri need to be connected */ public MyLogWebSocketClient(String serverUri, Project project) throws URISyntaxException { super(new URI(serverUri)); this.project = project; this.setConnectionLostTimeout(0); if (list.isEmpty()) { return; } for (MyLogWebSocketClient client : list) { client.close(); } list.clear(); list.add(this); } @Override public void onOpen(ServerHandshake serverHandshake) { log.info("在线日志socket连接成功"); ConsoleView logConsole = ConsoleUtil.getLogConsole(project); logConsole.print("连接建立成功\n", ConsoleViewContentType.ERROR_OUTPUT); } @Override public void onMessage(String s) { s = s.substring(8); // if (s.equals("---pong---")) { // return; // } ConsoleView logConsole = ConsoleUtil.getLogConsole(project); logConsole.print(s, ConsoleViewContentType.NORMAL_OUTPUT); } @Override public void onClose(int i, String s, boolean b) { log.info("在线日志socket断开"); ConsoleView logConsole = ConsoleUtil.getLogConsole(project); logConsole.print("在线日志socket断开,请重新连接\n", ConsoleViewContentType.ERROR_OUTPUT); } @Override public void onError(Exception e) { e.printStackTrace(); } // @Override // public void onWebsocketPong(WebSocket conn, Framedata f) { // try { // Thread.sleep(1000 * 10); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println("pong"); // this.onWebsocketPing(conn, f); // super.onWebsocketPong(conn, f); // } @Override public void onWebsocketPing(WebSocket conn, Framedata f) { try { Thread.sleep(1000 * 5); } catch (InterruptedException e) { e.printStackTrace(); } send("---pong---"); } @Deprecated public static void destroy() { if (list.isEmpty()) return; for (MyLogWebSocketClient client : list) { client.close(); } list.clear(); } public static void main(String[] args) throws URISyntaxException, InterruptedException { // MyWebSocketClient client = new MyWebSocketClient("ws://localhost:8080/websocket/A"); // boolean b = client.connectBlocking(); // System.out.println(b); // client.send("hello"); // client.send("hello"); // client.send("hello"); // client.send("hello"); // client.close(); } }
