赞
踩
项目上想通过websocket做好友的上线下线通知功能,用户上线时客户端websocket连接服务端,调用服务端onOpen()方法,服务端通知所有好友当前用户上线;用户退出时客户端websocket断开连接,调用服务端onClose()方法,服务端通知所有好友当前用户离线。
这样做会有一个很大的问题,如果客户端是关闭流量、关闭WIFI断网而不是正常退出,服务端就不会收到客户端的断连请求,因此服务端并不会触发onClose()方法,导致其好友无法收到当前用户的离线信息。
经过网上大量资料的查找,发现绝大多数网友都采用心跳机制解决。
客户端定时向服务端发送空消息(ping),服务端启动心跳检测,超过一定时间范围没有新的消息进来就默认为客户端已断线,服务端主动执行close()方法断开连接
Netty是一个非常强大的NIO通讯框架,支持多种通讯协议,并且在网络通讯领域有许多成熟的解决方案。
因此决定采用Netty来实现websocket服务器和心跳监听机制,不必再去重复造轮子
netty启动类
public class WSServer { public void start(int port){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) //设置主从线程组 .channel(NioServerSocketChannel.class) //设置nio双向通道 .childHandler(new WSServerInitializer()); //子处理器,用于处理workerGroup //用于启动server,同时启动方式为同步 ChannelFuture channelFuture = bootstrap.bind(port).sync(); //监听关闭的channel,设置同步方式 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { new WSServer().start(8888); } }
ChannelInitializer类,负责添加各种handler,注意添加顺序
public class WSServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { //获取流水线 ChannelPipeline pipeline = ch.pipeline(); //websocket基于http协议,所以要有http编解码器 pipeline.addLast(new HttpServerCodec()); //对写大数据的支持 pipeline.addLast(new ChunkedWriteHandler()); //对httpMessage进行整合,聚合成FullHttpRequest或FullHttpResponse pipeline.addLast(new HttpObjectAggregator(1024 * 64)); //心跳检测,读超时时间设置为30s,0表示不监控 ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)); //心跳超时处理事件 ch.pipeline().addLast(new ServerHeartBeat()); //自定义handler pipeline.addLast(new WSHandler()); //websocket指定给客户端连接访问的路由:/ws pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); } }
心跳超时处理器
public class ServerHeartBeat extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) {//超时事件 System.out.println("心跳检测超时"); IdleStateEvent idleEvent = (IdleStateEvent) evt; if (idleEvent.state() == IdleState.READER_IDLE) {//读 ctx.channel().close(); //关闭通道连接 } else if (idleEvent.state() == IdleState.WRITER_IDLE) {//写 } else if (idleEvent.state() == IdleState.ALL_IDLE) {//全部 } } super.userEventTriggered(ctx, evt); } }
好友上下线通知处理器
public class WSHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { //key为channelId value为uid 存储在Map中 private static ConcurrentHashMap<String,String> uidMap = new ConcurrentHashMap<>(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest){ //拦截请求地址,获取地址上的uid值,并存入Map集合中 FullHttpRequest fullHttpRequest = (FullHttpRequest) msg; String uri = fullHttpRequest.uri(); String uid = uri.substring(uri.lastIndexOf("/")+1, uri.length()); uidMap.put(ctx.channel().id().toString(),uid);//把uid放入集合中存储 System.err.println(uid+"上线****************************"); //TODO 通知所有好友,已上线 // uri改为 /ws fullHttpRequest.setUri("/ws"); } super.channelRead(ctx,msg); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //获取Uid String uid = uidMap.get(ctx.channel().id().toString()); System.err.println(uid+"断线****************************"); uidMap.remove(ctx.channel().id().toString()); //TODO 通知所有好友,已下线 } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } }
JS代码
window.CHAT = { socket: null, init: function () { if (window.WebSocket) { CHAT.socket = new WebSocket("ws://127.0.0.1:8888/ws/00001"); CHAT.socket.onopen = function () { console.log("连接建立成功..."); }, CHAT.socket.onclose = function () { console.log("连接关闭..."); }, CHAT.socket.onerror = function () { console.log("发生错误..."); }, CHAT.socket.onmessage = function (e) { console.log("接收到消息" + e.data); } } else { alert("浏览器不支持websocket协议..."); } }, sendMessage: function (value) { CHAT.socket.send(value); }, heartBeat: function () { setInterval(this.sendMessage(""),5000); } }; CHAT.init(); CHAT.heartBeat();
websocket连接地址example:
ws://127.0.0.1:8888/ws/{uid}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。