当前位置:   article > 正文

netty-发起tcp长连接(包含客户端和服务端)_netty 连接工具

netty 连接工具

Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持。

Netty是对JDK自带的NIO的API进行封装,具有高并发,高性能等优点。

项目中经常用到netty实现服务器与设备的通信,先写服务端代码:

  1. @Slf4j
  2. @Component
  3. public class NettyServerBootstrap {
  4. private Channel serverChannel;
  5. private static final int DEFAULT_PORT = 60782;
  6. //bossGroup只是处理连接请求
  7. private static EventLoopGroup bossGroup = null;
  8. //workGroup处理非连接请求,如果牵扯到数据量处理业务非常耗时的可以再单独新建一个eventLoopGroup,并在childHandler初始化的时候添加到pipeline绑定
  9. private static EventLoopGroup workGroup = null;
  10. /**
  11. * 启动Netty服务
  12. *
  13. * @return 启动结果
  14. */
  15. @PostConstruct
  16. public boolean start() {
  17. bossGroup = new NioEventLoopGroup();
  18. workGroup = new NioEventLoopGroup();
  19. //创建服务端启动对象
  20. ServerBootstrap bootstrap = new ServerBootstrap();
  21. try {
  22. //使用链式编程来设置
  23. bootstrap.group(bossGroup, workGroup)//设置两个线程组
  24. //使用NioSocketChannel作为服务器的通道实现
  25. .channel(NioServerSocketChannel.class)
  26. //设置线程队列得到的连接数
  27. .option(ChannelOption.SO_BACKLOG, 1024)
  28. //设置保持活动连接状态
  29. .childOption(ChannelOption.SO_KEEPALIVE, true)
  30. //设置处理器 WorkerGroup 的 EvenLoop 对应的管道设置处理器
  31. .childHandler(new ChannelInitializer<Channel>() {
  32. @Override
  33. protected void initChannel(Channel ch){
  34. log.info("--------------有客户端连接");
  35. ch.pipeline().addLast(new StringDecoder());
  36. ch.pipeline().addLast(new StringEncoder());
  37. ch.pipeline().addLast(new NettyServerHandler());
  38. }
  39. });
  40. //绑定端口, 同步等待成功;
  41. ChannelFuture future = bootstrap.bind(DEFAULT_PORT).sync();
  42. log.info("netty服务启动成功,ip:{},端口:{}", InetAddress.getLocalHost().getHostAddress(), DEFAULT_PORT);
  43. serverChannel = future.channel();
  44. ThreadUtil.execute(() -> {
  45. //等待服务端监听端口关闭
  46. try {
  47. future.channel().closeFuture().sync();
  48. log.info("netty服务正常关闭成功,ip:{},端口:{}", InetAddress.getLocalHost().getHostAddress(), DEFAULT_PORT);
  49. } catch (InterruptedException | UnknownHostException e) {
  50. e.printStackTrace();
  51. } finally {
  52. shutdown();
  53. }
  54. });
  55. } catch (Exception e) {
  56. e.printStackTrace();
  57. log.error("netty服务异常,异常原因:{}", e.getMessage());
  58. return false;
  59. }
  60. return true;
  61. }
  62. /**
  63. * 关闭当前server
  64. */
  65. public boolean close() {
  66. if (serverChannel != null) {
  67. serverChannel.close();//关闭服务
  68. try {
  69. //保险起见
  70. serverChannel.closeFuture().sync();
  71. } catch (InterruptedException e) {
  72. e.printStackTrace();
  73. return false;
  74. } finally {
  75. shutdown();
  76. serverChannel = null;
  77. }
  78. }
  79. return true;
  80. }
  81. /**
  82. * 优雅关闭
  83. */
  84. private void shutdown() {
  85. workGroup.shutdownGracefully();
  86. bossGroup.shutdownGracefully();
  87. }
  88. }

服务端处理类代码:

  1. @Slf4j
  2. public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
  3. /**
  4. * 处理读取到的msg
  5. *
  6. * @param ctx 上下文
  7. * @param msg 数据
  8. */
  9. @Override
  10. protected void channelRead0(ChannelHandlerContext ctx,String msg) throws Exception {
  11. System.out.println("服务端收到的消息--------"+msg);
  12. ctx.channel().writeAndFlush("ok");
  13. }
  14. /**
  15. * 断开连接
  16. *
  17. * @param ctx 傻瓜下文
  18. */
  19. @Override
  20. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  21. ChannelId channelId = ctx.channel().id();
  22. log.info("客户端id:{},断开连接,ip:{}", channelId, ctx.channel().remoteAddress());
  23. super.handlerRemoved(ctx);
  24. }
  25. }

接下来模拟客户端:

  1. @Configuration
  2. @Component
  3. public class TianmiaoClient {
  4. private static String ip;
  5. private static int port ;
  6. @Value("${tianmiao.nettyServer.ip}")
  7. public void setIp(String ip) {
  8. this.ip = ip;
  9. }
  10. @Value("${tianmiao.nettyServer.port}")
  11. public void setPort(int port) {
  12. this.port = port;
  13. }
  14. /**
  15. * 服务类
  16. */
  17. private static Bootstrap bootstrap=null;
  18. /**
  19. * 初始化 项目启动后自动初始化
  20. */
  21. @PostConstruct
  22. public void init() {
  23. //worker
  24. EventLoopGroup worker = new NioEventLoopGroup();
  25. bootstrap = new Bootstrap();
  26. //设置线程池
  27. bootstrap.group(worker);
  28. //设置socket工厂
  29. bootstrap.channel(NioSocketChannel.class);
  30. //设置管道
  31. bootstrap.handler(new ChannelInitializer<Channel>() {
  32. @Override
  33. protected void initChannel(Channel ch) throws Exception {
  34. ch.pipeline().addLast(new StringDecoder());
  35. ch.pipeline().addLast(new StringEncoder());
  36. ch.pipeline().addLast(new TianmiaoClientHandler());
  37. }
  38. });
  39. }
  40. /**
  41. * 获取会话 (获取或者创建一个会话)
  42. */
  43. public Channel createChannel() {
  44. try {
  45. Channel channel = bootstrap.connect( ip, port).sync().channel();
  46. return channel;
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. }
  50. return null;
  51. }
  52. }

客户端处理类代码

  1. public class TianmiaoClientHandler extends SimpleChannelInboundHandler<String> {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
  4. System.out.println("服务端发过来的消息:"+s);
  5. }
  6. @Override
  7. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  8. System.out.println(".......................tcp断开连接.........................");
  9. //移除
  10. Channel channel = ctx.channel();
  11. channel.close().sync();
  12. super.channelInactive(ctx);
  13. }
  14. }

管理客户端channel的一个工具类:

  1. public class TianmiaoChannelManager {
  2. /**
  3. * 在线会话(存储注册成功的会话)
  4. */
  5. private static final ConcurrentHashMap<String, Channel> onlineChannels = new ConcurrentHashMap<>();
  6. /**
  7. * 加入
  8. *
  9. * @param mn
  10. * @param channel
  11. * @return
  12. */
  13. public static boolean putChannel(String mn, Channel channel) {
  14. if (!onlineChannels.containsKey(mn)) {
  15. boolean success = onlineChannels.putIfAbsent(mn, channel) == null ? true : false;
  16. return success;
  17. }
  18. return false;
  19. }
  20. /**
  21. * 移除
  22. *
  23. * @param mn
  24. */
  25. public static Channel removeChannel(String mn) {
  26. return onlineChannels.remove(mn);
  27. }
  28. /**
  29. * 获取Channel
  30. *
  31. * @param mn
  32. * @return
  33. */
  34. public static Channel getChannel(String mn) {
  35. // 获取一个可用的会话
  36. Channel channel = onlineChannels.get(mn);
  37. if (channel != null) {
  38. // 连接有可能是断开,加入已经断开连接了,我们需要进行尝试重连
  39. if (!channel.isActive()) {
  40. //先移除之前的连接
  41. removeChannel(mn);
  42. return null;
  43. }
  44. }
  45. return channel;
  46. }
  47. /**
  48. * 发送消息[自定义协议]
  49. *
  50. * @param <T>
  51. * @param mn
  52. * @param msg
  53. */
  54. public static <T> void sendMessage(String mn, String msg) {
  55. Channel channel = onlineChannels.get(mn);
  56. if (channel != null && channel.isActive()) {
  57. channel.writeAndFlush(msg);
  58. }
  59. }
  60. /**
  61. * 发送消息[自定义协议]
  62. *
  63. * @param <T>
  64. * @param msg
  65. */
  66. public static <T> void sendChannelMessage(Channel channel, String msg) {
  67. if (channel != null && channel.isActive()) {
  68. channel.writeAndFlush(msg);
  69. }
  70. }
  71. /**
  72. * 关闭连接
  73. *
  74. * @return
  75. */
  76. public static void closeChannel(String mn) {
  77. onlineChannels.get(mn).close();
  78. }
  79. }

最后是客户端使用方法:

  1. /**
  2. * 发送数据包
  3. * @param key
  4. */
  5. public static void tianmiaoData(String key, String data) {
  6. Channel channel = TianmiaoChannelManager.getChannel(key);
  7. //将通道存入
  8. if(channel==null){
  9. TianmiaoClient client = new TianmiaoClient();
  10. channel = client.createChannel();
  11. TianmiaoChannelManager.putChannel(key, channel);
  12. }
  13. if (channel != null && channel.isActive()) {
  14. //发送数据
  15. channel.writeAndFlush(data);
  16. System.out.println("-------------天苗转发数据成功-------------");
  17. }
  18. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/64992
推荐阅读
相关标签
  

闽ICP备14008679号