当前位置:   article > 正文

SpringBoot搭建Netty+Socket+Tcp服务端和客户端_springboot的soke和netty

springboot的soke和netty

一: 服务端 

1: 启动类

  1. package com.idc.config.netty;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.springframework.stereotype.Component;
  10. import java.net.InetSocketAddress;
  11. /**
  12. * @description: netty服务启动类
  13. **/
  14. @Slf4j
  15. @Component
  16. public class NettyServer {
  17. public void start(InetSocketAddress address) {
  18. //配置服务端的NIO线程组
  19. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  20. EventLoopGroup workerGroup = new NioEventLoopGroup();
  21. try {
  22. ServerBootstrap bootstrap = new ServerBootstrap()
  23. .group(bossGroup, workerGroup) // 绑定线程池
  24. .channel(NioServerSocketChannel.class)
  25. .localAddress(address)
  26. .childHandler(new NettyServerChannelInitializer())//编码解码
  27. .option(ChannelOption.SO_BACKLOG, 128); //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
  28. // .childOption(ChannelOption.SO_KEEPALIVE, true); //保持长连接,2小时无数据激活心跳机制
  29. // 绑定端口,开始接收进来的连接
  30. ChannelFuture future = bootstrap.bind(address).sync();
  31. log.info("ODF-Socket------netty服务器开始监听端口:" + address.getPort());
  32. //关闭channel和块,直到它被关闭
  33. future.channel().closeFuture().sync();
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. bossGroup.shutdownGracefully();
  37. workerGroup.shutdownGracefully();
  38. }
  39. }
  40. }

2: 处理程序

  1. package com.idc.config.netty;
  2. import com.idc.config.udpsocket.UdpServerHandler;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.socket.SocketChannel;
  5. import io.netty.handler.codec.string.StringDecoder;
  6. import io.netty.handler.codec.string.StringEncoder;
  7. import io.netty.util.CharsetUtil;
  8. /**
  9. * @description: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器
  10. **/
  11. public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
  12. @Override
  13. protected void initChannel(SocketChannel channel) throws Exception {
  14. channel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
  15. channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
  16. channel.pipeline().addLast(new NettyServerHandler());
  17. }
  18. }
  1. package com.idc.config.netty;
  2. import com.idc.common.exception.CommonRuntimeException;
  3. import com.idc.entity.odf.dto.LightingStatus;
  4. import com.idc.mapper.OdfAlarmMapper;
  5. import com.idc.mapper.OdfMapper;
  6. import io.netty.channel.ChannelHandlerContext;
  7. import io.netty.channel.ChannelId;
  8. import io.netty.channel.ChannelInboundHandlerAdapter;
  9. import io.netty.handler.timeout.IdleState;
  10. import io.netty.handler.timeout.IdleStateEvent;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springframework.stereotype.Component;
  13. import javax.annotation.Resource;
  14. import java.net.InetSocketAddress;
  15. import java.util.concurrent.ConcurrentHashMap;
  16. /**
  17. * @author wcybaonier
  18. * @description: netty服务端处理类
  19. **/
  20. @Slf4j
  21. @Component
  22. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  23. @Resource
  24. private OdfMapper odfMapper;
  25. /**
  26. * 管理一个全局map,保存连接进服务端的通道数量
  27. */
  28. private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
  29. /**
  30. * @param ctx
  31. * @DESCRIPTION: 有客户端连接服务器会触发此函数
  32. * @return: void
  33. */
  34. @Override
  35. public void channelActive(ChannelHandlerContext ctx) {
  36. InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
  37. String clientIp = insocket.getAddress().getHostAddress();
  38. int clientPort = insocket.getPort();
  39. //获取连接通道唯一标识
  40. ChannelId channelId = ctx.channel().id();
  41. System.out.println();
  42. //如果map中不包含此连接,就保存连接
  43. if (CHANNEL_MAP.containsKey(channelId)) {
  44. log.info("ODF-Socket------客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
  45. } else {
  46. //保存连接
  47. CHANNEL_MAP.put(channelId, ctx);
  48. log.info("ODF-Socket------客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
  49. log.info("ODF-Socket------连接通道数量: " + CHANNEL_MAP.size());
  50. }
  51. }
  52. /**
  53. * @param ctx
  54. * @DESCRIPTION: 有客户端终止连接服务器会触发此函数
  55. * @return: void
  56. */
  57. @Override
  58. public void channelInactive(ChannelHandlerContext ctx) {
  59. InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
  60. String clientIp = insocket.getAddress().getHostAddress();
  61. ChannelId channelId = ctx.channel().id();
  62. //包含此客户端才去删除
  63. if (CHANNEL_MAP.containsKey(channelId)) {
  64. //删除连接
  65. CHANNEL_MAP.remove(channelId);
  66. System.out.println();
  67. log.info("ODF-Socket------客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
  68. log.info("ODF-Socket------连接通道数量: " + CHANNEL_MAP.size());
  69. }
  70. }
  71. /**
  72. * @param ctx
  73. * @DESCRIPTION: 有客户端发消息会触发此函数
  74. * @return: void
  75. */
  76. @Override
  77. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  78. if (msg == null){
  79. throw new CommonRuntimeException("ODF-Socket------加载客户端报文为空,请联系厂商!");
  80. }
  81. log.info("ODF-Socket------加载客户端报文......【" + ctx.channel().id() + "】" + " :" + msg);
  82. /**
  83. * 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数
  84. * 在这里可以设置异步执行 提交任务到该channel的taskQueue 中
  85. */
  86. ctx.channel().eventLoop().execute(() -> {
  87. String msgStr = String.valueOf(msg);
  88. // 如果不包含逗号, 那么格式不对 约定格式为 : 序列号,deviceid,shelfNo,moduleNo,termNo,state
  89. if (!msgStr.contains(",")){
  90. throw new CommonRuntimeException("ODF-Socket------加载客户端报文格式不正确,请联系厂商!");
  91. }
  92. try {
  93. String[] split = msgStr.split(",");
  94. if (split.length != 6){
  95. throw new CommonRuntimeException("ODF-Socket------加载客户端报文长度不正确,请联系厂商!");
  96. }
  97. //开始修改 admin
  98. LightingStatus lightingStatus = new LightingStatus();
  99. lightingStatus.setSerialNumber(split[0]);
  100. lightingStatus.setDeviceId(split[1]);
  101. lightingStatus.setShelfNo(split[2]);
  102. lightingStatus.setModuleNo(split[3]);
  103. lightingStatus.setTermNo(split[4]);
  104. lightingStatus.setState(split[5]);
  105. int i = odfMapper.updateTermStatus(lightingStatus);
  106. log.info("ODF-Socket------亮灯状态更新条数......【" + i + "】" );
  107. } catch (Exception e) {
  108. e.printStackTrace();
  109. }
  110. });
  111. /**
  112. * 可以设置多个异步任务
  113. * 但是这个会在上面异步任务执行完之后才执行
  114. */
  115. /*ctx.channel().eventLoop().execute(new Runnable() {
  116. @Override
  117. public void run() {
  118. try {
  119. Thread.sleep(10*1000);
  120. log.info(">>>>>>>>>休眠二十秒");
  121. } catch (InterruptedException e) {
  122. e.printStackTrace();
  123. }
  124. }
  125. });*/
  126. //响应客户端
  127. log.info("ODF-Socket------服务端端返回报文......【" + ctx.channel().id() + "】" + " :" + msg);
  128. this.channelWrite(ctx.channel().id(), msg);
  129. }
  130. /**
  131. * @param msg 需要发送的消息内容
  132. * @param channelId 连接通道唯一id
  133. * @DESCRIPTION: 服务端给客户端发送消息
  134. * @return: void
  135. */
  136. public void channelWrite(ChannelId channelId, Object msg) throws Exception {
  137. ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
  138. if (ctx == null) {
  139. log.info("ODF-Socket------通道【" + channelId + "】不存在");
  140. return;
  141. }
  142. if (msg == null || msg == "") {
  143. log.info("ODF-Socket------服务端响应空的消息");
  144. return;
  145. }
  146. //将客户端的信息直接返回写入ctx
  147. ctx.write(msg);
  148. //刷新缓存区
  149. ctx.flush();
  150. }
  151. public static void main(String[] args) {
  152. System.out.println("序列号,deviceid,shelfNo,moduleNo,termNo,state".split(",").length);
  153. }
  154. @Override
  155. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  156. String socketString = ctx.channel().remoteAddress().toString();
  157. if (evt instanceof IdleStateEvent) {
  158. IdleStateEvent event = (IdleStateEvent) evt;
  159. if (event.state() == IdleState.READER_IDLE) {
  160. log.info("ODF-Socket------Client: " + socketString + " READER_IDLE 读超时");
  161. ctx.disconnect();
  162. } else if (event.state() == IdleState.WRITER_IDLE) {
  163. log.info("ODF-Socket------Client: " + socketString + " WRITER_IDLE 写超时");
  164. ctx.disconnect();
  165. } else if (event.state() == IdleState.ALL_IDLE) {
  166. log.info("ODF-Socket------Client: " + socketString + " ALL_IDLE 总超时");
  167. ctx.disconnect();
  168. }
  169. }
  170. }
  171. /**
  172. * @param ctx
  173. * @DESCRIPTION: 发生异常会触发此函数
  174. * @return: void
  175. */
  176. @Override
  177. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  178. System.out.println();
  179. ctx.close();
  180. log.info("ODF-Socket------"+ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
  181. //cause.printStackTrace();
  182. }
  183. }

3: 项目启动类

  1. package com.idc;
  2. import com.idc.config.udpsocket.UdpServer;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.mybatis.spring.annotation.MapperScan;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.boot.CommandLineRunner;
  7. import org.springframework.boot.SpringApplication;
  8. import org.springframework.boot.autoconfigure.SpringBootApplication;
  9. import org.springframework.scheduling.annotation.EnableScheduling;
  10. import javax.annotation.Resource;
  11. /**
  12. * @author wcybaonier
  13. */
  14. @MapperScan("com.idc.mapper")
  15. @SpringBootApplication
  16. @Slf4j
  17. @EnableScheduling
  18. public class IdcPduApplication implements CommandLineRunner {
  19. @Value("${netty.host}")
  20. private String host;
  21. @Value("${netty.port}")
  22. private Integer port;
  23. @Resource
  24. private NettyServer nettyServer;
  25. public static void main(String[] args) {
  26. SpringApplication.run(IdcPduApplication.class, args);
  27. log.info("IdcPduApplication 启动成功!");
  28. }
  29. /**
  30. * netty服务启动
  31. * @param args
  32. * @throws Exception
  33. */
  34. @Override
  35. public void run(String... args) throws Exception {
  36. //tcp实现
  37. InetSocketAddress address = new InetSocketAddress(host,port);
  38. log.info("neety服务器启动地址: "+host+":"+ port);
  39. nettyServer.start(address);
  40. }
  41. }

yml配置: 

  1. # 配置Netty通信IP和端口
  2. netty:
  3. port: 7101
  4. host: 127.0.0.1

 

完成,启动项目即可自动监听对应端口

二: 客户端

1: 启动类

这里为了测试,写了Main方法,可以参考服务端,配置启动类 ,实现跟随项目启动

  1. package com.ws.aa;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioSocketChannel;
  7. import io.netty.handler.codec.string.StringDecoder;
  8. import io.netty.handler.codec.string.StringEncoder;
  9. import io.netty.util.CharsetUtil;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. /**
  13. * Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互)
  14. *
  15. * 核心文件(与服务端进行数据交互)
  16. *
  17. * 客户端
  18. *
  19. * @author 小辰哥哥
  20. */
  21. public class SocketClient {
  22. // 服务端IP
  23. static final String HOST = System.getProperty("host", "134.95.3.134");
  24. // 服务端开放端口
  25. static final int PORT = Integer.parseInt(System.getProperty("port", "7101"));
  26. // 数据包大小
  27. static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
  28. // 日志打印
  29. private static final Logger LOGGER = LoggerFactory.getLogger(SocketClient.class);
  30. // 主函数启动
  31. public static void main(String[] args) throws InterruptedException {
  32. sendMessage("1,deviceid123,shelfNo123,moduleNo123,termNo123,state123");
  33. }
  34. /**
  35. * 核心方法(处理:服务端向客户端发送的数据、客户端向服务端发送的数据)
  36. *
  37. * @param content
  38. * @throws InterruptedException
  39. * @author 小辰哥哥
  40. */
  41. public static void sendMessage(String content) throws InterruptedException {
  42. // Configure the client.
  43. EventLoopGroup group = new NioEventLoopGroup();
  44. try {
  45. Bootstrap b = new Bootstrap();
  46. b.group(group)
  47. .channel(NioSocketChannel.class)
  48. .option(ChannelOption.TCP_NODELAY, true)
  49. .handler(new ChannelInitializer<SocketChannel>() {
  50. @Override
  51. public void initChannel(SocketChannel ch) throws Exception {
  52. ChannelPipeline p = ch.pipeline();
  53. p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
  54. p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
  55. p.addLast(new SocketHandler() {
  56. @Override
  57. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  58. LOGGER.debug("####接收服务端发送过来的消息####");
  59. LOGGER.debug("服务端发送过来的数据:" + msg);
  60. // 主动与服务端断开连接(客户端触发)
  61. //ctx.channel().close();
  62. }
  63. });
  64. }
  65. });
  66. ChannelFuture future = b.connect(HOST, PORT).sync();
  67. future.channel().writeAndFlush(content);
  68. // 程序阻塞
  69. future.channel().closeFuture().sync();
  70. } finally {
  71. group.shutdownGracefully();
  72. }
  73. }
  74. }

2: 处理程序

  1. package com.ws.aa;
  2. import io.netty.channel.ChannelHandler;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.ChannelPipeline;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.handler.codec.string.StringDecoder;
  7. import io.netty.handler.codec.string.StringEncoder;
  8. import io.netty.util.CharsetUtil;
  9. import java.util.logging.SocketHandler;
  10. /**
  11. * Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互)
  12. *
  13. * 设置出站和入站的编码器和解码器(该方法在SocketClientConfig.java中被重写)
  14. *
  15. * 客户端
  16. *
  17. * @author wcybaonier
  18. */
  19. public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> {
  20. protected void initChannel(SocketChannel channel) throws Exception {
  21. ChannelPipeline p = channel.pipeline();
  22. p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
  23. p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
  24. p.addLast((ChannelHandler) new SocketHandler());
  25. }
  26. }
  1. package com.ws.aa;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelInboundHandlerAdapter;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. /**
  7. * Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互)
  8. *
  9. * 初始化操作、接受服务端发送过来的消息(该方法在SocketClient.java中被重写)
  10. *
  11. * 客户端
  12. *
  13. * @author wcybaonier
  14. */
  15. public class SocketHandler extends ChannelInboundHandlerAdapter {
  16. // 日志打印
  17. private static final Logger LOGGER = LoggerFactory.getLogger(SocketHandler.class);
  18. @Override
  19. public void channelActive(ChannelHandlerContext ctx) {
  20. LOGGER.debug("SocketHandler Active(客户端)");
  21. }
  22. @Override
  23. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  24. LOGGER.debug("####接收服务端发送过来的消息####");
  25. LOGGER.debug("SocketHandler read Message:" + msg);
  26. }
  27. @Override
  28. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  29. LOGGER.debug("####客户端断开连接####");
  30. }
  31. @Override
  32. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  33. cause.printStackTrace();
  34. ctx.close();
  35. }
  36. }

 

3: 项目启动类

......想写就参考服务端......

三: 测试,完成

有测试的,,,但是忘记截图了................

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/185424
推荐阅读
相关标签
  

闽ICP备14008679号