当前位置:   article > 正文

浅析Netty实现心跳机制与断线重连_dubbo 使用netty为什么要有心跳检查和断线重连

dubbo 使用netty为什么要有心跳检查和断线重连
转载自:http://blog.csdn.net/z69183787/article/details/52671543


       何为心跳:顾名思义, 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.

       为什么需要心跳?
       因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题,就需要引入心跳机制. 心跳机制的工作原理是: 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性.

       如何实现心跳?
       可以通过两种方式实现心跳机制:1.使用 TCP 协议层面的 keepalive 机制;2.在应用层上实现自定义的心跳机制.
       虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:它不是 TCP 的标准协议, 并且是默认关闭的;TCP keepalive机制依赖于操作系统的实现, 默认的 keepalive心跳时间是两个小时, 并且对keepalive的修改需要系统调用(或者修改系统配置), 灵活性不够;TCP keepalive与TCP协议绑定, 因此如果需要更换为UDP协议时, keepalive机制就失效了。
       虽然使用TCP层面的keepalive机制比自定义的应用层心跳机制节省流量, 但是基于上面的几点缺点, 一般的实践中, 大多数都是选择在应用层上实现自定义的心跳。
       在Netty中, 实现心跳机制的关键是IdleStateHandler,它可以对一个Channel的读/写设置定时器, 当Channel在一定事件间隔内没有数据交互时(即处于idle状态),就会触发指定的事件。

       在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 那么这个 Handler 如何使用呢?实例化一个 IdleStateHandler 需要提供三个参数:
          • readerIdleTimeSeconds, 读超时. 即当在指定的事件间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
          • writerIdleTimeSeconds, 写超时. 即当在指定的事件间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
          • allIdleTimeSeconds, 读/写超时. 即当在指定的事件间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.


       为了展示具体的IdleStateHandler实现的心跳机制, 下面构造一个具体的EchoServer 的例子, 这个例子的行为如下:

          1. 在这个例子中, 客户端和服务器通过 TCP 长连接进行通信.
          2. TCP 通信的报文格式是:
              +--------+-----+---------------+ 
              | Length |Type |   Content     |
              |   17   |  1  |"HELLO, WORLD" |
              +--------+-----+---------------+
       1. 客户端每隔一个随机的时间后, 向服务器发送消息, 服务器收到消息后, 立即将收到的消息原封不动地回复给客户端.
       2. 若客户端在指定的时间间隔内没有读/写操作, 则客户端会自动向服务器发送一个 PING 心跳, 服务器收到 PING 心跳消息时, 需要回复一个 PONG 消息.


       下面所使用的代码例子可以在https://github.com/yongshun/some_java_code上找到。实现心跳的通用部分 CustomHeartbeatHandler:

  1. public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
  2. public static final byte PING_MSG = 1;
  3. public static final byte PONG_MSG = 2;
  4. public static final byte CUSTOM_MSG = 3;
  5. protected String name;
  6. private int heartbeatCount = 0;
  7. public CustomHeartbeatHandler(String name) {
  8. this.name = name;
  9. }
  10. @Override
  11. protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
  12. if (byteBuf.getByte(4) == PING_MSG) {
  13. sendPongMsg(context);
  14. } else if (byteBuf.getByte(4) == PONG_MSG){
  15. System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
  16. } else {
  17. handleData(context, byteBuf);
  18. }
  19. }
  20. protected void sendPingMsg(ChannelHandlerContext context) {
  21. ByteBuf buf = context.alloc().buffer(5);
  22. buf.writeInt(5);
  23. buf.writeByte(PING_MSG);
  24. buf.retain();
  25. context.writeAndFlush(buf);
  26. heartbeatCount++;
  27. System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
  28. }
  29. private void sendPongMsg(ChannelHandlerContext context) {
  30. ByteBuf buf = context.alloc().buffer(5);
  31. buf.writeInt(5);
  32. buf.writeByte(PONG_MSG);
  33. context.channel().writeAndFlush(buf);
  34. heartbeatCount++;
  35. System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
  36. }
  37. protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);
  38. @Override
  39. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  40. // IdleStateHandler 所产生的 IdleStateEvent 的处理逻辑.
  41. if (evt instanceof IdleStateEvent) {
  42. IdleStateEvent e = (IdleStateEvent) evt;
  43. switch (e.state()) {
  44. case READER_IDLE:
  45. handleReaderIdle(ctx);
  46. break;
  47. case WRITER_IDLE:
  48. handleWriterIdle(ctx);
  49. break;
  50. case ALL_IDLE:
  51. handleAllIdle(ctx);
  52. break;
  53. default:
  54. break;
  55. }
  56. }
  57. }
  58. @Override
  59. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  60. System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
  61. }
  62. @Override
  63. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  64. System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
  65. }
  66. protected void handleReaderIdle(ChannelHandlerContext ctx) {
  67. System.err.println("---READER_IDLE---");
  68. }
  69. protected void handleWriterIdle(ChannelHandlerContext ctx) {
  70. System.err.println("---WRITER_IDLE---");
  71. }
  72. protected void handleAllIdle(ChannelHandlerContext ctx) {
  73. System.err.println("---ALL_IDLE---");
  74. }
  75. }
       CustomHeartbeatHandler 负责心跳的发送和接收, IdleStateHandler 是实现心跳的关键, 它会根据不同的 IO idle 类型来产生不同的 IdleStateEvent 事件, 而这个事件的捕获, 其实就是在 userEventTriggered 方法中实现的。
       在userEventTriggered中, 根据 IdleStateEvent 的 state() 的不同, 而进行不同的处理。例如如果是读取数据idle,则 e.state() == READER_IDLE, 因此就调用 handleReaderIdle 来处理它。CustomHeartbeatHandler 提供了三个 idle 处理方法: handleReaderIdle, handleWriterIdle, handleAllIdle, 这三个方法目前只有默认的实现, 它需要在子类中进行重写。
       接下来看看数据处理部分,CustomHeartbeatHandler.channelRead0 中,首先根据报文协议来判断当前的报文类型, 如果是 PING_MSG 则表示是服务器收到客户端的 PING 消息, 此时服务器需要回复一个 PONG 消息, 其消息类型是 PONG_MSG.扔报文类型是 PONG_MSG, 则表示是客户端收到服务器发送的 PONG 消息。

客户端初始化
  1. public class Client {
  2. public static void main(String[] args) {
  3. NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
  4. Random random = new Random(System.currentTimeMillis());
  5. try {
  6. Bootstrap bootstrap = new Bootstrap();
  7. bootstrap
  8. .group(workGroup)
  9. .channel(NioSocketChannel.class)
  10. .handler(new ChannelInitializer<SocketChannel>() {
  11. protected void initChannel(SocketChannel socketChannel) throws Exception {
  12. ChannelPipeline p = socketChannel.pipeline();
  13. p.addLast(new IdleStateHandler(0, 0, 5));
  14. p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
  15. p.addLast(new ClientHandler());
  16. }
  17. });
  18. Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();
  19. for (int i = 0; i < 10; i++) {
  20. String content = "client msg " + i;
  21. ByteBuf buf = ch.alloc().buffer();
  22. buf.writeInt(5 + content.getBytes().length);
  23. buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
  24. buf.writeBytes(content.getBytes());
  25. ch.writeAndFlush(buf);
  26. Thread.sleep(random.nextInt(20000));
  27. }
  28. } catch (Exception e) {
  29. throw new RuntimeException(e);
  30. } finally {
  31. workGroup.shutdownGracefully();
  32. }
  33. }
  34. }
       在ChannelInitializer.initChannel部分,给 pipeline 添加了三个Handler, IdleStateHandler 这个 handler 是心跳机制的核心,为客户端端设置了读写 idle 超时, 时间间隔是5s, 即如果客户端在间隔5s后都没有收到服务器的消息或向服务器发送消息, 则产生 ALL_IDLE 事件。LengthFieldBasedFrameDecoder, 它是负责解析TCP报文。最后一个Handler是ClientHandler,它继承于 CustomHeartbeatHandler,是处理业务逻辑部分。
  1. public class ClientHandler extends CustomHeartbeatHandler {
  2. public ClientHandler() {
  3. super("client");
  4. }
  5. @Override
  6. protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
  7. byte[] data = new byte[byteBuf.readableBytes() - 5];
  8. byteBuf.skipBytes(5);
  9. byteBuf.readBytes(data);
  10. String content = new String(data);
  11. System.out.println(name + " get content: " + content);
  12. }
  13. @Override
  14. protected void handleAllIdle(ChannelHandlerContext ctx) {
  15. super.handleAllIdle(ctx);
  16. sendPingMsg(ctx);
  17. }
  18. }
       ClientHandler继承于CustomHeartbeatHandler, 它重写了两个方法, 一个是 handleData, 在这里面实现 仅仅打印收到的消息。第二个重写的方法是handleAllIdle。客户端负责发送心跳的PING消息, 当客户端产生一个ALL_IDLE事件后,会导致父类的 CustomHeartbeatHandler.userEventTriggered 调用, 而userEventTriggered中会根据 e.state() 来调用不同的方法, 因此最后调用的是 ClientHandler.handleAllIdle, 在这个方法中, 客户端调用sendPingMsg向服务器发送一个PING消息。

服务器初始化
  1. public class Server {
  2. public static void main(String[] args) {
  3. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
  4. NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
  5. try {
  6. ServerBootstrap bootstrap = new ServerBootstrap();
  7. bootstrap
  8. .group(bossGroup, workGroup)
  9. .channel(NioServerSocketChannel.class)
  10. .childHandler(new ChannelInitializer<SocketChannel>() {
  11. protected void initChannel(SocketChannel socketChannel) throws Exception {
  12. ChannelPipeline p = socketChannel.pipeline();
  13. p.addLast(new IdleStateHandler(10, 0, 0));
  14. p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
  15. p.addLast(new ServerHandler());
  16. }
  17. });
  18. Channel ch = bootstrap.bind(12345).sync().channel();
  19. ch.closeFuture().sync();
  20. } catch (Exception e) {
  21. throw new RuntimeException(e);
  22. } finally {
  23. bossGroup.shutdownGracefully();
  24. workGroup.shutdownGracefully();
  25. }
  26. }
  27. }
  1. public class ServerHandler extends CustomHeartbeatHandler {
  2. public ServerHandler() {
  3. super("server");
  4. }
  5. @Override
  6. protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
  7. byte[] data = new byte[buf.readableBytes() - 5];
  8. ByteBuf responseBuf = Unpooled.copiedBuffer(buf);
  9. buf.skipBytes(5);
  10. buf.readBytes(data);
  11. String content = new String(data);
  12. System.out.println(name + " get content: " + content);
  13. channelHandlerContext.write(responseBuf);
  14. }
  15. @Override
  16. protected void handleReaderIdle(ChannelHandlerContext ctx) {
  17. super.handleReaderIdle(ctx);
  18. System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");
  19. ctx.close();
  20. }
  21. }
       ServerHandler继承于CustomHeartbeatHandler, 它重写了两个方法, 一个是handleData, 在这里面实现EchoServer的功能: 即收到客户端的消息后, 立即原封不动地将消息回复给客户端.
       第二个重写的方法是 handleReaderIdle, 因为服务器仅仅对客户端的读 idle 感兴趣, 因此只重新了这个方法. 若服务器在指定时间后没有收到客户端的消息, 则会触发 READER_IDLE 消息, 进而会调用 handleReaderIdle 这个方法.客户端负责发送心跳的 PING 消息, 并且服务器的 READER_IDLE 的超时时间是客户端发送 PING 消息的间隔的两倍, 因此当服务器 READER_IDLE 触发时, 就可以确定是客户端已经掉线了, 因此服务器直接关闭客户端连接即可。

总结
       1. 使用 Netty 实现心跳机制的关键就是利用 IdleStateHandler 来产生对应的 idle 事件.
       2. 一般是客户端负责发送心跳的 PING 消息, 因此客户端注意关注 ALL_IDLE 事件, 在这个事件触发后, 客户端需要向服务器发送 PING 消息, 告诉服务器"我还存活着".
       3. 服务器是接收客户端的 PING 消息的, 因此服务器关注的是 READER_IDLE 事件, 并且服务器的 READER_IDLE 间隔需要比客户端的 ALL_IDLE 事件间隔大(例如客户端ALL_IDLE 是5s 没有读写时触发, 因此服务器的 READER_IDLE 可以设置为10s)
       4. 当服务器收到客户端的 PING 消息时, 会发送一个 PONG 消息作为回复. 一个 PING-PONG 消息对就是一个心跳交互.


实现客户端的断线重连
  1. public class Client {
  2. private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
  3. private Channel channel;
  4. private Bootstrap bootstrap;
  5. public static void main(String[] args) throws Exception {
  6. Client client = new Client();
  7. client.start();
  8. client.sendData();
  9. }
  10. public void sendData() throws Exception {
  11. Random random = new Random(System.currentTimeMillis());
  12. for (int i = 0; i < 10000; i++) {
  13. if (channel != null && channel.isActive()) {
  14. String content = "client msg " + i;
  15. ByteBuf buf = channel.alloc().buffer(5 + content.getBytes().length);
  16. buf.writeInt(5 + content.getBytes().length);
  17. buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
  18. buf.writeBytes(content.getBytes());
  19. channel.writeAndFlush(buf);
  20. }
  21. Thread.sleep(random.nextInt(20000));
  22. }
  23. }
  24. public void start() {
  25. try {
  26. bootstrap = new Bootstrap();
  27. bootstrap
  28. .group(workGroup)
  29. .channel(NioSocketChannel.class)
  30. .handler(new ChannelInitializer<SocketChannel>() {
  31. protected void initChannel(SocketChannel socketChannel) throws Exception {
  32. ChannelPipeline p = socketChannel.pipeline();
  33. p.addLast(new IdleStateHandler(0, 0, 5));
  34. p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
  35. p.addLast(new ClientHandler(Client.this));
  36. }
  37. });
  38. doConnect();
  39. } catch (Exception e) {
  40. throw new RuntimeException(e);
  41. }
  42. }
  43. protected void doConnect() {
  44. if (channel != null && channel.isActive()) {
  45. return;
  46. }
  47. ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);
  48. future.addListener(new ChannelFutureListener() {
  49. public void operationComplete(ChannelFuture futureListener) throws Exception {
  50. if (futureListener.isSuccess()) {
  51. channel = futureListener.channel();
  52. System.out.println("Connect to server successfully!");
  53. } else {
  54. System.out.println("Failed to connect to server, try connect after 10s");
  55. futureListener.channel().eventLoop().schedule(new Runnable() {
  56. @Override
  57. public void run() {
  58. doConnect();
  59. }
  60. }, 10, TimeUnit.SECONDS);
  61. }
  62. }
  63. });
  64. }
  65. }
       上面的代码中,抽象出 doConnect 方法, 它负责客户端和服务器的 TCP 连接的建立, 并且当 TCP 连接失败时, doConnect 会 通过 "channel().eventLoop().schedule" 来延时10s 后尝试重新连接。
  1. public class ClientHandler extends CustomHeartbeatHandler {
  2. private Client client;
  3. public ClientHandler(Client client) {
  4. super("client");
  5. this.client = client;
  6. }
  7. @Override
  8. protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
  9. byte[] data = new byte[byteBuf.readableBytes() - 5];
  10. byteBuf.skipBytes(5);
  11. byteBuf.readBytes(data);
  12. String content = new String(data);
  13. System.out.println(name + " get content: " + content);
  14. }
  15. @Override
  16. protected void handleAllIdle(ChannelHandlerContext ctx) {
  17. super.handleAllIdle(ctx);
  18. sendPingMsg(ctx);
  19. }
  20. @Override
  21. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  22. super.channelInactive(ctx);
  23. client.doConnect();
  24. }
  25. }
       断线重连的关键一点是检测连接是否已经断开. 因此改写了ClientHandler,重写了channelInactive方法. 当TCP连接断开时,会回调channelInactive方法,因此在这个方法中调用 client.doConnect() 来进行重连。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/242058
推荐阅读
  

闽ICP备14008679号