当前位置:   article > 正文

IdleStateHandler 心跳检测,实现超时断开连接_idlestatecheck

idlestatecheck

目录

1.IdleStateHandler 原理

2.心跳检测

2.1.channel初始化时加入心跳超时处理事件

2.2 继承ChannelInboundHandlerAdapter ,重写超时事件


    有没有一种办法,如果我一段时间用不到服务器,就把这个连接给关掉?答:心跳机制。所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包(比如消息内容是某种要求格式、内容),通知对方自己还在线,以确保 TCP 连接的有效性。

    在 Netty 中,实现心跳机制的关键是 IdleStateHandler(空闲状态处理器),它的作用跟名字一样,是用来监测连接的空闲情况。然后我们就可以根据心跳情况,来实现具体的处理逻辑,比如说断开连接、重新连接等等。

    那么,这篇文章的思路有了!我们先来分析 IdleStateHandler 为什么能实现心跳检测,然后再看看如何编写 Server 处理逻辑…

1.IdleStateHandler 原理


我们先来看一下 IdleStateHandler 的继承关系:

可以看到它也是一个 ChannelHandler,并且还是个 ChannelInboundHandler,是用来处理入站事件的。看下它的构造器:

  1. public IdleStateHandler(
  2. int readerIdleTimeSeconds,
  3. int writerIdleTimeSeconds,
  4. int allIdleTimeSeconds) {
  5. this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
  6. TimeUnit.SECONDS);
  7. }


这里解释下三个参数的含义:

readerIdleTimeSeconds:读超时。即当在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLE 的 IdleStateEvent 事件
writerIdleTimeSeconds: 写超时。即当在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLE 的 IdleStateEvent 事件
allIdleTimeSeconds: 读/写超时。即当在指定的时间间隔内没有读或写操作时,会触发一个 ALL_IDLE 的 IdleStateEvent 事件
所以,跟编解码码器这些 ChannelHandler 一样,要实现 Netty 服务端心跳检测机制,也需要将 IdleStateHandler 注册到服务器端的 ChannelInitializer 中:

  1. // 由于我们的需求是判断 Client 时候还要向 Server 发送请求,从而决定是否关闭该连接
  2. // 所以,我们只需要判断 Server 是否在时间间隔内从 Channel 读取到数据
  3. // 所以,readerIdleTimeSeconds 我们取 3s,而 writerIdleTimeSeconds 为 0
  4. pipeline.addLast(new IdleStateHandler(3, 0, 0));


PS:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法: public IdleStateHandler(boolean observeOutput,long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit)

IdleStateHandler 源码分析

初步地看下 IdleStateHandler 源码,先看下 IdleStateHandler 中的 channelRead 方法:

  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
  4. reading = true;
  5. firstReaderIdleEvent = firstAllIdleEvent = true;
  6. }
  7. // 该方法只是进行了透传,不做任何业务逻辑处理,
  8. // 让 channelPipe 中的下一个 handler 处理 channelRead 方法
  9. ctx.fireChannelRead(msg);
  10. }

我们再看看 channelActive 方法:

  1. @Override
  2. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  3. // This method will be invoked only if this handler was added
  4. // before channelActive() event is fired. If a user adds this handler
  5. // after the channelActive() event, initialize() will be called by beforeAdd().
  6. initialize(ctx);
  7. super.channelActive(ctx);
  8. }

这里有个 initialize 方法,这是IdleStateHandler的精髓,接着探究:

  1. private void initialize(ChannelHandlerContext ctx) {
  2. // Avoid the case where destroy() is called before scheduling timeouts.
  3. // See: https://github.com/netty/netty/issues/143
  4. switch (state) {
  5. case 1:
  6. case 2:
  7. return;
  8. }
  9. state = 1;
  10. initOutputChanged(ctx);
  11. lastReadTime = lastWriteTime = ticksInNanos();
  12. // 根据读超时、写超时、读写超时创建定时任务
  13. if (readerIdleTimeNanos > 0) {
  14. // schedule 方法其实调用的线程池
  15. readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
  16. readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  17. }
  18. if (writerIdleTimeNanos > 0) {
  19. writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
  20. writerIdleTimeNanos, TimeUnit.NANOSECONDS);
  21. }
  22. if (allIdleTimeNanos > 0) {
  23. allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
  24. allIdleTimeNanos, TimeUnit.NANOSECONDS);
  25. }
  26. }

由于我们在上面创建 IdleStateHandler 时只是指定了 readerIdleTimeNanos=3,所以只会这里只会创建 ReaderIdleTimeoutTask。

PS:当线程池要执行某个 Task 时,实际就是让工作线程去执行 Task 的 run 方法。

那么,我们下面就来看看 ReaderIdleTimeoutTask 这个 Task 里的 run 方法:

  1. @Override
  2. protected void run(ChannelHandlerContext ctx) {
  3. long nextDelay = readerIdleTimeNanos;
  4. if (!reading) {
  5. // nextDelay 等于用当前时间减去最后一次 channelRead 方法调用的时间
  6. // 假如这个结果是 4s,说明最后一次调用 channelRead 已经是4s之前的事情了
  7. nextDelay -= ticksInNanos() - lastReadTime;
  8. }
  9. // 假如这个结果是 4s,说明最后一次调用 channelRead 已经是4s之前的事情了
  10. // 而上面我们设置的是读超时为3s,那么nextDelay则为-1,说明超时了
  11. if (nextDelay <= 0) {
  12. // Reader is idle - set a new timeout and notify the callback.
  13. // 重置定时任务,将delay设为 3s
  14. readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  15. boolean first = firstReaderIdleEvent;
  16. firstReaderIdleEvent = false;
  17. try {
  18. IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
  19. // 核心!!
  20. // channelIdle 实际调用的是 ctx.fireUserEventTriggered(evt)
  21. // 触发下一个 handler 的 UserEventTriggered 方法
  22. channelIdle(ctx, event);
  23. } catch (Throwable t) {
  24. ctx.fireExceptionCaught(t);
  25. }
  26. // 假如这个结果是 2s,说明最后一次调用 channelRead 已经是2s之前的事情了
  27. // 而上面我们设置的是读超时为3s,那么nextDelay则为1,说明没超时
  28. } else {
  29. // Read occurred before the timeout - set a new timeout with shorter delay.
  30. // 重置定时任务,将delay设为 1
  31. readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
  32. }
  33. }

上面的代码中两次重置 schedule 相当于循环,不断的更新定时时间(delay)

1.如果读超时了,就重置 delay 为初始值,并进入 UserEventTriggered() 用户自定义的处理逻辑中
2.如果没有读超时,就更新 delay 为一个更小的值
至此我们将 IdleStateHandler 底层核心逻辑分析完了,但 IdleStateHandler 说到底也只是能做一个空闲状态监测,但是根据连接空闲情况关闭连接等逻辑还要我们自己实现。下面我们就来看看怎么做…


2.心跳检测

2.1.channel初始化时加入心跳超时处理事件

  1. public class HcwWebSocketChannelHandler extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. ch.pipeline().addLast("http-codec",new HttpServerCodec());
  5. //以块的方式来写的处理器
  6. ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
  7. ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
  8. //心跳检测,读超时时间设置为30s,0表示不监控
  9. ch.pipeline().addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
  10. //心跳超时处理事件
  11. ch.pipeline().addLast(new HcwHeartBeat());
  12. ch.pipeline().addLast("handler",new NettyHandler());
  13. }
  14. }

2.2 继承ChannelInboundHandlerAdapter ,重写超时事件

  1. @Slf4j
  2. public class HcwHeartBeat extends ChannelInboundHandlerAdapter {
  3. @Override
  4. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  5. if (evt instanceof IdleStateEvent) {//超时事件
  6. log.info("心跳检测超时");
  7. IdleStateEvent idleEvent = (IdleStateEvent) evt;
  8. if (idleEvent.state() == IdleState.READER_IDLE) {//读
  9. ctx.channel().close(); //关闭通道连接
  10. }
  11. }
  12. super.userEventTriggered(ctx, evt);
  13. }
  14. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/242226
推荐阅读
相关标签
  

闽ICP备14008679号