当前位置:   article > 正文

netty的心跳检测与断线重连_new idlestatehandler(10,0,0, timeunit.seconds)

new idlestatehandler(10,0,0, timeunit.seconds)

服务端代码

NettyServer
  1. package com.example.netty.netty2;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import lombok.extern.slf4j.Slf4j;
  9. @Slf4j
  10. public class NettyServer {
  11. private NettyServerChannelInitializer serverChannelInitializer = null;
  12. private int port = 8888;
  13. public void bind() throws Exception {
  14. //配置服务端的NIO线程组
  15. EventLoopGroup bossGroup = new NioEventLoopGroup();
  16. EventLoopGroup workerGroup = new NioEventLoopGroup();
  17. try {
  18. serverChannelInitializer = new NettyServerChannelInitializer();
  19. ServerBootstrap b = new ServerBootstrap();
  20. b.group(bossGroup, workerGroup)
  21. .channel(NioServerSocketChannel.class)
  22. //保持长连接
  23. .childOption(ChannelOption.SO_KEEPALIVE,true)
  24. .option(ChannelOption.SO_BACKLOG, 1024)
  25. .childHandler(serverChannelInitializer);
  26. //绑定端口,同步等待成功
  27. ChannelFuture f = b.bind(port).sync();
  28. //等待服务器监听端口关闭
  29. f.channel().closeFuture().sync();
  30. } finally {
  31. //释放线程池资源
  32. bossGroup.shutdownGracefully();
  33. workerGroup.shutdownGracefully();
  34. }
  35. }
  36. public static void main(String[] args) throws Exception {
  37. new NettyServer().bind();
  38. }
  39. }

自定义channel类  里面涉及有拆包,粘包,编解码以及其他自定义操作(心跳检测)

  1. package com.example.netty.netty2;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.ChannelPipeline;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  7. import io.netty.handler.codec.string.StringDecoder;
  8. import io.netty.handler.codec.string.StringEncoder;
  9. import io.netty.handler.timeout.IdleStateHandler;
  10. import java.util.concurrent.TimeUnit;
  11. public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
  12. private NettyServerHandler handler ;
  13. @Override
  14. protected void initChannel(SocketChannel socketChannel) throws Exception {
  15. ChannelPipeline pipeline = socketChannel.pipeline();
  16. //解决TCP粘包拆包的问题,以特定的字符结尾($_)
  17. pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer("$_".getBytes())));
  18. //字符串解码和编码
  19. pipeline.addLast("decoder", new StringDecoder());
  20. pipeline.addLast("encoder", new StringEncoder());
  21. pipeline.addLast(new IdleStateHandler(3,0,0,TimeUnit.SECONDS));
  22. //服务器的逻辑
  23. handler = new NettyServerHandler();
  24. pipeline.addLast("handler", handler);
  25. }
  26. }
NettyServerHandler 心跳检测
  1. package com.example.netty.netty2;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. import io.netty.handler.timeout.IdleState;
  5. import io.netty.handler.timeout.IdleStateEvent;
  6. import lombok.extern.slf4j.Slf4j;
  7. @Slf4j
  8. public class NettyServerHandler extends SimpleChannelInboundHandler {
  9. /**
  10. * 心跳丢失次数
  11. */
  12. private int counter = 0;
  13. @Override
  14. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  15. System.out.println("Client say : " + msg.toString());
  16. //重置心跳丢失次数
  17. counter = 0;
  18. }
  19. @Override
  20. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  21. log.info("服务端被 : " + ctx.channel().remoteAddress().toString()+ " 连接上了 !");
  22. super.channelActive(ctx);
  23. }
  24. @Override
  25. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  26. if (evt instanceof IdleStateEvent) {
  27. IdleStateEvent event = (IdleStateEvent) evt;
  28. if (event.state().equals(IdleState.READER_IDLE)){
  29. // 空闲40s之后触发 (心跳包丢失)
  30. if (counter >= 3) {
  31. // 连续丢失3个心跳包 (断开连接)
  32. ctx.channel().close().sync();
  33. log.error("已与"+ctx.channel().remoteAddress()+"断开连接");
  34. System.out.println("已与"+ctx.channel().remoteAddress()+"断开连接");
  35. } else {
  36. counter++;
  37. log.debug(ctx.channel().remoteAddress() + "丢失了第 "+ctx.channel().remoteAddress() + counter + " 个心跳包");
  38. System.out.println("丢失了"+ctx.channel().remoteAddress()+"第 " + counter + " 个心跳包");
  39. }
  40. }
  41. }
  42. }
  43. }

客户端

NettyClient
  1. package com.example.netty.netty2;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelFutureListener;
  6. import io.netty.channel.ChannelOption;
  7. import io.netty.channel.EventLoop;
  8. import io.netty.channel.EventLoopGroup;
  9. import io.netty.channel.nio.NioEventLoopGroup;
  10. import io.netty.channel.socket.nio.NioSocketChannel;
  11. import java.util.concurrent.TimeUnit;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.springframework.beans.factory.annotation.Value;
  14. @Slf4j
  15. public class NettyClient {
  16. private String host;
  17. private int port;
  18. private static Channel channel;
  19. public NettyClient(){
  20. }
  21. public NettyClient(String host, int port) {
  22. this.host = host;
  23. this.port = port;
  24. }
  25. public void start() {
  26. EventLoopGroup group = new NioEventLoopGroup();
  27. try {
  28. Bootstrap b = new Bootstrap();
  29. b.group(group)
  30. .option(ChannelOption.SO_KEEPALIVE,true)
  31. .channel(NioSocketChannel.class)
  32. .handler(new ClientChannelInitializer(host,port));
  33. ChannelFuture f = b.connect(host,port);
  34. // //断线重连
  35. f.addListener(new ChannelFutureListener() {
  36. @Override
  37. public void operationComplete(ChannelFuture channelFuture) throws Exception {
  38. if (!channelFuture.isSuccess()) {
  39. final EventLoop loop = channelFuture.channel().eventLoop();
  40. loop.schedule(new Runnable() {
  41. @Override
  42. public void run() {
  43. log.error("服务端链接不上,开始重连操作...");
  44. System.err.println("服务端链接不上,开始重连操作...");
  45. start();
  46. }
  47. }, 1L, TimeUnit.SECONDS);
  48. } else {
  49. channel = channelFuture.channel();
  50. log.info("服务端链接成功...");
  51. System.err.println("服务端链接成功...");
  52. }
  53. }
  54. });
  55. }catch (Exception e){
  56. e.printStackTrace();
  57. }
  58. }
  59. public static void main(String[] args) {
  60. new NettyClient ("127.0.0.1",8888).start();
  61. }
  62. }
ClientChannelInitializer
  1. package com.example.netty.netty2;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.ChannelPipeline;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  7. import io.netty.handler.codec.string.StringDecoder;
  8. import io.netty.handler.codec.string.StringEncoder;
  9. import io.netty.handler.timeout.IdleStateHandler;
  10. import java.util.concurrent.TimeUnit;
  11. public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
  12. private String host;
  13. private int port;
  14. public ClientChannelInitializer( String host, int port) {
  15. this.host = host;
  16. this.port = port;
  17. }
  18. @Override
  19. protected void initChannel(SocketChannel socketChannel) throws Exception {
  20. ChannelPipeline pipeline = socketChannel.pipeline();
  21. //解决TCP粘包拆包的问题,以特定的字符结尾($_)
  22. pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer("$_".getBytes())));
  23. //字符串解码和编码
  24. pipeline.addLast("decoder", new StringDecoder());
  25. pipeline.addLast("encoder", new StringEncoder());
  26. //心跳检测
  27. pipeline.addLast(new IdleStateHandler(0,10,0,TimeUnit.SECONDS));
  28. //客户端的逻辑
  29. pipeline.addLast("handler", new NettyClientHandler(host,port));
  30. }
  31. }
NettyClientHandler
  1. package com.example.netty.netty2;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.EventLoop;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import io.netty.handler.timeout.IdleState;
  6. import io.netty.handler.timeout.IdleStateEvent;
  7. import java.util.concurrent.TimeUnit;
  8. import lombok.extern.slf4j.Slf4j;
  9. @Slf4j
  10. public class NettyClientHandler extends SimpleChannelInboundHandler {
  11. private String host;
  12. private int port;
  13. private NettyClient nettyClinet;
  14. private String tenantId;
  15. public NettyClientHandler(String host, int port) {
  16. this.host = host;
  17. this.port = port;
  18. nettyClinet = new NettyClient(host,port);
  19. }
  20. @Override
  21. protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o)
  22. throws Exception {
  23. System.out.println("Server say : " + o.toString());
  24. }
  25. @Override
  26. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  27. System.out.println("通道已连接!!");
  28. }
  29. @Override
  30. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  31. System.out.println("断线了。。。。。。");
  32. //使用过程中断线重连
  33. final EventLoop eventLoop = ctx.channel().eventLoop();
  34. eventLoop.schedule(new Runnable() {
  35. @Override
  36. public void run() {
  37. nettyClinet.start();
  38. }
  39. }, 1, TimeUnit.SECONDS);
  40. ctx.fireChannelInactive();
  41. }
  42. @Override
  43. public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
  44. throws Exception {
  45. if (evt instanceof IdleStateEvent) {
  46. IdleStateEvent event = (IdleStateEvent) evt;
  47. if (event.state().equals(IdleState.READER_IDLE)) {
  48. System.out.println("READER_IDLE");
  49. } else if (event.state().equals(IdleState.WRITER_IDLE)) {
  50. /**发送心跳,保持长连接*/
  51. String s = "ping$_";
  52. ctx.channel().writeAndFlush(s);
  53. log.debug("心跳发送成功!");
  54. System.out.println("心跳发送成功!");
  55. } else if (event.state().equals(IdleState.ALL_IDLE)) {
  56. System.out.println("ALL_IDLE");
  57. }
  58. }
  59. super.userEventTriggered(ctx, evt);
  60. }
  61. }

心跳检测类里面关键代码IdleStateHandler

readerIdleTime:为读超时时间(即测试端一定时间内未接受到被测试端消息)
writerIdleTime:为写超时时间(即测试端一定时间内向被测试端发送消息)
allIdleTime:所有类型的超时时间
比如在这里,服务端每隔3S检测一下,若超过3次,就自动断开连接,
客户端是每个10S中发送一个心跳包到服务端。
so,如非异常情况,会一直保持通信,因为服务端在将要端口连接的时候,这个时候客户端发送了一个心跳包。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/242162
推荐阅读
相关标签
  

闽ICP备14008679号