赞
踩
Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
那么如何和springboot这个比较流行的框架进行整合呢?Netty与SpringBoot的整合,我想无非就是要整合几个地方
让netty跟springboot生命周期保持一致,同生共死
让netty能用上ioc中的Bean
让netty能读取到全局的配置
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
是的,不用声明版本号。因为 spring-boot-dependencies 中已经声明了最新的netty依赖。
配置Netty的配置类
server: port: 80 logging: level: root: DEBUG management: endpoints: web: exposure: include: "*" endpoint: shutdown: enabled: true netty: websocket: # Websocket服务端口 port: 1024 # 绑定的网卡 ip: 0.0.0.0 # 消息帧最大体积 max-frame-size: 10240 # URI路径 path: /channel
通过 ApplicationRunner 启动Websocket服务
import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextClosedEvent; import org.springframework.stereotype.Component; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.springboot.netty.websocket.handler.WebsocketMessageHandler; /** * 初始化Netty服务 * @author Administrator */ @Component public class NettyBootsrapRunner implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware { private static final Logger LOGGER = LoggerFactory.getLogger(NettyBootsrapRunner.class); @Value("${netty.websocket.port}") private int port; @Value("${netty.websocket.ip}") private String ip; @Value("${netty.websocket.path}") private String path; @Value("${netty.websocket.max-frame-size}") private long maxFrameSize; private ApplicationContext applicationContext; private Channel serverChannel; public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } public void run(ApplicationArguments args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.localAddress(new InetSocketAddress(this.ip, this.port)); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof FullHttpRequest) { FullHttpRequest fullHttpRequest = (FullHttpRequest) msg; String uri = fullHttpRequest.uri(); if (!uri.equals(path)) { // 访问的路径不是 websocket的端点地址,响应404 ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND)) .addListener(ChannelFutureListener.CLOSE); return ; } } super.channelRead(ctx, msg); } }); pipeline.addLast(new WebSocketServerCompressionHandler()); pipeline.addLast(new WebSocketServerProtocolHandler(path, null, true, maxFrameSize)); /** * 从IOC中获取到Handler */ pipeline.addLast(applicationContext.getBean(WebsocketMessageHandler.class)); } }); Channel channel = serverBootstrap.bind().sync().channel(); this.serverChannel = channel; LOGGER.info("websocket 服务启动,ip={},port={}", this.ip, this.port); channel.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public void onApplicationEvent(ContextClosedEvent event) { if (this.serverChannel != null) { this.serverChannel.close(); } LOGGER.info("websocket 服务停止"); } }
NettyBootsrapRunner 实现了 ApplicationRunner, ApplicationListener, ApplicationContextAware 接口。
这样一来,NettyBootsrapRunner 可以在App的启动和关闭时执行Websocket服务的启动和关闭。而且通过 ApplicationContextAware 还能获取到 ApplicationContext
WebsocketMessageHandler
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.springboot.netty.service.DiscardService; /** * * @author Administrator * */ @Sharable @Component public class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketMessageHandler.class); @Autowired DiscardService discardService; @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception { if (msg instanceof TextWebSocketFrame) { TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg; // 业务层处理数据 this.discardService.discard(textWebSocketFrame.text()); // 响应客户端 ctx.channel().writeAndFlush(new TextWebSocketFrame("我收到了你的消息:" + System.currentTimeMillis())); } else { // 不接受文本以外的数据帧类型 ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); LOGGER.info("链接断开:{}", ctx.channel().remoteAddress()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); LOGGER.info("链接创建:{}", ctx.channel().remoteAddress()); } }
handler已经是一个IOC管理的Bean,可以自由的使用依赖注入等Spring带来的快捷功能。由于是单例存在,所有的链接都使用同一个hander,所以尽量不要保存任何实例变量。
这个Handler处理完毕客户端的消息后,给客户端会响应一条:“我收到了你的消息:” + System.currentTimeMillis() 的消息
为了演示在Handler中使用业务层,这里假装注入了一个 DiscardService 服务。它的逻辑很简单,就是丢弃消息
代码如下(示例):
public void discard (String message) {
LOGGER.info("丢弃消息:{}", message);
}
启动客户端
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Websocket</title> </head> <body> </body> <script type="text/javascript"> ;(function(){ const websocket = new WebSocket('ws://localhost:1024/channel'); websocket.onmessage = e => { console.log('收到消息:', e.data); } websocket.onclose = e => { let {code, reason} = e; console.log(`链接断开:code=${code}, reason=${reason}`); } websocket.onopen = () => { console.log(`链接建立...`); websocket.send('Hello'); } websocket.onerror = e => { console.log('链接异常:', e); } })(); </script> </html>
链接创建后就给服务端发送一条消息:Hello。Netty会在SpringBoot App启动后启动,App停止后关闭,可以正常的对外提供服务 并且Handler交给IOC管理可以注入Service,完成业务处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。