当前位置:   article > 正文

Springboot+Netty搭建TCP服务端_springboot tcp

springboot tcp

Netty是业界最流行的nio框架之一,它具有功能强大、性能优异、可定制性和可扩展性的优点

Netty的优点:

1.API使用简单,开发入门门槛低。

2.功能十分强大,预置多种编码解码功能,支持多种主流协议。

3.可定制、可扩展能力强,可以通过其提供的ChannelHandler进行灵活的扩展。

4.性能优异,特别在综合性能上的优异性。

5.成熟,稳定,适用范围广。

6.可用于智能GSM/GPRS模块的通讯服务端开发,使用它进行MQTT协议的开发。

Netty结合Springboot快速开发框架搭建服务端程序:

SpringBoot+Netty实现TCP服务端客户端的源码Demo

新建Springboot的maven项目,pom.xml文件导入依赖包

  1. <?xml version="1.0"?>
  2. <project
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
  4. xmlns="http://maven.apache.org/POM/4.0.0"
  5. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  6. <modelVersion>4.0.0</modelVersion>
  7. <parent>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-parent</artifactId>
  10. <version>2.0.5.RELEASE</version>
  11. <relativePath />
  12. </parent>
  13. <groupId>boot.base.tcp.server</groupId>
  14. <artifactId>boot-example-base-tcp-server-2.0.5</artifactId>
  15. <version>0.0.1-SNAPSHOT</version>
  16. <name>boot-example-base-tcp-server-2.0.5</name>
  17. <url>http://maven.apache.org</url>
  18. <properties>
  19. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  20. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  21. <java.version>1.8</java.version>
  22. </properties>
  23. <dependencies>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-web</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>io.netty</groupId>
  30. <artifactId>netty-all</artifactId>
  31. </dependency>
  32. <dependency>
  33. <groupId>io.springfox</groupId>
  34. <artifactId>springfox-swagger2</artifactId>
  35. <version>2.9.2</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>com.github.xiaoymin</groupId>
  39. <artifactId>swagger-bootstrap-ui</artifactId>
  40. <version>1.9.2</version>
  41. </dependency>
  42. </dependencies>
  43. <build>
  44. <plugins>
  45. <!-- 打包成一个可执行jar -->
  46. <plugin>
  47. <groupId>org.springframework.boot</groupId>
  48. <artifactId>spring-boot-maven-plugin</artifactId>
  49. <executions>
  50. <execution>
  51. <goals>
  52. <goal>repackage</goal>
  53. </goals>
  54. </execution>
  55. </executions>
  56. </plugin>
  57. </plugins>
  58. </build>
  59. </project>

Springboot启动类,Netty启动

  1. package boot.example.tcp.server;
  2. import boot.example.tcp.server.netty.BootNettyServer;
  3. import org.springframework.boot.CommandLineRunner;
  4. import org.springframework.boot.SpringApplication;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. import org.springframework.scheduling.annotation.Async;
  7. import org.springframework.scheduling.annotation.EnableAsync;
  8. /**
  9. * 蚂蚁舞
  10. */
  11. @SpringBootApplication
  12. @EnableAsync
  13. public class BootNettyServerApplication implements CommandLineRunner{
  14. public static void main( String[] args ) {
  15. SpringApplication app = new SpringApplication(BootNettyServerApplication.class);
  16. app.run(args);
  17. System.out.println( "Hello World!" );
  18. }
  19. @Async
  20. @Override
  21. public void run(String... args) throws Exception {
  22. /**
  23. * 使用异步注解方式启动netty服务端服务
  24. */
  25. new BootNettyServer().bind(6655);
  26. }
  27. }

Netty的server类

  1. package boot.example.tcp.server.netty;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. /**
  10. * 蚂蚁舞
  11. */
  12. public class BootNettyServer {
  13. public void bind(int port) throws Exception {
  14. /**
  15. * 配置服务端的NIO线程组
  16. * NioEventLoopGroup 是用来处理I/O操作的Reactor线程组
  17. * bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写,
  18. * bossGroup接收到连接后就会把连接信息注册到workerGroup
  19. * workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍
  20. */
  21. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  22. EventLoopGroup workerGroup = new NioEventLoopGroup();
  23. try {
  24. /**
  25. * ServerBootstrap 是一个启动NIO服务的辅助启动类
  26. */
  27. ServerBootstrap serverBootstrap = new ServerBootstrap();
  28. /**
  29. * 设置group,将bossGroup, workerGroup线程组传递到ServerBootstrap
  30. */
  31. serverBootstrap = serverBootstrap.group(bossGroup, workerGroup);
  32. /**
  33. * ServerSocketChannel是以NIO的selector为基础进行实现的,用来接收新的连接,这里告诉Channel通过NioServerSocketChannel获取新的连接
  34. */
  35. serverBootstrap = serverBootstrap.channel(NioServerSocketChannel.class);
  36. // option是设置 bossGroup,childOption是设置workerGroup
  37. /**
  38. * 服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝(队列被接收后,拒绝的客户端下次连接上来只要队列有空余就能连上)
  39. */
  40. serverBootstrap = serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
  41. /**
  42. * 立即发送数据,默认值为Ture(Netty默认为True而操作系统默认为False)。
  43. * 该值设置Nagle算法的启用,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。
  44. * Netty默认禁用该算法,从而最小化报文传输延时。
  45. */
  46. serverBootstrap = serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
  47. /**
  48. * 连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。
  49. * 可以将此功能视为TCP的心跳机制,默认的心跳间隔是7200s即2小时, Netty默认关闭该功能。
  50. */
  51. serverBootstrap = serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
  52. /**
  53. * 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
  54. */
  55. serverBootstrap = serverBootstrap.childHandler(new BootNettyChannelInitializer<SocketChannel>());
  56. /**
  57. * 绑定端口,同步等待成功
  58. */
  59. ChannelFuture f = serverBootstrap.bind(port).sync();
  60. if(f.isSuccess()){
  61. System.out.println("netty server start success!");
  62. /**
  63. * 等待服务器监听端口关闭
  64. */
  65. f.channel().closeFuture().sync();
  66. }
  67. } catch (InterruptedException e) {
  68. System.out.println(e.toString());
  69. } finally {
  70. /**
  71. * 退出,释放线程池资源
  72. */
  73. bossGroup.shutdownGracefully().sync();
  74. workerGroup.shutdownGracefully().sync();
  75. }
  76. }
  77. }

通道初始化

  1. package boot.example.tcp.server.netty;
  2. import io.netty.channel.Channel;
  3. import io.netty.channel.ChannelHandler;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.handler.codec.string.StringDecoder;
  6. import io.netty.handler.codec.string.StringEncoder;
  7. import io.netty.handler.timeout.IdleStateHandler;
  8. import io.netty.util.CharsetUtil;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * 通道初始化
  12. * 蚂蚁舞
  13. */
  14. @ChannelHandler.Sharable
  15. public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
  16. public static long READ_TIME_OUT = 60;
  17. public static long WRITE_TIME_OUT = 60;
  18. public static long ALL_TIME_OUT = 60;
  19. @Override
  20. protected void initChannel(Channel ch) throws Exception {
  21. ch.pipeline().addLast(new IdleStateHandler(READ_TIME_OUT, WRITE_TIME_OUT, ALL_TIME_OUT, TimeUnit.SECONDS));
  22. // 带编码
  23. ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
  24. ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
  25. // // ChannelOutboundHandler,依照逆序执行
  26. // ch.pipeline().addLast("encoder", new StringEncoder());
  27. //
  28. // // 属于ChannelInboundHandler,依照顺序执行
  29. // ch.pipeline().addLast("decoder", new StringDecoder());
  30. //自定义ChannelInboundHandlerAdapter
  31. ch.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter());
  32. }
  33. }

I/O数据读写处理类

  1. package boot.example.tcp.server.netty;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.charset.StandardCharsets;
  5. import io.netty.buffer.Unpooled;
  6. import io.netty.channel.ChannelHandler;
  7. import io.netty.channel.ChannelHandlerContext;
  8. import io.netty.channel.ChannelInboundHandlerAdapter;
  9. import io.netty.util.CharsetUtil;
  10. /**
  11. * I/O数据读写处理类
  12. * 蚂蚁舞
  13. */
  14. @ChannelHandler.Sharable
  15. public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{
  16. /**
  17. * 注册时执行
  18. */
  19. @Override
  20. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  21. super.channelRegistered(ctx);
  22. System.out.println("--channelRegistered--"+ctx.channel().id().toString());
  23. }
  24. /**
  25. * 离线时执行
  26. */
  27. @Override
  28. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  29. super.channelUnregistered(ctx);
  30. System.out.println("--channelUnregistered--"+ctx.channel().id().toString());
  31. }
  32. /**
  33. * 从客户端收到新的数据时,这个方法会在收到消息时被调用
  34. */
  35. @Override
  36. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  37. try {
  38. if(msg == null){return;}
  39. String data = (String) msg;
  40. data = data.replaceAll("\r|\n", "");
  41. String channelId = ctx.channel().id().toString();
  42. System.out.println("channelId="+channelId + "data="+data);
  43. // 这里我将通道id作为code来使用,实际是需要msg里来摘取的客户端数据里的唯一值的
  44. // 如果没有则创建 如果有,更新data值
  45. BootNettyChannel b = BootNettyChannelCache.get("server:"+channelId);
  46. if(b == null){
  47. BootNettyChannel bootNettyChannel = new BootNettyChannel();
  48. bootNettyChannel.setChannel(ctx.channel());
  49. bootNettyChannel.setCode("server:"+channelId);
  50. bootNettyChannel.setReport_last_data(data);
  51. BootNettyChannelCache.save("server:"+channelId, bootNettyChannel);
  52. } else {
  53. b.setReport_last_data(data);
  54. }
  55. ctx.writeAndFlush(Unpooled.buffer().writeBytes(("server:"+channelId).getBytes()));
  56. // netty的编码已经指定,因此可以不需要再次确认编码
  57. // ctx.writeAndFlush(Unpooled.buffer().writeBytes(channelId.getBytes(CharsetUtil.UTF_8)));
  58. } catch (Exception e) {
  59. System.out.println("channelRead--"+e.toString());
  60. }
  61. }
  62. /**
  63. * 从客户端收到新的数据、读取完成时调用
  64. */
  65. @Override
  66. public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
  67. System.out.println("channelReadComplete");
  68. ctx.flush();
  69. }
  70. /**
  71. * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
  72. */
  73. @Override
  74. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
  75. System.out.println("exceptionCaught");
  76. cause.printStackTrace();
  77. BootNettyChannel bootNettyChannel = BootNettyChannelCache.get("server:"+ctx.channel().id().toString());
  78. if(bootNettyChannel != null){
  79. BootNettyChannelCache.remove("server:"+ctx.channel().id().toString());
  80. }
  81. ctx.close();//抛出异常,断开与客户端的连接
  82. }
  83. /**
  84. * 客户端与服务端第一次建立连接时 执行
  85. */
  86. @Override
  87. public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
  88. super.channelActive(ctx);
  89. ctx.channel().read();
  90. InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
  91. String clientIp = inSocket.getAddress().getHostAddress();
  92. //此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
  93. System.out.println("channelActive:"+clientIp+ctx.name());
  94. }
  95. /**
  96. * 客户端与服务端 断连时 执行
  97. */
  98. @Override
  99. public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
  100. super.channelInactive(ctx);
  101. InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
  102. String clientIp = inSocket.getAddress().getHostAddress();
  103. System.out.println("channelInactive:"+clientIp);
  104. BootNettyChannel bootNettyChannel = BootNettyChannelCache.get("server:"+ctx.channel().id().toString());
  105. if(bootNettyChannel != null){
  106. BootNettyChannelCache.remove("server:"+ctx.channel().id().toString());
  107. }
  108. ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
  109. }
  110. /**
  111. * 服务端当read超时, 会调用这个方法
  112. */
  113. @Override
  114. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
  115. super.userEventTriggered(ctx, evt);
  116. InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
  117. String clientIp = inSocket.getAddress().getHostAddress();
  118. ctx.close();//超时时断开连接
  119. System.out.println("userEventTriggered:"+clientIp);
  120. }
  121. }
BootNettyChannel
  1. package boot.example.tcp.server.netty;
  2. import io.netty.channel.Channel;
  3. /**
  4. * 蚂蚁舞
  5. */
  6. public class BootNettyChannel {
  7. // 连接客户端唯一的code
  8. private String code;
  9. // 客户端最新发送的消息内容
  10. private String report_last_data;
  11. private transient volatile Channel channel;
  12. public String getCode() {
  13. return code;
  14. }
  15. public void setCode(String code) {
  16. this.code = code;
  17. }
  18. public String getReport_last_data() {
  19. return report_last_data;
  20. }
  21. public void setReport_last_data(String report_last_data) {
  22. this.report_last_data = report_last_data;
  23. }
  24. public Channel getChannel() {
  25. return channel;
  26. }
  27. public void setChannel(Channel channel) {
  28. this.channel = channel;
  29. }
  30. }
BootNettyChannelCache
  1. package boot.example.tcp.server.netty;
  2. import java.util.Map;
  3. import java.util.concurrent.ConcurrentHashMap;
  4. /**
  5. * 蚂蚁舞
  6. */
  7. public class BootNettyChannelCache {
  8. public static volatile Map<String, BootNettyChannel> channelMapCache = new ConcurrentHashMap<String, BootNettyChannel>();
  9. public static void add(String code, BootNettyChannel channel){
  10. channelMapCache.put(code,channel);
  11. }
  12. public static BootNettyChannel get(String code){
  13. return channelMapCache.get(code);
  14. }
  15. public static void remove(String code){
  16. channelMapCache.remove(code);
  17. }
  18. public static void save(String code, BootNettyChannel channel) {
  19. if(channelMapCache.get(code) == null) {
  20. add(code,channel);
  21. }
  22. }
  23. }
BootNettyController
  1. package boot.example.tcp.server.controller;
  2. import boot.example.tcp.server.netty.BootNettyChannel;
  3. import boot.example.tcp.server.netty.BootNettyChannelCache;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.util.CharsetUtil;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.PostMapping;
  8. import org.springframework.web.bind.annotation.RequestParam;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import java.util.ArrayList;
  11. import java.util.HashMap;
  12. import java.util.List;
  13. import java.util.Map;
  14. /**
  15. * 蚂蚁舞
  16. */
  17. @RestController
  18. public class BootNettyController {
  19. @GetMapping(value = {"", "/"})
  20. public String index() {
  21. return "netty springBoot tcp demo";
  22. }
  23. @GetMapping("/clientList")
  24. public List<Map<String,String>> clientList() {
  25. List<Map<String,String>> list = new ArrayList<>();
  26. for (Map.Entry<String, BootNettyChannel> entry : BootNettyChannelCache.channelMapCache.entrySet()) {
  27. Map<String, String> map = new HashMap<String, String>();
  28. map.put("code", entry.getKey());
  29. //map.put("code", entry.getValue().getCode());
  30. map.put("report_last_data", entry.getValue().getReport_last_data());
  31. list.add(map);
  32. }
  33. return list;
  34. }
  35. @PostMapping("/downDataToAllClient")
  36. public String downDataToAllClient(@RequestParam(name="content", required = true) String content) {
  37. for (Map.Entry<String, BootNettyChannel> entry : BootNettyChannelCache.channelMapCache.entrySet()) {
  38. BootNettyChannel bootNettyChannel = entry.getValue();
  39. if(bootNettyChannel != null && bootNettyChannel.getChannel().isOpen()){
  40. bootNettyChannel.getChannel().writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes()));
  41. // netty的编码已经指定,因此可以不需要再次确认编码
  42. // bootNettyChannel.getChannel().writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes(CharsetUtil.UTF_8)));
  43. }
  44. }
  45. return "ok";
  46. }
  47. @PostMapping("/downDataToClient")
  48. public String downDataToClient(@RequestParam(name="code", required = true) String code, @RequestParam(name="content", required = true) String content) {
  49. BootNettyChannel bootNettyChannel = BootNettyChannelCache.get(code);
  50. if(bootNettyChannel != null && bootNettyChannel.getChannel().isOpen()){
  51. bootNettyChannel.getChannel().writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes()));
  52. // netty的编码已经指定,因此可以不需要再次确认编码
  53. // bootNettyChannel.getChannel().writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes(CharsetUtil.UTF_8)));
  54. return "success";
  55. }
  56. return "fail";
  57. }
  58. }
SwaggerConfig
  1. package boot.example.tcp.server;
  2. import com.google.common.base.Predicates;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import springfox.documentation.builders.ApiInfoBuilder;
  6. import springfox.documentation.builders.PathSelectors;
  7. import springfox.documentation.builders.RequestHandlerSelectors;
  8. import springfox.documentation.service.ApiInfo;
  9. import springfox.documentation.spi.DocumentationType;
  10. import springfox.documentation.spring.web.plugins.Docket;
  11. import springfox.documentation.swagger2.annotations.EnableSwagger2;
  12. /**
  13. * 蚂蚁舞
  14. */
  15. @Configuration
  16. @EnableSwagger2
  17. public class SwaggerConfig {
  18. @Bean
  19. public Docket createRestApi(){
  20. return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
  21. .apis(RequestHandlerSelectors.any()).paths(PathSelectors.any())
  22. .paths(Predicates.not(PathSelectors.regex("/error.*")))
  23. .paths(PathSelectors.regex("/.*"))
  24. .build().apiInfo(apiInfo());
  25. }
  26. private ApiInfo apiInfo(){
  27. return new ApiInfoBuilder()
  28. .title("netty tcp 服务端demo")
  29. .description("netty tcp 服务端接口测试demo")
  30. .version("0.01")
  31. .build();
  32. }
  33. /**
  34. * http://localhost:6654/doc.html 地址和端口根据实际项目查看
  35. */
  36. }

目录结构

  1. ├─boot-example-base-tcp-server-2.0.5
  2. │ │ pom.xml
  3. │ │
  4. │ ├─src
  5. │ │ ├─main
  6. │ │ │ ├─java
  7. │ │ │ │ └─boot
  8. │ │ │ │ └─example
  9. │ │ │ │ └─tcp
  10. │ │ │ │ └─server
  11. │ │ │ │ │ BootNettyServerApplication.java
  12. │ │ │ │ │ SwaggerConfig.java
  13. │ │ │ │ │
  14. │ │ │ │ ├─controller
  15. │ │ │ │ │ BootNettyController.java
  16. │ │ │ │ │
  17. │ │ │ │ └─netty
  18. │ │ │ │ BootNettyChannel.java
  19. │ │ │ │ BootNettyChannelCache.java
  20. │ │ │ │ BootNettyChannelInboundHandlerAdapter.java
  21. │ │ │ │ BootNettyChannelInitializer.java
  22. │ │ │ │ BootNettyServer.java
  23. │ │ │ │
  24. │ │ │ └─resources
  25. │ │ │ application.properties
  26. │ │ │
  27. │ │ └─test
  28. │ │ └─java
  29. │ │ └─boot
  30. │ │ └─example
  31. │ │ └─tcp
  32. │ │ └─server
  33. │ │ BootNettyServerApplicationTest.java
  34. │ │

很简单的几个类加swagger,启动Springboot应用的同时也就启动了Netty

Netty Server端口:6655

SpringBoot Web端口: 6654

访问 

http://localhost:6654/doc.html

使用常见的tcp客户端工具发送字母或数字(客户端工具发送中文可能出现乱码的,虽然程序已经处理了中文乱码,但依旧容易出现,处理办法是用netty写一个客户端来测试)

可以看到客户端发送消息,服务端能收到消息,并且在服务端做了保活,服务端也可以根据客户端的信息向客户端发送消息

Springboot整合Netty的服务端demo开发测试完成。

注意:如果乱码的话需要统一编码,最简单的方式

  1. // 带编码
  2. ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
  3. ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号