赞
踩
每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext,并与ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler进行交互。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的。ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系如下图:
Netty有一个简单但强大的状态模型,能完美映射到ChannelInboundHandler的各个方法。如下表所示是Channel生命周期四个不同的状态。
一个Channel正常的生命周期如下图所示。随着状态发生变化产生相应的事件。这些事件被转发到ChannelPipeline中的ChannelHandler来触发相应的操作。
先看一个Netty中整个Handler体系的类关系图。
Netty定义了良好的类型层次结构来表示不同的处理程序类型,所有类型的父类是ChannelHandler, ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法,如下表
Netty还提供了一个实现了ChannelHandler的抽象类ChannelHandlerAdapter。 ChannelHandlerAdapter实现了父类的所有方法,主要功能就是将请求从一个ChannelHandler往下传递到下一个ChannelHandler,直到全部ChannelHandler传递完毕。也可以直接继承于ChannelHandlerAdapter,然后重写里面的方法。
ChannelInboundHandler还提供了一些在接收数据或Channel状态改变时被调用的方法。下面是ChannelInboundHandler的一些方法。
java.util.concurrent.Future是Java原生API中提供的接口,用来记录异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成立即返回执行结果,否则阻塞线程,知道任务完成再返回。
Netty扩展了Java的Future,在Future的基础上扩展了监听器(Listener)接口,通过监听器可以让异步执行更加有效率,不需要通过调用get方法来等待异步执行结束,而是通过监听器回调来精确控制异步执行结束时间。
- public interface Future<V> extends java.util.concurrent.Future<V> {
-
- boolean isSuccess();
-
- boolean isCancellable();
-
-
- Throwable cause();
-
- Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
-
- Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
-
- Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
-
-
- Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
-
- Future<V> sync() throws InterruptedException;
-
-
- Future<V> syncUninterruptibly();
-
-
- Future<V> await() throws InterruptedException;
-
-
- Future<V> awaitUninterruptibly();
-
- boolean await(long timeout, TimeUnit unit) throws InterruptedException;
-
-
- boolean await(long timeoutMillis) throws InterruptedException;
-
-
- boolean awaitUninterruptibly(long timeout, TimeUnit unit);
-
-
- boolean awaitUninterruptibly(long timeoutMillis);
-
-
- V getNow();
-
- @Override
- boolean cancel(boolean mayInterruptIfRunning);
- }
ChannelFuture接口有扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时和一个Channel进行绑定。
- public interface ChannelFuture extends Future<Void> {
-
- /**
- * Returns a channel where the I/O operation associated with this
- * future takes place.
- */
- Channel channel();
-
- @Override
- ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
-
- @Override
- ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
-
- @Override
- ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
-
- @Override
- ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
-
- @Override
- ChannelFuture sync() throws InterruptedException;
-
- @Override
- ChannelFuture syncUninterruptibly();
-
- @Override
- ChannelFuture await() throws InterruptedException;
-
- @Override
- ChannelFuture awaitUninterruptibly();
-
-
- boolean isVoid();
- }
Promise接口也是Future的扩展接口,它表示一种可写的Future,可以自定义设置异步执行的结果。
-
- /**
- * Special {@link Future} which is writable.
- */
- public interface Promise<V> extends Future<V> {
-
-
- Promise<V> setSuccess(V result);
-
-
- boolean trySuccess(V result);
-
- /**
- * Marks this future as a failure and notifies all
- * listeners.
- *
- * If it is success or failed already it will throw an {@link IllegalStateException}.
- */
- Promise<V> setFailure(Throwable cause);
-
- /**
- * Marks this future as a failure and notifies all
- * listeners.
- *
- * @return {@code true} if and only if successfully marked this future as
- * a failure. Otherwise {@code false} because this future is
- * already marked as either a success or a failure.
- */
- boolean tryFailure(Throwable cause);
-
- /**
- * Make this future impossible to cancel.
- *
- * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
- * without being cancelled. {@code false} if this future has been cancelled already.
- */
- boolean setUncancellable();
-
- @Override
- Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
-
- @Override
- Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
-
- @Override
- Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
-
- @Override
- Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
-
- @Override
- Promise<V> await() throws InterruptedException;
-
- @Override
- Promise<V> awaitUninterruptibly();
-
- @Override
- Promise<V> sync() throws InterruptedException;
-
- @Override
- Promise<V> syncUninterruptibly();
- }
ChannelPromise接口扩展了Promise和ChannelFuture,绑定了Channel,既可以写异步执行结果,又具备了监听者的功能,是Netty实际编程中使用的表示异步执行的接口。
- public interface ChannelPromise extends ChannelFuture, Promise<Void> {
-
- @Override
- Channel channel();
-
- @Override
- ChannelPromise setSuccess(Void result);
-
- ChannelPromise setSuccess();
-
- boolean trySuccess();
-
- @Override
- ChannelPromise setFailure(Throwable cause);
-
- @Override
- ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
-
- @Override
- ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
-
- @Override
- ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
-
- @Override
- ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
-
- @Override
- ChannelPromise sync() throws InterruptedException;
-
- @Override
- ChannelPromise syncUninterruptibly();
-
- @Override
- ChannelPromise await() throws InterruptedException;
-
- @Override
- ChannelPromise awaitUninterruptibly();
-
- /**
- * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
- */
- ChannelPromise unvoid();
- }
DefaultChannelPromise是ChannelPromise的实现类,它是实际运行时的Promise实例。Netty使用addListener方法来回调异步执行的结果。DefaultPromise的addListener()方法的代码如下
- public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
- checkNotNull(listener, "listener");
-
- synchronized (this) {
- addListener0(listener);
- }
-
- if (isDone()) {
- notifyListeners();
- }
-
- return this;
- }
- private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
- if (listeners == null) {
- listeners = listener;
- } else if (listeners instanceof DefaultFutureListeners) {
- ((DefaultFutureListeners) listeners).add(listener);
- } else {
- listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
- }
- }
- private void notifyListeners() {
- EventExecutor executor = executor();
- if (executor.inEventLoop()) {
- final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
- final int stackDepth = threadLocals.futureListenerStackDepth();
- if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
- threadLocals.setFutureListenerStackDepth(stackDepth + 1);
- try {
- notifyListenersNow();
- } finally {
- threadLocals.setFutureListenerStackDepth(stackDepth);
- }
- return;
- }
- }
-
- safeExecute(executor, new Runnable() {
- @Override
- public void run() {
- notifyListenersNow();
- }
- });
- }
从上述代码中可以看到,DefaultChannelPromise会判断异步任务执行的状态,如果执行完毕就立即通知监听者,否则加入监听者队列。通知监听者就是找一个线程来执行调用监听者的回调函数。
再来看监听者的接口,其实就是一个方法,即等待异步任务执行完毕后,获得Future结果,执行回调的逻辑,代码如下。
- public interface GenericFutureListener<F extends Future<?>> extends EventListener {
-
- /**
- * Invoked when the operation associated with the {@link Future} has been completed.
- *
- * @param future the source {@link Future} which called this callback
- */
- void operationComplete(F future) throws Exception;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。