赞
踩
最近写了很多关于Netty应用级别的文章,针对为什么选择Netty来实现高并发通讯,Netty实现高并发通讯的原理是什么?今天有时间把我对Netty的一些理解做个简单的说明,如有不对欢迎指正与探讨。
上图是我对Netty实现高并发通讯工作机制的一个简单理解,当我们的智能终端接入到网关程序后,首先是由一个线程池进行进行绑定操作,我们叫它(parentGroup),绑定之后将后续的事情交给另外一个线程池进行后续的处理,我们叫它(childGroup)
- parentGroup 负责处理客户端的TCP连接请求,如果服务端只有一个端口需要监听,建议将其线程数设置为1,相当于Reactor模型中的Acceptor线程池。
-
- childGroup 负责I/O的读写操作,通过 ServerBootstrap 的 group 方法进行设置,用于后续产生的 Channel 绑定,相当于Reactor模型中的NIO线程池。可以不设置线程数,默认为当前服务器的处理器核心数*2。
这里会有人疑问,并没有提及Netty如何实现高并发通讯机制,接触过Netty的都清楚,Netty是NIO框架,使用到Selector组件进行实现多路复用,其实通过上图workGroup一个EventLoop管理多个Channel,其实就是采用的Selector来实现的。
另外Netty采用的是零拷贝ByteBuf,可以减少内存复制,从而减少GC,提升效率,并且其相应的read与write操作也是非常好用的。
- package com.gateway.xxxx.netty;
-
- import com.gateway.xxxx.constants.Constant;
- import com.gateway.xxxx.netty.codec.FrameDecoder;
- import com.gateway.xxxx.netty.codec.ProtocolDecoder;
- import com.gateway.xxxx.netty.codec.ProtocolEncoder;
- import com.gateway.xxxx.netty.handler.BusinessHandler;
- import com.gateway.xxxx.netty.handler.LoginHandler;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.timeout.IdleStateHandler;
- import io.netty.util.concurrent.EventExecutorGroup;
- import lombok.extern.slf4j.Slf4j;
-
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
-
- /**
- * TCP服务类
- * @author Mr.xxx
- * @date 20210101
- */
- @Slf4j
- public class TcpServer extends Thread{
- private int port;
-
- private EventLoopGroup bossGroup;
-
- private EventLoopGroup workerGroup;
-
- private ServerBootstrap serverBootstrap = new ServerBootstrap();
- //这里引入计数器的意思是如果开启多个TCP服务,可以通过计数器来判断是否所有的tcp服务都已经正常启动
- private CountDownLatch countDownLatch;
-
- public TcpServer(int port, CountDownLatch countDownLatch) {
- this.port = port;
- this.countDownLatch = countDownLatch;
-
- bossGroup = new NioEventLoopGroup(1);
- workerGroup = new NioEventLoopGroup();
- //启动一个EventExecutorGroup来处理比较耗时的业务
- final EventExecutorGroup executorGroup = new DefaultEventExecutorGroup(Math.max(threadPoolSize, Runtime.getRuntime().availableProcessors() * 2));
- serverBootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .childOption(ChannelOption.TCP_NODELAY, true)
- .childHandler(new ChannelInitializer<SocketChannel>() {
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new IdleStateHandler(HcIotConstant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
- ch.pipeline().addLast(new FrameDecoder());
- //添加一个接收数据包预处理业务,和统一的指令下发封装业务
- ch.pipeline().addLast(ProtocolDecoder.INSTANCE, new ProtocolEncoder());
- //添加一个登录鉴权
- ch.pipeline().addLast(LoginHandler.INSTANCE);
- //添加一个业务处理(业务处理比较耗时,这里可以使用EventExecutorGroup来处理比较耗时的业务)
- ch.pipeline().addLast(executorGroup,BusinessHandler.INSTANCE);
- }
- });
- }
-
- @Override
- public void run() {
- bind();
- }
-
- /**
- * 绑定端口启动服务
- */
- private void bind() {
- serverBootstrap.bind(port).addListener(future -> {
- if (future.isSuccess()) {
- log.info("{} TCP服务器启动,端口:{}", protocolType, port);
- countDownLatch.countDown();
- } else {
- log.error("{} TCP服务器启动失败,端口:{}", protocolType, port, future.cause());
- System.exit(-1);
- }
- });
- }
-
- /**
- * 关闭服务端
- */
- public void shutdown() {
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- log.info("{} TCP服务器关闭,端口:{}", protocolType, port);
- }
- }
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。