赞
踩
目录
2.2 继承ChannelInboundHandlerAdapter ,重写超时事件
有没有一种办法,如果我一段时间用不到服务器,就把这个连接给关掉?答:心跳机制。所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包(比如消息内容是某种要求格式、内容),通知对方自己还在线,以确保 TCP 连接的有效性。
在 Netty 中,实现心跳机制的关键是 IdleStateHandler(空闲状态处理器),它的作用跟名字一样,是用来监测连接的空闲情况。然后我们就可以根据心跳情况,来实现具体的处理逻辑,比如说断开连接、重新连接等等。
那么,这篇文章的思路有了!我们先来分析 IdleStateHandler 为什么能实现心跳检测,然后再看看如何编写 Server 处理逻辑…
我们先来看一下 IdleStateHandler 的继承关系:
可以看到它也是一个 ChannelHandler,并且还是个 ChannelInboundHandler,是用来处理入站事件的。看下它的构造器:
- public IdleStateHandler(
- int readerIdleTimeSeconds,
- int writerIdleTimeSeconds,
- int allIdleTimeSeconds) {
-
- this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
- TimeUnit.SECONDS);
- }
这里解释下三个参数的含义:
readerIdleTimeSeconds:读超时。即当在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLE 的 IdleStateEvent 事件
writerIdleTimeSeconds: 写超时。即当在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLE 的 IdleStateEvent 事件
allIdleTimeSeconds: 读/写超时。即当在指定的时间间隔内没有读或写操作时,会触发一个 ALL_IDLE 的 IdleStateEvent 事件
所以,跟编解码码器这些 ChannelHandler 一样,要实现 Netty 服务端心跳检测机制,也需要将 IdleStateHandler 注册到服务器端的 ChannelInitializer 中:
- // 由于我们的需求是判断 Client 时候还要向 Server 发送请求,从而决定是否关闭该连接
- // 所以,我们只需要判断 Server 是否在时间间隔内从 Channel 读取到数据
- // 所以,readerIdleTimeSeconds 我们取 3s,而 writerIdleTimeSeconds 为 0
- pipeline.addLast(new IdleStateHandler(3, 0, 0));
PS:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法: public IdleStateHandler(boolean observeOutput,long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit)
IdleStateHandler 源码分析
初步地看下 IdleStateHandler 源码,先看下 IdleStateHandler 中的 channelRead 方法:
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
- reading = true;
- firstReaderIdleEvent = firstAllIdleEvent = true;
- }
- // 该方法只是进行了透传,不做任何业务逻辑处理,
- // 让 channelPipe 中的下一个 handler 处理 channelRead 方法
- ctx.fireChannelRead(msg);
- }
我们再看看 channelActive 方法:
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // This method will be invoked only if this handler was added
- // before channelActive() event is fired. If a user adds this handler
- // after the channelActive() event, initialize() will be called by beforeAdd().
- initialize(ctx);
- super.channelActive(ctx);
- }
这里有个 initialize 方法,这是IdleStateHandler的精髓,接着探究:
- private void initialize(ChannelHandlerContext ctx) {
- // Avoid the case where destroy() is called before scheduling timeouts.
- // See: https://github.com/netty/netty/issues/143
- switch (state) {
- case 1:
- case 2:
- return;
- }
-
- state = 1;
- initOutputChanged(ctx);
-
- lastReadTime = lastWriteTime = ticksInNanos();
- // 根据读超时、写超时、读写超时创建定时任务
- if (readerIdleTimeNanos > 0) {
- // schedule 方法其实调用的线程池
- readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
- readerIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- if (writerIdleTimeNanos > 0) {
- writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
- writerIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- if (allIdleTimeNanos > 0) {
- allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
- allIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- }
由于我们在上面创建 IdleStateHandler 时只是指定了 readerIdleTimeNanos=3,所以只会这里只会创建 ReaderIdleTimeoutTask。
PS:当线程池要执行某个 Task 时,实际就是让工作线程去执行 Task 的 run 方法。
那么,我们下面就来看看 ReaderIdleTimeoutTask 这个 Task 里的 run 方法:
- @Override
- protected void run(ChannelHandlerContext ctx) {
- long nextDelay = readerIdleTimeNanos;
- if (!reading) {
- // nextDelay 等于用当前时间减去最后一次 channelRead 方法调用的时间
- // 假如这个结果是 4s,说明最后一次调用 channelRead 已经是4s之前的事情了
- nextDelay -= ticksInNanos() - lastReadTime;
- }
-
- // 假如这个结果是 4s,说明最后一次调用 channelRead 已经是4s之前的事情了
- // 而上面我们设置的是读超时为3s,那么nextDelay则为-1,说明超时了
- if (nextDelay <= 0) {
- // Reader is idle - set a new timeout and notify the callback.
- // 重置定时任务,将delay设为 3s
- readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
-
- boolean first = firstReaderIdleEvent;
- firstReaderIdleEvent = false;
-
- try {
- IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
- // 核心!!
- // channelIdle 实际调用的是 ctx.fireUserEventTriggered(evt)
- // 触发下一个 handler 的 UserEventTriggered 方法
- channelIdle(ctx, event);
- } catch (Throwable t) {
- ctx.fireExceptionCaught(t);
- }
- // 假如这个结果是 2s,说明最后一次调用 channelRead 已经是2s之前的事情了
- // 而上面我们设置的是读超时为3s,那么nextDelay则为1,说明没超时
- } else {
- // Read occurred before the timeout - set a new timeout with shorter delay.
- // 重置定时任务,将delay设为 1
- readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
- }
- }
-
上面的代码中两次重置 schedule 相当于循环,不断的更新定时时间(delay)
1.如果读超时了,就重置 delay 为初始值,并进入 UserEventTriggered() 用户自定义的处理逻辑中
2.如果没有读超时,就更新 delay 为一个更小的值
至此我们将 IdleStateHandler 底层核心逻辑分析完了,但 IdleStateHandler 说到底也只是能做一个空闲状态监测,但是根据连接空闲情况关闭连接等逻辑还要我们自己实现。下面我们就来看看怎么做…
-
-
- public class HcwWebSocketChannelHandler extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast("http-codec",new HttpServerCodec());
- //以块的方式来写的处理器
- ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
- ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
- //心跳检测,读超时时间设置为30s,0表示不监控
- ch.pipeline().addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
- //心跳超时处理事件
- ch.pipeline().addLast(new HcwHeartBeat());
- ch.pipeline().addLast("handler",new NettyHandler());
-
- }
- }
- @Slf4j
- public class HcwHeartBeat extends ChannelInboundHandlerAdapter {
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {//超时事件
- log.info("心跳检测超时");
- IdleStateEvent idleEvent = (IdleStateEvent) evt;
- if (idleEvent.state() == IdleState.READER_IDLE) {//读
- ctx.channel().close(); //关闭通道连接
- }
- }
- super.userEventTriggered(ctx, evt);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。