赞
踩
一.业务分析:现在的业务是我们需要服务端检测客户端的网络连接状况是否连接网络,然后去同步客户端和服务端的数据库.我们需要用一种通讯协议来长连接检测网络的上线下线.所以选择用websocket
二.http和websocket的区别:http一直发请求建立长连接会造成数据中的丢包和服务器压力大的问题
然而websocket则不会出现这些问题,我们可以用websocket来做一些实时性比较强的例如股票分析,高可用聊天室等等
三.下面就来实现一下怎么保证长连接的
1.导入依赖
<dependency>
<groupId>org.springframeworkboot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
创建节点配置类
package cn.abtu; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
2.然后是WebSocket配置类,WebSocket.java:(这里面包含这单独发送消息,群发,监听上下线等等方法 其中我们会用到里面的sendMessage方法去给前端发送消息证明网络联通
package cn.abtu.websocket; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import cn.abtu.websocket.impl.PingImpl; import cn.abtu.websocket.impl.ToUserImpl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @ServerEndpoint("/websocket/{userId}") @Component @Slf4j public class WebSocketServer { /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/ private static int onlineCount = 0; /**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/ private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/ private Session session; /**接收userId*/ private String userId=""; public static Map<String, DoMessage> caseMap1 = new HashMap<>(); // 更多消息处理实现类,请初始化到这里 static { caseMap1.put("ping", new PingImpl()); // 心跳检测 caseMap1.put("toUser", new ToUserImpl()); // 一对一发送消息(测试) } /** * 连接建立成功调用的方法*/ @OnOpen public void onOpen(Session session,@PathParam("userId") String userId) { this.session = session; this.userId=userId; if(webSocketMap.containsKey(userId)){ webSocketMap.remove(userId); webSocketMap.put(userId,this); //加入set中 }else{ webSocketMap.put(userId,this); //加入set中 addOnlineCount(); //在线数加1 } log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount()); try { sendMessage("连接成功"); } catch (IOException e) { log.error("用户:"+userId+",网络异常!!!!!!"); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { if(webSocketMap.containsKey(userId)){ webSocketMap.remove(userId); //从set中删除 subOnlineCount(); } log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息*/ @OnMessage public void onMessage(String message, Session session) { log.info("用户消息:"+userId+",报文:"+message); //可以群发消息 //消息保存到数据库、redis if(!StringUtils.isEmpty(message)){ try { //解析发送的报文 JSONObject jsonObject = JSON.parseObject(message); //追加发送人(防止串改) WebSocketServer userServer = webSocketMap.get(this.userId); jsonObject.put("fromUserId",this.userId); String messageType = jsonObject.getString("msgType"); if(caseMap1.get(messageType) != null){ caseMap1.get(messageType).DoMsg(userServer,this.userId, webSocketMap, jsonObject.getString("msgData")); } }catch (Exception e){ e.printStackTrace(); } } } /** * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用户错误:"+this.userId+",原因:"+error.getMessage()); error.printStackTrace(); } /** * 实现服务器主动推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 发送自定义消息 * */ public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException { log.info("发送消息到:"+userId+",报文:"+message); if(!StringUtils.isEmpty(userId)&&webSocketMap.containsKey(userId)){ webSocketMap.get(userId).sendMessage(message); }else{ log.error("用户"+userId+",不在线!"); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
发送消息的接口domagges
package cn.abtu.websocket; import com.alibaba.fastjson2.JSONObject; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; public interface DoMessage { // 处理收到的消息 public void DoMsg(WebSocketServer webSocketServer, String userId, ConcurrentHashMap<String,WebSocketServer> webSocketMap, String message) throws IOException; }
3.客服端连接网络一直发送类似于心跳检测的请求(一般是5秒一次),我们发送一个“ping”给前端
前端接受之后回传给我们消息
package cn.abtu.websocket.impl; import cn.abtu.websocket.DoMessage; import cn.abtu.websocket.WebSocketServer; import cn.abtu.websocket.dto.Message; import com.alibaba.fastjson2.JSONObject; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; /** * 心跳检测 */ public class PingImpl implements DoMessage { @Override public void DoMsg(WebSocketServer webSocketServer, String userId, ConcurrentHashMap<String,WebSocketServer> webSocketMap, String message) throws IOException { // 收到消息后解析成对象 // Test t = JSONObject.parseObject(message, Test.class); // System.err.println(t.getKeywords()); Message msg = new Message(); msg.setMsgData("pong"); msg.setMsgType("ping"); webSocketServer.sendMessage(JSONObject.toJSONString(msg)); } }
4.前端检测回传消息,如果检测不到证明网络连接还是有问题,重新发消息给前端接收方未在线!
package cn.abtu.websocket.impl; import cn.abtu.websocket.DoMessage; import cn.abtu.websocket.WebSocketServer; import cn.abtu.websocket.dto.Message; import cn.abtu.websocket.dto.ToUser; import com.alibaba.fastjson2.JSONObject; import org.springframework.util.StringUtils; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; /** * 1v1 发送消息 */ public class ToUserImpl implements DoMessage { @Override public void DoMsg(WebSocketServer webSocketServer, String userId, ConcurrentHashMap<String,WebSocketServer> webSocketMap, String message) throws IOException { System.err.println(message); ToUser t = JSONObject.parseObject(message, ToUser.class); if(!StringUtils.isEmpty(t.getToUserId())&&webSocketMap.containsKey(t.getToUserId())){ webSocketServer = webSocketMap.get(t.getToUserId()); }else{ Message msg = new Message(); msg.setMsgData("接收方未在线!"); msg.setMsgType("error"); webSocketServer.sendMessage(JSONObject.toJSONString(msg)); return; } // System.err.println(t.getKeywords()); t.setFromUserId(userId); Message msg = new Message(); msg.setMsgData(t); msg.setMsgType("toUser"); webSocketServer.sendMessage(JSONObject.toJSONString(msg)); } }
当我们重试机制达到两次,都检测到未在线是,证明客户端已经断开连接了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。