当前位置:   article > 正文

Netty入门学习笔记2-核心组件EventLoop_netty defaulteventloopgroup

netty defaulteventloopgroup

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。(EventLoop 可翻译:事件循环对象 )

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法

  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,

    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop

    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全) (EventLoopGroup 可翻译为 事件循环组 )

  • 继承自 netty 自己的 EventExecutorGroup

    • 实现了 Iterable 接口提供遍历 EventLoop 的能力

    • 另有 next 方法获取集合中下一个 EventLoop

一个简单的实现为例:

  1. // 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
  2. DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
  3. System.out.println(group.next());
  4. System.out.println(group.next());
  5. System.out.println(group.next());

输出

  1. io.netty.channel.DefaultEventLoop@60f82f98
  2. io.netty.channel.DefaultEventLoop@35f983a6
  3. io.netty.channel.DefaultEventLoop@60f82f98

也可以使用 for 循环

  1. DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
  2. for (EventExecutor eventLoop : group) {
  3. System.out.println(eventLoop);
  4. }

输出
 

  1. io.netty.channel.DefaultEventLoop@60f82f98
  2. io.netty.channel.DefaultEventLoop@35f983a6

优雅关闭

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的.

演示 NioEventLoop 处理 io 事件

服务器端 1个nio boss+ 两个 nio worker 工人

  1. public static void main(String[] args) {
  2. DefaultEventLoopGroup defaultdDevent = new DefaultEventLoopGroup(1);
  3. new ServerBootstrap()
  4. .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
  5. .channel(NioServerSocketChannel.class)
  6. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  7. @Override
  8. protected void initChannel(NioSocketChannel socketChannel) throws Exception {
  9. socketChannel.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter(){
  10. @Override
  11. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  12. ByteBuf byteBuf=(ByteBuf)msg;
  13. log.debug(byteBuf.toString(Charset.defaultCharset()));
  14. //super.channelRead(ctx, msg);
  15. ctx.fireChannelRead(msg);//
  16. }
  17. });
  18. socketChannel.pipeline().addLast(defaultdDevent,"handler2",new ChannelInboundHandlerAdapter() {
  19. @Override
  20. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  21. ByteBuf byteBuf=(ByteBuf)msg;
  22. log.debug(byteBuf.toString(Charset.defaultCharset()));
  23. }
  24. });
  25. }
  26. })
  27. .bind(8080);
  28. }

handler 执行中如何换人?

关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

  1. static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
  2. final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
  3. // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
  4. EventExecutor executor = next.executor();
  5. // 是,直接调用
  6. if (executor.inEventLoop()) {
  7. next.invokeChannelRead(m);
  8. }
  9. // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
  10. else {
  11. executor.execute(new Runnable() {
  12. @Override
  13. public void run() {
  14. next.invokeChannelRead(m);
  15. }
  16. });
  17. }
  18. }
  • 如果两个 handler 绑定的是同一个线程,那么就直接调用

  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/152637?site
推荐阅读
相关标签
  

闽ICP备14008679号