当前位置:   article > 正文

SpringBoot整合使用Netty

SpringBoot整合使用Netty

SpringBoot整合使用Netty

一. Netty简介

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,具有并发高、传输快、封装好的特性。相比于BIO(Blocking I/O,阻塞IO),并发性能会有很大的提高。
在这里插入图片描述
由此可见,NIO单线程处理的连接数量会比BIO高很多,原因是由于Selector

在NIO中,当一个Socket建立好之后,Thread并不会阻塞去接受这个Socket,而是将这个请求交给Selector,Selector会不断遍历所有的Socket,一旦有Socket建立完成,则通知Thread,然后Thread处理完数据后在返回给客户端。这个过程是不阻塞的。

在BIO中,等待客户端发送数据的过程是阻塞的,这就意味着一个线程只能处理一个请求,而机器能支持的最大线程数是有限的,这就限制了BIO的并发性能。

相关名词:

Channel,表示一个连接,可以理解为每一个请求就是一个ChannelChannelHandler,核心业务处理,用于处理业务请求;
ChannelHandlerContext,用于传输业务数据;
ChannelPipeline,用于保存处理过程中需要用到的ChannelHandlerChannelHandlerContext
  • 1
  • 2
  • 3
  • 4

二. SpringBoot整合Netty

1. 创建服务端

  1. 创建一个SpringBoot项目,POM文件引入依赖,application.properties配置文件配置端口。

    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.65.Final</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    server.port=8888
    
    • 1
  2. 主启动类

    package com.example.nettyserver;
    
    import com.example.nettyserver.netty.NettyServer;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class NettyServerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(NettyServerApplication.class, args);
    
            // 启动服务端
            NettyServer nettyServer = new NettyServer();
            nettyServer.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
  3. Netty服务启动

    package com.example.nettyserver.netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    
    public class NettyServer {
    
        public void start() {
            // 主线程组,用于接收请求
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            // 工作线程组,用于处理数据
            NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        // 设置队列大小
                        .option(ChannelOption.SO_BACKLOG, 128)
                        // 两小时内没有数据通信时,TCP会自动发送一个活动探测报文
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                // 添加编解码
                                pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                                pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                                // 添加业务处理器
                                pipeline.addLast(new NettyServerHandler());
                            }
                        });
                // 绑定端口,接收连接
                System.out.println(System.currentTimeMillis() + " Start data channel");
                ChannelFuture future = serverBootstrap.bind("127.0.0.1", 8888).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
  4. Netty服务端处理器

    package com.example.nettyserver.netty;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 客户端连接时触发
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            System.out.println(System.currentTimeMillis() + " Server channel active...");
        }
    
        /**
         * 客户端发消息时触发
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(System.currentTimeMillis() + " Server收到消息--->" + msg.toString());
            ctx.writeAndFlush("server say Hello");
        }
    
        /**
         * 发生异常时触发
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

2. 创建客户端

  1. 创建一个SpringBoot项目,POM文件引入依赖,application.properties配置文件配置端口。

    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.65.Final</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    server.port=8889
    
    • 1
  2. 主启动类

    package com.example.nettyclient;
    
    import com.example.nettyclient.netty.NettyClient;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class NettyClientApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(NettyClientApplication.class, args);
    
            NettyClient nettyClient = new NettyClient();
            nettyClient.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
  3. Netty客户端

    package com.example.nettyclient.netty;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class NettyClient {
    
        public void start() {
            NioEventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
    
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new NettyClientHandler());
                            }
                        });
                ChannelFuture future = bootstrap.connect("127.0.0.1", 8888).sync();
                System.out.println(System.currentTimeMillis() + " Client connect success...");
                // 发送消息
                future.channel().writeAndFlush("Client say Hello");
                // 等待连接被关闭
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
  4. 客户端处理器

    package com.example.nettyclient.netty;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            System.out.println(System.currentTimeMillis() + " Client channel active...");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(System.currentTimeMillis() + " Client收到消息--->" + msg.toString());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

3. 启动服务端和客户端测试

服务端:
在这里插入图片描述

客户端:
在这里插入图片描述

三. Netty参数详解

1. bossGroup与workerGroup

bossGroup负责接收请求,workerGroup负责处理数据。

2. option与childOption

服务端:

ServerBootStrap.option用于监听客户端连接服务器的套接字(服务器通道)

ServerBootStrap.childOption用于监听服务器接收连接后的套接字(每个客户端通道)

客户端:

BootStrap只会有option,没有childOption

3. ChannelOption.SO_BACKLOG

ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。

4. ChannelOption.SO_REUSEADDR

ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。

5. ChannelOption.SO_KEEPALIVE

Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

6. ChannelOption.TCP_NODELAY

ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关,Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

7. future.channel().closeFuture().sync()

主线程执行到这里就 wait 子线程结束,子线程才是真正监听和接受请求的,closeFuture()是开启了一个channel的监听器,负责监听channel是否关闭的状态,如果监听到channel关闭了,子线程才会释放,syncUninterruptibly()让主线程同步等待子线程结果。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/206534
推荐阅读
相关标签
  

闽ICP备14008679号