当前位置:   article > 正文

Netty 基本介绍、结构及相关组件_netty数据库组件

netty数据库组件

Netty介绍

netty是一款基于NIO的高性能、异步事件驱动的网络程序框架,它对JDK中的NIO做了封装和优化,提供了更好的性能的同时降低了使用的难度。
它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

Netty把网络编程中使用到的各部分都进行了优化和高性能的封装。

作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty的NIO框架构建,Dubbo。
在这里插入图片描述

Reactor

Netty中的Reactor模型主要由多路复用器(Acceptor)、事件分发器(Dispatcher)、事件处理器(Handler)组成,可以分为三种。
Reactor是异步事件驱动框架,Netty是Reactor模型的一种实现,采用Reactor第三种模型:主从线程模型(单、多、主从)。主从线程模型:一组线程池(boss)接收请求,一组线程池(worker)处理IO。

Netty结构说明

在这里插入图片描述

服务端

1、NioEventLoopGroup是一个线程池循环处理器,它是多线程事件驱动IO操作类。netty服务端有两个NioEventLoopGroup。一个称为boss,负责接收连接请求,线程数量为1个;另外一个称为worker,负责处理具体的 IO操作等,worker中的线程数量为2*CPU。

2、ServerBootstrap是服务端启动的引导类。group方法用于设置EventLoopGroup。

3、通过channel方法,可以指定新连接的Channel类型为NioServerSocketChannel类。

4、构造一系列channelHandler处理链来组成ChannelPipeline。

5、option用来配置一些channel的参数,配置的参数会被ChannelConfig使用。

6、示例中的TestServerHandler可以继承ChannelInboundHandlerAdapter,复写了channelRead()和exceptionCaught()方法。

7、childOption主要设置SocketChannel的子Channel的选项。
8、bind用于绑定端口启动服务。

服务端


public class TestServer {
    // 1.服务端两个线程池:boss 接收请求连接线程池;worker 处理请求线程池
    private final EventLoopGroup boss = new NioEventLoopGroup();
    private final EventLoopGroup worker = new NioEventLoopGroup();
    
    public void startup() {
        try {
            //启动类
            ServerBootstrap server = new ServerBootstrap();
            //配置线程池对象
            server.group(boss, worker)
                    //表示当前服务器运行形式,基于Nio的ServerSocket实现
                    .channel(NioServerSocketChannel.class)
                    //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
                    .option(ChannelOption.SO_BACKLOG, 64)
                    .option(ChannelOption.TCP_NODELAY, true)
                    //进行Netty数据处理的过滤器配置(责任链模式)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new TestServerHandler());
                        }
                    });
            ChannelFuture cf = server.bind(8080).sync();//绑定端口,启动服务
            logger.info("server start");
            cf.channel().closeFuture().sync();
        }
        catch(Exception e) {
            e.printStackTrace();
        }finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

TestServerHandler.java

public class TestServerHandler extends SimpleChannelInboundHandler<Object>
{
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //连接服务端成功,通道连接成功后,激活该方法,此方法可向客户端发消息
        byte[] data = "[Server]连接通道已建立,可以进行通信".getBytes();
        ByteBuf buf = Unpooled.buffer(data.length);//创建一个缓冲区
        buf.writeBytes(data);//将收到的字节数据,写入到缓冲区
        ctx.writeAndFlush(buf);//进行信息的发送,发送完成后刷新缓冲区
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        //String body = buf.toString(CharsetUtil.UTF_8);
        System.out.println("底层占用了多少字节的内存:"+ buf.capacity());
        System.out.println("最大容量上限:"+ buf.maxCapacity());
        System.out.println("当前可读的字节数:"+ buf.readableBytes());
        System.out.println("是否可读:"+ buf.isReadable());
        System.out.println("读指针:"+ buf.readerIndex());
        System.out.println("把当前的读指针=保存起来:buf.markReaderIndex()");
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
		//接收不同类型的数据
		if(msg instanceof ByteBuf){}
		if(msg instanceof FullHttpRequest){}
		if(msg instanceof TextWebSocketFrame){}
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //logger.info("Server ReadComplete size:{}, info \n{}", sb.length(), sb.toString());
        ctx.fireChannelReadComplete();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

客户端

• 客户端引导类用Bootstrap。
• 客户端只需要一个NioEventLoopGroup即可,因为无需单独处理连接和IO操作。
• 指定建立连接的方式为NioSocketChannel。
• option用来配置一些channel的参数,配置的参数会被ChannelConfig使用。
• connect连接到指定的IP和端口。

public class TestClient {
    private static Logger logger = LoggerFactory.getLogger(Test2Client.class);

    public static void main(String[] args)
    {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap client = new Bootstrap();
            client.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new Test2ClientHandler());
                        }
                    });
            logger.info("client start");
            ChannelFuture f = client.connect("127.0.0.1",8080).sync();
            f.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            // 优雅退出,释放NIO线程组+
            logger.info("client shutdown");
            group.shutdownGracefully();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

Netty核心组件

类引用结构
在这里插入图片描述

EventLoopGroup、NioEventLoopGroup、NioEventLoop

EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。
NioEventLoop是具体处理channel连接及IO操作的类,一个channel在其生命周期只和一个NioEventLoop绑定,一个NioEventLoop可以同时处理多个channel。NioEventLoop使用select()不断轮询其管理的channel,直至接收到客户端的数据。

NioEventLoopGroup是NIO的EventLoopGroup具体实现,而EventLoopGroup也继承了ScheduledExecutorService,即NioEventLoopGroup本身是一个线程池,那么NioEventLoopGroup也提供了一些构造方法去构造NioEventLoopGroup。

Netty之EventLoop
https://www.cnblogs.com/wuzhenzhao/p/11221189.html

在这里插入图片描述

ServerBootStrap、BootStrap

Netty应用程序通过设置Bootstrap引导类来完成,该类提供了一个用于应用程序网络层配置的容器。

ServerBootStrap是服务端应用启动引导类,使用这个类只需绑定本地端口即可,简单的使用方法即是使用main方法调用即可,无需依赖Tomcat容器等。
BootStrap是客户端应用启动引导类,此类需要绑定连接到服务端的host和port,也可以使用main方法直接启动即可,不依赖于Tomcat容器。

Channel

Netty 中的接口 Channel 定义了与 socket 丰富交互的操作集:bind, close, config, connect, isActive, isOpen, isWritable, read, write 等等。

NioServerSocketChannel、NioSocketChannel

NioServerSocketChannel是netty为非阻塞同步IO提供的服务端socket连接类;
NioSocketChannel是netty为非阻塞同步IO提供的客户端socket连接类。

Bootstrap类在引导服务时指定channel类型,服务端接收新连接时便使用NioServerSocketChannel创建新的channel,客户端使用NioSocketChannel建立连接。

ChannelInitializer的作用

ChannelInitializer继承ChannelHandlerAdapter,是一个比较特殊的handler,它的使命仅仅是为了初始化Server端新接入的SocketChannel对象,往往我们也只是初始化新接入SocketChannel的管道pipeline,在做完初始化工作之后,该Handler就会将自己从pipeline中移除,也就是说SocketChannel的初始化工作只会做一次。

ChannelOption

Netty中的ChannelOption常用参数详解
http://www.cnblogs.com/googlemeoften/p/6082785.html
https://blog.csdn.net/hbtj_1216/article/details/74199162

ChannelHandler、ChannelInboundHandler、ChannelOutboundHandler

ChannelHandler接口是处理入站和出站数据的应用程序逻辑的容器。

ChannelInboundHandler、ChannelOutboundHandler是ChannelHandler的子接口,分别是进站时处理类的接口和出站时处理类的接口。即ChannelInboundHandler提供了服务端接收数据时处理的接口,提供了一些方法,如channelRegisteredchannelUnregistered,channelActive,channelInactive,channelRead方法等,ChannelOutboundHandler提供了服务端返回数据时做一些处理的接口。

ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter

ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter分别是ChannelInboundHandler和ChannelOutboundHandler的基础实现类,我们自定义的handler最好继承这两个,尽可能只重写自己需要的方法。

ChannelPipeline

一个ChannelPipeline中可以有多个ChannelHandler实例,而每一个ChannelHandler实例与ChannelPipeline之间的桥梁就是ChannelHandlerContext实例。
ChannelPipeline控制着ChannelHandler的流转,所有的ChannelHandler都在ChannelPipeline上流转。

ChannelHandlerContext

ChannelHandlerContext是ChannelPipeline与ChannelHandler之间的上下文。
ChannelHandlerContext里就包含着ChannelHandler中的上下文信息,每一个ChannelHandler被添加都ChannelPipeline中都会创建一个与其对应的ChannelHandlerContext。ChannelHandlerContext的功能就是用来管理它所关联的ChannelHandler和在同一个ChannelPipeline中ChannelHandler的交互。

Selector

Selector、SelectionKey、ServerSocketChannel和SocketChannel的关系

​NIO ( 非阻塞 IO ) 使用单个线程管理多个通道 ( Channel ) 就是通过选择器 ( Selector ) 实现的 ;

当客户端连接时,会通过ServerSocketChannel得到SocketChannel
将SocketChannel注册到Selector上(通过SelectableChannel方法注册)。
注册后返回一个SelectionKey被Selector以Set集合管理起来。

返回有事件发生的通道的SelectionKey进而获取到Channel

链接:https://blog.csdn.net/GxDong_/article/details/111319435

Selector 是如何轮询的呢?

Selector 会不断地轮询注册在其上的 Channel,使用select方法进行监听。如果某个 Channel 上面发生了读或者写事件,这个 Channel 就处于就绪状态,会被 Selector 轮询出来,通过 SelectionKey 可以获取就绪 Channel 的集合,进行后续的 I/O 操作。

ByteBuf

ByteBuf是netty提供的操作字节码的类,比JDK中的ByteBuffer提供了更为方便的操作字节码的方法。ByteBuf中同时保存了readIndex和writeIndex,读写互不干扰。

ByteBuf使用模式

堆缓冲区ByteBuf将数据存储在 JVM 的堆空间,这是通过将数据存储在数组的实现。堆缓冲区可以快速分配,当不使用时也可以快速释放。它还提供了直接访问数组的方法,通过 ByteBuf.array() 来获取 byte[]数据。

Netty三种不同类型的ByteBuf:堆缓冲区、直接缓冲区、复合缓冲区

Netty里的内存管理是通过ByteBuf这个类作为桥梁连接着业务代码与jdk底层的内存。
0<=readerIndex<=writerIndex<=capacity

PooledUnpooledPooled类型的bytebuf是在已经申请好的内存块取一块内存,而Unpooled是直接通过JDK底层代码申请。

Unpooled 内存分配
https://www.jianshu.com/p/566d162e89c8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Netty 缓存buffer介绍及使用
https://blog.csdn.net/cuiyaoqiang/article/details/51655712

ChannelFuture

ChannelFuture是netty中异步IO操作的结果,在使用ChannelFuture时需先判断操作已结束且成功。即isDone&&isSuccess。

其他

FullHttpRequest

使用FullHttpRequest,该类继承HttpRequest, FullHttpMessage,而FullHttpMessage又继承LastHttpContent, 而LastHttpContent又继承HttpContent。

Netty中客户端、服务端的编解码器

作为服务端而言:主要工作就是接收客户端请求,将客户端的请求内容解码;发送响应给客户端,并将发送内容编码;所以,服务端需要两个编解码器

  • HttpRequestDecoder(将请求内容解码)
  • HttpResponseEncoder(将响应内容编码)

作为客户端而言:主要工作就是发送请求给服务端,并将发送内容编码;接收服务端响应,并将接收内容解码;所以,客户端需要两个编解码器

  • HttpResponseDecoder(将响应内容解码)
  • HttpRequestEncoder(将请求内容编码)

参考资料

Netty中的Channel之数据冲刷与线程安全(writeAndFlush)
https://www.cnblogs.com/UncleCatMySelf/p/10769206.html

Netty的基本使用 https://www.jianshu.com/p/e750f628669c
Netty的启动流程解析 https://www.cnblogs.com/luoxn28/p/11839293.html
Netty之EventLoop https://www.cnblogs.com/wuzhenzhao/p/11221189.html
Netty源码解析—服务端启动 https://www.cnblogs.com/luozhiyun/p/9809634.html
Netty原理架构解析 https://www.cnblogs.com/mbhu/p/13306927.html
Netty对底层Selector的优化 https://zhuanlan.zhihu.com/p/91097192

Netty HTTP服务的实现 https://blog.csdn.net/qazwsxpcm/article/details/78364023
Netty4使用http体验 https://www.jianshu.com/p/11814875d793
Netty4实现HTTP请求、响应 https://blog.csdn.net/qq_26323323/article/details/84236481

Netty 缓存buffer介绍 https://blog.csdn.net/cuiyaoqiang/article/details/51655712
Unpooled 内存分配 https://www.jianshu.com/p/566d162e89c8
Netty ByteBuf源码解读
https://www.jianshu.com/p/9ad81534b444

使用 Netty 整合 protobuf
https://mp.weixin.qq.com/s/IT2uqhVrXSrRkWPdCNtbUg

Netty中ctx.writeAndFlush与ctx.channel().writeAndFlush的区别
https://blog.csdn.net/FishSeeker/article/details/78447684

Netty编码流程及WriteAndFlush()的实现
https://www.cnblogs.com/ZhuChangwu/p/11228433.html

Netty零拷贝
https://blog.csdn.net/Ashiamd/article/details/105889328

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/408184
推荐阅读
相关标签
  

闽ICP备14008679号