当前位置:   article > 正文

springboot集成netty开发服务端和客户端_spring netty client

spring netty client

maven里面引入netty依赖

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>4.1.51.Final</version>
  5. </dependency>

创建NettyServer类

  1. package com.NettyServer.service;
  2. import com.NettyServer.common.ServerChannelInitializer;
  3. import io.netty.bootstrap.ServerBootstrap;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelOption;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springframework.stereotype.Component;
  12. import javax.annotation.PostConstruct;
  13. import javax.annotation.PreDestroy;
  14. import java.net.InetSocketAddress;
  15. @Component
  16. @Slf4j
  17. public class NettyServer {
  18. /**
  19. * boss 线程组用于TCP处理连接工作
  20. */
  21. private final EventLoopGroup boss = new NioEventLoopGroup();
  22. /**
  23. * work 线程组用于IO数据处理
  24. */
  25. private final EventLoopGroup work = new NioEventLoopGroup();
  26. @Value("${netty.port}")
  27. private Integer port;
  28. /**
  29. * 启动Netty Server
  30. */
  31. @PostConstruct
  32. public void start() throws InterruptedException {
  33. ServerBootstrap bootstrap = new ServerBootstrap();
  34. bootstrap.group(boss, work)
  35. // 指定Channel
  36. .channel(NioServerSocketChannel.class)
  37. //使用指定的端口设置套接字地址
  38. .localAddress(new InetSocketAddress(port))
  39. //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
  40. .option(ChannelOption.SO_BACKLOG, 1024)
  41. //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
  42. .childOption(ChannelOption.SO_KEEPALIVE, true)
  43. //将小的数据包包装成更大的帧进行传送,提高网络的负载
  44. .childOption(ChannelOption.TCP_NODELAY, true)
  45. .childHandler(new ServerChannelInitializer());
  46. ChannelFuture future = bootstrap.bind().sync();
  47. if (future.isSuccess()) {
  48. log.info("已启动 Netty Server");
  49. }
  50. }
  51. @PreDestroy
  52. public void destory() throws InterruptedException {
  53. boss.shutdownGracefully().sync();
  54. work.shutdownGracefully().sync();
  55. log.info("关闭Netty");
  56. }

创建ServerChannelInitializer类

  1. package com.NettyServer.common;
  2. import com.NettyServer.service.NettyServerHandler;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.socket.SocketChannel;
  5. import io.netty.handler.codec.serialization.ClassResolvers;
  6. import io.netty.handler.codec.serialization.ObjectDecoder;
  7. import io.netty.handler.codec.serialization.ObjectEncoder;
  8. public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
  9. @Override
  10. protected void initChannel(SocketChannel socketChannel) {
  11. //添加编解码
  12. socketChannel.pipeline().addLast(new ObjectDecoder(10 * 1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
  13. socketChannel.pipeline().addLast(new ObjectEncoder());
  14. socketChannel.pipeline().addLast(new NettyServerHandler());
  15. }
  16. }

创建NettyServerHandler类

  1. package com.NettyServer.service;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelId;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.handler.timeout.IdleState;
  6. import io.netty.handler.timeout.IdleStateEvent;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. import javax.annotation.PostConstruct;
  11. import java.net.InetSocketAddress;
  12. import java.util.concurrent.ConcurrentHashMap;
  13. @Slf4j
  14. @Component
  15. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  16. private static NettyServerHandler handler;
  17. /**
  18. * 管理一个全局map,保存连接进服务端的通道数量
  19. */
  20. private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
  21. //@Autowired
  22. //private IBaseAttachmentService attachmentService;
  23. @PostConstruct
  24. public void init() {
  25. handler = this;
  26. }
  27. /**
  28. * @param ctx
  29. * @author xxx on 2021/4/28 16:10
  30. * @DESCRIPTION: 有客户端连接服务器会触发此函数
  31. * @return: void
  32. */
  33. @Override
  34. public void channelActive(ChannelHandlerContext ctx) {
  35. InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
  36. String clientIp = insocket.getAddress().getHostAddress();
  37. int clientPort = insocket.getPort();
  38. //获取连接通道唯一标识
  39. ChannelId channelId = ctx.channel().id();
  40. //如果map中不包含此连接,就保存连接
  41. if (CHANNEL_MAP.containsKey(channelId)) {
  42. log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
  43. } else {
  44. //保存连接
  45. CHANNEL_MAP.put(channelId, ctx);
  46. log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
  47. log.info("连接通道数量: " + CHANNEL_MAP.size());
  48. }
  49. }
  50. /**
  51. * @param ctx
  52. * @author xxx on 2021/4/28 16:10
  53. * @DESCRIPTION: 有客户端终止连接服务器会触发此函数
  54. * @return: void
  55. */
  56. @Override
  57. public void channelInactive(ChannelHandlerContext ctx) {
  58. InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
  59. String clientIp = insocket.getAddress().getHostAddress();
  60. ChannelId channelId = ctx.channel().id();
  61. //包含此客户端才去删除
  62. if (CHANNEL_MAP.containsKey(channelId)) {
  63. //删除连接
  64. CHANNEL_MAP.remove(channelId);
  65. log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
  66. log.info("连接通道数量: " + CHANNEL_MAP.size());
  67. }
  68. }
  69. /**
  70. * @param ctx
  71. * @author xxx on 2021/4/28 16:10
  72. * @DESCRIPTION: 有客户端发消息会触发此函数
  73. * @return: void
  74. */
  75. @Override
  76. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  77. log.info("【" + ctx.channel().id() + "】" + " :" + msg.toString());
  78. //可以在这个地方写业务处理逻辑入库或者什么的...................................
  79. // 结果写到客户端
  80. /**
  81. * 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数
  82. *
  83. */
  84. //响应客户端
  85. this.channelWrite(ctx.channel().id(), msg);
  86. }
  87. /**
  88. * @param msg 需要发送的消息内容
  89. * @param channelId 连接通道唯一id
  90. * @author xxx 2021/05/14 16:10
  91. * @DESCRIPTION: 服务端给客户端发送消息
  92. * @return: void
  93. */
  94. public void channelWrite(ChannelId channelId, Object msg) throws Exception {
  95. ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
  96. if (ctx == null) {
  97. log.info("通道【" + channelId + "】不存在");
  98. return;
  99. }
  100. if (msg == null || msg == "") {
  101. log.info("服务端响应空的消息");
  102. return;
  103. }
  104. //将客户端的信息直接返回写入ctx
  105. ctx.write(msg);
  106. //刷新缓存区
  107. ctx.flush();
  108. }
  109. @Override
  110. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  111. String socketString = ctx.channel().remoteAddress().toString();
  112. if (evt instanceof IdleStateEvent) {
  113. IdleStateEvent event = (IdleStateEvent) evt;
  114. if (event.state() == IdleState.READER_IDLE) {
  115. log.info("Client: " + socketString + " READER_IDLE 读超时");
  116. ctx.disconnect();
  117. } else if (event.state() == IdleState.WRITER_IDLE) {
  118. log.info("Client: " + socketString + " WRITER_IDLE 写超时");
  119. ctx.disconnect();
  120. } else if (event.state() == IdleState.ALL_IDLE) {
  121. log.info("Client: " + socketString + " ALL_IDLE 总超时");
  122. ctx.disconnect();
  123. }
  124. }
  125. }
  126. /**
  127. * @param ctx
  128. * @author xxx on 2021/4/28 16:10
  129. * @DESCRIPTION: 发生异常会触发此函数
  130. * @return: void
  131. */
  132. @Override
  133. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  134. System.out.println();
  135. ctx.close();
  136. log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
  137. //cause.printStackTrace();
  138. }
  139. }

下面贴出来客户端代码

创建NettyClient类

  1. package com.NettyClient.service;
  2. import com.NettyClient.common.NettyClientInitializer;
  3. import io.netty.bootstrap.Bootstrap;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelFutureListener;
  6. import io.netty.channel.ChannelOption;
  7. import io.netty.channel.EventLoopGroup;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springframework.stereotype.Component;
  12. import javax.annotation.PostConstruct;
  13. @Slf4j
  14. @Component
  15. public class NettyClient {
  16. static final String HOST = "127.0.0.1";
  17. static final int PORT = 8899;
  18. private EventLoopGroup group;
  19. private Bootstrap b;
  20. private ChannelFuture cf;
  21. private NettyClientInitializer nettyClientInitializer;
  22. public NettyClient() {
  23. nettyClientInitializer = new NettyClientInitializer();
  24. group = new NioEventLoopGroup();
  25. b = new Bootstrap();
  26. b.group(group)
  27. .channel(NioSocketChannel.class)
  28. .handler(nettyClientInitializer);
  29. }
  30. public void connect() {
  31. try {
  32. this.cf = b.connect(HOST, PORT).sync();
  33. } catch (InterruptedException e) {
  34. log.error("客户端连接服务端异常:" + e);
  35. }
  36. }
  37. public ChannelFuture getChannelFuture() {
  38. if (this.cf == null) {
  39. this.connect();
  40. }
  41. if (!this.cf.channel().isActive()) {
  42. this.connect();
  43. }
  44. return this.cf;
  45. }
  46. public void close() {
  47. try {
  48. this.cf.channel().closeFuture().sync();
  49. this.group.shutdownGracefully();
  50. } catch (InterruptedException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. public void setMessage(String msg) throws InterruptedException {
  55. ChannelFuture cf = this.getChannelFuture();
  56. cf.channel().writeAndFlush(msg);
  57. }
  58. public static void main(String[] args) {
  59. try {
  60. } catch (Exception e) {
  61. log.error("异常:" + e);
  62. }
  63. }
  64. }

创建NettyClientHandler类

  1. package com.NettyClient.service;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelInboundHandlerAdapter;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.stereotype.Component;
  6. @Slf4j
  7. public class NettyClientHandler extends ChannelInboundHandlerAdapter {
  8. private String result;
  9. @Override
  10. public void channelActive(ChannelHandlerContext ctx) {
  11. }
  12. @Override
  13. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  14. //if (!ObjectUtils.isEmpty(msg)) {
  15. result = (String) msg;
  16. //}
  17. }
  18. @Override
  19. public void channelReadComplete(ChannelHandlerContext ctx) {
  20. ctx.flush();
  21. }
  22. @Override
  23. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  24. cause.printStackTrace();
  25. ctx.close();
  26. }
  27. }

创建NettyClientInitializer类

  1. package com.NettyClient.common;
  2. import com.NettyClient.service.NettyClientHandler;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.socket.SocketChannel;
  5. import io.netty.handler.codec.serialization.ClassResolvers;
  6. import io.netty.handler.codec.serialization.ObjectDecoder;
  7. import io.netty.handler.codec.serialization.ObjectEncoder;
  8. import io.netty.handler.timeout.ReadTimeoutHandler;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
  11. @Override
  12. protected void initChannel(SocketChannel socketChannel) throws Exception {
  13. socketChannel.pipeline().addLast(new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
  14. socketChannel.pipeline().addLast(new ObjectEncoder());
  15. socketChannel.pipeline().addLast(new NettyClientHandler());
  16. }
  17. }

客户端直接可以在业务代码里面调用发送数据到服务端就行了、我简单写了一个定时器测试发送的例子

  1. package com.NettyClient;
  2. import com.NettyClient.service.NettyClient;
  3. import lombok.RequiredArgsConstructor;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.scheduling.annotation.Async;
  7. import org.springframework.scheduling.annotation.EnableAsync;
  8. import org.springframework.scheduling.annotation.Scheduled;
  9. import org.springframework.stereotype.Component;
  10. import java.text.SimpleDateFormat;
  11. import java.util.Date;
  12. @Component
  13. @Slf4j
  14. @EnableAsync
  15. public class TaskManager {
  16. @Autowired
  17. private NettyClient nettyClient;
  18. @Async
  19. @Scheduled(cron = "0 */1 * * * ?")
  20. public void test() throws InterruptedException {
  21. try {
  22. SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  23. nettyClient.setMessage("客户端发送此数据" + df.format(new Date()));
  24. } catch (Exception e) {
  25. log.error("异常:" + e);
  26. }
  27. }
  28. @Async
  29. @Scheduled(cron = "0 */1 * * * ?")
  30. public void test2() throws InterruptedException {
  31. try {
  32. SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  33. nettyClient.setMessage("客户端2发送此数据" + df.format(new Date()));
  34. } catch (Exception e) {
  35. log.error("异常:" + e);
  36. }
  37. }
  38. }

 

 

 

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/185425
推荐阅读
相关标签
  

闽ICP备14008679号