当前位置:   article > 正文

Springboot集成Netty

springboot集成netty

本章具体讲解SpringBoot中如何集成Netty

1.搭建一个Springboot项目

一,服务端

1.项目结构目录

2.导入jar包

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>5.0.0.Alpha2</version>
  5. </dependency>

3.yml 配置

  1. tcp:
  2. port: 8555
  3. boss:
  4. thread:
  5. count: 2
  6. worker:
  7. thread:
  8. count: 2
  9. so:
  10. keepalive: true
  11. backlog: 100
  12. server:
  13. port: 8888

4.创建TCP服务

  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.PostConstruct;
  7. import javax.annotation.PreDestroy;
  8. import java.net.InetSocketAddress;
  9. /**
  10. * @Author:hmz
  11. * @date:2019/07/10 15:34
  12. * @Explanation:
  13. */
  14. @Component
  15. public class TCPServer {
  16. @Autowired
  17. @Qualifier("serverBootstrap")
  18. private ServerBootstrap b;
  19. @Autowired
  20. @Qualifier("tcpSocketAddress")
  21. private InetSocketAddress tcpPort;
  22. private ChannelFuture serverChannelFuture;
  23. @PostConstruct
  24. public void start() throws Exception {
  25. System.out.println("Starting server at " + tcpPort);
  26. serverChannelFuture = b.bind(tcpPort).sync();
  27. }
  28. @PreDestroy
  29. public void stop() throws Exception {
  30. serverChannelFuture.channel().closeFuture().sync();
  31. }
  32. public ServerBootstrap getB() {
  33. return b;
  34. }
  35. public void setB(ServerBootstrap b) {
  36. this.b = b;
  37. }
  38. public InetSocketAddress getTcpPort() {
  39. return tcpPort;
  40. }
  41. public void setTcpPort(InetSocketAddress tcpPort) {
  42. this.tcpPort = tcpPort;
  43. }
  44. }

5.初始化通道

  1. import io.netty.channel.ChannelInitializer;
  2. import io.netty.channel.ChannelPipeline;
  3. import io.netty.channel.socket.SocketChannel;
  4. import io.netty.handler.codec.string.StringDecoder;
  5. import io.netty.handler.codec.string.StringEncoder;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.beans.factory.annotation.Qualifier;
  8. import org.springframework.stereotype.Component;
  9. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  10. /**
  11. * @Author:hmz
  12. * @date:2019/07/10 15:30
  13. * @Explanation:
  14. */
  15. @Component
  16. @Qualifier("springProtocolInitializer")
  17. public class StringProtocolInitalizer extends ChannelInitializer<SocketChannel> {
  18. @Autowired
  19. StringDecoder stringDecoder;
  20. @Autowired
  21. StringEncoder stringEncoder;
  22. @Autowired
  23. NettyHandle nettyHandle;
  24. @Override
  25. protected void initChannel(SocketChannel ch) throws Exception {
  26. ChannelPipeline pipeline = ch.pipeline();
  27. pipeline.addLast("decoder", stringDecoder); // 解码器
  28. pipeline.addLast("handler", nettyHandle); // 处理器
  29. pipeline.addLast("encoder", stringEncoder); // 编码器
  30. // 如果后期处理拆包粘包可以在这里处理
  31. /** LineBasedFrameDecoder:以行为单位进行数据包的解码,使用换行符\n或者\r\n作为依据遇
  32. 到\n或者\r\n都认为是一条完整的消息。
  33. DelimiterBasedFrameDecoder:以特殊的符号作为分隔来进行数据包的解码。
  34. FixedLengthFrameDecoder:以固定长度进行数据包的解码。
  35. LenghtFieldBasedFrameDecode:适用于消息头包含消息长度的协议(最常用)。
  36. */
  37. pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) ;
  38. }
  39. public StringDecoder getStringDecoder() {
  40. return stringDecoder;
  41. }
  42. public void setStringDecoder(StringDecoder stringDecoder) {
  43. this.stringDecoder = stringDecoder;
  44. }
  45. public StringEncoder getStringEncoder() {
  46. return stringEncoder;
  47. }
  48. public void setStringEncoder(StringEncoder stringEncoder) {
  49. this.stringEncoder = stringEncoder;
  50. }
  51. public NettyHandle getNettyHandle() {
  52. return nettyHandle;
  53. }
  54. public void setNettyHandle(NettyHandle nettyHandle) {
  55. this.nettyHandle = nettyHandle;
  56. }
  57. }

6.Netty配置

  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.channel.ChannelOption;
  3. import io.netty.channel.nio.NioEventLoopGroup;
  4. import io.netty.channel.socket.nio.NioServerSocketChannel;
  5. import io.netty.handler.codec.string.StringDecoder;
  6. import io.netty.handler.codec.string.StringEncoder;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
  12. import java.net.InetSocketAddress;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. import java.util.Set;
  16. /**
  17. * @Author:hmz
  18. * @date:2019/07/10 15:26
  19. * @Explanation:
  20. */
  21. @Configuration
  22. public class NettyConfigTest {
  23. //链接线程数
  24. // 一般设置为1,因为它只负责接收客户端的连接,不需要太多的线程来处理;
  25. @Value("${boss.thread.count}")
  26. private int bossCount;
  27. //工作线程数
  28. // worker线程数:根据服务器的性能来设置,一般设置为CPU核数的2-4倍;
  29. @Value("${worker.thread.count}")
  30. private int workerCount;
  31. @Value("${tcp.port}")
  32. private int tcpPort;
  33. @Value("${so.keepalive}")
  34. private boolean keepAlive;
  35. @Value("${so.backlog}")
  36. private int backlog;
  37. @Autowired
  38. private NettyHandle nettyHandle;
  39. //bootstrap配置
  40. @SuppressWarnings("unchecked")
  41. @Bean(name = "serverBootstrap")
  42. public ServerBootstrap bootstrap() {
  43. ServerBootstrap b = new ServerBootstrap();
  44. b.group(bossGroup(), workerGroup())
  45. .channel(NioServerSocketChannel.class)
  46. .childHandler(nettyHandle);
  47. Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
  48. Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
  49. for (@SuppressWarnings("rawtypes")
  50. ChannelOption option : keySet) {
  51. b.option(option, tcpChannelOptions.get(option));
  52. }
  53. return b;
  54. }
  55. @Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
  56. public NioEventLoopGroup bossGroup() {
  57. return new NioEventLoopGroup(bossCount);
  58. }
  59. @Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
  60. public NioEventLoopGroup workerGroup() {
  61. return new NioEventLoopGroup(workerCount);
  62. }
  63. @Bean(name = "tcpSocketAddress")
  64. public InetSocketAddress tcpPort() {
  65. return new InetSocketAddress(tcpPort);
  66. }
  67. @Bean(name = "tcpChannelOptions")
  68. public Map<ChannelOption<?>, Object> tcpChannelOptions() {
  69. Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
  70. options.put(ChannelOption.SO_KEEPALIVE, keepAlive); // 是否使用TCP的心跳机制
  71. // 1. 服务器TCP内核 内维护了两个队列,称为A(未连接队列)和B(已连接队列)
  72. // 2. 如果A+B的长度大于Backlog时,新的连接就会被TCP内核拒绝掉。
  73. // 3. 默认是50
  74. options.put(ChannelOption.SO_BACKLOG, backlog);
  75. // 还有其他的配置
  76. // SO_SNDBUF 、SO_RCVBUF 、SO_BROADCAST 、 SO_REUSEADDR、SO_LINGER 等
  77. // 见后面
  78. return options;
  79. }
  80. @Bean(name = "stringEncoder")
  81. public StringEncoder stringEncoder() {
  82. return new StringEncoder();
  83. }
  84. @Bean(name = "stringDecoder")
  85. public StringDecoder stringDecoder() {
  86. return new StringDecoder();
  87. }
  88. /**
  89. * Necessary to make the Value annotations work.
  90. *
  91. * @return
  92. */
  93. @Bean
  94. public static PropertySourcesPlaceholderConfigurer propertyPlaceholderConfigurer() {
  95. return new PropertySourcesPlaceholderConfigurer();
  96. }
  97. }

7.NettyHandle事件处理

  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandler;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Qualifier;
  9. import org.springframework.stereotype.Component;
  10. import java.io.UnsupportedEncodingException;
  11. /**
  12. * @Author:hmz
  13. * @date:2019/07/10 15:26
  14. * @Explanation:
  15. */
  16. @Component
  17. @Qualifier("serverHandler")
  18. @ChannelHandler.Sharable
  19. public class NettyHandle extends SimpleChannelInboundHandler<String> {
  20. private static final Logger log = LoggerFactory.getLogger(NettyHandle.class);
  21. @Override
  22. protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
  23. log.info("client msg:"+msg);
  24. String clientIdToLong= ctx.channel().id().asLongText();
  25. log.info("client long id:"+clientIdToLong);
  26. String clientIdToShort= ctx.channel().id().asShortText();
  27. log.info("client short id:"+clientIdToShort);
  28. if(msg.indexOf("bye")!=-1){
  29. //close
  30. ctx.channel().close();
  31. }else{
  32. //send to client
  33. ctx.channel().writeAndFlush("Yoru msg is:"+msg);
  34. }
  35. }
  36. @Override
  37. public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
  38. log.info("server 读取数据……");
  39. //读取数据
  40. byte[] req = readClientData((ByteBuf) msg);
  41. String body = new String(req, "GBK"); //获取到的值
  42. log.info("客户端的数据------>"+body);
  43. sendInfo(ctx , "收到");
  44. }
  45. private void sendInfo(ChannelHandlerContext ctx , String info) {
  46. ByteBuf bufff = Unpooled.buffer();
  47. bufff.writeBytes(info.getBytes());
  48. ctx.writeAndFlush(bufff);
  49. ctx.flush();
  50. }
  51. private byte[] readClientData(ByteBuf msg) {
  52. // logger.info("读客户端的数据.");
  53. ByteBuf buf = msg;
  54. byte[] req = new byte[buf.readableBytes()];
  55. buf.readBytes(req);
  56. buf.release();
  57. return req;
  58. }
  59. @Override
  60. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  61. log.info("开始连接");
  62. sendInfo(ctx , "连接成功");
  63. super.channelActive(ctx);
  64. }
  65. @Override
  66. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  67. log.error("异常关闭");
  68. sendInfo(ctx , "异常");
  69. ctx.close();
  70. }
  71. @Override
  72. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  73. log.info("离线");
  74. sendInfo(ctx , "离线");
  75. super.channelInactive(ctx);
  76. }
  77. }

二、客户端

1.NClient

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.EventLoopGroup;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioSocketChannel;
  8. import java.net.InetSocketAddress;
  9. /**
  10. * @Author:hmz
  11. * @date:2019/07/08 14:01
  12. * @Explanation:
  13. */
  14. public class NClient {
  15. private String host;
  16. private int port;
  17. public NClient(String host, int port) {
  18. this.host = host;
  19. this.port = port;
  20. }
  21. public void start() throws Exception {
  22. EventLoopGroup nioEventLoopGroup = null;
  23. try {
  24. //创建Bootstrap对象用来引导启动客户端
  25. Bootstrap bootstrap = new Bootstrap();
  26. //创建EventLoopGroup对象并设置到Bootstrap中,EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据
  27. nioEventLoopGroup = new NioEventLoopGroup();
  28. //创建InetSocketAddress并设置到Bootstrap中,InetSocketAddress是指定连接的服务器地址
  29. bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port))
  30. .handler(new ChannelInitializer<SocketChannel>() {
  31. //添加一个ChannelHandler,客户端成功连接服务器后就会被执行
  32. @Override
  33. protected void initChannel(SocketChannel ch)
  34. throws Exception {
  35. ch.pipeline().addLast(new NClientHandler());
  36. }
  37. });
  38. // • 调用Bootstrap.connect()来连接服务器
  39. ChannelFuture f = bootstrap.connect().sync();
  40. // • 最后关闭EventLoopGroup来释放资源
  41. f.channel().closeFuture().sync();
  42. } finally {
  43. nioEventLoopGroup.shutdownGracefully().sync();
  44. }
  45. }
  46. }

2.NClientHandler

  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.util.CharsetUtil;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. /**
  9. * @Author:hmz
  10. * @date:2019/07/08 14:07
  11. * @Explanation:
  12. */
  13. public class NClientHandler extends ChannelInboundHandlerAdapter {
  14. public static List<ChannelHandlerContext> cts = new ArrayList<ChannelHandlerContext>();
  15. /**
  16. * 向服务端发送数据
  17. */
  18. @Override
  19. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  20. System.out.println("客户端与服务端通道-开启:" + ctx.channel().localAddress() + "channelActive");
  21. cts.add(ctx);
  22. String sendInfo = "你好服务端";
  23. System.out.println("客户端准备发送的数据包:" + sendInfo);
  24. write(ctx,sendInfo);
  25. }
  26. public void write(ChannelHandlerContext ctx , String mess) throws Exception {
  27. String sendInfo = mess;
  28. ctx.writeAndFlush(Unpooled.copiedBuffer(sendInfo, CharsetUtil.UTF_8)); // 必须有flush
  29. ctx.flush();
  30. }
  31. @Override
  32. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  33. //读取数据
  34. //读取数据
  35. ByteBuf buf1 = (ByteBuf) msg;
  36. byte[] req = readClientData((ByteBuf) msg);
  37. String body = new String(req, "UTF-8"); //获取到的值
  38. System.out.println("客户端的数据------>"+body);
  39. //写数据
  40. write(ctx,"wits写的数据");
  41. }
  42. //将netty的数据装换为字节数组
  43. private byte[] readClientData(ByteBuf msg) {
  44. ByteBuf buf = msg;
  45. byte[] req = new byte[buf.readableBytes()];
  46. buf.readBytes(req);
  47. buf.release();
  48. return req;
  49. }
  50. /**
  51. * channelInactive
  52. *
  53. * channel 通道 Inactive 不活跃的
  54. *
  55. * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
  56. *
  57. */
  58. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  59. System.out.println("客户端与服务端通道-关闭:" + ctx.channel().localAddress() + "channelInactive");
  60. }
  61. @Override
  62. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  63. cts.remove(ctx);
  64. ctx.close();
  65. System.out.println("异常退出:" + cause.getMessage());
  66. }
  67. }

3.mainTest

  1. import com.herbert.client.NClient;
  2. import com.herbert.finalPool.ConstantPool;
  3. /**
  4. * @Author:hmz
  5. * @date:2019/07/08 14:01
  6. * @Explanation:
  7. */
  8. public class TestMain {
  9. public static void main(String[] args) throws Exception {
  10. new NClient(ConstantPool.HOST, ConstantPool.PORT).start(); // 连接127.0.0.1/65535,并启动
  11. }
  12. }

附件:Netty参数配置详情

ChannelOption套接字配置
参数解释
SO_BROADCAST对应套接字层的套接字:SO_BROADCAST,将消息发送到广播地址。
如果目标中指定的接口支持广播数据包,则启用此选项可让应用程序发送广播消息。
SO_KEEPALIVE对应套接字层的套接字:SO_KEEPALIVE,保持连接。
在空闲套接字上发送探测,以验证套接字是否仍处于活动状态。
SO_SNDBUF对应套接字层的套接字:SO_SNDBUF,设置发送缓冲区的大小。
SO_RCVBUF对应套接字层的套接字:SO_RCVBUF,获取接收缓冲区的大小。
SO_REUSEADDR对应套接字层的套接字:SO_REUSEADDR,本地地址复用。
启用此选项允许绑定已使用的本地地址。
SO_LINGER对应套接字层的套接字:SO_LINGER,延迟关闭连接。
启用此选项,在调用close时如果存在未发送的数据时,在close期间将阻止调用应用程序,直到数据被传输或连接超时。
SO_BACKLOG对应TCP/IP协议中<font color=red>backlog</font>参数,<font color=red>backlog</font>即连接队列,设置TCP中的连接队列大小。如果队列满了,会发送一个ECONNREFUSED错误信息给C端,即“ Connection refused”。
SO_TIMEOUT等待客户连接的超时时间。
IP_TOS对应套接字层的套接字:IP_TOS,在IP标头中设置服务类型(TOS)和优先级。
IP_MULTICAST_ADDR对应IP层的套接字选项:IP_MULTICAST_IF,设置应发送多播数据报的传出接口。
IP_MULTICAST_IF对应IP层的套接字选项:IP_MULTICAST_IF2,设置应发送多播数据报的IPV6传出接口。
IP_MULTICAST_TTL对应IP层的套接字选项:IP_MULTICAST_TTL,在传出的 多播数据报的IP头中设置生存时间(TTL)。
IP_MULTICAST_LOOP_DISABLED取消 指定应将 传出的多播数据报的副本 回传到发送主机,只要它是多播组的成员即可。
TCP_NODELAY对应TCP层的套接字选项:TCP_NODELAY,指定TCP是否遵循<font color=#35b998>Nagle算法</font> 决定何时发送数据。Nagle算法代表通过减少必须发送包的个数来增加网络软件系统的效率。即尽可能发送大块数据避免网络中充斥着大量的小数据块。如果要追求高实时性,需要设置关闭Nagle算法;如果需要追求减少网络交互次数,则设置开启Nagle算法。
ChannelOption通用配置
参数解释
ALLOCATORByteBuf的分配器,默认值为ByteBufAllocator.DEFAULT。
RCVBUF_ALLOCATOR用于Channel分配接受Buffer的分配器,默认值为AdaptiveRecvByteBufAllocator.DEFAULT,是一个自适应的接受缓冲区分配器,能根据接受到的数据自动调节大小。可选值为FixedRecvByteBufAllocator,固定大小的接受缓冲区分配器。
MESSAGE_SIZE_ESTIMATOR消息大小估算器,默认为DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder为实际大小,FileRegion估算值为0。该值估算的字节数在计算水位时使用,FileRegion为0可知FileRegion不影响高低水位。
CONNECT_TIMEOUT_MILLIS连接超时毫秒数,默认值30000毫秒即30秒。
WRITE_SPIN_COUNT一个Loop写操作执行的最大次数,默认值为16。也就是说,对于大数据量的写操作至多进行16次,如果16次仍没有全部写完数据,此时会提交一个新的写任务给EventLoop,任务将在下次调度继续执行。这样,其他的写请求才能被响应不会因为单个大数据量写请求而耽误。
WRITE_BUFFER_WATER_MARK
ALLOW_HALF_CLOSURE一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,事件为ChannelInputShutdownEvent。
AUTO_READ自动读取,默认值为True。Netty只在必要的时候才设置关心相应的I/O事件。对于读操作,需要调用channel.read()设置关心的I/O事件为OP_READ,这样若有数据到达才能读取以供用户处理。该值为True时,每次读操作完毕后会自动调用channel.read(),从而有数据到达便能读取;否则,需要用户手动调用channel.read()。需要注意的是:当调用config.setAutoRead(boolean)方法时,如果状态由false变为true,将会调用channel.read()方法读取数据;由true变为false,将调用config.autoReadCleared()方法终止数据读取。
AUTO_CLOSE

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/848638
推荐阅读
相关标签
  

闽ICP备14008679号