当前位置:   article > 正文

Netty 线程模型、核心组件_netty so_keepalive

netty so_keepalive

BIO、NIO、AIO与Linnux下的IO模型 概览

Netty 基础概念

  Netty 是一个 异步、基于事件驱动 的网络应用框架,用作快速开发高性能、高可用性的网络 I/O 应用。它的出现,是为了降低直接使用 JDK NIO 编码的复杂度,如:

  1. NIO 使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  2. 需要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
  3. Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%

  Netty 的主从 Reactor 多线程、内存零拷贝、内存池设计等特点,使得 Netty 的综合性能较高,使得它成为众多开源框架的底层通讯架构,如 ElasticSearch、Dubbo、Hadoop 中的多个组件。

netty 4.1 API
netty GitHub


线程模型的演变

BIO 服务模型

  一个客户端连接请求,使用服务器一个线程
带来的问题:

  1. 当并发数很大,就会创建大量的线程,占用很大系统资源
  2. 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费

NIO 服务模型

Reactor 模式中的两个核心成员:

  1. Reactor:Reactor 在一个单独的线程中运行,负责监听、等待数据,然后将数据分发给其他线程来对 IO 事件做出反应。类似于公司的客服,收集用户提出的问题,并记录下来,然后给对应的人去处理。
  2. Handlers:对得到的 IO 做实际的处理。类似客户问题的实际处理人。

Reactor模式分类

1. 单 Reactor 单线程

在这里插入图片描述

  服务器端用一个线程,通过多路复用搞定所有的 IO 操作(包括连接,读、写等),编码简单、没有线程切换的性能消耗,但是如果客户端连接数量较多,或 Handler 执行耗时任务时,将出现卡顿情况。

  使用场景:客户端的数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度为 O(1) 的情况

2. 单 Reactor 多线程

在这里插入图片描述
优点:可以充分的利用多核 cpu 的处理能力

缺点:多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,是单线程运行状态, 在高并发场景容易出现性能瓶颈。

3. Netty 改进后的主从 Reactor 多线程

在这里插入图片描述
图解:

  1. Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写 ,程序中的线程数默认为实际 cpu 核数 * 2,也可在初始化 NioEventLoopGroup 时传入指定的线程数

  2. NioEventLoopGroup 组中含有多个 NioEventLoop 事件循环 ,执行如图所示的3步工作,默认轮询的方式去处理多个事件

  3. 每个 NioEventLoop 包含有一个 Selector,一个 taskQueue ;每个 Selector 上可以注册监听多个 NioChannel

  4. 在 WorkerGroup 的每个 NIOEventLoop 处理业务时,会使用 pipeline(管道), pipeline 中包含了 channel(频道、通道) , 即通过 pipeline 可以获取到对应通道, 管道中维护了很多的 handler 处理器。


核心组件 API 介绍

Bootstrap、ServerBootstrap

  两者分别是 netty 客户端、服务端的启动引导类;其中常见的方法如下:

  1. public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup):用于服务器端设置主从 EventLoop

  2. public B group(EventLoopGroup group) :用于客户端设置一个 EventLoop

  3. public B channel(Class<? extends C> channelClass):用来设置通道实现,如服务器端的 NioServerSocketChannel ,客户端的 NioSocketChannel

  4. public < T > B option(ChannelOption< T > option, T value):用来给 parentGroup 添加配置

  5. public < T > ServerBootstrap childOption(ChannelOption< T > childOption, T value),用来给 childGroup 通道添加配置

  6. public ServerBootstrap childHandler(ChannelHandler childHandler):用来设置业务处理类(自定义的 handler )

  7. public ChannelFuture bind(int inetPort) :用于服务器端设置占用的端口号,注:当绑定的端口提供的是 Http 相关的服务,需要注意有 默认非安全端口 的限制,即服务正常,但部分浏览器会限制访问,更多请自行百度

  8. public ChannelFuture connect(String inetHost, int inetPort) :用于客户端连接服务器端

Channel
  1. Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用立即返回 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,将执行结果通知到调用方
    消时回调通知调用方
  2. 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,常用的 Channel 类型:
    NioSocketChannel,异步的客户端 TCP Socket 连接。
    NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
    NioDatagramChannel,异步的 UDP 连接。
    NioSctpChannel,异步的客户端 Sctp 连接。
    NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。
Selector
  1. Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件

  2. 当向一个 Selector 中注册 Channel 后,Selector 不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),然后驱动事件执行

ChannelOption
  1. Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数:
    ChannelOption.SO_BACKLOG:对应 TCP/MP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务将不能处理的客户端连接请求放在以列中等待处理,backlog 参数指定了队列的大小。
    ChannelOption.SO_KEEPALIVE:是否保持连接活动状态
ChannelHandler 及其实现类
  1. ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。

  2. ChannelPipeline 提供了 ChannelHandler 链的容器,何时是入站,何时是出站???

  3. ChannelHandler 相关类图:
    在这里插入图片描述

Pipeline 和 ChannelPipeline
  1. ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的链。(ChannelPipeline 是 保存 ChannelHandler 的 List,用于处理或拦截Channel 的入站事件和出站操作)

  2. ChannelPipeline 实现了拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互

  3. 每个 Channel 有且仅有一个 ChannelPipeline 与之对应,两者可以通过一个对象获取到另外一个对象;链表中 header 和 tail 节点为 DefaultChannelPipeline(是接口 ChannelHandlerContext 的子类) :
    在这里插入图片描述

  4. ChannelPipeline addFirst(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的第一个位置

  5. ChannelPipeline addLast(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的最后一个位置

Future、ChannelFuture

  Netty 中所有的 IO 操作都是异步的

  1. 通过 isDone 方法来判断当前操作是否完成;

  2. 通过 isSuccess 方法来判断已完成的当前操作是否成功;

  3. 通过 getCause 方法来获取已完成的当前操作失败的原因;

  4. 通过 isCancelled 方法来判断已完成的当前操作是否被取消;

  5. 通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果
    Future 对象已完成,则通知指定的监听器

  6. Channel channel(),返回当前正在进行 IO 操作的通道

  7. ChannelFuture sync(),等待异步操作执行完毕

EventLoopGroup 和其实现类 NioEventLoopGroup
  1. Netty 为了更好的利用多核 CPU 资源,会使用多个 EventLoop同时工作,每个 EventLoop 维护着一个 Selector 实例。
  2. EventLoopGroup 包含多个 EventLoop ,EventLoopGroup 提供 next 接口,通过固定的规则,来选取一个 EventLoop 完成 IO 事件的处理。
Unpooled :用来操作缓冲区(即 Netty 的数据容器 ByteBuf )的工具类
/**
         * byte[] buff  //buff即内部用于缓存的数组。
         * position //当前读取的位置。
         * mark //为某一读过的位置做标记,便于某些时候回退到该位置。
         * capacity //初始化时候的容量。
         * limit //当写数据到buffer中时,limit一般和capacity相等,当读数据时,limit代表buffer中有效数据的长度。
         * 0 - mark - position - limit - capacity
         */
        ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[10]);
        /**
         * 该对象包含一个数组 arr , 是一个 byte[10]。在 netty 的 ByteBuf 中,不需要使用 flip 进行反转
         * 底层维护了 readerindex 和 writerIndex,通过 readerindex 和 writerIndex 和 capacity, 将 buffer 分成三个区域
         * 0---readerindex 已经读取的区域 ;readerindex---writerIndex , 可读的区域 ; writerIndex -- capacity, 可写的区域
         */
        ByteBuf byteBuf = Unpooled.buffer(10);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

心跳检测 IdleStateHandler 相关:

什么心跳检测?

  心跳机制是客户端和服务端在 TCP 三次握手进入 ESTABLISH 状态后,通过发送一个最简单的包来保持连接的存活,或监控另一边服务的可用性等。

Netty中的心跳机制的使用

基于运输层 TCP 的 keepalive 机制实现

  基于运输层 TCP 的 keepalive 机制,由具体的 TCP 协议栈来实现长连接的维持。如在netty 中可以在创建 channel 的时候,指定 SO_KEEPALIVE 参数来实现:
在这里插入图片描述
存在的问题:
  Netty 只能控制 SO_KEEPALIVE 是否开启,其他参数,则需要从系统中读取,其中比较关键的是 tcp_keepalive_time,发送心跳包检测的时间间隔,默认为每2小时检测一次。如果客户端在这2小时内断开了,那么服务端也要维护这个连接2小时,浪费服务端资源;另外就是对于需要实时传输数据的场景,客户端断开了,服务端也要2小时后才能发现。服务端发送心跳检测,具体可能出现的情况如下:

  1. 连接正常:客户端仍然存在,网络连接状况良好。此时客户端会返回一个 ACK 。 服务端接收到ACK后重置计时器,在2小时后再发送探测。如果2小时内连接上有数据传输,那么在该时间基础上向后推延2个小时;

  2. 连接断开:客户端异常关闭,或是网络断开。在这两种情况下,客户端都不会响应服务器,服务器在一定时间(默认为 1000 ms )后重复发送 keep-alive packet ,并且重复发送一定次数。

  3. 客户端曾经崩溃,但已经重启:这种情况下,服务器将会收到对其存活探测的响应,但该响应是一个复位,从而引起服务器对连接的终止。

基于应用层的 IdleStateHandler 机制

IdleStateHandler 是 netty 提供的处理空闲状态的处理器

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) 
  • 1
  1. long readerIdleTime : 表示在 tcp 连接建立后 ,多长时间没有读操作, 就会发送一个心跳检测包到客户端检测连接是否存活。0表示不检测

  2. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接

  3. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接

  4. 当 IdleStateEvent 触发后 , 就会传递给 管道 的下一个 handler 去处理,通过调用(触发)下一个 handler 的 userEventTiggered 方法, 去完成对应的 IdleStateEvent(读空闲,写空闲,读写空闲)操作

遗留问题

  1. userEventTiggered 关闭通道后,从日志看,关闭的channel和后续执行的channel不是同一个,后续的 handler 还是可以执行的??????
  2. 在 channelRead0 中主动关闭 channel 后,后续添加的 handler 为什么还可以执行到其中的部分代码????????????

编解码器与 Handler 链的调用机制

编码解码器

   当 Netty 发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码,从字节转换为另一种
格式(比如 java 对象);出站消息被编码成字节。

   Netty 提供的编解码器,他们都实现了 ChannelInboundHadnler 或 ChannelOutboundHandler 接口,并将已经编解码的对象转发给 ChannelPipeline 中的下一个 Handler。

public class ByteBufDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        if (in.readableBytes() >= 8) {
            out.add(in.readInt());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

   tcp 协议下,数据被分段传输过来,有可能出现粘包拆包的问题。ByteToMessageDecoder 类会对入站数据进行缓冲,直到有足够的数据被处理。

需要注意的是:

  1. 在一次数据传输里,重写的 decode 方法在缓冲区有数据时,可能会被调用多次,直到 ByteBuf 中没有可读的字节为止。当 decode 方法的 List out 非空时,其内容会被传递到下一个 ChannelInboundHadnler ,即后续的 ChannelInboundHadnler 也可能会被调用多次。

MessageToByteEncoder 源码解读:

  在出站时,需要对象转字节,会调用 MessageToByteEncoder 的 write 方法,方法中会判断出站类型是否与实现 MessageToByteEncoder 时的类型一致,如果是,就调用重写的 encode 方法,如果不是就跳过直接输出出站字节码,因此,我们在编写对应的 Encoder 时,需要传入的数据类型和处理的数据类型一致。

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter 

	@Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {	//判断当前 msg 类型是否与实现 MessageToByteEncoder 时的类型一致,如果是,就调用重写的 encode 方法,如果不是就跳过直接输出出站字节码
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

其他解码器与解码器

  解码器与解码器相关的类应是成对出现的。

ReplayingDecoder:

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder 扩展了 ByteToMessageDecoder 类,使用这个类,我们不必调用 readableBytes() 方法,不需要判断数据是否足够读取,内部会进行处理判断。
  • 1
  • 2

LineBasedFrameDecoder

这个类在 Netty 内部也有使用,它使用行尾控制字符(\n 或者\r\n)作为分隔符来解析数据。
  • 1

DelimiterBasedFrameDecoder

使用自定义的特殊字符作为消息的分隔符。
  • 1

HttpObjectDecoder

一个 HTTP 数据的解码器
  • 1

LengthFieldBasedFrameDecoder

通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。
  • 1

ZlibDecoder

压缩传送数据的解码器
  • 1

启动过程分析

NioEventLoopGroup 源码追踪

   NioEventLoopGroup 是整个 Netty 的核心对象,用于处理 Netty 服务端或客户端的 IO 事件。

  追踪 NioEventLoopGroup, 构造器方法 MultithreadEventExecutorGroup 才是 NioEventLoopGroup 真正的构造方法, 这里可以看成是一个模板方法,使用了模板模式。

 private final EventExecutor[] children;
 
@param nThreads 使用的线程数,默认为 core *2 [可以追踪源码]
@param executor 执行器:如果传入 null,则采用 Netty 默认的线程工厂和默认的执行器 ThreadPerTaskExecutor
@param chooserFactory 单例 new DefaultEventExecutorChooserFactory()
@param args args 在创建执行器的时候传入固定参数
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
//如果传入的执行器是空的则采用默认的线程工厂和默认的执行器
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
//创建指定线程数的执行器数组(单例线程池数组)
        children = new EventExecutor[nThreads];
//初始化线程数组
        for (int i = 0; i < nThreads; i ++) {	
            boolean success = false;
            try {
//创建 new NioEventLoop
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
// 如果创建过程出现失败,优雅关闭
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
//为每一个单例线程池添加一个关闭监听器
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
//将所有的单例线程池添加到一个 HashSet 中。
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

NioEventLoopGroup 执行流程总揽:

1) 如果 executor 是 null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂。
2) 根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。
3) 循环填充数组中的元素。如果异常,则关闭所有的单例线程池。
4) 根据线程选择工厂创建一个 线程选择器。
5) 为每一个单例线程池添加一个关闭监听器。
6) 将所有的单例线程池添加到一个 HashSet 中。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

ServerBootstrap

  创建服务器端的启动对象,配置相关参数

			ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来进行设置
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用 NioServerSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道测试对象(匿名对象)
                        //给 pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //向 pipelline 的最后加入一个处理器
                            ch.pipeline().addLast(new NettyServerTaskQueueHandler());
                        }
                    }); // 给我们的 workerGroup 的 EventLoop 对应的管道设置处理器

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

ServerBootstrap 执行流程总揽:

1) 链式调用:group 方法,将 boss 和 worker 传入,boss 赋值给 parentGroup 属性,worker 赋值给 childGroup 属性
2) channel 方法传入 NioServerSocketChannel class 对象。会根据这个 class 创建 channel 对象。
3) option 方法传入 TCP 参数,放在一个 LinkedHashMap 中。
4) handler 方法传入一个 handler 中,这个 hanlder 只专属于 ServerSocketChannel 而不是 SocketChannel
5) childHandler 传入一个 hanlder ,这个 handler 将会在每个客户端连接的时候调用。供 SocketChannel 使用
  • 1
  • 2
  • 3
  • 4
  • 5

serverBootstrap.bind(port) 端口绑定

  核心的两个方法 initAndRegister 和 doBind0。initAndRegister 主要完成创建 NioServerSocketChannel 并与 EventLoopGroup 相互关联,完成 pipeline 与 handler 的关联;doBind0 将 JDK 提供的 channel 和端口进行绑定,最后开始监听连接事件。至此,启动流程结束。

serverBootstrap.bind(port) ==》doBind() ==》initAndRegister()


 	private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
       	...
       	...
       	...
       	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

initAndRegister() ==》 channelFactory.newChannel() ==》init(channel)

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
       	...
       	...
       	...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

执行流程:

1)  initAndRegister() 初始化 NioServerSocketChannel 通道并注册各个 handler,返回一个 future
2) 通过 ServerBootstrap 的通道工厂反射创建一个 NioServerSocketChannel。
3) init 初始化这个 NioServerSocketChannel。
4) config().group().register(channel) 通过 ServerBootstrap 的 bossGroup 注册 NioServerSocketChannel。
5) 最后,返回这个异步执行的占位符,即 regFuture。
  • 1
  • 2
  • 3
  • 4
  • 5

接受请求的过程

   服务器启动完成后,注册了一个 Accept 事件等待客户端的连接,NioServerSocketChannel 将自己注册到了 boss 单例线程池(reactor 线程)上,也就是 EventLoop 。
在这里插入图片描述
   EventLoop 循环中,主要处理以下三件事:

  1. 有条件的等待 Nio 事件。-- select
  2. 处理 Nio 事件。 – processSelectedKeys
  3. 处理消息队列中的任务 – runAllTasks

(p87-91粘包拆包跳过)

p95 接受请求。。。
  

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/146815
推荐阅读
相关标签
  

闽ICP备14008679号