当前位置:   article > 正文

Springboot 整合 Netty 实战(附源码)

springboot+netty

点击上方“后端技术精选”,选择“置顶公众号”

技术文章第一时间送达!

作者:pjmike_pj

juejin.im/post/5bd584bc518825292865395d

前言

这一篇文章主要介绍如何用Springboot 整合 Netty,由于本人尚处于学习Netty的过程中,并没有将Netty 运用到实际生产项目的经验,这里也是在网上搜寻了一些Netty例子学习后总结来的,借鉴了他人的写法和经验。如有重复部分,还请见谅。

关于SpringBoot 如何整合使用 Netty ,我将分为以下几步进行分析与讨论:

  • 构建Netty 服务端

  • 构建Netty 客户端

  • 利用protobuf定义消息格式

  • 服务端空闲检测

  • 客户端发送心跳包与断线重连

PS: 我这里为了简单起见(主要是懒),将 Netty 服务端与客户端放在了同一个SpringBoot工程里,当然也可以将客户端和服务端分开。

构建 Netty 服务端

Netty 服务端的代码其实比较简单,代码如下:

  1. @Component
  2. @Slf4j
  3. public class NettyServer {
  4.     /**
  5.      * boss 线程组用于处理连接工作
  6.      */
  7.     private EventLoopGroup boss = new NioEventLoopGroup();
  8.     /**
  9.      * work 线程组用于数据处理
  10.      */
  11.     private EventLoopGroup work = new NioEventLoopGroup();
  12.     @Value("${netty.port}")
  13.     private Integer port;
  14.     /**
  15.      * 启动Netty Server
  16.      *
  17.      * @throws InterruptedException
  18.      */
  19.     @PostConstruct
  20.     public void start() throws InterruptedException {
  21.         ServerBootstrap bootstrap = new ServerBootstrap();
  22.         bootstrap.group(boss, work)
  23.                 // 指定Channel
  24.                 .channel(NioServerSocketChannel.class)
  25.                 //使用指定的端口设置套接字地址
  26.                 .localAddress(new InetSocketAddress(port))
  27.                 //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
  28.                 .option(ChannelOption.SO_BACKLOG1024)
  29.                 //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
  30.                 .childOption(ChannelOption.SO_KEEPALIVEtrue)
  31.                 //将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
  32.                 .childOption(ChannelOption.TCP_NODELAYtrue)
  33.                 .childHandler(new NettyServerHandlerInitializer());
  34.         ChannelFuture future = bootstrap.bind().sync();
  35.         if (future.isSuccess()) {
  36.             log.info("启动 Netty Server");
  37.         }
  38.     }
  39.     @PreDestroy
  40.     public void destory() throws InterruptedException {
  41.         boss.shutdownGracefully().sync();
  42.         work.shutdownGracefully().sync();
  43.         log.info("关闭Netty");
  44.     }
  45. }

因为我们在springboot 项目中使用 Netty ,所以我们将Netty 服务器的启动封装在一个 start()方法,并使用 @PostConstruct注解,在指定的方法上加上 @PostConstruct注解来表示该方法在 Spring 初始化 NettyServer类后调用。

考虑到使用心跳机制等操作,关于ChannelHandler逻辑处理链的部分将在后面进行阐述。

构建 Netty 客户端

Netty 客户端代码与服务端类似,代码如下:

  1. @Component
  2. @Slf4j
  3. public class NettyClient  {
  4.     private EventLoopGroup group = new NioEventLoopGroup();
  5.     @Value("${netty.port}")
  6.     private int port;
  7.     @Value("${netty.host}")
  8.     private String host;
  9.     private SocketChannel socketChannel;
  10.     public void sendMsg(MessageBase.Message message) {
  11.         socketChannel.writeAndFlush(message);
  12.     }
  13.     @PostConstruct
  14.     public void start()  {
  15.         Bootstrap bootstrap = new Bootstrap();
  16.         bootstrap.group(group)
  17.                 .channel(NioSocketChannel.class)
  18.                 .remoteAddress(host, port)
  19.                 .option(ChannelOption.SO_KEEPALIVEtrue)
  20.                 .option(ChannelOption.TCP_NODELAYtrue)
  21.                 .handler(new ClientHandlerInitilizer());
  22.         ChannelFuture future = bootstrap.connect();
  23.         //客户端断线重连逻辑
  24.         future.addListener((ChannelFutureListener) future1 -> {
  25.             if (future1.isSuccess()) {
  26.                 log.info("连接Netty服务端成功");
  27.             } else {
  28.                 log.info("连接失败,进行断线重连");
  29.                 future1.channel().eventLoop().schedule(() -> start(), 20TimeUnit.SECONDS);
  30.             }
  31.         });
  32.         socketChannel = (SocketChannel) future.channel();
  33.     }
  34. }

上面还包含了客户端断线重连的逻辑,更多细节问题,将在下面进行阐述。

使用 protobuf 构建通信协议

在整合使用 Netty 的过程中,我们使用 Google 的protobuf定义消息格式,下面来简单介绍下 protobuf

protobuf简介

Google 官方给 protobuf的定义如下:

Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。

在 Netty 中常用 protobuf 来做序列化方案,当然也可以用 protobuf来构建 客户端与服务端之间的通信协议

为什么要用protobuf

我们这里是用 protobuf 做为我们的序列化手段,那我们为什么要使用 protobuf,而不使用其他序列化方案呢,比如 jdk 自带的序列化,Thrift,fastjson等。

首先 jdk 自带序列化手段有很多缺点,比如:

  • 序列化后的码流太大

  • 性能太低

  • 无法跨语言

而 Google Protobuf 跨语言,支持C++、java和python。然后利用protobuf 编码后的消息更小,有利于存储和传输,并且其性能也非常高,相比其他序列化框架,它也是非常有优势的,具体的关于Java 各种序列化框架比较此处就不多说了。总之,目前Google Protobuf 广泛的被使用到各种项目,它的诸多优点让我们选择使用它。

怎么使用protobuf

对于 Java 而言,使用 protobuf 主要有以下几步:

  • 在 .proto 文件中定义消息格式

  • 使用 protobuf 编译器编译 .proto文件 成 Java 类

  • 使用 Java 对应的 protobuf API来写或读消息

定义 protobuf 协议格式

这里为我Demo里的 message.proto文件为例,如下:

  1. //protobuf语法有 proto2和proto3两种,这里指定 proto3
  2. syntax = "proto3"
  3. // 文件选项
  4. option java_package = "com.pjmike.server.protocol.protobuf";
  5. option java_outer_classname = "MessageBase";
  6. // 消息模型定义
  7. message Message {
  8.     string requestId = 1;
  9.     CommandType cmd = 2;
  10.     string content = 3;
  11.     enum CommandType {
  12.         NORMAL = 0//常规业务消息
  13.         HEARTBEAT_REQUEST = 1//客户端心跳消息
  14.         HEARTBEAT_RESPONSE = 2//服务端心跳消息
  15.     }
  16. }

文件解读:

  • 文中的第一行指定正在使用 proto3语法,如果没有指定,编译器默认使用 proto2的语法。现在新项目中可能一般多用 proto3的语法,proto3比 proto2支持更多的语言但更简洁。如果首次使用 protobuf,可以选择使用 proto3

  • 定义 .proto文件时,可以标注一系列的选项,一些选项是文件级别的,比如上面的第二行和第三行,java_package文件选项表明protocol编译器编译 .proto文件生成的 Java 类所在的包,java_outer_classname选项表明想要生成的 Java 类的名称

  • Message中定义了具体的消息格式,我这里定义了三个字段,每个字段都有唯一的一个数字标识符,这些标识符用来在消息的二进制格式中识别各个字段的

  • Message中还添加了一个枚举类型,该枚举中含有类型 CommandType中所有的值,每个枚举类型必须将其第一个类型映射为 0,该0值为默认值。

消息模型定义

关于消息格式,此处我只是非常非常简单的定义了几个字段,requestId代表消息Id,CommandType表示消息的类型,这里简单分为心跳消息类型和业务消息类型,然后content就是具体的消息内容。这里的消息格式定义是十分简陋,真正的项目实战中,关于自定义消息格式的要求是非常多的,是比较复杂的。

上面简单的介绍了 protobuf的一些语法规则,关于 protobuf语法的更多介绍参考官方文档:

https://developers.google.com/protocol-buffers/docs/proto3

使用 .proto编译器编译

第一步已经定义好了 protobuf的消息格式,然后我们用 .proto文件的编译器将我们定义的 消息格式编译生成对应的 Java类,以便于我们在项目中使用该消息类。

关于protobuf编译器的安装这里我就不细说,详情见官方文档:

https://developers.google.com/protocol-buffers/

安装好编译器以后,使用以下命令编译.proto文件:

protoc -I = ./ --java_out=./ ./Message.proto

  • -I 选项用于指定待编译的 .proto消息定义文件所在的目录,该选项也可以写作为 --proto_path

  • --java_out选项表示生成 Java代码后存放位置,对于不同语言,我们的选项可能不同,比如生成C++代码为 --cpp_out

  • 在前两个选项后再加上 待编译的消息定义文件

使用 Java 对应 的 protobuf API来读写消息

前面已经根据 .proto消息定义文件生成的Java类,我们这里代码根据 Message.proto生成了MessageBase类,但是要正常的使用生成的 Java 类,我们还需要引入 protobuf-java的依赖:

  1. <dependency>
  2.     <groupId>com.google.protobuf</groupId>
  3.     <artifactId>protobuf-java</artifactId>
  4.     <version>3.5.1</version>
  5. </dependency>

使用 protobuf 生成的每一个 Java类中,都会包含两种内部类:Msg 和 Msg 包含的 Builder(这里的Msg就是实际消息传输类)。具体是.proto中定义的每一个message 都会生成一个 Msg,每一个Msg对应一个 Builder:

  • Buidler提供了构建类,查询类的API

  • Msg提供了查询,序列化,反序列化的API

比如我们使用 Builder来构建 Msg,例子如下:

  1. public class MessageBaseTest {
  2.     public static void main(String[] args) {
  3.         MessageBase.Message message = MessageBase.Message.newBuilder()
  4.                 .setRequestId(UUID.randomUUID().toString())
  5.                 .setContent("hello world").build();
  6.         System.out.println("message: "+message.toString());
  7.     }
  8. }

这里就不多介绍protobuf-java API的相关用法了,更多详情还是参考官方文档:

https://developers.google.com/protocol-buffers/docs/reference/java/

protobuf的编解码器

上面说了这么多,消息传输格式已经定义好了,但是在客户端和服务端传输过程中我们还需要对这种 protobuf格式进行编解码,当然我们可以自定义消息的编解码,protobuf-java 的API中提供了相关的序列化和反序列化方法。好消息是,Netty 为了支持 protobuf提供了针对 protobuf的编解码器,如下表所示(摘自《Netty实战》) :

640?wx_fmt=png

有了这些编解码器,将其加入客户端和服务端的 ChannelPipeline中以用于对消息进行编解码,如下:

  1. public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
  2.     @Override
  3.     protected void initChannel(Channel ch) throws Exception {
  4.         ch.pipeline()
  5.                 //空闲检测
  6.                 .addLast(new ServerIdleStateHandler())
  7.                 .addLast(new ProtobufVarint32FrameDecoder())
  8.                 .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
  9.                 .addLast(new ProtobufVarint32LengthFieldPrepender())
  10.                 .addLast(new ProtobufEncoder())
  11.                 .addLast(new NettyServerHandler());
  12.     }
  13. }

客户端心跳机制

心跳机制简介

心跳是在TCP长连接中,客户端与服务端之间定期发送的一种特殊的数据包,通知对方在线以确保TCP连接的有效性。

如何实现心跳机制

有两种方式实现心跳机制:

  • 使用TCP协议层面的 keepalive 机制

  • 在应用层上自定义的心跳机制

TCP层面的 keepalive 机制我们在之前构建 Netty服务端和客户端启动过程中也有定义,我们需要手动开启,示例如下:

  1. // 设置TCP的长连接,默认的 keepalive的心跳时间是两个小时
  2. childOption(ChannelOption.SO_KEEPALIVEtrue)

除了开启 TCP协议的 keepalive 之外,在我研究了github的一些开源Demo发现,人们往往也会自定义自己的心跳机制,定义心跳数据包。而Netty也提供了 IdleStateHandler 来实现心跳机制。(更多Springboot文章,参考:

Netty 实现心跳机制

下面来看看客户端如何实现心跳机制:

  1. @Slf4j
  2. public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
  3.     @Override
  4.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  5.         if (evt instanceof IdleStateEvent) {
  6.             IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
  7.             if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
  8.                 log.info("已经10s没有发送消息给服务端");
  9.                 //向服务端送心跳包
  10.                 //这里使用 protobuf定义的消息格式
  11.                 MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
  12.                         .setRequestId(UUID.randomUUID().toString())
  13.                         .setContent("heartbeat").build();
  14.                 //发送心跳消息,并在发送失败时关闭该连接
  15.                 ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  16.             }
  17.         } else {
  18.             super.userEventTriggered(ctx, evt);
  19.         }
  20.     }
  21. }

我们这里创建了一个ChannelHandler类并重写了userEventTriggered方法,在该方法里实现发送心跳数据包的逻辑,同时将 IdleStateEvent类加入逻辑处理链上。

实际上是当连接空闲时间太长时,将会触发一个 IdleStateEvent事件,然后我们调用 userEventTriggered来处理该 IdleStateEvent事件。

当启动客户端和服务端之后,控制台打印心跳消息如下:

  1. 2018-10-28 16:30:46.825  INFO 42648 --- [ntLoopGroup-2-1] c.pjmike.server.client.HeartbeatHandler  : 已经10s没有发送消息给服务端
  2. 2018-10-28 16:30:47.176  INFO 42648 --- [ntLoopGroup-4-1] c.p.server.server.NettyServerHandler     : 收到客户端发来的心跳消息:requestId"80723780-2ce0-4b43-ad3a-53060a6e81ab"
  3. cmdHEARTBEAT_REQUEST
  4. content"heartbeat"

上面我们只讨论了客户端发送心跳消息给服务端,那么服务端还需要发心跳消息给客户端吗?

一般情况是,对于长连接而言,一种方案是两边都发送心跳消息,另一种是服务端作为被动接收一方,如果一段时间内服务端没有收到心跳包那么就直接断开连接。

我们这里采用第二种方案,只需要客户端发送心跳消息,然后服务端被动接收,然后设置一段时间,在这段时间内如果服务端没有收到任何消息,那么就主动断开连接,这也就是后面要说的 空闲检测

Netty 客户端断线重连

一般有以下两种情况,Netty 客户端需要重连服务端:

  • Netty 客户端启动时,服务端挂掉,连不上服务端

  • 在程序运行过程中,服务端突然挂掉

第一种情况实现 ChannelFutureListener用来监测连接是否成功,不成功就进行断连重试机制,代码如下:

  1. @Component
  2. @Slf4j
  3. public class NettyClient  {
  4.     private EventLoopGroup group = new NioEventLoopGroup();
  5.     @Value("${netty.port}")
  6.     private int port;
  7.     @Value("${netty.host}")
  8.     private String host;
  9.     private SocketChannel socketChannel;
  10.     public void sendMsg(MessageBase.Message message) {
  11.         socketChannel.writeAndFlush(message);
  12.     }
  13.     @PostConstruct
  14.     public void start()  {
  15.         Bootstrap bootstrap = new Bootstrap();
  16.         bootstrap.group(group)
  17.                 .channel(NioSocketChannel.class)
  18.                 .remoteAddress(host, port)
  19.                 .handler(new ClientHandlerInitilizer());
  20.         ChannelFuture future = bootstrap.connect();
  21.         //客户端断线重连逻辑
  22.         future.addListener((ChannelFutureListener) future1 -> {
  23.             if (future1.isSuccess()) {
  24.                 log.info("连接Netty服务端成功");
  25.             } else {
  26.                 log.info("连接失败,进行断线重连");
  27.                 future1.channel().eventLoop().schedule(() -> start(), 20TimeUnit.SECONDS);
  28.             }
  29.         });
  30.         socketChannel = (SocketChannel) future.channel();
  31.     }
  32. }

ChannelFuture添加一个监听器,如果客户端连接服务端失败,调用 channel().eventLoop().schedule()方法执行重试逻辑。

第二种情况是运行过程中 服务端突然挂掉了,这种情况我们在处理数据读写的Handler中实现,代码如下:

  1. @Slf4j
  2. public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
  3.     @Autowired
  4.     private NettyClient nettyClient;
  5.     @Override
  6.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  7.         if (evt instanceof IdleStateEvent) {
  8.             IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
  9.             if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
  10.                 log.info("已经10s没有发送消息给服务端");
  11.                 //向服务端送心跳包
  12.                 MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
  13.                         .setRequestId(UUID.randomUUID().toString())
  14.                         .setContent("heartbeat").build();
  15.                 //发送心跳消息,并在发送失败时关闭该连接
  16.                 ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  17.             }
  18.         } else {
  19.             super.userEventTriggered(ctx, evt);
  20.         }
  21.     }
  22.     @Override
  23.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  24.         //如果运行过程中服务端挂了,执行重连机制
  25.         EventLoop eventLoop = ctx.channel().eventLoop();
  26.         eventLoop.schedule(() -> nettyClient.start(), 10L, TimeUnit.SECONDS);
  27.         super.channelInactive(ctx);
  28.     }
  29. }

我们这里直接在实现心跳机制的 Handler中重写channelInactive方法,然后在该方法中执行重试逻辑,这里注入了 NettyClient类,目的是方便调用 NettyClient的start()方法重新连接服务端

channelInactive()方法是指如果当前Channel没有连接到远程节点,那么该方法将会被调用。

服务端空闲检测

空闲检测是什么?实际上空闲检测是每隔一段时间,检测这段时间内是否有数据读写。比如,服务端检测一段时间内,是否收到客户端发送来的数据,如果没有,就及时释放资源,关闭连接。

对于空闲检测,Netty 特地提供了 IdleStateHandler 来实现这个功能。下面的代码参考自掘金小册《Netty 入门与实战:仿写微信 IM 即时通讯系统》中空闲检测部分的实现:

  1. @Slf4j
  2. public class ServerIdleStateHandler extends IdleStateHandler {
  3.     /**
  4.      * 设置空闲检测时间为 30s
  5.      */
  6.     private static final int READER_IDLE_TIME = 30;
  7.     public ServerIdleStateHandler() {
  8.         super(READER_IDLE_TIME00TimeUnit.SECONDS);
  9.     }
  10.     @Override
  11.     protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
  12.         log.info("{} 秒内没有读取到数据,关闭连接"READER_IDLE_TIME);
  13.         ctx.channel().close();

Controller方法测试

因为这是 SpringBoot 整合 Netty 的一个Demo,我们创建一个Controller方法对Netty 服务端与客户端之间的通信进行测试,controller代码如下,非常简单:

  1. @RestController
  2. public class ConsumerController {
  3.     @Autowired
  4.     private NettyClient nettyClient;
  5.     @GetMapping("/send")
  6.     public String send() {
  7.         MessageBase.Message message = new MessageBase.Message()
  8.                 .toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL)
  9.                 .setContent("hello server")
  10.                 .setRequestId(UUID.randomUUID().toString()).build();
  11.         nettyClient.sendMsg(message);
  12.         return "send ok";
  13.     }
  14. }

注入 NettyClient,调用其 sendMsg方法发送消息,结果如下:

 
 

小结

上面详细阐述了 如何用 SpringBoot 整合 Netty ,其中借鉴很多前辈大佬的例子与文章,算是初步了解了如何使用 Netty。上文中如有错误之处,欢迎指出。

github地址:

https://github.com/pjmike/springboot-netty

参考

https://juejin.im/book/5b4bc28bf265da0f60130116

 
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/1006580
推荐阅读
相关标签
  

闽ICP备14008679号