当前位置:   article > 正文

Netty 实现高并发通讯原理理解_netty处理请求是并发还是串联

netty处理请求是并发还是串联

1.前言

 

        最近写了很多关于Netty应用级别的文章,针对为什么选择Netty来实现高并发通讯,Netty实现高并发通讯的原理是什么?今天有时间把我对Netty的一些理解做个简单的说明,如有不对欢迎指正与探讨。

2.Netty工作原理

         上图是我对Netty实现高并发通讯工作机制的一个简单理解,当我们的智能终端接入到网关程序后,首先是由一个线程池进行进行绑定操作,我们叫它(parentGroup),绑定之后将后续的事情交给另外一个线程池进行后续的处理,我们叫它(childGroup)

  1. parentGroup 负责处理客户端的TCP连接请求,如果服务端只有一个端口需要监听,建议将其线程数设置为1,相当于Reactor模型中的Acceptor线程池。
  2. childGroup 负责I/O的读写操作,通过 ServerBootstrap 的 group 方法进行设置,用于后续产生的 Channel 绑定,相当于Reactor模型中的NIO线程池。可以不设置线程数,默认为当前服务器的处理器核心数*2

        这里会有人疑问,并没有提及Netty如何实现高并发通讯机制,接触过Netty的都清楚,Netty是NIO框架,使用到Selector组件进行实现多路复用,其实通过上图workGroup一个EventLoop管理多个Channel,其实就是采用的Selector来实现的。

        另外Netty采用的是零拷贝ByteBuf,可以减少内存复制,从而减少GC,提升效率,并且其相应的read与write操作也是非常好用的。

3.实现代码解析

  1. package com.gateway.xxxx.netty;
  2. import com.gateway.xxxx.constants.Constant;
  3. import com.gateway.xxxx.netty.codec.FrameDecoder;
  4. import com.gateway.xxxx.netty.codec.ProtocolDecoder;
  5. import com.gateway.xxxx.netty.codec.ProtocolEncoder;
  6. import com.gateway.xxxx.netty.handler.BusinessHandler;
  7. import com.gateway.xxxx.netty.handler.LoginHandler;
  8. import io.netty.bootstrap.ServerBootstrap;
  9. import io.netty.channel.ChannelInitializer;
  10. import io.netty.channel.ChannelOption;
  11. import io.netty.channel.EventLoopGroup;
  12. import io.netty.channel.nio.NioEventLoopGroup;
  13. import io.netty.channel.socket.SocketChannel;
  14. import io.netty.channel.socket.nio.NioServerSocketChannel;
  15. import io.netty.handler.timeout.IdleStateHandler;
  16. import io.netty.util.concurrent.EventExecutorGroup;
  17. import lombok.extern.slf4j.Slf4j;
  18. import java.util.concurrent.CountDownLatch;
  19. import java.util.concurrent.TimeUnit;
  20. /**
  21. * TCP服务类
  22. * @author Mr.xxx
  23. * @date 20210101
  24. */
  25. @Slf4j
  26. public class TcpServer extends Thread{
  27. private int port;
  28. private EventLoopGroup bossGroup;
  29. private EventLoopGroup workerGroup;
  30. private ServerBootstrap serverBootstrap = new ServerBootstrap();
  31. //这里引入计数器的意思是如果开启多个TCP服务,可以通过计数器来判断是否所有的tcp服务都已经正常启动
  32. private CountDownLatch countDownLatch;
  33. public TcpServer(int port, CountDownLatch countDownLatch) {
  34. this.port = port;
  35. this.countDownLatch = countDownLatch;
  36. bossGroup = new NioEventLoopGroup(1);
  37. workerGroup = new NioEventLoopGroup();
  38. //启动一个EventExecutorGroup来处理比较耗时的业务
  39. final EventExecutorGroup executorGroup = new DefaultEventExecutorGroup(Math.max(threadPoolSize, Runtime.getRuntime().availableProcessors() * 2));
  40. serverBootstrap.group(bossGroup, workerGroup)
  41. .channel(NioServerSocketChannel.class)
  42. .option(ChannelOption.SO_BACKLOG, 1024)
  43. .childOption(ChannelOption.SO_KEEPALIVE, true)
  44. .childOption(ChannelOption.TCP_NODELAY, true)
  45. .childHandler(new ChannelInitializer<SocketChannel>() {
  46. @Override
  47. protected void initChannel(SocketChannel ch) throws Exception {
  48. ch.pipeline().addLast(new IdleStateHandler(HcIotConstant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
  49. ch.pipeline().addLast(new FrameDecoder());
  50. //添加一个接收数据包预处理业务,和统一的指令下发封装业务
  51. ch.pipeline().addLast(ProtocolDecoder.INSTANCE, new ProtocolEncoder());
  52. //添加一个登录鉴权
  53. ch.pipeline().addLast(LoginHandler.INSTANCE);
  54. //添加一个业务处理(业务处理比较耗时,这里可以使用EventExecutorGroup来处理比较耗时的业务)
  55. ch.pipeline().addLast(executorGroup,BusinessHandler.INSTANCE);
  56. }
  57. });
  58. }
  59. @Override
  60. public void run() {
  61. bind();
  62. }
  63. /**
  64. * 绑定端口启动服务
  65. */
  66. private void bind() {
  67. serverBootstrap.bind(port).addListener(future -> {
  68. if (future.isSuccess()) {
  69. log.info("{} TCP服务器启动,端口:{}", protocolType, port);
  70. countDownLatch.countDown();
  71. } else {
  72. log.error("{} TCP服务器启动失败,端口:{}", protocolType, port, future.cause());
  73. System.exit(-1);
  74. }
  75. });
  76. }
  77. /**
  78. * 关闭服务端
  79. */
  80. public void shutdown() {
  81. workerGroup.shutdownGracefully();
  82. bossGroup.shutdownGracefully();
  83. log.info("{} TCP服务器关闭,端口:{}", protocolType, port);
  84. }
  85. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/201239
推荐阅读
相关标签
  

闽ICP备14008679号