赞
踩
版本信息:
JDK1.8
Netty-all:4.1.38.Final
在各种论坛上,笔者看到大部分的博客都是把netty的IdleStateHandler说成心跳机制,而笔者认为这并不是心跳机制,仅仅是跟心跳机制比较相似,并且IdleStateHandler能做的并不仅仅只有"心跳机制",笔者认为叫做空闲检测机制比较适合。
为什么说叫做空闲检测机制比较合适呢?因为netty是封装了NIO,而提及IO无非就是输入输出,换种方式说无非就是读和写。那么在读和写的过程中能不能注册一个监听器,监听是否一直有读写请求,如果达到一定的时间没有读写请求那么就判定为空闲状态,达到空闲状态后,是不是可以对其做一些操作,比如释放连接资源、缓冲区的刷出等等操作。
读空闲:释放连接资源(但是这里并不是心跳机制,因为心跳机制的定义是给客户端发送心跳包,需要客户端回应ACK才是心跳机制的定义,这里仅仅是判断有没有读到客户端的数据)
写空间:写缓冲区的刷出(可以设置一个阈值,当多少秒没有写数据了,就一次性把写缓冲区数据刷出,提升性能,可以理解为批处理~)
使用IdleStateHandler一定要注意,定时器的频率不能太高,不然太部分时间在做判断会一定影响到业务代码的执行。而且这里的定时器的精准度不会特别精准,使用时需要考虑精准度问题~
netty中提供了空闲检测机制,既然是对读写操作的监听,那么肯定是在Pipeline的回调过程中实现,所以肯定需要实现ChannelHandler,并且同时能监听读和写,肯定是实现ChannelInboundHandler和ChannelOutboundHandler。
- public class IdleStateHandler extends ChannelDuplexHandler {}
-
- public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {}
-
先从构造方法入手~
- /**
- * @param observeOutput 是否观察输出数据
- * @param readerIdleTime 对读空闲的阈值时间
- * @param writerIdleTime 对写空闲的阈值时间
- * @param allIdleTime 对读写空闲的阈值时间
- * @param unit 时间单位
- *
- * */
- public IdleStateHandler(boolean observeOutput,
- long readerIdleTime, long writerIdleTime, long allIdleTime,
- TimeUnit unit) {
-
- this.observeOutput = observeOutput;
-
- if (readerIdleTime <= 0) {
- readerIdleTimeNanos = 0;
- } else {
- // 默认一毫秒。当然用户可通过构造方法传入阈值时间
- readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
- }
- if (writerIdleTime <= 0) {
- writerIdleTimeNanos = 0;
- } else {
- // 默认一毫秒。当然用户可通过构造方法传入阈值时间
- writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
- }
- if (allIdleTime <= 0) {
- allIdleTimeNanos = 0;
- } else {
- // 默认一毫秒。当然用户可通过构造方法传入阈值时间
- allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
- }
- }
构造方法就非常的简单,用户可以传入读写空闲的阈值时间,这里对空闲的阈值时间做初始化工作,如果只对读做空闲监听,那么除了读其他的传入0即可,反之写是一样的。
既然是监听, 肯定需要使用到定时任务,那么需要有一个点触发开启定时任务。而这里是对读写做监听,所以在通道注册完毕的回调方法中开启定时任务就是非常的恰当。
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
- // 通道是否已经激活
- if (ctx.channel().isActive()) {
- // 对读写空闲的定时任务做初始化
- initialize(ctx);
- }
- // 继续调用下一个ChannelHandlerContext的channelRegistered
- super.channelRegistered(ctx);
- }
这里继续看到 initialize初始化方法。
- private void initialize(ChannelHandlerContext ctx) {
-
- // 用状态控制是否已经初始化完毕。
- switch (state) {
- case 1:
- case 2:
- return;
- }
-
- state = 1;
- // 在构造方法中boolean observeOutput参数控制,默认为false
- // 初始化对输出数据改变的监听
- initOutputChanged(ctx);
-
- // 获取到当前系统时间。
- lastReadTime = lastWriteTime = ticksInNanos();
-
- // readerIdleTimeNanos在构造方法中设置,由用户传入,默认最小值为1毫秒。
- if (readerIdleTimeNanos > 0) {
- // 往EventLoop中注册定时任务
- readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
- readerIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- // writerIdleTimeNanos在构造方法中设置,由用户传入,默认最小值为1毫秒。
- if (writerIdleTimeNanos > 0) {
- // 往EventLoop中注册定时任务
- writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
- writerIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- // allIdleTimeNanos在构造方法中设置,由用户传入,默认最小值为1毫秒。
- if (allIdleTimeNanos > 0) {
- // 往EventLoop中注册定时任务
- allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
- allIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- }
这里也非常的简单,往EventLoop中注册定时任务,定时任务的参数都是从构造方法中用户传入的
那么接下来看到定时任务触发时回调的方法。而我们这里只关心读空闲检测。也即读的定时任务。
- // 任务的回调点。
- // 执行线程为EventLoop,也即netty工作线程。
- @Override
- protected void run(ChannelHandlerContext ctx) {
- // 用户设置的读空闲阈值
- long nextDelay = readerIdleTimeNanos;
-
- // 如果正在读,那就直接跳过计算,直接注册下一次的定时任务即可。
- if (!reading) {
- nextDelay -= ticksInNanos() - lastReadTime;
- }
-
- // 达到设置的空闲读阈值
- if (nextDelay <= 0) {
- // 虽然已经达到读空闲阈值,但是用户可能会设置几次的阈值,所以还需要注册定时任务。
- readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
-
- // 默认为true,以后为false。
- // 用于判断是否是第一次。
- boolean first = firstReaderIdleEvent;
- firstReaderIdleEvent = false;
-
- try {
- // 触发IdleStateEvent事件
- // 后续ChannelHandler实现userEventTriggered方法,可以判断IdleStateEvent事件,做出特殊处理
- IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
- channelIdle(ctx, event);
- } catch (Throwable t) {
- ctx.fireExceptionCaught(t);
- }
- } else {
- // 都在读了,所以不存在空闲。
- // 但是还是需要设置下一次定时任务。
- readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
- }
- }
这里就是定时任务的回调方法,对这里做一个总结:
用户只需要在Pipeline中IdleStateHandler的后面写一个ChannelHandler实现userEventTriggered方法,对事件做判断即可,如果是IdleStateEvent事件,就做出对应的特殊处理即可(比如释放连接资源等等操作)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。