当前位置:   article > 正文

Netty学习——源码篇9 Netty的Handler其他处理 备份_netty handler 里面放个处理数据

netty handler 里面放个处理数据

 1 ChannelHandlerContext

        每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext,并与ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler进行交互。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的。ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系如下图:

2 Channel的声明周期

        Netty有一个简单但强大的状态模型,能完美映射到ChannelInboundHandler的各个方法。如下表所示是Channel生命周期四个不同的状态。

         一个Channel正常的生命周期如下图所示。随着状态发生变化产生相应的事件。这些事件被转发到ChannelPipeline中的ChannelHandler来触发相应的操作。

3 ChannelHandler常用的API

        先看一个Netty中整个Handler体系的类关系图。

        Netty定义了良好的类型层次结构来表示不同的处理程序类型,所有类型的父类是ChannelHandler, ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法,如下表

        Netty还提供了一个实现了ChannelHandler的抽象类ChannelHandlerAdapter。 ChannelHandlerAdapter实现了父类的所有方法,主要功能就是将请求从一个ChannelHandler往下传递到下一个ChannelHandler,直到全部ChannelHandler传递完毕。也可以直接继承于ChannelHandlerAdapter,然后重写里面的方法。

4 ChannelInboundHandler

        ChannelInboundHandler还提供了一些在接收数据或Channel状态改变时被调用的方法。下面是ChannelInboundHandler的一些方法。

5 异步处理Future

        java.util.concurrent.Future是Java原生API中提供的接口,用来记录异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成立即返回执行结果,否则阻塞线程,知道任务完成再返回。

        Netty扩展了Java的Future,在Future的基础上扩展了监听器(Listener)接口,通过监听器可以让异步执行更加有效率,不需要通过调用get方法来等待异步执行结束,而是通过监听器回调来精确控制异步执行结束时间。

  1. public interface Future<V> extends java.util.concurrent.Future<V> {
  2. boolean isSuccess();
  3. boolean isCancellable();
  4. Throwable cause();
  5. Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  6. Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  7. Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
  8. Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  9. Future<V> sync() throws InterruptedException;
  10. Future<V> syncUninterruptibly();
  11. Future<V> await() throws InterruptedException;
  12. Future<V> awaitUninterruptibly();
  13. boolean await(long timeout, TimeUnit unit) throws InterruptedException;
  14. boolean await(long timeoutMillis) throws InterruptedException;
  15. boolean awaitUninterruptibly(long timeout, TimeUnit unit);
  16. boolean awaitUninterruptibly(long timeoutMillis);
  17. V getNow();
  18. @Override
  19. boolean cancel(boolean mayInterruptIfRunning);
  20. }

        ChannelFuture接口有扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时和一个Channel进行绑定。

  1. public interface ChannelFuture extends Future<Void> {
  2. /**
  3. * Returns a channel where the I/O operation associated with this
  4. * future takes place.
  5. */
  6. Channel channel();
  7. @Override
  8. ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
  9. @Override
  10. ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
  11. @Override
  12. ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
  13. @Override
  14. ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
  15. @Override
  16. ChannelFuture sync() throws InterruptedException;
  17. @Override
  18. ChannelFuture syncUninterruptibly();
  19. @Override
  20. ChannelFuture await() throws InterruptedException;
  21. @Override
  22. ChannelFuture awaitUninterruptibly();
  23. boolean isVoid();
  24. }

6 异步执行Promise

        Promise接口也是Future的扩展接口,它表示一种可写的Future,可以自定义设置异步执行的结果。

  1. /**
  2. * Special {@link Future} which is writable.
  3. */
  4. public interface Promise<V> extends Future<V> {
  5. Promise<V> setSuccess(V result);
  6. boolean trySuccess(V result);
  7. /**
  8. * Marks this future as a failure and notifies all
  9. * listeners.
  10. *
  11. * If it is success or failed already it will throw an {@link IllegalStateException}.
  12. */
  13. Promise<V> setFailure(Throwable cause);
  14. /**
  15. * Marks this future as a failure and notifies all
  16. * listeners.
  17. *
  18. * @return {@code true} if and only if successfully marked this future as
  19. * a failure. Otherwise {@code false} because this future is
  20. * already marked as either a success or a failure.
  21. */
  22. boolean tryFailure(Throwable cause);
  23. /**
  24. * Make this future impossible to cancel.
  25. *
  26. * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
  27. * without being cancelled. {@code false} if this future has been cancelled already.
  28. */
  29. boolean setUncancellable();
  30. @Override
  31. Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  32. @Override
  33. Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  34. @Override
  35. Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
  36. @Override
  37. Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  38. @Override
  39. Promise<V> await() throws InterruptedException;
  40. @Override
  41. Promise<V> awaitUninterruptibly();
  42. @Override
  43. Promise<V> sync() throws InterruptedException;
  44. @Override
  45. Promise<V> syncUninterruptibly();
  46. }

        ChannelPromise接口扩展了Promise和ChannelFuture,绑定了Channel,既可以写异步执行结果,又具备了监听者的功能,是Netty实际编程中使用的表示异步执行的接口。

  1. public interface ChannelPromise extends ChannelFuture, Promise<Void> {
  2. @Override
  3. Channel channel();
  4. @Override
  5. ChannelPromise setSuccess(Void result);
  6. ChannelPromise setSuccess();
  7. boolean trySuccess();
  8. @Override
  9. ChannelPromise setFailure(Throwable cause);
  10. @Override
  11. ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
  12. @Override
  13. ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
  14. @Override
  15. ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
  16. @Override
  17. ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
  18. @Override
  19. ChannelPromise sync() throws InterruptedException;
  20. @Override
  21. ChannelPromise syncUninterruptibly();
  22. @Override
  23. ChannelPromise await() throws InterruptedException;
  24. @Override
  25. ChannelPromise awaitUninterruptibly();
  26. /**
  27. * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
  28. */
  29. ChannelPromise unvoid();
  30. }

        DefaultChannelPromise是ChannelPromise的实现类,它是实际运行时的Promise实例。Netty使用addListener方法来回调异步执行的结果。DefaultPromise的addListener()方法的代码如下

  1. public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
  2. checkNotNull(listener, "listener");
  3. synchronized (this) {
  4. addListener0(listener);
  5. }
  6. if (isDone()) {
  7. notifyListeners();
  8. }
  9. return this;
  10. }
  11. private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
  12. if (listeners == null) {
  13. listeners = listener;
  14. } else if (listeners instanceof DefaultFutureListeners) {
  15. ((DefaultFutureListeners) listeners).add(listener);
  16. } else {
  17. listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
  18. }
  19. }
  20. private void notifyListeners() {
  21. EventExecutor executor = executor();
  22. if (executor.inEventLoop()) {
  23. final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
  24. final int stackDepth = threadLocals.futureListenerStackDepth();
  25. if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
  26. threadLocals.setFutureListenerStackDepth(stackDepth + 1);
  27. try {
  28. notifyListenersNow();
  29. } finally {
  30. threadLocals.setFutureListenerStackDepth(stackDepth);
  31. }
  32. return;
  33. }
  34. }
  35. safeExecute(executor, new Runnable() {
  36. @Override
  37. public void run() {
  38. notifyListenersNow();
  39. }
  40. });
  41. }

        从上述代码中可以看到,DefaultChannelPromise会判断异步任务执行的状态,如果执行完毕就立即通知监听者,否则加入监听者队列。通知监听者就是找一个线程来执行调用监听者的回调函数。

        再来看监听者的接口,其实就是一个方法,即等待异步任务执行完毕后,获得Future结果,执行回调的逻辑,代码如下。

  1. public interface GenericFutureListener<F extends Future<?>> extends EventListener {
  2. /**
  3. * Invoked when the operation associated with the {@link Future} has been completed.
  4. *
  5. * @param future the source {@link Future} which called this callback
  6. */
  7. void operationComplete(F future) throws Exception;
  8. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/925462
推荐阅读
相关标签
  

闽ICP备14008679号