赞
踩
服务器与浏览器之间实现通信,一般都是由浏览器发起http请求,服务端对http请求进行响应,要实现服务端主动向浏览器推送数据,一般采用的方案都是websocket主动推送,或者前端实现轮询方式拉取数据,轮询方式多少有点浪费资源,并且消息推送也不够及时。目前很多系统都是采用websocket协议进行主动推送数据给前端。在springboot中是支持websocket协议的,但是这里想讲的是通过netty实现websocket通信。
首先需要引入netty的依赖包
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.90.Final</version>
</dependency>
这里面已经包含了websocket协议相关的编解码。下面介绍两种方案使用websocket协议,一种是内置的处理ws消息,另外一种是自己实现相关消息的解析和处理。
首先介绍第一种使用,这种方案只需要用户自己定义一个handler实现消息的接收和业务处理,把处理结果返回给浏览器就可以了,大致代码逻辑如下:
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; /** * 用户自定义websocket消息处理handler * * @Author xingo * @Date 2023/11/21 */ public class UserWebsocketInHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception { String text = frame.text(); System.out.println(Thread.currentThread().getName() + "|" + text); ctx.writeAndFlush(new TextWebSocketFrame("server send message : " + text)); } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * websocket服务端 * * @Author xingo * @Date 2023/11/21 */ public class NettyWebsocketServer implements Runnable { /** * 服务端IP地址 */ private String ip; /** * 服务端端口号 */ private int port; public NettyWebsocketServer(String ip, int port) { this.ip = ip; this.port = port; } @Override public void run() { // 指定boss线程数:主要负责接收连接请求,一般设置为1就可以 final EventLoopGroup boss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger index = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NioBoss_%d", this.index.incrementAndGet())); } }); // 指定worker线程数:主要负责处理连接就绪的连接,一般设置为CPU的核心数 final int totalThread = 12; final EventLoopGroup worker = new NioEventLoopGroup(totalThread, new ThreadFactory() { private AtomicInteger index = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NioSelector_%d_%d", totalThread, this.index.incrementAndGet())); } }); // 指定任务处理线程数:主要负责读取数据和处理响应,一般该值设置的比较大,与业务相对应 final int jobThreads = 1024; final EventLoopGroup job = new DefaultEventLoopGroup(jobThreads, new ThreadFactory() { private AtomicInteger index = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NioJob_%d_%d", jobThreads, this.index.incrementAndGet())); } }); // 日志处理handler:类定义上面有Sharable表示线程安全,可以将对象定义在外面使用 final LoggingHandler LOGGING_HANDLER = new LoggingHandler(); // 指定服务端bootstrap ServerBootstrap server = new ServerBootstrap(); server.group(boss, worker) // 指定通道类型 .channel(NioServerSocketChannel.class) // 指定全连接队列大小:windows下默认是200,linux/mac下默认是128 .option(ChannelOption.SO_BACKLOG, 2048) // 维持链接的活跃,清除死链接 .childOption(ChannelOption.SO_KEEPALIVE, true) // 关闭延迟发送 .childOption(ChannelOption.TCP_NODELAY, true) // 添加handler处理链 .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); // 日志处理 pipeline.addLast(LOGGING_HANDLER); // 心跳检测:读超时时间、写超时时间、全部超时时间(单位是秒,0表示不处理) pipeline.addLast(new IdleStateHandler(30,0,0, TimeUnit.SECONDS)); pipeline.addLast(new ChannelDuplexHandler() { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; System.out.println("心跳事件 : " + event.state()); super.userEventTriggered(ctx, evt); } }); // 处理http请求的编解码器 pipeline.addLast(job, "httpServerCodec", new HttpServerCodec()); pipeline.addLast(job, "chunkedWriteHandler", new ChunkedWriteHandler()); pipeline.addLast(job, "httpObjectAggregator", new HttpObjectAggregator(65536)); // 处理websocket的编解码器 pipeline.addLast(job, "webSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/", "WebSocket", true, 655360)); // 自定义处理器 pipeline.addLast(job, "userInHandler", new UserWebsocketInHandler()); } }); try { // 服务端绑定对外服务地址 ChannelFuture future = server.bind(ip, port).sync(); System.out.println("netty server start ok."); // 等待服务关闭,关闭后释放相关资源 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); job.shutdownGracefully(); } } public static void main(String[] args) { new Thread(new NettyWebsocketServer("127.0.0.1", 8899)).start(); } }
以上就实现了websocket服务端,客户端连接到服务端实现双向通信。
另外一种实现方式是自己定义一个handler用于ws协议数据的解析和处理,这样协议的整个处理过程对于用户来说很清楚明白,下面是实现的逻辑代码:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.AttributeKey; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; /** * * * @Author xingo * @Date 2023/11/21 */ @Slf4j public class WebsocketServerHandler extends SimpleChannelInboundHandler<Object> { private WebSocketServerHandshaker handshaker; public WebsocketServerHandler() { } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) { if (msg instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg.retain()); return; } if (msg instanceof PingWebSocketFrame) { log.info("websocket ping message"); ctx.channel().write(new PingWebSocketFrame(msg.content().retain())); } else if (msg instanceof TextWebSocketFrame) { // websocket消息解压成字符串让下一个handler处理 String text = ((TextWebSocketFrame) msg).text(); log.info("请求数据|{}", text); // 如果不调用这个方法后面的handler就获取不到数据 ctx.fireChannelRead(text); } else { log.error("不支持的消息格式"); throw new UnsupportedOperationException("不支持的消息格式"); } } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest msg) { if (!msg.decoderResult().isSuccess() || (!"websocket".equalsIgnoreCase(msg.headers().get(HttpHeaderNames.UPGRADE)))) { sendHttpResponse(ctx, msg, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsShakerFactory = new WebSocketServerHandshakerFactory( "ws://" + msg.headers().get(HttpHeaderNames.HOST), null, false); handshaker = wsShakerFactory.newHandshaker(msg); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { // 建立websocket连接握手 handshaker.handshake(ctx.channel(), msg); ctx.channel().attr(AttributeKey.valueOf("add")).set(Boolean.TRUE); } } private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest msg, DefaultFullHttpResponse response) { if (response.status().code() != HttpResponseStatus.OK.code()) { ByteBuf buf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8); response.content().writeBytes(buf); buf.release(); } ChannelFuture cf = ctx.channel().writeAndFlush(response); if (!HttpUtil.isKeepAlive(msg) || response.status().code() != HttpResponseStatus.OK.code()) { cf.addListener(ChannelFutureListener.CLOSE); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().attr(AttributeKey.valueOf("add")).set(Boolean.FALSE); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.channel().attr(AttributeKey.valueOf("add")).set(Boolean.FALSE); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { handleWebSocketFrame(ctx, (WebSocketFrame) msg); } } }
上面对ws协议进行了处理,处理后的数据直接解析成字符串给后续的handler。
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; /** * 入站处理器:获取请求数据,完成业务处理,推送消息给浏览器 * * @Author xingo * @Date 2023/11/21 */ public class UserWebsocketInHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(Thread.currentThread().getName() + "|" + msg); // ctx.writeAndFlush(new TextWebSocketFrame("server send message : " + msg)); ctx.writeAndFlush("server send message : " + msg); } }
import io.netty.channel.*; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; /** * 出站处理器:判断数据是否需要进行封装 * * @Author xingo * @Date 2023/11/21 */ public class UserWebsocketOutHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if(msg instanceof String) { ctx.write(new TextWebSocketFrame((String) msg), promise); } else { super.write(ctx, msg, promise); } } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * websocket服务端 * * @Author xingo * @Date 2023/11/21 */ public class NettyWebsocketServer implements Runnable { /** * 服务端IP地址 */ private String ip; /** * 服务端端口号 */ private int port; public NettyWebsocketServer(String ip, int port) { this.ip = ip; this.port = port; } @Override public void run() { // 指定boss线程数:主要负责接收连接请求,一般设置为1就可以 final EventLoopGroup boss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger index = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NioBoss_%d", this.index.incrementAndGet())); } }); // 指定worker线程数:主要负责处理连接就绪的连接,一般设置为CPU的核心数 final int totalThread = 12; final EventLoopGroup worker = new NioEventLoopGroup(totalThread, new ThreadFactory() { private AtomicInteger index = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NioSelector_%d_%d", totalThread, this.index.incrementAndGet())); } }); // 指定任务处理线程数:主要负责读取数据和处理响应,一般该值设置的比较大,与业务相对应 final int jobThreads = 1024; final EventLoopGroup job = new DefaultEventLoopGroup(jobThreads, new ThreadFactory() { private AtomicInteger index = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NioJob_%d_%d", jobThreads, this.index.incrementAndGet())); } }); // 日志处理handler:类定义上面有Sharable表示线程安全,可以将对象定义在外面使用 final LoggingHandler LOGGING_HANDLER = new LoggingHandler(); // 指定服务端bootstrap ServerBootstrap server = new ServerBootstrap(); server.group(boss, worker) // 指定通道类型 .channel(NioServerSocketChannel.class) // 指定全连接队列大小:windows下默认是200,linux/mac下默认是128 .option(ChannelOption.SO_BACKLOG, 2048) // 维持链接的活跃,清除死链接 .childOption(ChannelOption.SO_KEEPALIVE, true) // 关闭延迟发送 .childOption(ChannelOption.TCP_NODELAY, true) // 添加handler处理链 .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); // 日志处理 pipeline.addLast(LOGGING_HANDLER); // 心跳检测:读超时时间、写超时时间、全部超时时间(单位是秒,0表示不处理) pipeline.addLast(new IdleStateHandler(30,0,0, TimeUnit.SECONDS)); pipeline.addLast(new ChannelDuplexHandler() { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; System.out.println("心跳事件 : " + event.state()); super.userEventTriggered(ctx, evt); } }); // 处理http请求的编解码器 pipeline.addLast(job, "httpServerCodec", new HttpServerCodec()); pipeline.addLast(job, "chunkedWriteHandler", new ChunkedWriteHandler()); pipeline.addLast(job, "httpObjectAggregator", new HttpObjectAggregator(65536)); // 处理websocket的编解码器 pipeline.addLast(job, "websocketHandler", new WebsocketServerHandler()); // 自定义处理器 pipeline.addLast(job, "userOutHandler", new UserWebsocketOutHandler()); pipeline.addLast(job, "userInHandler", new UserWebsocketInHandler()); } }); try { // 服务端绑定对外服务地址 ChannelFuture future = server.bind(ip, port).sync(); System.out.println("netty server start ok."); // 等待服务关闭,关闭后释放相关资源 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); job.shutdownGracefully(); } } public static void main(String[] args) { new Thread(new NettyWebsocketServer("127.0.0.1", 8899)).start(); } }
上面这种方式同样实现了websocket通信,并且可以清楚的知道连接创建和数据交互的整个过程。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。