赞
踩
Netty 是一个 异步、基于事件驱动 的网络应用框架,用作快速开发高性能、高可用性的网络 I/O 应用。它的出现,是为了降低直接使用 JDK NIO 编码的复杂度,如:
Netty 的主从 Reactor 多线程、内存零拷贝、内存池设计等特点,使得 Netty 的综合性能较高,使得它成为众多开源框架的底层通讯架构,如 ElasticSearch、Dubbo、Hadoop 中的多个组件。
一个客户端连接请求,使用服务器一个线程
带来的问题:
Reactor 模式中的两个核心成员:
服务器端用一个线程,通过多路复用搞定所有的 IO 操作(包括连接,读、写等),编码简单、没有线程切换的性能消耗,但是如果客户端连接数量较多,或 Handler 执行耗时任务时,将出现卡顿情况。
使用场景:客户端的数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度为 O(1) 的情况
优点:可以充分的利用多核 cpu 的处理能力
缺点:多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,是单线程运行状态, 在高并发场景容易出现性能瓶颈。
图解:
Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写 ,程序中的线程数默认为实际 cpu 核数 * 2,也可在初始化 NioEventLoopGroup 时传入指定的线程数
NioEventLoopGroup 组中含有多个 NioEventLoop 事件循环 ,执行如图所示的3步工作,默认轮询的方式去处理多个事件
每个 NioEventLoop 包含有一个 Selector,一个 taskQueue ;每个 Selector 上可以注册监听多个 NioChannel
在 WorkerGroup 的每个 NIOEventLoop 处理业务时,会使用 pipeline(管道), pipeline 中包含了 channel(频道、通道) , 即通过 pipeline 可以获取到对应通道, 管道中维护了很多的 handler 处理器。
两者分别是 netty 客户端、服务端的启动引导类;其中常见的方法如下:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup):用于服务器端设置主从 EventLoop
public B group(EventLoopGroup group) :用于客户端设置一个 EventLoop
public B channel(Class<? extends C> channelClass):用来设置通道实现,如服务器端的 NioServerSocketChannel ,客户端的 NioSocketChannel
public < T > B option(ChannelOption< T > option, T value):用来给 parentGroup 添加配置
public < T > ServerBootstrap childOption(ChannelOption< T > childOption, T value),用来给 childGroup 通道添加配置
public ServerBootstrap childHandler(ChannelHandler childHandler):用来设置业务处理类(自定义的 handler )
public ChannelFuture bind(int inetPort) :用于服务器端设置占用的端口号,注:当绑定的端口提供的是 Http 相关的服务,需要注意有 默认非安全端口 的限制,即服务正常,但部分浏览器会限制访问,更多请自行百度
public ChannelFuture connect(String inetHost, int inetPort) :用于客户端连接服务器端
Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件
当向一个 Selector 中注册 Channel 后,Selector 不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),然后驱动事件执行
ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
ChannelPipeline 提供了 ChannelHandler 链的容器,何时是入站,何时是出站???
ChannelHandler 相关类图:
ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的链。(ChannelPipeline 是 保存 ChannelHandler 的 List,用于处理或拦截Channel 的入站事件和出站操作)
ChannelPipeline 实现了拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互
每个 Channel 有且仅有一个 ChannelPipeline 与之对应,两者可以通过一个对象获取到另外一个对象;链表中 header 和 tail 节点为 DefaultChannelPipeline(是接口 ChannelHandlerContext 的子类) :
ChannelPipeline addFirst(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的第一个位置
ChannelPipeline addLast(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的最后一个位置
Netty 中所有的 IO 操作都是异步的
通过 isDone 方法来判断当前操作是否完成;
通过 isSuccess 方法来判断已完成的当前操作是否成功;
通过 getCause 方法来获取已完成的当前操作失败的原因;
通过 isCancelled 方法来判断已完成的当前操作是否被取消;
通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果
Future 对象已完成,则通知指定的监听器
Channel channel(),返回当前正在进行 IO 操作的通道
ChannelFuture sync(),等待异步操作执行完毕
/**
* byte[] buff //buff即内部用于缓存的数组。
* position //当前读取的位置。
* mark //为某一读过的位置做标记,便于某些时候回退到该位置。
* capacity //初始化时候的容量。
* limit //当写数据到buffer中时,limit一般和capacity相等,当读数据时,limit代表buffer中有效数据的长度。
* 0 - mark - position - limit - capacity
*/
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[10]);
/**
* 该对象包含一个数组 arr , 是一个 byte[10]。在 netty 的 ByteBuf 中,不需要使用 flip 进行反转
* 底层维护了 readerindex 和 writerIndex,通过 readerindex 和 writerIndex 和 capacity, 将 buffer 分成三个区域
* 0---readerindex 已经读取的区域 ;readerindex---writerIndex , 可读的区域 ; writerIndex -- capacity, 可写的区域
*/
ByteBuf byteBuf = Unpooled.buffer(10);
心跳机制是客户端和服务端在 TCP 三次握手进入 ESTABLISH 状态后,通过发送一个最简单的包来保持连接的存活,或监控另一边服务的可用性等。
基于运输层 TCP 的 keepalive 机制,由具体的 TCP 协议栈来实现长连接的维持。如在netty 中可以在创建 channel 的时候,指定 SO_KEEPALIVE 参数来实现:
存在的问题:
Netty 只能控制 SO_KEEPALIVE 是否开启,其他参数,则需要从系统中读取,其中比较关键的是 tcp_keepalive_time,发送心跳包检测的时间间隔,默认为每2小时检测一次。如果客户端在这2小时内断开了,那么服务端也要维护这个连接2小时,浪费服务端资源;另外就是对于需要实时传输数据的场景,客户端断开了,服务端也要2小时后才能发现。服务端发送心跳检测,具体可能出现的情况如下:
连接正常:客户端仍然存在,网络连接状况良好。此时客户端会返回一个 ACK 。 服务端接收到ACK后重置计时器,在2小时后再发送探测。如果2小时内连接上有数据传输,那么在该时间基础上向后推延2个小时;
连接断开:客户端异常关闭,或是网络断开。在这两种情况下,客户端都不会响应服务器,服务器在一定时间(默认为 1000 ms )后重复发送 keep-alive packet ,并且重复发送一定次数。
客户端曾经崩溃,但已经重启:这种情况下,服务器将会收到对其存活探测的响应,但该响应是一个复位,从而引起服务器对连接的终止。
IdleStateHandler 是 netty 提供的处理空闲状态的处理器
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit)
long readerIdleTime : 表示在 tcp 连接建立后 ,多长时间没有读操作, 就会发送一个心跳检测包到客户端检测连接是否存活。0表示不检测
long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
当 IdleStateEvent 触发后 , 就会传递给 管道 的下一个 handler 去处理,通过调用(触发)下一个 handler 的 userEventTiggered 方法, 去完成对应的 IdleStateEvent(读空闲,写空闲,读写空闲)操作
遗留问题:
当 Netty 发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码,从字节转换为另一种
格式(比如 java 对象);出站消息被编码成字节。
Netty 提供的编解码器,他们都实现了 ChannelInboundHadnler 或 ChannelOutboundHandler 接口,并将已经编解码的对象转发给 ChannelPipeline 中的下一个 Handler。
public class ByteBufDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
if (in.readableBytes() >= 8) {
out.add(in.readInt());
}
}
}
tcp 协议下,数据被分段传输过来,有可能出现粘包拆包的问题。ByteToMessageDecoder 类会对入站数据进行缓冲,直到有足够的数据被处理。
需要注意的是:
在出站时,需要对象转字节,会调用 MessageToByteEncoder 的 write 方法,方法中会判断出站类型是否与实现 MessageToByteEncoder 时的类型一致,如果是,就调用重写的 encode 方法,如果不是就跳过直接输出出站字节码,因此,我们在编写对应的 Encoder 时,需要传入的数据类型和处理的数据类型一致。
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) { //判断当前 msg 类型是否与实现 MessageToByteEncoder 时的类型一致,如果是,就调用重写的 encode 方法,如果不是就跳过直接输出出站字节码 @SuppressWarnings("unchecked") I cast = (I) msg; buf = allocateBuffer(ctx, cast, preferDirect); try { encode(ctx, cast, buf); } finally { ReferenceCountUtil.release(cast); } if (buf.isReadable()) { ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { buf.release(); } } }
解码器与解码器相关的类应是成对出现的。
ReplayingDecoder:
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder 扩展了 ByteToMessageDecoder 类,使用这个类,我们不必调用 readableBytes() 方法,不需要判断数据是否足够读取,内部会进行处理判断。
LineBasedFrameDecoder:
这个类在 Netty 内部也有使用,它使用行尾控制字符(\n 或者\r\n)作为分隔符来解析数据。
DelimiterBasedFrameDecoder:
使用自定义的特殊字符作为消息的分隔符。
HttpObjectDecoder
一个 HTTP 数据的解码器
LengthFieldBasedFrameDecoder:
通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。
ZlibDecoder:
压缩传送数据的解码器
NioEventLoopGroup 是整个 Netty 的核心对象,用于处理 Netty 服务端或客户端的 IO 事件。
追踪 NioEventLoopGroup, 构造器方法 MultithreadEventExecutorGroup 才是 NioEventLoopGroup 真正的构造方法, 这里可以看成是一个模板方法,使用了模板模式。
private final EventExecutor[] children; @param nThreads 使用的线程数,默认为 core *2 [可以追踪源码] @param executor 执行器:如果传入 null,则采用 Netty 默认的线程工厂和默认的执行器 ThreadPerTaskExecutor @param chooserFactory 单例 new DefaultEventExecutorChooserFactory() @param args args 在创建执行器的时候传入固定参数 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } //如果传入的执行器是空的则采用默认的线程工厂和默认的执行器 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //创建指定线程数的执行器数组(单例线程池数组) children = new EventExecutor[nThreads]; //初始化线程数组 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //创建 new NioEventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { // 如果创建过程出现失败,优雅关闭 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; //为每一个单例线程池添加一个关闭监听器 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } //将所有的单例线程池添加到一个 HashSet 中。 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
NioEventLoopGroup 执行流程总揽:
1) 如果 executor 是 null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂。
2) 根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。
3) 循环填充数组中的元素。如果异常,则关闭所有的单例线程池。
4) 根据线程选择工厂创建一个 线程选择器。
5) 为每一个单例线程池添加一个关闭监听器。
6) 将所有的单例线程池添加到一个 HashSet 中。
创建服务器端的启动对象,配置相关参数
ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来进行设置 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 .channel(NioServerSocketChannel.class) //使用 NioServerSocketChannel 作为服务器的通道实现 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道测试对象(匿名对象) //给 pipeline 设置处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { //向 pipelline 的最后加入一个处理器 ch.pipeline().addLast(new NettyServerTaskQueueHandler()); } }); // 给我们的 workerGroup 的 EventLoop 对应的管道设置处理器
ServerBootstrap 执行流程总揽:
1) 链式调用:group 方法,将 boss 和 worker 传入,boss 赋值给 parentGroup 属性,worker 赋值给 childGroup 属性
2) channel 方法传入 NioServerSocketChannel class 对象。会根据这个 class 创建 channel 对象。
3) option 方法传入 TCP 参数,放在一个 LinkedHashMap 中。
4) handler 方法传入一个 handler 中,这个 hanlder 只专属于 ServerSocketChannel 而不是 SocketChannel
5) childHandler 传入一个 hanlder ,这个 handler 将会在每个客户端连接的时候调用。供 SocketChannel 使用
核心的两个方法 initAndRegister 和 doBind0。initAndRegister 主要完成创建 NioServerSocketChannel 并与 EventLoopGroup 相互关联,完成 pipeline 与 handler 的关联;doBind0 将 JDK 提供的 channel 和端口进行绑定,最后开始监听连接事件。至此,启动流程结束。
serverBootstrap.bind(port) ==》doBind() ==》initAndRegister()
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { ... ... ...
initAndRegister() ==》 channelFactory.newChannel() ==》init(channel)
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); ... ... ...
执行流程:
1) initAndRegister() 初始化 NioServerSocketChannel 通道并注册各个 handler,返回一个 future
2) 通过 ServerBootstrap 的通道工厂反射创建一个 NioServerSocketChannel。
3) init 初始化这个 NioServerSocketChannel。
4) config().group().register(channel) 通过 ServerBootstrap 的 bossGroup 注册 NioServerSocketChannel。
5) 最后,返回这个异步执行的占位符,即 regFuture。
服务器启动完成后,注册了一个 Accept 事件等待客户端的连接,NioServerSocketChannel 将自己注册到了 boss 单例线程池(reactor 线程)上,也就是 EventLoop 。
EventLoop 循环中,主要处理以下三件事:
(p87-91粘包拆包跳过)
p95 接受请求。。。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。