当前位置:   article > 正文

netty的读写空闲检测-IdleStateHandler_idlestateevent

idlestateevent

前言

Netty客户端和服务端之间的Tcp长连接通过心跳包探测连通性。

1.对于服务端而言,定时剔除那些没用的连接,减轻服务端的压力,增加稳定性;

2.对于客户端而言,可以提前进行重新连接,减少请求后续连接耗时。

在Netty中,提供了IdleStateHandler类提供心跳检测,下面分析IdleStateHandler原理何在dubbo中的使用。

一.源码解析

IdleStateHandler类中初始化方法

  1. private void initialize(ChannelHandlerContext ctx) {
  2. // Avoid the case where destroy() is called before scheduling timeouts.
  3. // See: https://github.com/netty/netty/issues/143
  4. // 初始化成功后直接返回
  5. switch (state) {
  6. case 1:
  7. case 2:
  8. return;
  9. default:
  10. break;
  11. }
  12. state = 1;
  13. //默认为false,检测ChannelOutboundBuffer中是否有字节变化
  14. initOutputChanged(ctx);
  15. //初始化设置的三个任务调度器
  16. lastReadTime = lastWriteTime = ticksInNanos();
  17. if (readerIdleTimeNanos > 0) {
  18. readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
  19. readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  20. }
  21. if (writerIdleTimeNanos > 0) {
  22. writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
  23. writerIdleTimeNanos, TimeUnit.NANOSECONDS);
  24. }
  25. if (allIdleTimeNanos > 0) {
  26. allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
  27. allIdleTimeNanos, TimeUnit.NANOSECONDS);
  28. }
  29. }

在AbstractScheduledEventExecutor类中,将任务添加到延迟队列ScheduledTaskQueue中

  1. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  2. ObjectUtil.checkNotNull(command, "command");
  3. ObjectUtil.checkNotNull(unit, "unit");
  4. if (delay < 0) {
  5. delay = 0;
  6. }
  7. validateScheduled0(delay, unit);
  8. //调度延迟任务
  9. return schedule(new ScheduledFutureTask<Void>(
  10. this,
  11. command,
  12. deadlineNanos(unit.toNanos(delay))));
  13. }

在NioEventLoop中获取延迟到期的任务

System.nanoTime()是当前系统时间

设定的时间:long deadlineNanos = nanoTime() + delay; 

                     nanoTime() = System.nanoTime() - START_TIME;

比较的时间:System.nanoTime() - START_TIME 

  1. /**
  2. * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
  3. *
  4. * @return {@code true} if and only if at least one task was run
  5. */
  6. protected boolean runAllTasks() {
  7. assert inEventLoop();
  8. boolean fetchedAll;
  9. boolean ranAtLeastOne = false;
  10. do {
  11. //从中获取延迟到期的任务
  12. fetchedAll = fetchFromScheduledTaskQueue();
  13. if (runAllTasksFrom(taskQueue)) {
  14. ranAtLeastOne = true;
  15. }
  16. } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
  17. if (ranAtLeastOne) {
  18. lastExecutionTime = ScheduledFutureTask.nanoTime();
  19. }
  20. afterRunningAllTasks();
  21. return ranAtLeastOne;
  22. }
  1. private boolean fetchFromScheduledTaskQueue() {
  2. if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
  3. return true;
  4. }
  5. long nanoTime = AbstractScheduledEventExecutor.nanoTime();
  6. for (;;) {
  7. Runnable scheduledTask = pollScheduledTask(nanoTime);
  8. if (scheduledTask == null) {
  9. return true;
  10. }
  11. //任务队列不满则加入,满了在放回延迟队列
  12. if (!taskQueue.offer(scheduledTask)) {
  13. // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
  14. scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
  15. return false;
  16. }
  17. }
  18. }

接着从taskQueue顺序拿出任务,开始执行任务;

1.ReaderIdleTimeoutTask读空闲任务

  1. private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
  2. ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
  3. super(ctx);
  4. }
  5. @Override
  6. protected void run(ChannelHandlerContext ctx) {
  7. long nextDelay = readerIdleTimeNanos;
  8. //是否正在处理读任务;否则的话,
  9. //计算剩余的(当前时间到最后一次读借宿时间间隔)和 读空闲设置时间比较(后面会重新设置)
  10. if (!reading) {
  11. nextDelay -= ticksInNanos() - lastReadTime;
  12. }
  13. //超过了读空闲时间,触发IdleStateEvent事件并且重新调度
  14. if (nextDelay <= 0) {
  15. // Reader is idle - set a new timeout and notify the callback.
  16. readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  17. boolean first = firstReaderIdleEvent;
  18. firstReaderIdleEvent = false;
  19. try {
  20. IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
  21. channelIdle(ctx, event);
  22. } catch (Throwable t) {
  23. ctx.fireExceptionCaught(t);
  24. }
  25. } else {
  26. //在读空闲时间内有读事件,重新设置空闲时间为
  27. //初始值 - (当前时间 - 最后一次读结束的时间) = 剩余容忍空闲读的时间
  28. //(当前时间 - 最后一次读结束的时间)=》这个是在一个设置周期内已经空闲读的时间
  29. // Read occurred before the timeout - set a new timeout with shorter delay.
  30. readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
  31. }
  32. }
  33. }

上面的short delay注释解释了,确保在设置的readerIdleTimeNanos周期内有无读时间发生。

最终,触发了读空闲后会发出一个ctx.fireUserEventTriggered(evt)事件,可以在自定的handler的userEventTriggered方法中处理。

更新标识的时机:

读的过程中会置位reading为true,channelReadComplete后会置为false;

channelReadComplete会更新lastReadTime为最后一次读的时间。

  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
  4. reading = true;
  5. firstReaderIdleEvent = firstAllIdleEvent = true;
  6. }
  7. ctx.fireChannelRead(msg);
  8. }
  9. @Override
  10. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  11. if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
  12. lastReadTime = ticksInNanos();
  13. reading = false;
  14. }
  15. ctx.fireChannelReadComplete();
  16. }

2.WriterIdleTimeoutTask写空闲任务

  1. private final class WriterIdleTimeoutTask extends AbstractIdleTask {
  2. WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
  3. super(ctx);
  4. }
  5. @Override
  6. protected void run(ChannelHandlerContext ctx) {
  7. long lastWriteTime = IdleStateHandler.this.lastWriteTime;
  8. long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
  9. if (nextDelay <= 0) {
  10. // Writer is idle - set a new timeout and notify the callback.
  11. writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
  12. boolean first = firstWriterIdleEvent;
  13. firstWriterIdleEvent = false;
  14. try {
  15. //这个默认是false,不会触发
  16. //如果为true,则ChannelOutboundBuffer中有字节的写出则不会触发写空闲,
  17. //如果写出过于缓慢,则可能导致ChannelOutboundBuffer链表过长导致OOMb
  18. if (hasOutputChanged(ctx, first)) {
  19. return;
  20. }
  21. IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
  22. channelIdle(ctx, event);
  23. } catch (Throwable t) {
  24. ctx.fireExceptionCaught(t);
  25. }
  26. } else {
  27. // Write occurred before the timeout - set a new timeout with shorter delay.
  28. writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
  29. }
  30. }
  31. }

写的逻辑和读的大致一样。

写标识更新时机:

在最终writeAndFlush将ChannelOutboundBuffer中的entry通过nio channel写出时。

  1. private final ChannelFutureListener writeListener = new ChannelFutureListener() {
  2. @Override
  3. public void operationComplete(ChannelFuture future) throws Exception {
  4. lastWriteTime = ticksInNanos();
  5. firstWriterIdleEvent = firstAllIdleEvent = true;
  6. }
  7. };

ChannelOutboundBuffer类中的remove方法

  1. /**
  2. * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
  3. * flushed message exists at the time this method is called it will return {@code false} to signal that no more
  4. * messages are ready to be handled.
  5. */
  6. public boolean remove() {
  7. Entry e = flushedEntry;
  8. if (e == null) {
  9. clearNioBuffers();
  10. return false;
  11. }
  12. Object msg = e.msg;
  13. ChannelPromise promise = e.promise;
  14. int size = e.pendingSize;
  15. removeEntry(e);
  16. if (!e.cancelled) {
  17. // only release message, notify and decrement if it was not canceled before.
  18. ReferenceCountUtil.safeRelease(msg);
  19. safeSuccess(promise);
  20. decrementPendingOutboundBytes(size, false, true);
  21. }
  22. // recycle the entry
  23. e.recycle();
  24. return true;
  25. }

3.AllIdleTimeoutTask读写空闲任务

和单独的读、写大致一样。

二.IdleStateHandler在dubbo中使用

server端:

初始化IdleStateHandler,服务端心跳时长是客户端三倍,注册的是读写空闲

 处理的nettyServerHandler的userEventTriggered方法,服务端读写空闲事件,直接关闭channel连接

client端:

初始化IdleStateHandler,客户端心跳时长是服务端三分之一,注册的是读空闲

客户端重连

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/242229
推荐阅读
相关标签
  

闽ICP备14008679号