赞
踩
从很久以前就接触到netty,也在几个项目中使用netty进行网络通讯对接,包括对接车联网设备以及对接安防硬件设备,因此也一直有想法系统地学习netty相关的框架以及实现的原理等,包括前期对零拷贝技术的学习,也是在为学习netty做准备。因此本文,在学习线上相关技术资料、整合以前使用的案例的基础上整理出来,在此进行记录,为以后的深入学习做准备,也为后来者提供参考借鉴,文中不免疏漏之处,望读者予以指正,不胜感激!
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
特点
(1)一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持
(2)使用更高效的socket底层,对epoll空轮询引起的cpu占用飙升在内部进行了处理,避免了直接使用NIO的陷阱,简化了NIO的处理方式。
(3)采用多种decoder/encoder 支持,对TCP粘包/分包进行自动化处理
(4)可使用接受/处理线程池,提高连接效率,对重连、心跳检测的简单支持
(5)可配置IO线程数、TCP参数, TCP接收和发送缓冲区使用直接内存代替堆内存,通过内存池的方式循环利用ByteBuf
(6)通过引用计数器及时申请释放不再引用的对象,降低了GC频率
(7)使用单线程串行化的方式,高效的Reactor线程模型
(8)大量使用了volitale、使用了CAS和原子类、线程安全类的使用、读写锁的使用
性能
(1)更高的吞吐量,更低的延迟
(2)减少资源消耗
(3)最小化不必要的内存复制
安全
完整的SSL / TLS和StartTLS支持
Netty 作为异步事件驱动的网络,高性能之处主要来自于其 I/O 模型和线程处理模型,前者决定如何收发数据,后者决定如何处理数据。
NIO概念
NIO:同步非阻塞IO,服务器实现模式是一个线程处理多个请求,客户端发送的链接请求都会注册到多路复用器上,多路复用器轮询到链接有IO请求就进行处理。
NIO核心组成部分
NIO主要有三大核心部分 :Channel(通道),Buffer(缓冲区),Selector(选择器)。
NIO基于Channel和Buffer进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector用于监听多个通道的事件(比如 :连接打开,数据到达)。因此使用单个线程就可以监听多个数据管道。
NIO中Selector、Channel、Buffer的关系:
1)每个channel都会对应一个buffer。
2)一个selector对应一个线程,一个线程对应多个channel。
3)程序切换到哪个channel是由Event(事件)决定的。
4)selector会根据不同的事件,在各个通道上进行切换。
5)buffer是一个内存块,底层有一个数组。
6)数据的读取和写入是通过buffer,buffer可以切换读写,通过flip方法,但是BIO是单向输出,要么是输入流,要么是输出流。
7)channel是双向的,可以返回底层操作系统的情况,比如linux,底层的操作系统通道就是双向的。
Selector底层实现的三种方式
Linux中支持IO多路复用的有select poll epoll ,最终还是选择了epoll
epoll好处:
1)支持一个进程打开的fd不受限制(当然小于OS的最大支持句柄)
select最大缺陷:单进程打开的进程FD有限制,默认1024,对于要支持万个TCP的服务器来说太少。
cat /proc/sys/fs/file -max 看最大句柄,1G内存机器约10W句柄
2)IO不随FD数目增加线性下降
传统select/poll:socket集合很大,由于网络延时、链路空闲,少部分socket活跃,select/poll 线性扫描,效率线性下降。
Epoll:只对活跃的socket操作,epoll根据fd上的callback函数实现,只有活跃的socket才主动调用callback,idle的socket不会。
当活跃socket多时,select/poll效率比epoll更高,当活跃socket少时,epoll效率更高
3)使用mmap加速内核与用户空间的消息。
epoll通过内核和用户空间mmap同一块内存来实现
4)epoll Api比较简单
Netty线程模型是典型的 Reactor 模型结构。
Reactor线程模型
常用的 Reactor 线程模型有三种,分别为:Reactor 单线程模型、Reactor 多线程模型和主从 Reactor 多线程模型。
1)Reactor 单线程模型
Reactor 单线程模型指的是所有的 IO 操作都在同一个 NIO 线程上面完成。作为 NIO 服务端接收客户端的 TCP 连接,作为 NIO 客户端向服务端发起 TCP 连接,读取通信对端的请求或向通信对端发送消息请求或者应答消息。
由于 Reactor 模式使用的是异步非阻塞 IO,所有的 IO 操作都不会导致阻塞,理论上一个线程可以独立处理所有 IO 相关的操作。
2)Reactor 多线程模型
对于一些小容量应用场景,可以使用单线程模型,但是对于高负载、大并发的应用却不合适,需要对该模型进行改进,演进为 Reactor 多线程模型。
Rector 多线程模型与单线程模型最大的区别就是有一组 NIO 线程处理 IO 操作。
在该模型中有专门一个 NIO 线程 -Acceptor 线程用于监听服务端,接收客户端的 TCP 连接请求;而 1 个 NIO 线程可以同时处理N条链路,但是 1 个链路只对应 1 个 NIO 线程,防止发生并发操作问题。
网络 IO 操作-读、写等由一个 NIO 线程池负责,线程池可以采用标准的 JDK 线程池实现,它包含一个任务队列和 N 个可用的线程,由这些 NIO 线程负责消息的读取、解码、编码和发送。
3)主从 Reactor 多线程模型
在并发极高的情况单独一个 Acceptor 线程可能会存在性能不足问题,为了解决性能问题,产生主从 Reactor 多线程模型。
主从 Reactor 线程模型的特点是:服务端用于接收客户端连接的不再是 1 个单独的 NIO 线程,而是一个独立的 NIO 线程池。
Acceptor 接收到客户端 TCP 连接请求处理完成后,将新创建的 SocketChannel 注册到 IO 线程池(sub reactor 线程池)的某个 IO 线程上,由它负责 SocketChannel 的读写和编解码工作。
Acceptor 线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端 subReactor 线程池的 IO 线程上,由 IO 线程负责后续的 IO 操作。
Netty 线程模型
Netty线程模型是基于主从reactor多线程模式的,并在此基础上做了一定程度上的优化:
1)BossGroup线程池维护主Selector , 只关注accecpt
2)当接收到accept事件后,获得对应的SocketChannel,封装成NioSocketChannel 注册到Worker线程循环
3)worker线程监听到自己感兴趣的时间后,交由handler处理
Netty Reactor线程执行过程
1、netty抽象出两个线程池:BossGroup负责监听和建立连接 ;WorkerGroup 负责网络IO的读写
2、BossGroup 和 WorkerGroup 类型都是NioEventLoopGroup , 相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环都是NioEventLoop
3、NioEventLoop表示一个selector , 用户监听绑定在其上的socket网络通讯
4、每一个Boos NioEventLoop循环执行3步:
a、轮询accept事件
b、建立连接,生成NioSocketChannel,并注册到workerGroup上
c、处理任务队列中的任务,即RunAllTasks
5、每个Worker NioEventLoop循环执行3步:
a、轮询读写时间
b、处理IO时间,在对应的NioSocketChannel上处理
c、处理任务队列的任务,即RunAllTasks
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.28.Final</version>
</dependency>
// netty server类 @Component public class NettyServer { @Value("${netty-port}") private int port; public void start() throws InterruptedException { /** * 创建两个线程组 bossGroup 和 workerGroup * bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成 * 两个都是无线循环 */ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //设置两个线程组 bootstrap.group(bossGroup, workerGroup) //使用NioServerSocketChannel 作为服务器的通道实现 .channel(NioServerSocketChannel.class) //设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) //设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去 .childOption(ChannelOption.TCP_NODELAY, true) //可以给 bossGroup 加个日志处理器 .handler(new LoggingHandler(LogLevel.INFO)) //给workerGroup 的 EventLoop 对应的管道设置处理器 .childHandler(new ChannelInitializer<SocketChannel>() { //给pipeline 设置处理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器 pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器 pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2)); pipeline.addLast(new NettyServerHandler()); } }); //启动服务器并绑定一个端口并且同步生成一个 ChannelFuture 对象 ChannelFuture cf = bootstrap.bind(port).sync(); if (cf.isSuccess()) { System.out.println("socket server start---------------"); } //对关闭通道进行监听 cf.channel().closeFuture().sync(); } finally { //发送异常关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } // handler类 public class NettyServerHandler extends SimpleChannelInboundHandler<Object> { private static final Logger log = LoggerFactory.getLogger(NettyServerHandler.class); protected void channelRead0(ChannelHandlerContext context, Object obj) throws Exception { log.info(">>>>>>>>>>>服务端接收到客户端的消息:{}",obj); SocketChannel socketChannel = (SocketChannel) context.channel(); /** * 服务器返回客户端消息 */ Map map = new HashMap(); map.put("msg","我是服务端,收到你的消息了"); socketChannel.writeAndFlush(JSON.toJSONString(map)); ReferenceCountUtil.release(obj); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } // springboot 集成启动 netty server,同时不影响tomcat接口 @Component public class NettyBoot implements CommandLineRunner { @Autowired private NettyServer nettyServer; public void run(String... args) throws Exception { try { nettyServer.start(); } catch (Exception e) { e.printStackTrace(); } } }
// netty client客户端 @Component public class NettyClient { private int port = 9999; private String host = "localhost"; public Channel channel; public void start() { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .remoteAddress(host, port) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器 pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器 pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2)); pipeline.addLast(new NettyClientHandler()); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); if (future.isSuccess()) { channel = future.channel(); System.out.println("connect server 成功---------"); } // 给关闭通道进行监听 future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } } public void sendMsg(String msg) { this.channel.writeAndFlush(msg); } } // handler处理类 public class NettyClientHandler extends SimpleChannelInboundHandler<Object> { private static final Logger log = LoggerFactory.getLogger(NettyClientHandler.class); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>连接"); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>退出"); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { log.info(">>>>>>>>>>>>>userEventTriggered:{}", evt); } /** * 客户端接收到服务端发的数据 * @param channelHandlerContext * @param obj * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) { log.info(">>>>>>>>>>>>>客户端接收到消息:{}", obj); ReferenceCountUtil.release(obj); } /** * socket通道处于活动状态 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>>>socket建立了"); super.channelActive(ctx); } /** * socket通道不活动了 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>>>socket关闭了"); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } // springboot集成,同时不影响tomact接口 @Component public class NettyBoot implements CommandLineRunner { @Autowired private NettyClient nettyClient; public void run(String... args) throws Exception { nettyClient.start(); } }
https://netty.io/
https://www.cnblogs.com/telwanggs/p/12119697.html
https://blog.csdn.net/lmdsoft/article/details/105618052
https://www.infoq.cn/article/netty-threading-model
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。