当前位置:   article > 正文

在SpringBoot中整合使用Netty框架_netty 转 springboot中使用

netty 转 springboot中使用

Netty是一个非常优秀的Socket框架。如果需要在SpringBoot开发的app中,提供Socket服务,那么Netty是不错的选择。

Netty与SpringBoot的整合,我想无非就是要整合几个地方

  • 让netty跟springboot生命周期保持一致,同生共死
  • 让netty能用上ioc中的Bean
  • 让netty能读取到全局的配置

整合Netty,提供WebSocket服务

这里演示一个案例,在SpringBoot中使用Netty提供一个Websocket服务。

servlet容器本身提供了websocket的实现,但这里用netty的实现 :sparkling_heart:

添加依赖

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. </dependency>

是的,不用声明版本号。因为 spring-boot-dependencies 中已经声明了最新的netty依赖。

通过yaml配置基本的属性

  1. server:
  2. port: 80
  3. logging:
  4. level:
  5. root: DEBUG
  6. management:
  7. endpoints:
  8. web:
  9. exposure:
  10. include: "*"
  11. endpoint:
  12. shutdown:
  13. enabled: true
  14. netty:
  15. websocket:
  16. # Websocket服务端口
  17. port: 1024
  18. # 绑定的网卡
  19. ip: 0.0.0.0
  20. # 消息帧最大体积
  21. max-frame-size: 10240
  22. # URI路径
  23. path: /channel

App使用了,actuator,并且开启暴露了 shutdown 端点,可以让SpringBoot App优雅的停机。 在这里通过 netty.websocket.* 配置 websocket服务相关的配置。

通过 ApplicationRunner 启动Websocket服务

  1. import java.net.InetSocketAddress;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.BeansException;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.boot.ApplicationArguments;
  7. import org.springframework.boot.ApplicationRunner;
  8. import org.springframework.context.ApplicationContext;
  9. import org.springframework.context.ApplicationContextAware;
  10. import org.springframework.context.ApplicationListener;
  11. import org.springframework.context.event.ContextClosedEvent;
  12. import org.springframework.stereotype.Component;
  13. import io.netty.bootstrap.ServerBootstrap;
  14. import io.netty.channel.Channel;
  15. import io.netty.channel.ChannelFutureListener;
  16. import io.netty.channel.ChannelHandlerContext;
  17. import io.netty.channel.ChannelInboundHandlerAdapter;
  18. import io.netty.channel.ChannelInitializer;
  19. import io.netty.channel.ChannelPipeline;
  20. import io.netty.channel.EventLoopGroup;
  21. import io.netty.channel.nio.NioEventLoopGroup;
  22. import io.netty.channel.socket.SocketChannel;
  23. import io.netty.channel.socket.nio.NioServerSocketChannel;
  24. import io.netty.handler.codec.http.DefaultFullHttpResponse;
  25. import io.netty.handler.codec.http.FullHttpRequest;
  26. import io.netty.handler.codec.http.HttpObjectAggregator;
  27. import io.netty.handler.codec.http.HttpResponseStatus;
  28. import io.netty.handler.codec.http.HttpServerCodec;
  29. import io.netty.handler.codec.http.HttpVersion;
  30. import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
  31. import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
  32. import io.netty.handler.stream.ChunkedWriteHandler;
  33. import io.springboot.netty.websocket.handler.WebsocketMessageHandler;
  34. /**
  35. * 初始化Netty服务
  36. * @author Administrator
  37. */
  38. @Component
  39. public class NettyBootsrapRunner implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware {
  40. private static final Logger LOGGER = LoggerFactory.getLogger(NettyBootsrapRunner.class);
  41. @Value("${netty.websocket.port}")
  42. private int port;
  43. @Value("${netty.websocket.ip}")
  44. private String ip;
  45. @Value("${netty.websocket.path}")
  46. private String path;
  47. @Value("${netty.websocket.max-frame-size}")
  48. private long maxFrameSize;
  49. private ApplicationContext applicationContext;
  50. private Channel serverChannel;
  51. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  52. this.applicationContext = applicationContext;
  53. }
  54. public void run(ApplicationArguments args) throws Exception {
  55. EventLoopGroup bossGroup = new NioEventLoopGroup();
  56. EventLoopGroup workerGroup = new NioEventLoopGroup();
  57. try {
  58. ServerBootstrap serverBootstrap = new ServerBootstrap();
  59. serverBootstrap.group(bossGroup, workerGroup);
  60. serverBootstrap.channel(NioServerSocketChannel.class);
  61. serverBootstrap.localAddress(new InetSocketAddress(this.ip, this.port));
  62. serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  63. @Override
  64. protected void initChannel(SocketChannel socketChannel) throws Exception {
  65. ChannelPipeline pipeline = socketChannel.pipeline();
  66. pipeline.addLast(new HttpServerCodec());
  67. pipeline.addLast(new ChunkedWriteHandler());
  68. pipeline.addLast(new HttpObjectAggregator(65536));
  69. pipeline.addLast(new ChannelInboundHandlerAdapter() {
  70. @Override
  71. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  72. if(msg instanceof FullHttpRequest) {
  73. FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
  74. String uri = fullHttpRequest.uri();
  75. if (!uri.equals(path)) {
  76. // 访问的路径不是 websocket的端点地址,响应404
  77. ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND))
  78. .addListener(ChannelFutureListener.CLOSE);
  79. return ;
  80. }
  81. }
  82. super.channelRead(ctx, msg);
  83. }
  84. });
  85. pipeline.addLast(new WebSocketServerCompressionHandler());
  86. pipeline.addLast(new WebSocketServerProtocolHandler(path, null, true, maxFrameSize));
  87. /**
  88. * 从IOC中获取到Handler
  89. */
  90. pipeline.addLast(applicationContext.getBean(WebsocketMessageHandler.class));
  91. }
  92. });
  93. Channel channel = serverBootstrap.bind().sync().channel();
  94. this.serverChannel = channel;
  95. LOGGER.info("websocket 服务启动,ip={},port={}", this.ip, this.port);
  96. channel.closeFuture().sync();
  97. } finally {
  98. bossGroup.shutdownGracefully();
  99. workerGroup.shutdownGracefully();
  100. }
  101. }
  102. public void onApplicationEvent(ContextClosedEvent event) {
  103. if (this.serverChannel != null) {
  104. this.serverChannel.close();
  105. }
  106. LOGGER.info("websocket 服务停止");
  107. }
  108. }

NettyBootsrapRunner 实现了 ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware 接口。

这样一来,NettyBootsrapRunner 可以在App的启动和关闭时执行Websocket服务的启动和关闭。而且通过 ApplicationContextAware 还能获取到 ApplicationContext

通过IOC管理 Netty 的Handler

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.SimpleChannelInboundHandler;
  7. import io.netty.channel.ChannelFutureListener;
  8. import io.netty.channel.ChannelHandler.Sharable;
  9. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  10. import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
  11. import io.netty.handler.codec.http.websocketx.WebSocketFrame;
  12. import io.springboot.netty.service.DiscardService;
  13. /**
  14. *
  15. * @author Administrator
  16. *
  17. */
  18. @Sharable
  19. @Component
  20. public class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
  21. private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketMessageHandler.class);
  22. @Autowired
  23. DiscardService discardService;
  24. @Override
  25. protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
  26. if (msg instanceof TextWebSocketFrame) {
  27. TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
  28. // 业务层处理数据
  29. this.discardService.discard(textWebSocketFrame.text());
  30. // 响应客户端
  31. ctx.channel().writeAndFlush(new TextWebSocketFrame("我收到了你的消息:" + System.currentTimeMillis()));
  32. } else {
  33. // 不接受文本以外的数据帧类型
  34. ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE);
  35. }
  36. }
  37. @Override
  38. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  39. super.channelInactive(ctx);
  40. LOGGER.info("链接断开:{}", ctx.channel().remoteAddress());
  41. }
  42. @Override
  43. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  44. super.channelActive(ctx);
  45. LOGGER.info("链接创建:{}", ctx.channel().remoteAddress());
  46. }
  47. }

handler已经是一个IOC管理的Bean,可以自由的使用依赖注入等Spring带来的快捷功能。由于是单例存在,所有的链接都使用同一个hander,所以尽量不要保存任何实例变量。

这个Handler处理完毕客户端的消息后,给客户端会响应一条:"我收到了你的消息:" + System.currentTimeMillis() 的消息

为了演示在Handler中使用业务层,这里假装注入了一个 DiscardService服务。它的逻辑很简单,就是丢弃消息

  1. public void discard (String message) {
  2. LOGGER.info("丢弃消息:{}", message);
  3. }

演示

启动客户端

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Websocket</title>
  6. </head>
  7. <body>
  8. </body>
  9. <script type="text/javascript">
  10. ;(function(){
  11. const websocket = new WebSocket('ws://localhost:1024/channel');
  12. websocket.onmessage = e => {
  13. console.log('收到消息:', e.data);
  14. }
  15. websocket.onclose = e => {
  16. let {code, reason} = e;
  17. console.log(`链接断开:code=${code}, reason=${reason}`);
  18. }
  19. websocket.onopen = () => {
  20. console.log(`链接建立...`);
  21. websocket.send('Hello');
  22. }
  23. websocket.onerror = e => {
  24. console.log('链接异常:', e);
  25. }
  26. })();
  27. </script>
  28. </html>

链接创建后就给服务端发送一条消息:Hello

关闭服务端

使用 PostMan 请求服务器的停机端点

日志

客户端日志

服务端日志

  1. 2020-06-22 17:08:22.728 INFO 9392 --- [ main] io.undertow : starting server: Undertow - 2.1.3.Final
  2. 2020-06-22 17:08:22.740 INFO 9392 --- [ main] org.xnio : XNIO version 3.8.0.Final
  3. 2020-06-22 17:08:22.752 INFO 9392 --- [ main] org.xnio.nio : XNIO NIO Implementation Version 3.8.0.Final
  4. 2020-06-22 17:08:22.839 INFO 9392 --- [ main] org.jboss.threads : JBoss Threads version 3.1.0.Final
  5. 2020-06-22 17:08:22.913 INFO 9392 --- [ main] o.s.b.w.e.undertow.UndertowWebServer : Undertow started on port(s) 80 (http)
  6. 2020-06-22 17:08:22.931 INFO 9392 --- [ main] io.springboot.netty.NettyApplication : Started NettyApplication in 4.536 seconds (JVM running for 5.175)
  7. 2020-06-22 17:08:23.653 INFO 9392 --- [ main] i.s.n.w.runner.NettyBootsrapRunner : websocket 服务启动,ip=0.0.0.0,port=1024
  8. 2020-06-22 17:08:28.484 INFO 9392 --- [ XNIO-1 task-1] io.undertow.servlet : Initializing Spring DispatcherServlet 'dispatcherServlet'
  9. 2020-06-22 17:08:28.484 INFO 9392 --- [ XNIO-1 task-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
  10. 2020-06-22 17:08:28.492 INFO 9392 --- [ XNIO-1 task-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 8 ms
  11. 2020-06-22 17:08:28.724 INFO 9392 --- [ntLoopGroup-3-1] i.s.n.w.handler.WebsocketMessageHandler : 链接创建:/0:0:0:0:0:0:0:1:12093
  12. 2020-06-22 17:08:28.790 INFO 9392 --- [ntLoopGroup-3-1] i.s.netty.service.DiscardService : 丢弃消息:Hello
  13. 2020-06-22 17:08:33.688 INFO 9392 --- [ Thread-232] i.s.n.w.runner.NettyBootsrapRunner : websocket 服务停止
  14. 2020-06-22 17:08:33.691 INFO 9392 --- [ntLoopGroup-3-1] i.s.n.w.handler.WebsocketMessageHandler : 链接断开:/0:0:0:0:0:0:0:1:12093
  15. 2020-06-22 17:08:33.699 INFO 9392 --- [ Thread-232] io.undertow : stopping server: Undertow - 2.1.3.Final
  16. 2020-06-22 17:08:33.704 INFO 9392 --- [ Thread-232] io.undertow.servlet : Destroying Spring FrameworkServlet 'dispatcherServlet'
  17. 2020-06-22 17:08:33.708 INFO 9392 --- [ Thread-232] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
  18. 复制代码

Netty会在SpringBoot App启动后启动,App停止后关闭,可以正常的对外提供服务 并且Handler交给IOC管理可以注入Service,完成业务处理。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/1006589
推荐阅读
相关标签
  

闽ICP备14008679号