当前位置:   article > 正文

netty入门(十六)netty心跳监测机制_netty客户端发送心跳

netty客户端发送心跳

1.引入

在 TCP 保持长连接的过程中,可能会出现断网等网络异常出现,异常发生的时候, client 与 server 之间如果没有交互的话,它们是无法发现对方已经掉线。

2.工作原理

在 client 与 server 之间,一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器就会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互。所以, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性。

TCP 实际上自带的就有长连接选项,本身也有心跳包机制,也就是 TCP 的选项:SO_KEEPALIVE。但是,TCP 协议层面的长连接灵活性不够。所以,一般情况下我们都是在应用层协议上实现自定义心跳机制的,也就是在 Netty 层面通过编码实现。通过 Netty 实现心跳机制的话,核心类是 IdleStateHandler 。

3.实现

在 Netty中, 实现心跳机制的关键是 IdleStateHandler

  1. public IdleStateHandler(
  2. int readerIdleTimeSeconds,
  3. int writerIdleTimeSeconds,
  4. int allIdleTimeSeconds) {
  5. this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
  6. TimeUnit.SECONDS);
  7. }

参数的含义:

  • readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.

  • writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.

  • allIdleTimeSeconds: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

这三个参数默认的时间单位是秒。

4.案例

需求:

(1)编写一个 Netty 心跳检测机制案例,当服务器超过3秒没有读时,就提示读空闲;

(2)当服务器超过5秒没有写操作时,就提示写空闲;

(3)实现当服务器超过7秒没有读或写操作时,就提示读写空闲。

服务器端代码:

  1. /**
  2. * 服务器端
  3. */
  4. public class MyServer {
  5. public static void main(String[] args) throws InterruptedException {
  6. // 创建两个线程组
  7. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
  8. NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
  9. try {
  10. ServerBootstrap serverBootstrap = new ServerBootstrap();
  11. serverBootstrap.group(bossGroup, workerGroup)
  12. .channel(NioServerSocketChannel.class)
  13. .handler(new LoggingHandler(LogLevel.INFO)) // 在bossgroup增加一个日志处理器
  14. .childHandler(new ChannelInitializer<SocketChannel>() {
  15. @Override
  16. protected void initChannel(SocketChannel ch) throws Exception {
  17. ChannelPipeline pipeline = ch.pipeline();
  18. // 加入 netty 提供的 IdleStateHandler
  19. /**
  20. * 说明
  21. * 1. IdleStateHandler 是 netty 提供的处理空闲状态的处理器
  22. * 2. readerIdleTime:表示多长时间没有读,就会发送一个心跳检测包,检测是否还是连接状态
  23. * 3. writerIdleTime:表示多长时间没有写,就会发送一个心跳检测包,检测是否还是连接状态
  24. * 4. allIdleTime:表示多长时间没有读写,就会发送一个心跳检测包,检测是否还是连接状态
  25. * 5. 当 IdleStateEvent 触发后,就会传递给管道的下一个handler进行处理,通过调用(触发)下一个 handler 的userEventTrigged 方法,
  26. * 在该方法中去处理 IdleStateEvent 事件
  27. */
  28. pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
  29. // 加入一个对空闲检测进一步处理的handler(自定义)
  30. pipeline.addLast(new MyServerHandler());
  31. }
  32. });
  33. ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
  34. channelFuture.channel().closeFuture().sync();
  35. }finally {
  36. bossGroup.shutdownGracefully();
  37. workerGroup.shutdownGracefully();
  38. }
  39. }
  40. }
  41. /**
  42. * 心跳处理器
  43. */
  44. public class MyServerHandler extends ChannelInboundHandlerAdapter {
  45. /**
  46. *
  47. * @param ctx 上下文
  48. * @param evt 事件
  49. * @throws Exception 异常
  50. */
  51. @Override
  52. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  53. if(evt instanceof IdleStateEvent){
  54. // 将 evt 向下转型
  55. IdleStateEvent event = (IdleStateEvent) evt;
  56. String eventType = null;
  57. switch (event.state()){
  58. case READER_IDLE:
  59. eventType = "读空闲";
  60. break;
  61. case WRITER_IDLE:
  62. eventType = "写空闲";
  63. break;
  64. case ALL_IDLE:
  65. eventType = "读写空闲";
  66. break;
  67. }
  68. System.out.println(ctx.channel().remoteAddress() + "--超时事件发生-->" + eventType);
  69. System.out.println("服务器做相应处理......");
  70. }
  71. }
  72. }

客户端代码:

  1. /**
  2. * 客户端
  3. */
  4. public class MyClient {
  5. private final String host;
  6. private final int port;
  7. public MyClient(String host, int port) {
  8. this.host = host;
  9. this.port = port;
  10. }
  11. public void run() throws InterruptedException {
  12. EventLoopGroup group = new NioEventLoopGroup();
  13. try {
  14. Bootstrap bootstrap = new Bootstrap();
  15. bootstrap.group(group)
  16. .channel(NioSocketChannel.class)
  17. .handler(new ChannelInitializer<SocketChannel>() {
  18. @Override
  19. protected void initChannel(SocketChannel ch) throws Exception {
  20. // 得到 pipeline
  21. ChannelPipeline pipeline = ch.pipeline();
  22. // 向 pipeline 加入一个解码器
  23. pipeline.addLast("decoder", new StringDecoder());
  24. // 向 pipeline 加入一个编码器
  25. pipeline.addLast("encoder", new StringEncoder());
  26. // 加入自己的业务处理 handler
  27. pipeline.addLast(new GroupChatClientHandler());
  28. }
  29. });
  30. ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
  31. // 得到channel
  32. Channel channel = channelFuture.channel();
  33. System.out.println("--------" + channel.localAddress() + "----------");
  34. // 客户端需要输入信息,创建扫描器 scanner
  35. Scanner scanner = new Scanner(System.in);
  36. while (scanner.hasNextLine()){
  37. String msg = scanner.nextLine();
  38. // 通过 channel 发送到服务器端
  39. channel.writeAndFlush(msg + "\r\n");
  40. }
  41. }finally {
  42. group.shutdownGracefully();
  43. }
  44. }
  45. public static void main(String[] args) {
  46. MyClient groupChatClient = new MyClient("127.0.0.1", 7000);
  47. try {
  48. groupChatClient.run();
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. }
  54. public class MyClientHandler extends SimpleChannelInboundHandler<String> {
  55. @Override
  56. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  57. System.out.println(msg.trim());
  58. }
  59. }

测试:先启动服务器端,在启动客户端,测试结果如下。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/242276
推荐阅读
相关标签
  

闽ICP备14008679号