当前位置:   article > 正文

Netty源码解析-IdleStateHandler

idlestatehandler

前言:

心跳机制广泛运用在我们的应用平台中。对于连接到应用服务的客户端,服务端有必要对长时间没有请求的客户端连接进行清理,以避免连接过多。这就需要服务端有空闲连接检测机制。

而针对客户端而言,如果长时间未请求数据,为避免被服务端清理连接,就需要间歇性的发送心跳请求。

在Netty中,针对以上需求,已经有现成的Handler可供使用,这就是本文要介绍的IdleStateHandler。

1.IdleStateHandler的使用

1.1 服务端检测长时间未请求的客户端

我们在服务端可以使用IdleStateHandler来检测长时间未发送请求的客户端,对其进行清理操作,简单示例如下:

  1. // 还是使用HelloServer的示例,我们在ChannelInitializer中添加IdleStateHandler
  2. .childHandler(new ChannelInitializer<SocketChannel>() {
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline pipeline = ch.pipeline();
  5. // 在这里添加IdleStateHandler,设置空闲检测时间为10秒
  6. pipeline.addLast("idle", new IdleStateHandler(10, 10, 10));
  7. // 针对空闲事件的处理(自定义),具体内容如下
  8. pipeline.addLast("idledeal", new IdleEventHandler());
  9. pipeline.addLast("decoder", new StringDecoder());
  10. pipeline.addLast("encoder", new StringEncoder());
  11. pipeline.addLast("handler", new HelloServerHandler());
  12. }
  13. });
  14. // IdleEventHandler
  15. public class IdleEventHandler extends ChannelDuplexHandler {
  16. @Override
  17. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  18. if (evt instanceof IdleStateEvent) {
  19. evt = (IdleStateEvent) evt;
  20. // 若检测到长时间未读到请求事件,则清理客户端连接
  21. if (evt.equals(IdleStateEvent.READER_IDLE_STATE_EVENT)) {
  22. System.out.println("idle...");
  23. ctx.channel().close();
  24. }else {
  25. // 其他事件
  26. // TODO
  27. }
  28. }
  29. }
  30. }

在本例中,我们设置了IdleStateHandler的读空闲检测时间为10s,则客户端连接10s没有发送任何请求过来时,则发送一个IdleStateEvent.READER_IDLE_STATE_EVENT事件到下游,IdleEventHandler处理该事件,直接关闭客户端连接。

1.2 客户端发送心跳请求

若客户端本身检测到长时间未发送请求,为避免被服务端清理,则可以主动发送一个心跳请求。简单示例如下

  1. // 同样的使用HelloClient的代码,我们改造下ChannelInitializer
  2. .handler(new ChannelInitializer<SocketChannel>() {
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline pipeline = ch.pipeline();
  5. ...
  6. // 在这里添加IdleStateHandler检测
  7. pipeline.addLast("idle", new IdleStateHandler(5, 5, 5));
  8. // 空闲事件处理Handler(自定义),具体内容如下
  9. pipeline.addLast("idledeal", new ClientIdleEventHandler());
  10. pipeline.addLast("handler", new HelloClientHandler());
  11. }
  12. });
  13. // ClientIdleEventHandler
  14. public class ClientIdleEventHandler extends ChannelDuplexHandler {
  15. @Override
  16. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  17. if (evt instanceof IdleStateEvent) {
  18. evt = (IdleStateEvent) evt;
  19. // 若检测到长时间未发送请求事件,则主动发送心跳信息
  20. if (evt.equals(IdleStateEvent.WRITER_IDLE_STATE_EVENT)) {
  21. ctx.writeAndFlush("ping");
  22. }else {
  23. // 其他事件
  24. // TODO
  25. }
  26. }
  27. }
  28. }

这样,当客户端发现已经5s没有发送过请求时,则主动发送一个ping心跳信息到服务端,避免被清理

2.IdleStateHandler的构造

我们首先来看下IdleStateHandler的相关构造方法和基本属性

  1. public class IdleStateHandler extends ChannelDuplexHandler {
  2. // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
  3. // write监听器
  4. private final ChannelFutureListener writeListener = new ChannelFutureListener() {
  5. @Override
  6. public void operationComplete(ChannelFuture future) throws Exception {
  7. lastWriteTime = ticksInNanos();
  8. firstWriterIdleEvent = firstAllIdleEvent = true;
  9. }
  10. };
  11. private final boolean observeOutput;
  12. // 三种类型的空闲时间设置
  13. private final long readerIdleTimeNanos;
  14. private final long writerIdleTimeNanos;
  15. private final long allIdleTimeNanos;
  16. // 读空闲检测定时任务
  17. private ScheduledFuture<?> readerIdleTimeout;
  18. // 最近一次读事件
  19. private long lastReadTime;
  20. // 是否第一次读idleEvent触发
  21. private boolean firstReaderIdleEvent = true;
  22. // 以下与读设置类似
  23. private ScheduledFuture<?> writerIdleTimeout;
  24. private long lastWriteTime;
  25. private boolean firstWriterIdleEvent = true;
  26. private ScheduledFuture<?> allIdleTimeout;
  27. private boolean firstAllIdleEvent = true;
  28. // IdleStateHandler的状态,避免多次初始化
  29. private byte state; // 0 - none, 1 - initialized, 2 - destroyed
  30. private boolean reading;
  31. private long lastChangeCheckTimeStamp;
  32. private int lastMessageHashCode;
  33. private long lastPendingWriteBytes;
  34. private long lastFlushProgress;
  35. // 默认使用的构造器
  36. public IdleStateHandler(
  37. int readerIdleTimeSeconds,
  38. int writerIdleTimeSeconds,
  39. int allIdleTimeSeconds) {
  40. // 默认单位为秒
  41. this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
  42. TimeUnit.SECONDS);
  43. }
  44. public IdleStateHandler(
  45. long readerIdleTime, long writerIdleTime, long allIdleTime,
  46. TimeUnit unit) {
  47. this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
  48. }
  49. public IdleStateHandler(boolean observeOutput,
  50. long readerIdleTime, long writerIdleTime, long allIdleTime,
  51. TimeUnit unit) {
  52. ObjectUtil.checkNotNull(unit, "unit");
  53. this.observeOutput = observeOutput;
  54. // 以纳秒为单位重新设置超时时间
  55. if (readerIdleTime <= 0) {
  56. readerIdleTimeNanos = 0;
  57. } else {
  58. readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
  59. }
  60. if (writerIdleTime <= 0) {
  61. writerIdleTimeNanos = 0;
  62. } else {
  63. writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
  64. }
  65. if (allIdleTime <= 0) {
  66. allIdleTimeNanos = 0;
  67. } else {
  68. allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
  69. }
  70. }
  71. }

IdleStateHandler继承了ChannelDuplexHandler,说明其可以处理inbound、outbound事件;

构造方法比较简单,我们比较常用的就是第一个构造方法,以秒为单位来设置读、写、all的空闲检测;

属性的话,我们通过具体方法来学习。

3.IdleStateHandler空闲检测

在handlerAdded、channelActive等方法中,都有一个initialize()方法,这个方法用来初始化检测器,我们先来看下

3.1 initialize() 初始化IdleStateHandler

  1. public class IdleStateHandler extends ChannelDuplexHandler {
  2. private void initialize(ChannelHandlerContext ctx) {
  3. // 已经初始化过则不再重复初始化
  4. switch (state) {
  5. case 1:
  6. case 2:
  7. return;
  8. }
  9. // 设置状态为initialized
  10. state = 1;
  11. // TODO
  12. initOutputChanged(ctx);
  13. // 设置最新的readTime和writeTime为当前时间
  14. lastReadTime = lastWriteTime = ticksInNanos();
  15. if (readerIdleTimeNanos > 0) {
  16. // 创建一个读空闲检测定时任务,延后readerIdleTimeNanos执行,具体schedule方法见下面
  17. readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
  18. readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  19. }
  20. if (writerIdleTimeNanos > 0) {
  21. // 创建一个写空闲检测定时任务,延后readerIdleTimeNanos执行
  22. writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
  23. writerIdleTimeNanos, TimeUnit.NANOSECONDS);
  24. }
  25. if (allIdleTimeNanos > 0) {
  26. // 创建一个读、写空闲检测定时任务,延后readerIdleTimeNanos执行
  27. allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
  28. allIdleTimeNanos, TimeUnit.NANOSECONDS);
  29. }
  30. }
  31. // 创建一个定时任务
  32. ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
  33. return ctx.executor().schedule(task, delay, unit);
  34. }
  35. }

初始化方法,主要用于初始化三个定时任务,那么这三个定时任务ReaderIdleTimeoutTask、WriterIdleTimeoutTask、AllIdleTimeoutTask具体是怎么用的呢?我们先来看下读空闲检测是如何做的。

3.2 读空闲检测

  1. public class IdleStateHandler extends ChannelDuplexHandler {
  2. @Override
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  4. // 当设置的readerIdleTimeNanos或allIdleTimeNanos大于0时,说明需要进行读空闲检测
  5. if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
  6. // 设置reading 正在读数据的状态为true
  7. reading = true;
  8. // 设置两个状态为为true
  9. firstReaderIdleEvent = firstAllIdleEvent = true;
  10. }
  11. ctx.fireChannelRead(msg);
  12. }
  13. // 重点在这来,如果本次读已经结束,则需要重置时间和状态位
  14. @Override
  15. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  16. // 如果数据正在读状态
  17. if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
  18. // 则重新设置lastReadTime为当前时间
  19. lastReadTime = ticksInNanos();
  20. // 将正在读标志设置为false
  21. reading = false;
  22. }
  23. ctx.fireChannelReadComplete();
  24. }
  25. }

通过上述两个方法可以看出,当发生读事件时,设置reading=true,当本次读结束时,则设置reading=false,lastReadTime(最近一次读时间)为当前时间。

那么这个是如何被检测到读超时的呢?我们可以回到initialize()方法,其中有一个ReaderIdleTimeoutTask的定时任务,延迟readerIdleTimeNanos执行,一起来看下这个task的具体内容

3.2.1 ReaderIdleTimeoutTask 读空闲检测定时任务

  1. private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
  2. // 将ChannelHandlerContext传入当前task
  3. ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
  4. super(ctx);
  5. }
  6. @Override
  7. protected void run(ChannelHandlerContext ctx) {
  8. long nextDelay = readerIdleTimeNanos;
  9. // 如果当前没有发生读事件,则reading为false
  10. if (!reading) {
  11. nextDelay -= ticksInNanos() - lastReadTime;
  12. }
  13. // 读空闲超时,需要发送READER_IDLE事件
  14. if (nextDelay <= 0) {
  15. // Reader is idle - set a new timeout and notify the callback.
  16. // 重启一个定时检测任务,延迟readerIdleTimeNanos执行
  17. readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  18. boolean first = firstReaderIdleEvent;
  19. firstReaderIdleEvent = false;
  20. try {
  21. // 往下游发送一个READER_IDLE Event
  22. IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
  23. channelIdle(ctx, event);
  24. } catch (Throwable t) {
  25. ctx.fireExceptionCaught(t);
  26. }
  27. } else {
  28. // 注意这里虽然也是重启一个定时任务,但是延迟时间与上面有所不同,这里的具体延迟时间为readerIdleTimeNanos - (ticksInNanos() - lastReadTime)
  29. readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
  30. }
  31. }
  32. protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
  33. ctx.fireUserEventTriggered(evt);
  34. }
  35. }

这里比较有意思的是nextDelay值的设置,当读事件正在进行时(reading=true),则直接进行下一次循环;

当读事件未执行,若ticksInNanos(当前时间) - lastReadTime(最后一次读完成时间) > readerIdleTimeNanos(读空闲检测时间),说明读空闲超时,往下游发送一个READER_IDLE Event

reading=false的情况有两种:没有发生过读、读已经结束;当读数据正在进行时,则reading=true

总结:通过这种对lastReadTime的定时任务检测,就可以发现是否已经长时间未读,若是,则发送下游READER_IDLE事件,下游检测到该事件进行相应处理即可。

3.3 写空闲检测

分析过程与3.2 读空闲检测类似,我们先来看下重写后的write方法

  1. public class IdleStateHandler extends ChannelDuplexHandler {
  2. @Override
  3. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  4. // 如果writerIdleTimeNanos或allIdleTimeNanos大于0,说明需要进行写空闲检测
  5. if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
  6. // 对write方法执行后的ChannelFuture添加监听器,writeListener内容如下
  7. ctx.write(msg, promise.unvoid()).addListener(writeListener);
  8. } else {
  9. ctx.write(msg, promise);
  10. }
  11. }
  12. private final ChannelFutureListener writeListener = new ChannelFutureListener() {
  13. @Override
  14. public void operationComplete(ChannelFuture future) throws Exception {
  15. // 当写方法完成时,设置最新一次写时间为当前时间
  16. lastWriteTime = ticksInNanos();
  17. firstWriterIdleEvent = firstAllIdleEvent = true;
  18. }
  19. };
  20. }

方法并不复杂,主要就是对write方法添加一个监听器,用于监听wirte方法完成,完成后重置下lastWriteTime。下面来看下WriteTask所做的事情

3.3.1 WriterIdleTimeoutTask 写空闲检测

  1. private final class WriterIdleTimeoutTask extends AbstractIdleTask {
  2. // 将ChannelHandlerContext传入当前task
  3. WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
  4. super(ctx);
  5. }
  6. @Override
  7. protected void run(ChannelHandlerContext ctx) {
  8. long lastWriteTime = IdleStateHandler.this.lastWriteTime;
  9. // 同样的方式来计算nextDelay
  10. long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
  11. // 已超时
  12. if (nextDelay <= 0) {
  13. // 先生成一个定时任务,用于下次写超时检测
  14. writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
  15. boolean first = firstWriterIdleEvent;
  16. firstWriterIdleEvent = false;
  17. try {
  18. // 这里比较有意思,我们具体在3.3.2 来看下
  19. if (hasOutputChanged(ctx, first)) {
  20. return;
  21. }
  22. // 直接向下游传递一个WRITER_IDLE事件
  23. IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
  24. channelIdle(ctx, event);
  25. } catch (Throwable t) {
  26. ctx.fireExceptionCaught(t);
  27. }
  28. } else {
  29. // 说明写未超时,重新生成一个定时任务,延迟nextDelay执行
  30. writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
  31. }
  32. }
  33. }

3.3.2 hasOutputChanged() 判断Channeloutboundbuffer是否发生过变化

  1. public class IdleStateHandler extends ChannelDuplexHandler {
  2. private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
  3. // 默认observeOutput=false,不会进行下面的检测,需要主动开启
  4. if (observeOutput) {
  5. // 如果 lastChangeCheckTimeStamp 和 lastWriteTime 不一样,说明写操作进行过了,需要更新此值
  6. if (lastChangeCheckTimeStamp != lastWriteTime) {
  7. lastChangeCheckTimeStamp = lastWriteTime;
  8. // 非首次,则直接返回true
  9. // 这里的first,即firstWriterIdleEvent参数,默认为true,当写操作完成时也被置为true
  10. if (!first) {
  11. return true;
  12. }
  13. }
  14. Channel channel = ctx.channel();
  15. Unsafe unsafe = channel.unsafe();
  16. ChannelOutboundBuffer buf = unsafe.outboundBuffer();
  17. if (buf != null) {
  18. int messageHashCode = System.identityHashCode(buf.current());
  19. long pendingWriteBytes = buf.totalPendingWriteBytes();
  20. // 这来主要判断ChannelOutboundBuffer中的值是否发生了变化
  21. // 如果有数据添加进来,则前后肯定不一致
  22. if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
  23. lastMessageHashCode = messageHashCode;
  24. lastPendingWriteBytes = pendingWriteBytes;
  25. if (!first) {
  26. return true;
  27. }
  28. }
  29. long flushProgress = buf.currentProgress();
  30. if (flushProgress != lastFlushProgress) {
  31. lastFlushProgress = flushProgress;
  32. if (!first) {
  33. return true;
  34. }
  35. }
  36. }
  37. }
  38. return false;
  39. }
  40. }

关于检测Channeloutboundbuffer变化的逻辑,在正常使用IdleStateHandler中是不会触发的,具体的细节分析可以参考下下文

Netty 心跳服务之 IdleStateHandler 源码分析 - 简书

总结:写空闲检测与读空闲检测基本类似,笔者不再赘述。

3.4 AllIdle检测

这个的检测与上述读写检测基本是一样的,大家可以自行阅读AllIdleTimeoutTask.java,笔者不再赘述

总结:

IdleStateHandler的代码不算复杂,在我们的应用探活中可以很好的发挥作用。

主要还是我们监听到Idle event后的自定义处理方案,这个才是关键。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/242191
推荐阅读
相关标签
  

闽ICP备14008679号