赞
踩
所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性。
在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 看下它的构造器:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
这里解释下三个参数的含义:
readerIdleTimeSeconds
: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的IdleStateEvent 事件.writerIdleTimeSeconds
: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的IdleStateEvent 事件.allIdleTimeSeconds
: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:
IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中加入如下的代码:
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
继承关系图:
IdleStateHandler间接继承了ChannelInboundHandlerAdapter,并且重写了channelRead方法:
红框代码其实表示该方法只是进行了透传,不做任何业务逻辑处理,让channelPipe中的下一个handler处理channelRead方法。
我们再看看channelActive方法:
这里有个initialize的方法,这是IdleStateHandler的精髓,接着探究:
private void initialize(ChannelHandlerContext ctx) { // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx); lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0) { readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }
这边会调用schedule()方法:
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
//往线程池添加一个task
return ctx.executor().schedule(task, delay, unit);
}
进而触发一个Task,ReaderIdleTimeoutTask,这个task里的run方法源码是这样的:
private final class ReaderIdleTimeoutTask extends AbstractIdleTask { @Override protected void run(ChannelHandlerContext ctx) { long nextDelay = readerIdleTimeNanos; if (!reading) { //[1]计算差值,判断是否超时 nextDelay -= ticksInNanos() - lastReadTime; } if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false; try { IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); //[2]调用 ctx.fireUserEventTriggered(evt);传播事件至下一个handler channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // Read occurred before the timeout - set a new timeout with shorter delay. readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } } }
[1]处代码是用当前时间(ticksInNanos()方法返回当前时间)减去最后一次channelRead方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了,那么[2]处代码则会触发下一个handler的
userEventTriggered方法:
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
ctx.fireUserEventTriggered(evt);
是不是不熟悉?很简单,和ctx.fireChannelRead(msg);
类似,是传递消息至下一个handler,前者触发下一个handler中的fireUserEventTriggered()方法,后者触发下一个handler中的fireChannelRead()方法。至此,用户可以在复写的fireUserEventTriggered()内,实现自己的业务逻辑,比如关闭连接操作等待。
如果没有超时则不触发userEventTriggered方法。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。