赞
踩
下面是使用Netty一个服务,基本包含Netty的核心使用,直接上代码
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.60.Final</version>
</dependency>
package com.service.modular.im.component.config; import com.mountain.api.modular.coder.WebMessageDecoder; import com.mountain.api.modular.coder.WebMessageEncoder; import com.mountain.common.core.constant.SystemConstants; import com.mountain.service.modular.im.component.config.properties.ImProperties; import com.mountain.service.modular.im.component.decorator.HttpRequestHandler; import com.mountain.service.modular.im.component.decorator.IMNioSocketHandler; import com.mountain.service.modular.im.component.decorator.LoggingHandler; import com.mountain.service.modular.im.component.decorator.ImCommandHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; import java.time.Duration; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @Slf4j public class NettyServer { private final ThreadFactory bossThreadFactory; private final ThreadFactory workerThreadFactory; private EventLoopGroup webBossGroup; private EventLoopGroup webWorkerGroup; private final ImCommandHandler outerRequestHandler; private final ChannelHandler loggingHandler = new LoggingHandler(); private final Integer webPort; public final Duration writeIdle; public final Duration readIdle; public NettyServer(ImProperties imProperties, ImCommandHandler imCommandHandler) { this.webPort = imProperties.getWeb().getPort(); this.writeIdle = Duration.ofSeconds(imProperties.getWeb().getWriteIdleTime()); this.readIdle = Duration.ofSeconds(imProperties.getWeb().getReaderIdleTime()); this.outerRequestHandler = imCommandHandler; this.bossThreadFactory = r -> { Thread thread = new Thread(r); thread.setName("nio-boss-"); return thread; }; this.workerThreadFactory = r -> { Thread thread = new Thread(r); thread.setName("nio-worker-"); return thread; }; } /** * 启动 */ public void bind() { try { if (this.webPort != null) { this.bindWebPort(); } } catch (InterruptedException e) { log.error("启动netty失败,InterruptedException!", e); Thread.currentThread().interrupt(); } catch (Exception e) { log.error("启动netty失败!", e); } } /** * web端 */ public void bindWebPort() throws InterruptedException { this.createWebEventGroup(); ServerBootstrap bootstrap = this.createServerBootstrap(this.webBossGroup, this.webWorkerGroup); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { try { log.info("收到新连接:{}", ch.remoteAddress()); ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast(NettyServer.this.loggingHandler); // 设置定时时间 ch.pipeline().addLast(new IdleStateHandler(NettyServer.this.readIdle.getSeconds(), NettyServer.this.writeIdle.getSeconds(), 0L, TimeUnit.SECONDS)); // 编码解码器,数据处理 ch.pipeline().addLast("decoder", new WebMessageDecoder()); ch.pipeline().addLast("encoder", new WebMessageEncoder()); // 消息处理类 ch.pipeline().addLast("handler", new HttpRequestHandler()); ch.pipeline().addLast("handler1", new IMNioSocketHandler(outerRequestHandler)); ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10)); } catch (Exception e) { log.error("通信异常!", e); } } }); // 服务器异步创建绑定,打印日志,关闭通道 ChannelFuture channelFuture = bootstrap.bind(this.webPort).sync(); channelFuture.channel().newSucceededFuture().addListener(future -> log.info("netty服务端启动成功,端口:{}", channelFuture.channel().localAddress())); channelFuture.channel().closeFuture().addListener(future -> this.destroy(this.webBossGroup, this.webWorkerGroup)); } /** * 创建web管道 */ private void createWebEventGroup() { if (SystemConstants.IS_LINUX) { this.webBossGroup = new EpollEventLoopGroup(this.bossThreadFactory); this.webWorkerGroup = new EpollEventLoopGroup(this.workerThreadFactory); } else { this.webBossGroup = new NioEventLoopGroup(this.bossThreadFactory); this.webWorkerGroup = new NioEventLoopGroup(this.workerThreadFactory); } } /** * 初始化 * * @param bossGroup 监听线程组 * @param workerGroup 处理客户端相关操作线程组,负责处理与客户端的数据通讯 * @return ServerBootstrap */ private ServerBootstrap createServerBootstrap(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { ServerBootstrap bootstrap = new ServerBootstrap(); // 绑定线程池 bootstrap.group(bossGroup, workerGroup); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 指定使用的channel,设置非阻塞,用它来建立新accept的连接,用于构造serverSocketChannel的工厂类 bootstrap.channel(SystemConstants.IS_LINUX ? EpollServerSocketChannel.class : NioServerSocketChannel.class); return bootstrap; } /** * bean销毁方法,也可使用@PreDestroy */ public void destroy() { this.destroy(this.webBossGroup, this.webWorkerGroup); } /** * 释放 NIO 线程组 * * @param bossGroup tcp连接线程池 * @param workerGroup io处理线程池 */ public void destroy(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { if (bossGroup != null && !bossGroup.isShuttingDown() && !bossGroup.isShutdown()) { try { bossGroup.shutdownGracefully(); } catch (Exception var5) { var5.printStackTrace(); } } if (workerGroup != null && !workerGroup.isShuttingDown() && !workerGroup.isShutdown()) { try { workerGroup.shutdownGracefully(); } catch (Exception var4) { var4.printStackTrace(); } } } }
自定义处理类,处理http请求
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private WebSocketServerHandshaker handshaker; @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { log.info("HttpRequestHandler连接:{},{}", ctx, request); // 如果HTTP解码失败,返回HHTP异常 if (!request.decoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) { //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端 sendHttpResponse(ctx, request, new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } // 构造握手响应返回 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://" + ctx.channel(), null, false); handshaker = wsFactory.newHandshaker(request); //SocketConstant.webSocketHandshakerMap.put(ctx.channel().id().asLongText(),handshaker); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), request); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); log.error("NettyWebSocketParamHandler.exceptionCaught --> cause: ", cause); ctx.close(); } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } // 如果是非Keep-Alive,关闭连接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!HttpHeaders.isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } }
处理WebSocketFrame格式的请求数据,有IETF发布的WebSocket RFC,定义了6种帧,Netty为它们都提供了一个POJO实现。
BinaryWebSocketFrame——包含了二进制数据
TextWebSocketFrame——包含了文本数据
ContinuationWebSocketFrame——包含属于上一个BinaryWebSocketFrame或TextWebSocketFrame的文本数据或者二进制数据
CloseWebSocketFrame——表示一个CLOSE请求,包含一个关闭的状态码和关闭的原因
PingWebSocketFrame——请求传输一个PongWebSocketFrame
PongWebSocketFrame——作为一个对于PingWebSocketFrame的响应被发送
TextWebSocketFrame是我们唯一真正需要处理的帧类型。为了符合WebSocket RFC,Netty提供了WebSocketServerProtocolHandler来处理其他类型的帧。
import com.mountain.api.modular.constant.ChannelAttr; import com.mountain.api.modular.vo.SentBody; import com.mountain.service.modular.im.component.group.MyChannelGroup; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import java.util.Date; /** * @author 123 */ @Slf4j @Sharable public class IMNioSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private WebSocketServerHandshaker handshaker; /** * 自定义处理Handler */ private final ImCommandHandler outerRequestHandler; public IMNioSocketHandler(ImCommandHandler outerRequestHandler) { this.outerRequestHandler = outerRequestHandler; } /** * 客户端与服务端通信通道开启 */ @Override public void channelActive(ChannelHandlerContext ctx) { log.info("客户端与服务端连接开启:" + ctx.channel().remoteAddress().toString()); ctx.channel().attr(ChannelAttr.ID).set(ctx.channel().id().asShortText()); MyChannelGroup.group.add(ctx.channel()); } /** * 客户端断开通道 */ @Override public void channelInactive(ChannelHandlerContext ctx) { log.info("客户端与服务端连接关闭:" + ctx.channel().remoteAddress().toString()); if (ctx.channel().attr(ChannelAttr.UID) != null) { SentBody body = new SentBody(); body.setKey("client_closed"); this.outerRequestHandler.process(ctx.channel(), body); } MyChannelGroup.group.remove(ctx.channel()); } /** * 通道读取完成,收到通知 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame body) { log.info("接收消息:{}", body); if (body != null) { log.info("类型WebSocketFrame:{}", body); WebSocketFrame frame = (WebSocketFrame) body; // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { //handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); ctx.channel().close(); return; } // 判断是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 仅支持文本消息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException( String.format("%s frame types not supported", frame.getClass().getName())); } // 返回应答消息 String request = ((TextWebSocketFrame) frame).text(); log.info("服务端2收到:" + ctx.channel() + "," + request); TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request); // 返回【谁发的发给谁】 ctx.channel().writeAndFlush(tws); } //其他连接 //this.outerRequestHandler.process(ctx.channel(), body); } /** * 超时处理 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("读写事件超时,ctx:{},event:{}", ctx, evt); if (!(evt instanceof IdleStateEvent)) { super.userEventTriggered(ctx, evt); return; } IdleStateEvent idleEvent = (IdleStateEvent) evt; String uid = (String) ctx.channel().attr(ChannelAttr.UID).get(); if (idleEvent.state() == IdleState.WRITER_IDLE && uid == null) { //ctx.channel().close(); return; } //读事件 if (IdleState.READER_IDLE == idleEvent.state()) { Integer pingCount = ctx.channel().attr(ChannelAttr.PING_COUNT).get(); ctx.channel().attr(ChannelAttr.PING_COUNT).set(pingCount == null ? 1 : (pingCount + 1)); if (pingCount != null && pingCount >= 2) { log.info("{} pong timeout.", ctx.channel()); ctx.channel().close(); } } //写事件 if (IdleState.WRITER_IDLE == idleEvent.state()) { // Integer pingCount = ctx.channel().attr(ChannelAttr.PING_COUNT).get(); // ctx.channel().attr(ChannelAttr.PING_COUNT).set(pingCount == null ? 1 : pingCount + 1); // ctx.channel().writeAndFlush(Ping.getInstance()); } } /** * 异常处理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.warn("EXCEPTION", cause); ctx.channel().close(); } }
到此集成netty基本流程结束
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。