当前位置:   article > 正文

Netty源码剖析之IdleStateHandler空闲检测机制_idlestateevent

idlestateevent

版本信息:

JDK1.8

Netty-all:4.1.38.Final

 空闲检测机制的介绍:

在各种论坛上,笔者看到大部分的博客都是把netty的IdleStateHandler说成心跳机制,而笔者认为这并不是心跳机制,仅仅是跟心跳机制比较相似,并且IdleStateHandler能做的并不仅仅只有"心跳机制",笔者认为叫做空闲检测机制比较适合。

为什么说叫做空闲检测机制比较合适呢?因为netty是封装了NIO,而提及IO无非就是输入输出,换种方式说无非就是读和写。那么在读和写的过程中能不能注册一个监听器,监听是否一直有读写请求,如果达到一定的时间没有读写请求那么就判定为空闲状态,达到空闲状态后,是不是可以对其做一些操作,比如释放连接资源、缓冲区的刷出等等操作。

IdleStateHandler应用:

读空闲:释放连接资源(但是这里并不是心跳机制,因为心跳机制的定义是给客户端发送心跳包,需要客户端回应ACK才是心跳机制的定义,这里仅仅是判断有没有读到客户端的数据)

写空间:写缓冲区的刷出(可以设置一个阈值,当多少秒没有写数据了,就一次性把写缓冲区数据刷出,提升性能,可以理解为批处理~)

使用IdleStateHandler一定要注意,定时器的频率不能太高,不然太部分时间在做判断会一定影响到业务代码的执行。而且这里的定时器的精准度不会特别精准,使用时需要考虑精准度问题~

IdleStateHandler源码分析:

netty中提供了空闲检测机制,既然是对读写操作的监听,那么肯定是在Pipeline的回调过程中实现,所以肯定需要实现ChannelHandler,并且同时能监听读和写,肯定是实现ChannelInboundHandler和ChannelOutboundHandler。

  1. public class IdleStateHandler extends ChannelDuplexHandler {}
  2. public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {}

先从构造方法入手~

  1. /**
  2. * @param observeOutput 是否观察输出数据
  3. * @param readerIdleTime 对读空闲的阈值时间
  4. * @param writerIdleTime 对写空闲的阈值时间
  5. * @param allIdleTime 对读写空闲的阈值时间
  6. * @param unit 时间单位
  7. *
  8. * */
  9. public IdleStateHandler(boolean observeOutput,
  10. long readerIdleTime, long writerIdleTime, long allIdleTime,
  11. TimeUnit unit) {
  12. this.observeOutput = observeOutput;
  13. if (readerIdleTime <= 0) {
  14. readerIdleTimeNanos = 0;
  15. } else {
  16. // 默认一毫秒。当然用户可通过构造方法传入阈值时间
  17. readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
  18. }
  19. if (writerIdleTime <= 0) {
  20. writerIdleTimeNanos = 0;
  21. } else {
  22. // 默认一毫秒。当然用户可通过构造方法传入阈值时间
  23. writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
  24. }
  25. if (allIdleTime <= 0) {
  26. allIdleTimeNanos = 0;
  27. } else {
  28. // 默认一毫秒。当然用户可通过构造方法传入阈值时间
  29. allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
  30. }
  31. }

构造方法就非常的简单,用户可以传入读写空闲的阈值时间,这里对空闲的阈值时间做初始化工作,如果只对读做空闲监听,那么除了读其他的传入0即可,反之写是一样的。

既然是监听, 肯定需要使用到定时任务,那么需要有一个点触发开启定时任务。而这里是对读写做监听,所以在通道注册完毕的回调方法中开启定时任务就是非常的恰当。

  1. @Override
  2. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  3. // 通道是否已经激活
  4. if (ctx.channel().isActive()) {
  5. // 对读写空闲的定时任务做初始化
  6. initialize(ctx);
  7. }
  8. // 继续调用下一个ChannelHandlerContext的channelRegistered
  9. super.channelRegistered(ctx);
  10. }

这里继续看到 initialize初始化方法。

  1. private void initialize(ChannelHandlerContext ctx) {
  2. // 用状态控制是否已经初始化完毕。
  3. switch (state) {
  4. case 1:
  5. case 2:
  6. return;
  7. }
  8. state = 1;
  9. // 在构造方法中boolean observeOutput参数控制,默认为false
  10. // 初始化对输出数据改变的监听
  11. initOutputChanged(ctx);
  12. // 获取到当前系统时间。
  13. lastReadTime = lastWriteTime = ticksInNanos();
  14. // readerIdleTimeNanos在构造方法中设置,由用户传入,默认最小值为1毫秒。
  15. if (readerIdleTimeNanos > 0) {
  16. // 往EventLoop中注册定时任务
  17. readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
  18. readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  19. }
  20. // writerIdleTimeNanos在构造方法中设置,由用户传入,默认最小值为1毫秒。
  21. if (writerIdleTimeNanos > 0) {
  22. // 往EventLoop中注册定时任务
  23. writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
  24. writerIdleTimeNanos, TimeUnit.NANOSECONDS);
  25. }
  26. // allIdleTimeNanos在构造方法中设置,由用户传入,默认最小值为1毫秒。
  27. if (allIdleTimeNanos > 0) {
  28. // 往EventLoop中注册定时任务
  29. allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
  30. allIdleTimeNanos, TimeUnit.NANOSECONDS);
  31. }
  32. }

这里也非常的简单,往EventLoop中注册定时任务,定时任务的参数都是从构造方法中用户传入的

那么接下来看到定时任务触发时回调的方法。而我们这里只关心读空闲检测。也即读的定时任务。

  1. // 任务的回调点。
  2. // 执行线程为EventLoop,也即netty工作线程。
  3. @Override
  4. protected void run(ChannelHandlerContext ctx) {
  5. // 用户设置的读空闲阈值
  6. long nextDelay = readerIdleTimeNanos;
  7. // 如果正在读,那就直接跳过计算,直接注册下一次的定时任务即可。
  8. if (!reading) {
  9. nextDelay -= ticksInNanos() - lastReadTime;
  10. }
  11. // 达到设置的空闲读阈值
  12. if (nextDelay <= 0) {
  13. // 虽然已经达到读空闲阈值,但是用户可能会设置几次的阈值,所以还需要注册定时任务。
  14. readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  15. // 默认为true,以后为false。
  16. // 用于判断是否是第一次。
  17. boolean first = firstReaderIdleEvent;
  18. firstReaderIdleEvent = false;
  19. try {
  20. // 触发IdleStateEvent事件
  21. // 后续ChannelHandler实现userEventTriggered方法,可以判断IdleStateEvent事件,做出特殊处理
  22. IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
  23. channelIdle(ctx, event);
  24. } catch (Throwable t) {
  25. ctx.fireExceptionCaught(t);
  26. }
  27. } else {
  28. // 都在读了,所以不存在空闲。
  29. // 但是还是需要设置下一次定时任务。
  30. readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
  31. }
  32. }

这里就是定时任务的回调方法,对这里做一个总结:

  1. 获取到用户设置的读空闲阈值
  2. 如果正在读,那么直接设置下一次定时任务即可。
  3. 如果达到了用户设置的读空闲阈值,那么还是需要设置下一次定时任务,因为用户可能会设置一个次数阈值,所以还需要定时任务在做后续处理
  4. 发送IdleStateEvent事件
  5. 触发后续的ChannelHandler实现userEventTriggered方法,userEventTriggered方法中可以判断IdleStateEvent事件,做出特殊处理

用户只需要在Pipeline中IdleStateHandler的后面写一个ChannelHandler实现userEventTriggered方法,对事件做判断即可,如果是IdleStateEvent事件,就做出对应的特殊处理即可(比如释放连接资源等等操作)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/242248
推荐阅读
相关标签
  

闽ICP备14008679号