赞
踩
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,具有并发高、传输快、封装好的特性。相比于BIO(Blocking I/O,阻塞IO),并发性能会有很大的提高。
由此可见,NIO单线程处理的连接数量会比BIO高很多,原因是由于Selector。
在NIO中,当一个Socket建立好之后,Thread并不会阻塞去接受这个Socket,而是将这个请求交给Selector,Selector会不断遍历所有的Socket,一旦有Socket建立完成,则通知Thread,然后Thread处理完数据后在返回给客户端。这个过程是不阻塞的。
在BIO中,等待客户端发送数据的过程是阻塞的,这就意味着一个线程只能处理一个请求,而机器能支持的最大线程数是有限的,这就限制了BIO的并发性能。
相关名词:
Channel,表示一个连接,可以理解为每一个请求就是一个Channel;
ChannelHandler,核心业务处理,用于处理业务请求;
ChannelHandlerContext,用于传输业务数据;
ChannelPipeline,用于保存处理过程中需要用到的ChannelHandler和ChannelHandlerContext。
创建一个SpringBoot项目,POM文件引入依赖,application.properties配置文件配置端口。
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.65.Final</version>
</dependency>
server.port=8888
主启动类
package com.example.nettyserver; import com.example.nettyserver.netty.NettyServer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class NettyServerApplication { public static void main(String[] args) { SpringApplication.run(NettyServerApplication.class, args); // 启动服务端 NettyServer nettyServer = new NettyServer(); nettyServer.start(); } }
Netty服务启动
package com.example.nettyserver.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; public class NettyServer { public void start() { // 主线程组,用于接收请求 NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); // 工作线程组,用于处理数据 NioEventLoopGroup workerGroup = new NioEventLoopGroup(8); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // 设置队列大小 .option(ChannelOption.SO_BACKLOG, 128) // 两小时内没有数据通信时,TCP会自动发送一个活动探测报文 .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 添加编解码 pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); // 添加业务处理器 pipeline.addLast(new NettyServerHandler()); } }); // 绑定端口,接收连接 System.out.println(System.currentTimeMillis() + " Start data channel"); ChannelFuture future = serverBootstrap.bind("127.0.0.1", 8888).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
Netty服务端处理器
package com.example.nettyserver.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 客户端连接时触发 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); System.out.println(System.currentTimeMillis() + " Server channel active..."); } /** * 客户端发消息时触发 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(System.currentTimeMillis() + " Server收到消息--->" + msg.toString()); ctx.writeAndFlush("server say Hello"); } /** * 发生异常时触发 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
创建一个SpringBoot项目,POM文件引入依赖,application.properties配置文件配置端口。
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.65.Final</version>
</dependency>
server.port=8889
主启动类
package com.example.nettyclient; import com.example.nettyclient.netty.NettyClient; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class NettyClientApplication { public static void main(String[] args) { SpringApplication.run(NettyClientApplication.class, args); NettyClient nettyClient = new NettyClient(); nettyClient.start(); } }
Netty客户端
package com.example.nettyclient.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class NettyClient { public void start() { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new NettyClientHandler()); } }); ChannelFuture future = bootstrap.connect("127.0.0.1", 8888).sync(); System.out.println(System.currentTimeMillis() + " Client connect success..."); // 发送消息 future.channel().writeAndFlush("Client say Hello"); // 等待连接被关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
客户端处理器
package com.example.nettyclient.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); System.out.println(System.currentTimeMillis() + " Client channel active..."); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(System.currentTimeMillis() + " Client收到消息--->" + msg.toString()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
服务端:
客户端:
bossGroup负责接收请求,workerGroup负责处理数据。
服务端:
ServerBootStrap.option用于监听客户端连接服务器的套接字(服务器通道)
ServerBootStrap.childOption用于监听服务器接收连接后的套接字(每个客户端通道)
客户端:
BootStrap只会有option,没有childOption
ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。
ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。
Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关,Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
主线程执行到这里就 wait 子线程结束,子线程才是真正监听和接受请求的,closeFuture()是开启了一个channel的监听器,负责监听channel是否关闭的状态,如果监听到channel关闭了,子线程才会释放,syncUninterruptibly()让主线程同步等待子线程结果。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。