赞
踩
通过前面几篇博客的各种代码示例,就算别的没记住,也应该对实验后 Client 不会自动断开连接,等手动关闭时会报错的情况应该印象很深把。因为 Netty 建立的是长连接,也就是说只要不在 Client 的代码中手动 channel.close();
那该连接就会一直保持着,直到客户端或者服务器一方关闭。
也不是说长连接它就不好,但大家想想,每一个客户端都一直占着一个连接,即使它后面已经用不到服务器了,而服务器能承受的连接数是有限的,后面再来了真正有需求的用户,它也进不来了,而且长时间的高并发也可能导致服务器宕机。
所以,有没有一种办法,如果我一段时间用不到服务器,就把这个连接给关掉?答:心跳机制。所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包(比如消息内容是某种要求格式、内容),通知对方自己还在线,以确保 TCP 连接的有效性。
在 Netty 中,实现心跳机制的关键是 IdleStateHandler(空闲状态处理器),它的作用跟名字一样,是用来监测连接的空闲情况。然后我们就可以根据心跳情况,来实现具体的处理逻辑,比如说断开连接、重新连接等等。
那么,这篇文章的思路有了!我们先来分析 IdleStateHandler 为什么能实现心跳检测,然后再看看如何编写 Server 处理逻辑…
我们先来看一下 IdleStateHandler 的继承关系:
可以看到它也是一个 ChannelHandler,并且还是个 ChannelInboundHandler,是用来处理入站事件的。看下它的构造器:
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
这里解释下三个参数的含义:
所以,跟编解码码器这些 ChannelHandler 一样,要实现 Netty 服务端心跳检测机制,也需要将 IdleStateHandler 注册到服务器端的 ChannelInitializer 中:
// 由于我们的需求是判断 Client 时候还要向 Server 发送请求,从而决定是否关闭该连接
// 所以,我们只需要判断 Server 是否在时间间隔内从 Channel 读取到数据
// 所以,readerIdleTimeSeconds 我们取 3s,而 writerIdleTimeSeconds 为 0
pipeline.addLast(new IdleStateHandler(3, 0, 0));
PS:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法: public IdleStateHandler(boolean observeOutput,long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit)
IdleStateHandler 源码分析
初步地看下 IdleStateHandler 源码,先看下 IdleStateHandler 中的 channelRead 方法:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
// 该方法只是进行了透传,不做任何业务逻辑处理,
// 让 channelPipe 中的下一个 handler 处理 channelRead 方法
ctx.fireChannelRead(msg);
}
我们再看看 channelActive 方法:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);
super.channelActive(ctx);
}
这里有个 initialize 方法,这是IdleStateHandler的精髓,接着探究:
private void initialize(ChannelHandlerContext ctx) { // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx); lastReadTime = lastWriteTime = ticksInNanos(); // 根据读超时、写超时、读写超时创建定时任务 if (readerIdleTimeNanos > 0) { // schedule 方法其实调用的线程池 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }
由于我们在上面创建 IdleStateHandler 时只是指定了 readerIdleTimeNanos=3,所以只会这里只会创建 ReaderIdleTimeoutTask。
PS:当线程池要执行某个 Task 时,实际就是让工作线程去执行 Task 的 run 方法。
那么,我们下面就来看看 ReaderIdleTimeoutTask 这个 Task 里的 run 方法:
@Override protected void run(ChannelHandlerContext ctx) { long nextDelay = readerIdleTimeNanos; if (!reading) { // nextDelay 等于用当前时间减去最后一次 channelRead 方法调用的时间 // 假如这个结果是 4s,说明最后一次调用 channelRead 已经是4s之前的事情了 nextDelay -= ticksInNanos() - lastReadTime; } // 假如这个结果是 4s,说明最后一次调用 channelRead 已经是4s之前的事情了 // 而上面我们设置的是读超时为3s,那么nextDelay则为-1,说明超时了 if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. // 重置定时任务,将delay设为 3s readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false; try { IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); // 核心!! // channelIdle 实际调用的是 ctx.fireUserEventTriggered(evt) // 触发下一个 handler 的 UserEventTriggered 方法 channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } // 假如这个结果是 2s,说明最后一次调用 channelRead 已经是2s之前的事情了 // 而上面我们设置的是读超时为3s,那么nextDelay则为1,说明没超时 } else { // Read occurred before the timeout - set a new timeout with shorter delay. // 重置定时任务,将delay设为 1 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } }
上面的代码中两次重置 schedule 相当于循环,不断的更新定时时间(delay)
至此我们将 IdleStateHandler 底层核心逻辑分析完了,但 IdleStateHandler 说到底也只是能做一个空闲状态监测,但是根据连接空闲情况关闭连接等逻辑还要我们自己实现。下面我们就来看看怎么做…
HeartBeatServer
public class HeartBeatServer { public static void main(String[] args) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); // IdleStateHandler 的 readerIdleTime 参数指定超过3秒还没收到客户端的连接, // 会触发 IdleStateEvent 事件并且交给下一个 handler 处理, // 下一个 handler 必须实现 userEventTriggered 方法处理对应事件 pipeline.addLast(new IdleStateHandler(3, 0, 0)); // 所以,HeartBeatServerHandler 必须要有实现 userEventTriggered 方法 pipeline.addLast(new HeartBeatServerHandler()); } }); ChannelFuture future = bootstrap.bind(9000).sync(); System.out.println("Netty Server started..."); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } }
HeartBeatServerHandler(核心)
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> { // 记录读超时几次了,用来判断是否断开该连接 int readIdleTimes = 0; /* * Channel 收到消息后触发 * * 注:心跳包说白了就是一个某些地方特殊的数据包 * 所以这里我们规定,如果消息内容是 "Heartbeat Packet",那么它就是一个心跳包 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // 将收到的消息打印出来 System.out.println(" ====== > [server] message received : " + s); // 如果消息内容是 Heartbeat Packet 则说明这是一个心跳包,我们返回 ok if ("Heartbeat Packet".equals(s)) { ctx.channel().writeAndFlush("ok"); // 其余情况说明收到的是一个正常消息,不做特殊处理 } else { System.out.println(" 其他信息处理 ... "); } } /** * 用户事件触发 * * 当 IdleStateHandler 发现读超时后,会调用 fireUserEventTriggered() 去执行后一个 Handler 的 userEventTriggered 方法。 * 所以,根据心跳检测状态去关闭连接的就写在这里! */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 入站的消息就是 IdleStateEvent 具体的事件 IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; // 我们在 IdleStateHandler 中也看到了,它有读超时,写超时,读写超时等 // 所以,这里我们需要判断事件类型 switch (event.state()) { case READER_IDLE: eventType = "读空闲"; readIdleTimes++; // 读空闲的计数加 1 break; case WRITER_IDLE: eventType = "写空闲"; break; // 不处理 case ALL_IDLE: eventType = "读写空闲"; break; // 不处理 } // 打印触发了一次超时警告 System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType); // 当读超时超过 3 次,我们就端口该客户端的连接 // 注:读超时超过 3 次,代表起码有 4 次 3s 内客户端没有发送心跳包或普通数据包 if (readIdleTimes > 3) { System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源"); ctx.channel().writeAndFlush("idle close"); ctx.channel().close(); // 手动断开连接 } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.err.println("=== " + ctx.channel().remoteAddress() + " is active ==="); } }
HeartBeatClient
public class HeartBeatClient { public static void main(String[] args) throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new HeartBeatClientHandler()); } }); Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel(); System.out.println("Netty Client started..."); // 心跳包内容 String text = "Heartbeat Packet"; Random random = new Random(); // 随机休眠 10s 内的时间,然后发送心跳包 // 也就是说,如果 num>3,那么就会触发一次Server的超时警告 // 也就是说,如果 num>3 出现 4 次时,该连接就被 Server 强制关闭了 // 也就是说,如果所有心跳包都能在 3s 内发送,那么连接就可以一直保持 // 注:正因为 random,所以每次运行 Client 的结果是不确定的 while (channel.isActive()) { int num = random.nextInt(10); Thread.sleep(num * 1000); channel.writeAndFlush(text); } } catch (Exception e) { e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } } static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(" client received :" + msg); if (msg != null && msg.equals("idle close")) { System.out.println(" 服务端关闭连接,客户端也关闭"); ctx.channel().closeFuture(); } } } }
好了,我们把 Server、Client 运行一下看看
注:由于客户端设置的发送心跳包时间是随机的([0,10]),所以多个 Client 的运行结果可能不同
总结
正是因为 Netty 封装了真么多功能丰富,稳定可靠的 Handler,所以 Dubbo 等 Rpc 框架,Zookeeper、Eureka 等中间件才会在底层 IO 采用它。就拿本篇的 IdleStateHandler 来说,有了它我们就很容易实现心跳监控连接情况,并做出相应处理,如果感兴趣的同学可以看看 zk 底层心跳机制的实现,它就是这样做的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。