赞
踩
是HTML5的一种新的协议,WebSocket是真正实现了全双工通信的服务器向客户端的互联网技术,是单个TCP连接上进行全双工通信的协议。
WebSocket拥有完整的应用层协议,包含一套标准的API
Socket是一组接口,是应用层与TCP/IP协议通信的中间软件抽象层
http是短链接,请求之后会关闭连接。
WebSocket是长连接,只需要通过一次请求初始化连接,然后所有的请求和响应都是通过这个TCP连接进行通信。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
负责在握手之前,处理客户端的URL请求
import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor; import java.util.Map; @Slf4j public class MyWebSocketInterceptor extends HttpSessionHandshakeInterceptor { private final Logger logger = LoggerFactory.getLogger(getClass()); /** * 握手之前,做参数处理 * @param request the current request * @param response the current response * @param wsHandler the target WebSocket handler * @param attributes the attributes from the HTTP handshake to associate with the WebSocket * session; the provided attributes are copied, the original map is not used. * @return * @throws Exception */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { logger.info("[MyWebSocketInterceptor#BeforeHandshake] Request from " + request.getRemoteAddress().getHostString()); if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request; String id = serverHttpRequest.getServletRequest().getParameter("id"); attributes.put("id", id); } return super.beforeHandshake(request, response, wsHandler, attributes); } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { logger.info("[MyWebSocketInterceptor#afterHandshake] Request from " + request.getRemoteAddress().getHostString()); } }
负责websocket的消息处理、关闭等操作
import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class MyWebsocketHandler extends AbstractWebSocketHandler { public static final Map<String, WebSocketBean> webSocketBeanMap; /** * 仅用用于标识客户端编号 */ private static final AtomicInteger clientIdMaker; static { webSocketBeanMap = new ConcurrentHashMap<>(); clientIdMaker = new AtomicInteger(0); } @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { // 当WebSocket连接正式建立后,将该Session加入到Map中进行管理 Map<String, Object> attributes = session.getAttributes(); WebSocketBean webSocketBean = new WebSocketBean(); webSocketBean.setWebSocketSession(session); webSocketBean.setClientId(clientIdMaker.getAndIncrement()); webSocketBean.setId(attributes.get("id").toString()); webSocketBeanMap.put(session.getId(), webSocketBean); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { //当连接关闭后,从Map中移除session实例 webSocketBeanMap.remove(session.getId()); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { //传输过程中出现了错误 if (session.isOpen()) { session.close(); } webSocketBeanMap.remove(session.getId()); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { //处理接收到的消息 log.info("Received message from client[ID:" + webSocketBeanMap.get(session.getId()).getClientId() + "]; Content is [" + message.getPayload() + "]."); TextMessage textMessage = new TextMessage("pong"); session.sendMessage(textMessage); } }
负责接受参数,可要可不要
import lombok.Data;
import org.springframework.web.socket.WebSocketSession;
@Data
public class WebSocketBean {
private WebSocketSession webSocketSession;
private int clientId;
private String Id;
}
负责开启websocket、加载处理器、拦截器
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration @EnableWebSocket public class WebSocketConfiguration implements WebSocketConfigurer { @Bean public MyWebSocketInterceptor myWebSocketInterceptor(){ return new MyWebSocketInterceptor(); } @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } /** * 注册handler 不同的请求,前往不同的handler * 添加拦截器,做握手前的数据处理 * @param registry */ @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(new MyWebsocketHandler(),"/websocket").addInterceptors(myWebSocketInterceptor()); } }
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.1</version>
</dependency>
save.path=C:\\Users\\LZKJ\\Desktop\\1
monitor.websocket.address= ws://127.0.0.1:9090/websocket?id=1;ws://127.0.0.1:9091/websocket?id=2;ws://127.0.0.1:9092/websocket?id=3
monitor.name=WebScoket1;WebScoket2;WebScoket3
monitor.interval=5
import cn.hutool.core.date.DateUtil; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.enums.ReadyState; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @Slf4j public class HardWareMonitorWebSocketClient extends WebSocketClient { private List<Map<String,String>> logList; private final String monitorName; public HardWareMonitorWebSocketClient(URI serverUri, String monitorName) { super(serverUri); this.monitorName = monitorName; logList = new CopyOnWriteArrayList<>(); // 初始化建立连接 this.connect(); } @Override public void onOpen(ServerHandshake serverHandshake) { //log.info("{} websocket开启中", this.uri.toString()); } @Override public void onMessage(String s) { logList.add(buildLogMap("服务正常")); log.info("{} 发送心跳信息成功", this.uri.toString()); } @Override public void onClose(int i, String s, boolean b) { // log.info("{} 服务端连接关闭,重连中。。。。。", this.uri.toString()); // logList.add(buildLogMap("硬件宕机,重连中......")); try { if (this.getReadyState() != ReadyState.OPEN) { if (this.getReadyState() == ReadyState.NOT_YET_CONNECTED) { if (this.isClosed()) { this.reconnect(); } else { this.connect(); } } else if (this.getReadyState() == ReadyState.CLOSED) { this.reconnect(); } } } catch (Exception e) { // log.error("重连异常",e); } } @Override public void onError(Exception e) { log.info("{} 服务宕机:", this.uri.toString()); logList.add(buildLogMap("服务宕机")); } private Map<String,String> buildLogMap(String msg){ Map<String,String> map = new HashMap<>(); map.put("url",this.uri.toString()); map.put("monitorName",monitorName); map.put("monitorTime", DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss")); map.put("msg",msg); return map; } public List<Map<String, String>> getLogList(){ return logList; } public void clearLogList(List<Map<String, String>> removeList){ this.logList.removeAll(removeList); } }
启动、关闭心跳检测
import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @Component public class Monitor implements CommandLineRunner { @Value("${monitor.interval}") private int monitorInterval; @Value("${monitor.websocket.address}") private String monitorWebSocketAddress; @Value("${monitor.name}") private String monitorName; @Value("${save.path}") private String savePath; @Override public void run(String... args) throws Exception { new MonitorUtil().start(monitorInterval,monitorWebSocketAddress,monitorName,savePath); } }
定时心跳检测、导出CSV文件
import cn.hutool.core.date.DateUtil; import cn.hutool.core.text.csv.CsvWriter; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.io.File; import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; @Slf4j @Component public class MonitorUtil{ private static boolean canEveryDayExportCSV = false; private static Map<String, monitorWebSocketClient> monitorWebSocketMap; private ScheduledFuture<?> monitorAliveTask = null; private static int monitorInterval; private static String monitorWebSocketAddress; private static String monitorName; private static String savePath; // 创建任务队列 10 为线程数量 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); public void start(int monitorInterval, String monitorWebSocketAddress, String monitorName, String savePath) { monitorInterval = monitorInterval; monitorWebSocketAddress = monitorWebSocketAddress; monitorName = monitorName; savePath = savePath; // 监控硬件 websocket if (StringUtils.hasText(monitorWebSocketAddress) && StringUtils.hasText(monitorName)) { // 初始化监控map monitorWebSocketMap = new ConcurrentHashMap<>(); String[] monitorWebSocketAddressArray = monitorWebSocketAddress.split(";"); String[] monitorNameArray = monitorName.split(";"); for (int i = 0; i < monitorWebSocketAddressArray.length; i++) { // 初始化监控map,key为硬件名称,value为硬件socket连接对象 if(StringUtils.hasText(monitorNameArray[i])){ monitorWebSocketMap.put(monitorNameArray[i], new monitorWebSocketClient(URI.create(monitorWebSocketAddressArray[i]), monitorNameArray[i])); } } log.info("监控数据:{}", monitorWebSocketMap); // 执行任务 立即执行,间隔时间由配置monitor.interval决定 monitorAliveTask = scheduledExecutorService.scheduleWithFixedDelay(monitorAliveRunnable(), 0, monitorInterval, TimeUnit.SECONDS); // 每天24点导出数据开关 canEveryDayExportCSV = true; } } public String stop(boolean needSave) { if (!needSave){ return "not need save"; } // 暂停心跳检测任务 monitorAliveTask.cancel(true); // 关闭每天00:10:00 导出CSV 定时任务 canEveryDayExportCSV = false; // 导出csv文件 exportCSV(true,null,true); return ""; } /** * 导出监测记录到CSV文件 * @param exportAll true 导出全部,false 导出exportDate指定的日期 * @param exportDate 指定导出数据的日期 * @param closeWebSocket true 关闭webSocket连接,false不关闭 */ private void exportCSV(boolean exportAll,Date exportDate,boolean closeWebSocket){ // 导出全部,日期就写今天 if(exportAll){ exportDate = new Date(); } for (Map.Entry<String, monitorWebSocketClient> entry : monitorWebSocketMap.entrySet()) { // 文件名称 年月日+硬件名称+hardwareMonitor.csv String csvFileName = DateUtil.format(exportDate, "yyyyMMdd") + "-" + entry.getKey() + "-" + "-monitor.csv"; String filePath = savePath + File.separator + csvFileName; log.info("导出的文件地址:{}",filePath); File file = new File(filePath); // 文件存在,就追加数据,不存在,使用覆盖数据 boolean fileExists = file.exists(); try (CsvWriter csvWriter = new CsvWriter(file, StandardCharsets.UTF_8, fileExists)) { if (!fileExists) { csvWriter.write(new String[]{"监测地址", "检测名称", "监测时间", "监测信息"}); } Date finalExportDate = exportDate; // 获取需要导出的数据 exportAll为true导出全部,false,导出日期和exportDate相同的数据 List<Map<String, String>> logWriteCSVList = entry.getValue().getLogList().stream().filter(logMap -> exportAll || DateUtil.isSameDay(finalExportDate, DateUtil.parse(logMap.get("monitorTime")))).collect(Collectors.toList()); for (Map<String, String> logMap : logWriteCSVList) { csvWriter.write(new String[]{logMap.get("url"), logMap.get("monitorName"), logMap.get("monitorTime"), logMap.get("msg")}); } // 清空数据-将写入csv文件的数据删除 entry.getValue().clearLogList(logWriteCSVList); if(closeWebSocket){ // 关闭socket连接 entry.getValue().close(); } } catch (Exception e) { log.error("导出CSV文件异常:", e); } } } /** * 每天00:10:00导出前一天数据 */ @Scheduled(cron = "0 10 0 * * ?") private void exportCSVTask(){ if(canEveryDayExportCSV){ try { Date date = DateUtil.offsetDay(new Date(),-1); log.info("定时任务导出监控数据:{}",DateUtil.format(date,"yyyy-MM-dd HH:mm:ss")); // 导出前一天的数据 exportCSV(false,date,false); } catch (Exception e) { log.error("每天定时导出数据失败:",e); } } } /** * 定时任务-发送websocket心跳 */ private Runnable monitorAliveRunnable(){ return ()->{ for (Map.Entry<String, monitorWebSocketClient> entry : monitorWebSocketMap.entrySet()) { try { entry.getValue().send("ping"); } catch (Exception e) { // 发送不了消息,连接异常,调用重连方法 entry.getValue().onClose(0, null, true); } } }; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。