赞
踩
Netty客户端和服务端之间的Tcp长连接通过心跳包探测连通性。
1.对于服务端而言,定时剔除那些没用的连接,减轻服务端的压力,增加稳定性;
2.对于客户端而言,可以提前进行重新连接,减少请求后续连接耗时。
在Netty中,提供了IdleStateHandler类提供心跳检测,下面分析IdleStateHandler原理何在dubbo中的使用。
IdleStateHandler类中初始化方法
- private void initialize(ChannelHandlerContext ctx) {
- // Avoid the case where destroy() is called before scheduling timeouts.
- // See: https://github.com/netty/netty/issues/143
- // 初始化成功后直接返回
- switch (state) {
- case 1:
- case 2:
- return;
- default:
- break;
- }
-
- state = 1;
- //默认为false,检测ChannelOutboundBuffer中是否有字节变化
- initOutputChanged(ctx);
-
- //初始化设置的三个任务调度器
- lastReadTime = lastWriteTime = ticksInNanos();
- if (readerIdleTimeNanos > 0) {
- readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
- readerIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- if (writerIdleTimeNanos > 0) {
- writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
- writerIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- if (allIdleTimeNanos > 0) {
- allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
- allIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- }
在AbstractScheduledEventExecutor类中,将任务添加到延迟队列ScheduledTaskQueue中
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
- ObjectUtil.checkNotNull(command, "command");
- ObjectUtil.checkNotNull(unit, "unit");
- if (delay < 0) {
- delay = 0;
- }
- validateScheduled0(delay, unit);
-
- //调度延迟任务
- return schedule(new ScheduledFutureTask<Void>(
- this,
- command,
- deadlineNanos(unit.toNanos(delay))));
- }
在NioEventLoop中获取延迟到期的任务
System.nanoTime()是当前系统时间
设定的时间:long deadlineNanos = nanoTime() + delay;
nanoTime() = System.nanoTime() - START_TIME;
比较的时间:System.nanoTime() - START_TIME
-
- /**
- * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
- *
- * @return {@code true} if and only if at least one task was run
- */
- protected boolean runAllTasks() {
- assert inEventLoop();
- boolean fetchedAll;
- boolean ranAtLeastOne = false;
-
- do {
- //从中获取延迟到期的任务
- fetchedAll = fetchFromScheduledTaskQueue();
- if (runAllTasksFrom(taskQueue)) {
- ranAtLeastOne = true;
- }
- } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
-
- if (ranAtLeastOne) {
- lastExecutionTime = ScheduledFutureTask.nanoTime();
- }
- afterRunningAllTasks();
- return ranAtLeastOne;
- }
- private boolean fetchFromScheduledTaskQueue() {
- if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
- return true;
- }
- long nanoTime = AbstractScheduledEventExecutor.nanoTime();
- for (;;) {
- Runnable scheduledTask = pollScheduledTask(nanoTime);
- if (scheduledTask == null) {
- return true;
- }
- //任务队列不满则加入,满了在放回延迟队列
- if (!taskQueue.offer(scheduledTask)) {
- // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
- scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
- return false;
- }
- }
- }
接着从taskQueue顺序拿出任务,开始执行任务;
- private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
-
- ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
- super(ctx);
- }
-
- @Override
- protected void run(ChannelHandlerContext ctx) {
- long nextDelay = readerIdleTimeNanos;
- //是否正在处理读任务;否则的话,
- //计算剩余的(当前时间到最后一次读借宿时间间隔)和 读空闲设置时间比较(后面会重新设置)
- if (!reading) {
- nextDelay -= ticksInNanos() - lastReadTime;
- }
- //超过了读空闲时间,触发IdleStateEvent事件并且重新调度
- if (nextDelay <= 0) {
- // Reader is idle - set a new timeout and notify the callback.
- readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
-
- boolean first = firstReaderIdleEvent;
- firstReaderIdleEvent = false;
-
- try {
- IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
- channelIdle(ctx, event);
- } catch (Throwable t) {
- ctx.fireExceptionCaught(t);
- }
- } else {
- //在读空闲时间内有读事件,重新设置空闲时间为
- //初始值 - (当前时间 - 最后一次读结束的时间) = 剩余容忍空闲读的时间
- //(当前时间 - 最后一次读结束的时间)=》这个是在一个设置周期内已经空闲读的时间
- // Read occurred before the timeout - set a new timeout with shorter delay.
- readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
- }
- }
- }
上面的short delay注释解释了,确保在设置的readerIdleTimeNanos周期内有无读时间发生。
最终,触发了读空闲后会发出一个ctx.fireUserEventTriggered(evt)事件,可以在自定的handler的userEventTriggered方法中处理。
更新标识的时机:
读的过程中会置位reading为true,channelReadComplete后会置为false;
channelReadComplete会更新lastReadTime为最后一次读的时间。
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
- reading = true;
- firstReaderIdleEvent = firstAllIdleEvent = true;
- }
- ctx.fireChannelRead(msg);
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
- lastReadTime = ticksInNanos();
- reading = false;
- }
- ctx.fireChannelReadComplete();
- }
- private final class WriterIdleTimeoutTask extends AbstractIdleTask {
-
- WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
- super(ctx);
- }
-
- @Override
- protected void run(ChannelHandlerContext ctx) {
-
- long lastWriteTime = IdleStateHandler.this.lastWriteTime;
- long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
- if (nextDelay <= 0) {
- // Writer is idle - set a new timeout and notify the callback.
- writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
-
- boolean first = firstWriterIdleEvent;
- firstWriterIdleEvent = false;
-
- try {
- //这个默认是false,不会触发
- //如果为true,则ChannelOutboundBuffer中有字节的写出则不会触发写空闲,
- //如果写出过于缓慢,则可能导致ChannelOutboundBuffer链表过长导致OOMb
- if (hasOutputChanged(ctx, first)) {
- return;
- }
-
- IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
- channelIdle(ctx, event);
- } catch (Throwable t) {
- ctx.fireExceptionCaught(t);
- }
- } else {
- // Write occurred before the timeout - set a new timeout with shorter delay.
- writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
- }
- }
- }
写的逻辑和读的大致一样。
写标识更新时机:
在最终writeAndFlush将ChannelOutboundBuffer中的entry通过nio channel写出时。
- private final ChannelFutureListener writeListener = new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- lastWriteTime = ticksInNanos();
- firstWriterIdleEvent = firstAllIdleEvent = true;
- }
- };
ChannelOutboundBuffer类中的remove方法
- /**
- * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
- * flushed message exists at the time this method is called it will return {@code false} to signal that no more
- * messages are ready to be handled.
- */
- public boolean remove() {
- Entry e = flushedEntry;
- if (e == null) {
- clearNioBuffers();
- return false;
- }
- Object msg = e.msg;
-
- ChannelPromise promise = e.promise;
- int size = e.pendingSize;
-
- removeEntry(e);
-
- if (!e.cancelled) {
- // only release message, notify and decrement if it was not canceled before.
- ReferenceCountUtil.safeRelease(msg);
- safeSuccess(promise);
- decrementPendingOutboundBytes(size, false, true);
- }
-
- // recycle the entry
- e.recycle();
-
- return true;
- }
和单独的读、写大致一样。
server端:
初始化IdleStateHandler,服务端心跳时长是客户端三倍,注册的是读写空闲
处理的nettyServerHandler的userEventTriggered方法,服务端读写空闲事件,直接关闭channel连接
client端:
初始化IdleStateHandler,客户端心跳时长是服务端三分之一,注册的是读空闲
客户端重连
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。