赞
踩
WebSocket 是一种支持双向通讯的网络通信协议。
实现过程:
@Configuration
public class WebSocketConfig {
// 自动注册使用了@ServerEndpoint**注解声明的Websocket endpoint
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
需求是通过WebSocket,建立长连接,并获取当前在线的人数。通过Websocket 不断发送消息,建立长连接,给Session续命。我是通过MAC地址,区分不同的设备,因为我的需求中需要一个账号能够登录多台机器。所以我通过MAC地址用于标识不同的设备信息。(若是一个账号只能登陆一次,采用用户ID)
1 . 添加配置
@ServerEndpoint(value = "/websocket/onlineAme/{Mac}")
2. 主要方法
@OnOpen
private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>(); @OnOpen public void onOpen(Session session, @PathParam(value = "Mac") String mac) throws IOException { log.info("【Ame websocket 链接成功】,Ame mac:"+ mac); session.setMaxIdleTimeout(sessionTimeout); // 获取客户端的Ip if(StringUtils.isBlank(mac)||ObjectUtil.isNull(mac)){ log.error("并未上传设备信息"); } setMap(session,mac); } private void setMap(Session session,String mac){ sessionMap.put(mac,session); log.warn("Ame MAC address:{},当前在线人数为:{}",mac,sessionMap.size()); }
@OnMessage
该方法是客户端与服务端进行通讯。
每次客户端与服务端建立通讯时,会给Session续命,延长Session的时常
@OnMessage public void onMessage(Session session, String msg) { session.setMaxIdleTimeout(sessionTimeout); if(StringUtils.isBlank(msg)){ return; } // 判断 MAC 地址 是否 是正在上线 String mac = getMACBySession(session); if(StringUtils.isBlank(mac)){ return; } // 将上传的msg转化为 AmeServicePack handleAmeMsg(mac,ame); } private String getMACBySession(Session session){ String mac = getUserIdBySession(session); if(ObjectUtil.isNull(mac)){ return null; } return mac; } private String getUserIdBySession(Session session){ for (String mac : sessionMap.keySet()) { /*session 本身是有一个id的,通过userid 找到Session 然后再通过 其对应 id,与 传入的Session 中的 session对比 */ if(sessionMap.get(mac).getId().equals(session.getId())){ return mac; } } return null; }
@OnClose
@OnClose
public void onClose(Session session,@PathParam(value = "Mac") String mac) {
removeMap(session);
log.info("【websocket退出成功】该设备退出:"+mac);
}
private void removeMap(Session session){
String mac = getUserIdBySession(session);
if(ObjectUtil.isNull(mac)){
return;
}
sessionMap.remove(mac);
//userMap.remove(userId);
removeAme(mac);
}
private void removeAme(String mac){
ameHashMap.remove(mac);
sendInfo("Ame:"+ameInfo.getAmeip()+"下线成功",mac);
}
@OnError
@OnError
public void onError(Session session,Throwable throwable) {
log.error("websocket: 发生了错误");
removeMap(session);
throwable.printStackTrace();
}
向某一个用户发送消息
/* 发送自定义消息 向某一个用户发送,消息*/ public static void sendInfo(String message,String toMac){ log.info("发送消息:{},内容是:{}",message,toMac); if(ObjectUtil.isNull(toMac) || StringUtils.isBlank(message)){ log.error("消息不完整"); return; } // 包含就发送 //System.out.println(sessionMap.containsKey(toUserId)); if(sessionMap.containsKey(toMac)){ try { sendMessage(sessionMap.get(toMac),message); }catch (Exception e){ log.error("发送给用户{}的消息出错",toMac); } } // 用户不在线 else { log.error("设备{}不在线",toMac); } } public static void sendMessage(Session session,String message) throws IOException { session.getBasicRemote().sendText(message); }
Websocket 中的request中并没有header 中并没有客户端的Ip地址,但是在SpringCloud中,是通过网关,路由转发。在网关中的请求的request中存在Ip地址,可以通过拦截器,获取网关的ip然后将request放到websocket的request中。
package com.mam.gateway.filter; import jdk.nashorn.internal.runtime.regexp.joni.Config; import org.springframework.cloud.gateway.filter.GatewayFilter; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; import java.net.InetSocketAddress; import java.util.Objects; /** * 获取 WebSocket 上传Session的 Ip信息 */ @Component public class SessionFilter extends AbstractGatewayFilterFactory<SessionFilter.Config> { public SessionFilter() { super(SessionFilter.Config.class); } @Override public String name() { return "SessionFilter"; } @Override public GatewayFilter apply(SessionFilter.Config config) { return new GatewayFilter() { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerWebExchange mutatedServerWebExchange = exchange.mutate().request(exchange.getRequest()).build(); return chain.filter(mutatedServerWebExchange); } }; } static class Config { private Integer order; public Integer getOrder() { return order; } public void setOrder(Integer order) { this.order = order; } } }
public class WebSocketConfigurator extends ServerEndpointConfig.Configurator{ private static final Logger log = LoggerFactory.getLogger(WebSocketConfigurator.class); @Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { Map<String, Object> attributes = sec.getUserProperties(); try{ String clientIp = IpUtils.getIpAddrByHandshakeRequest(request.getHeaders()); attributes.put("clientIp",clientIp); log.info("websocker拦截器X-Real_IP{}header{}",request.getHeaders().get("X-Real_IP"),request.getHeaders().toString()); }catch (Exception e){ e.printStackTrace(); } super.modifyHandshake(sec,request,response); } }
public static String getIpAddrByHandshakeRequest(Map<String, List<String>> map) { if (map == null) { return null; } String ip = null; // X-Forwarded-For:Squid 服务代理 String ipAddresses = Convert.toStr(map.get("X-Forwarded-For")); if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses)) { // Proxy-Client-IP:apache 服务代理 ipAddresses = Convert.toStr(map.get("Proxy-Client-IP")); }else { ipAddresses = ipAddresses.substring(1,ipAddresses.length()-1); } if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses)) { // WL-Proxy-Client-IP:weblogic 服务代理 ipAddresses = Convert.toStr(map.get("WL-Proxy-Client-IP")); } if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses)) { // HTTP_CLIENT_IP:有些代理服务器 ipAddresses = Convert.toStr(map.get("HTTP_CLIENT_IP")); } if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses)) { // X-Real-IP:nginx服务代理 ipAddresses = Convert.toStr(map.get("X-Real-IP")); } // 有些网络通过多层代理,那么获取到的ip就会有多个,一般都是通过逗号(,)分割开来,并且第一个ip为客户端的真实IP if (ipAddresses != null && ipAddresses.length() != 0) { ip = ipAddresses.split(",")[0]; } return ip.equals("0:0:0:0:0:0:0:1") ? "127.0.0.1" : ip; }
AmeServicePack ame = JSONObject.toJavaObject(JSONObject.parseObject(msg), AmeServicePack.class);
Map<String, Object> userProperties = session.getUserProperties();
String clientip = (String) userProperties.get("clientIp");
@Component @ServerEndpoint(value = "/websocket/onlineAme/{Mac}",configurator = WebSocketConfigurator.class) public class AmeLoginWebSocket { static Logger log = LoggerFactory.getLogger(AmeLoginWebSocket.class); /* 存储session */ private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>(); /* 存储 在线 ame服务 信息*/ private ConcurrentHashMap<String,AmeServicePack> ameHashMap = new ConcurrentHashMap<>(); /* 存储 Ip */ private static final long sessionTimeout = 600000; /** 链接成功后发送消息**/ @OnMessage public void onMessage(Session session, String msg) { session.setMaxIdleTimeout(sessionTimeout); log.info("【websocket 接收成功】内容为"+msg); if(StringUtils.isBlank(msg)){ return; } // 判断 MAC 地址 是否 是正在上线 String mac = getMACBySession(session); if(StringUtils.isBlank(mac)){ return; } AmeServicePack ame = JSONObject.toJavaObject(JSONObject.parseObject(msg), AmeServicePack.class); Map<String, Object> userProperties = session.getUserProperties(); String clientip = (String) userProperties.get("clientIp"); // 将上传的msg转化为 AmeServicePack handleAmeMsg(mac,ame); } private void handleAmeMsg(String mac, AmeServicePack ameInfo) { log.info("Ame:MAC{}Ip{}:",mac,ameInfo.getAmeip()); if(ameHashMap.containsKey(mac)){ log.info("该设备{}Ip{}已上线",mac,ameInfo.getAmeip()); sendInfo("该用户Ip"+ameInfo.getAmeip()+"已存在",mac); }else { ameHashMap.put(mac,ameInfo); } } private boolean updateOnline(AmeServicePack ameInfo){ AjaxResult isonline = SpringUtils.getBean(IAmePackService.class).update(ameInfo,SecurityConstants.INNER); if((Integer)isonline.get("code")== 200){ return true; }else { return false; } } /** * ameIp 为上线 但 任务表中仍有 正在运行的任务,并将其修改为 -2 * @param ameIp */ private void updateErrorTaskStatus(String ameIp){ SpringUtils.getBean(IAmePackService.class).updateErrorAMEStatus(ameIp,SecurityConstants.INNER); } private void handlePCMsg(LoginUser loginUser, String msg) { log.info("系统用户:{},消息{}:",loginUser.getUsername(),msg); } private String getMACBySession(Session session){ String mac = getUserIdBySession(session); if(ObjectUtil.isNull(mac)){ return null; } return mac; } /* *成功建立连接后调用 * @param [session, username] * @return void */ @OnOpen public void onOpen(Session session, @PathParam(value = "Mac") String mac) throws IOException { log.info("【Ame websocket 链接成功】,Ame mac:"+ mac); session.setMaxIdleTimeout(sessionTimeout); // 获取客户端的Ip if(StringUtils.isBlank(mac)||ObjectUtil.isNull(mac)){ log.error("并未上传设备信息"); } setMap(session,mac); } private void setMap(Session session,String mac){ sessionMap.put(mac,session); log.warn("Ame MAC address:{},当前在线人数为:{}",mac,sessionMap.size()); } /* *关闭连接时调用 * @param [userId] * @return void */ @OnClose public void onClose(Session session,@PathParam(value = "Mac") String mac) { removeMap(session); log.info("【websocket退出成功】该设备退出:"+mac); } private void removeMap(Session session){ String mac = getUserIdBySession(session); if(ObjectUtil.isNull(mac)){ return; } sessionMap.remove(mac); //userMap.remove(userId); removeAme(mac); } private String getUserIdBySession(Session session){ for (String mac : sessionMap.keySet()) { /*session 本身是有一个id的,通过userid 找到Session 然后再通过 其对应 id,与 传入的Session 中的 session对比 */ if(sessionMap.get(mac).getId().equals(session.getId())){ return mac; } } return null; } private void removeAme(String mac){ ameHashMap.remove(mac); log.info("{}:下线成功",ameInfo.getAmeip()); sendInfo("Ame:"+ameInfo.getAmeip()+"下线成功",mac); } /* *发生错误时调用 * @param [session, throwable] * @return void */ @OnError public void onError(Session session,Throwable throwable) { log.error("websocket: 发生了错误"); removeMap(session); throwable.printStackTrace(); } /* 发送自定义消息 向某一个用户发送,消息 */ public static void sendInfo(String message,String toMac){ log.info("发送消息:{},内容是:{}",message,toMac); if(ObjectUtil.isNull(toMac) || StringUtils.isBlank(message)){ log.error("消息不完整"); return; } // 包含就发送 //System.out.println(sessionMap.containsKey(toUserId)); if(sessionMap.containsKey(toMac)){ try { sendMessage(sessionMap.get(toMac),message); }catch (Exception e){ log.error("发送给用户{}的消息出错",toMac); } } // 用户不在线 else { log.error("设备{}不在线",toMac); } } public static void sendMessage(Session session,String message) throws IOException { session.getBasicRemote().sendText(message); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。