当前位置:   article > 正文

netty 心跳包和断线重连机制_收不到心跳包立马断连吗

收不到心跳包立马断连吗

为什么需要心跳包???

心跳包主要是用来做TCP长连接保活的。有时 socket 虽然是连接的但中间网络可能有问题,这时你还在不停的往外发送数据,但对方是收不到的,你不知道对方是不是还活着,不知道 socket 通道是不是还是联通的。 心跳包就是你发送一些试探包给对方,对方回应,如果一定时间内比如30秒内没有收到任何数据,说明对方或网络可能有问题了。这时你主动断开 socket 连接,避免浪费资源。

TCP 本来就有 keepAlive 机制为什么还需要应用层自己实现心跳???

TCP keepAlive 也是在一定时间内(默认2小时)socket 上没有接收到数据时主动断开连接,避免浪费资源,这时远端很可能已经down机了或中间网络有问题。也是通过发送一系列试探包看有没有回应来实现的。
但 TCP keepAlive 检测的主要是传输层的连通性,应用层心跳主要检测应用层服务是否可用,如如果出现死锁虽然 socket 是联通的但服务已经不可用。
TCP keepAlive 依赖操作系统,默认是关闭的,需要修改操作系统配置打开。

netty 中通过 IdleStateHandler 在空闲的时候发送心跳包

为什么在空闲的时候发送心跳包,而不是每隔固定时间发送???

这个是显而易见的,正常通信时说明两端连接是没有问题的,所以只在空闲的时候发送心跳包。如果每隔固定时间发送就会浪费资源占用正常通信的资源。

假设现在要做一个手机端推送的项目,所有手机通过 TCP 长连接连接到后台服务器。心跳机制是这样的:

  1. 手机端在写空闲的时候发送心跳包给服务端,用 IdleStateHandler 来做 socket 的空闲检测。 如果 5 秒内没有写任何数据,则发送心跳包到服务端
  1. ch.pipeline().addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
  2. ch.pipeline().addLast(new HeartbeatKeeper());
  3. @ChannelHandler.Sharable
  4. public class HeartbeatKeeper extends ChannelInboundHandlerAdapter {
  5. @Override
  6. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  7. if (evt instanceof IdleStateEvent) {
  8. IdleState state = ((IdleStateEvent) evt).state();
  9. if (state == IdleState.WRITER_IDLE) {
  10. System.out.println("client send heart beat");
  11. ctx.channel().writeAndFlush("heart beat\n");
  12. }
  13. } else {
  14. super.userEventTriggered(ctx, evt);
  15. }
  16. }
  17. }
  1. 服务端设置读超时,如果 30 秒内没有收到一个客户端的任何数据则关闭连接。
    1. ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
    2. ch.pipeline().addLast(new IdleStateTrigger());
    3. @ChannelHandler.Sharable
    4. public class IdleStateTrigger extends ChannelInboundHandlerAdapter {
    5. @Override
    6. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    7. if (evt instanceof IdleStateEvent) {
    8. IdleState state = ((IdleStateEvent) evt).state();
    9. if (state == IdleState.READER_IDLE) {
    10. ctx.channel().close();
    11. }
    12. } else {
    13. super.userEventTriggered(ctx, evt);
    14. }
    15. }
    16. }

    服务端接收到心跳包后要不要回复???

    看其他博客说不要回复,如果有 10万空闲连接,光回复心跳包就要占用大量资源。服务端读超时后直接关闭连接,客户端再进行重连。

    断线重连

    断线重连也很简单就是在 channelInactive 的时候重新 connect 就行了。参考其他博客专门用一个 ChannelInboundHandler 来处理断线重连。

    1. @ChannelHandler.Sharable
    2. public class ConnectionWatchDog extends ChannelInboundHandlerAdapter implements TimerTask {
    3. private final Bootstrap bootstrap;
    4. private final String host;
    5. private final int port;
    6. private volatile boolean reconnect;
    7. private int attempts;
    8. private Channel channel;
    9. private HashedWheelTimer timer = new HashedWheelTimer();
    10. private int reconnectDelay = 5;
    11. public ConnectionWatchDog(Bootstrap bootstrap, String host, int port, boolean reconnect) {
    12. this.bootstrap = bootstrap;
    13. this.host = host;
    14. this.port = port;
    15. this.reconnect = reconnect;
    16. }
    17. public Channel getChannel() {
    18. return this.channel;
    19. }
    20. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    21. System.out.println("channelActive");
    22. channel = ctx.channel();
    23. ctx.fireChannelActive();
    24. }
    25. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    26. System.out.println("channelInactive");
    27. ctx.fireChannelInactive();
    28. channel = null;
    29. if (reconnect) {
    30. attempts = 0;
    31. scheduleReconnect();
    32. }
    33. }
    34. private void connect() {
    35. bootstrap.connect(host, port).addListener((future) -> {
    36. if (future.isSuccess()) {
    37. System.out.println("connected to " + host + ":" + port);
    38. attempts = 0;
    39. } else {
    40. System.out.println("connect failed " + attempts + " , to reconnect after " + reconnectDelay + " 秒");
    41. // 这里现在每5秒重连一次直到连接上,可自己实现重连逻辑
    42. scheduleReconnect();
    43. }
    44. });
    45. }
    46. public void run(Timeout timeout) {
    47. synchronized (this.bootstrap) {
    48. ++attempts;
    49. connect();
    50. }
    51. }
    52. private void scheduleReconnect() {
    53. timer.newTimeout(this, reconnectDelay, TimeUnit.SECONDS);
    54. }
    55. public void setReconnect(boolean reconnect) {
    56. this.reconnect = reconnect;
    57. }
    58. }

    这个 watchDog Handler 应当放在 ChannelPipeline 的最前面

    1. public void connect(String host, int port) {
    2. Bootstrap bootstrap = new Bootstrap().group(new NioEventLoopGroup())
    3. .channel(NioSocketChannel.class)
    4. .option(ChannelOption.TCP_NODELAY, true)
    5. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
    6. watchDog = new ConnectionWatchDog(bootstrap, host, port, true);
    7. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    8. @Override
    9. protected void initChannel(SocketChannel ch) throws Exception {
    10. ch.pipeline().addLast(watchDog);
    11. ch.pipeline().addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
    12. ch.pipeline().addLast(new HeartbeatKeeper());
    13. ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
    14. ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
    15. ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
    16. ch.pipeline().addLast(new ClientDemoHandler());
    17. }
    18. });
    19. // 这里如果第一次连接不成功也可以尝试多次连接
    20. bootstrap.connect(host, port);
    21. }

    DEMO:
    https://github.com/lesliebeijing/Netty-Demo

    基于 Netty 写的一个简单的推送 DEMO,可用在手机端推送
    https://github.com/lesliebeijing/EncPush

    Netty 客户端用在 Android 中也很稳定,我们的物联网项目Android和后台都是用的 Netty。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/242059
推荐阅读
相关标签
  

闽ICP备14008679号