赞
踩
Spring Cloud Gateway 管理 Netty : https://editor.csdn.net/md/?articleId=105424618
上篇文章中简单概述了一下网关路由长连接的思路
这篇文章来贴一下代码
- Java 通过 SocketClient 连接 Netty
- VUE 通过 WebSocket 连接 Netty
本文章中有关 Netty 和 Client 以及 Socket 的代码仅作为网关路由长连接案例,非网关路由请查看直连文档,以免出现无法预想的问题。
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import java.net.InetSocketAddress; /** * Netty 服务端 * @author cuixiaojian */ @Slf4j @Configuration public class NettyServer { /** * Netty 端口号 */ @Value("${netty.port}") private Integer nettyPort; /** * 启动方法 * @throws Exception */ public void startServer() throws Exception { // Netty 线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup wokerGroup = new NioEventLoopGroup(); try{ // Netty 启动类 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.SO_KEEPALIVE,true) .option(ChannelOption.SO_BACKLOG,1024 * 1024 * 10) // 设置处理器 .childHandler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("http-codec",new HttpServerCodec()); pipeline.addLast("http-chunked",new ChunkedWriteHandler()); pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*1024)); pipeline.addLast(new WebSocketServerProtocolHandler("/notice",null,true,65535)); // 自定义处理器 pipeline.addLast(new WebSocketHandle()); } }); ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(nettyPort)).sync(); if (channelFuture.isSuccess()) { log.info("Netty服务端启动成功, 端口号:{}",nettyPort); } channelFuture.channel().closeFuture().sync(); } catch (Exception e) { log.error("Netty服务端启动失败,异常信息为: {}",e); } finally { bossGroup.shutdownGracefully(); wokerGroup.shutdownGracefully(); } } }
import com.xnyzc.lhy.common.util.CheckUtil; import com.xnyzc.lhy.notice.entity.message.SysSocketUser; import com.xnyzc.lhy.notice.netty.param.NettyMessage; import com.xnyzc.lhy.notice.netty.util.NettyMsg; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * 自定义的handler类 * @author cuixiaojian */ @Slf4j @Configuration public class WebSocketHandle extends SimpleChannelInboundHandler<Object> { //客户端组 public static ChannelGroup channelGroup; static { channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } /** * 存储 Channel 的容器 */ private static ConcurrentMap<String, Channel> channelMap = new ConcurrentHashMap<>(); /** * Handler活跃状态,连接成功 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 将通道放入组 channelGroup.add(ctx.channel()); } /** * 通道读取 */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // 文本消息 (约定 与客户端 Socket 消息类型为文本消息) if (msg instanceof TextWebSocketFrame) { // 获取当前channel绑定的IP地址 InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); String address = ipSocket.getAddress().getHostAddress(); log.info("获取到远程连接:IP:{}", address); TextWebSocketFrame textFrame = (TextWebSocketFrame) msg; String message = textFrame.text(); log.info("收到来自客户端消息: {}", message); // 此处 NettyMsg 类为自定义消息解析器,通过 message 串来解析消息。 NettyMessage nettyMessage = NettyMsg.getNettyMessage(message); // 解析成为自定义消息对象 SysSocketUser sysSocketUser = NettyMsg.getCon(nettyMessage.getCon(), SysSocketUser.class); // 与客户端约定建立连接成功时,发送一条包含用户ID的消息,此处从消息中获取用户ID与Channel绑定,后续为用户发送消息时使用。 if (CheckUtil.objIsNotEmpty(sysSocketUser)) { if (CheckUtil.objIsNotEmpty(sysSocketUser.getUserId())) { //将 用户 和 Channel 的关系保存 if (!channelMap.containsKey(sysSocketUser.getUserId())) { channelMap.put(sysSocketUser.getUserId(), ctx.channel()); } } } } // PING 类型消息 if (msg instanceof PongWebSocketFrame) { log.info("PING SUCCESS"); } // 请求关闭连接类型消息 if (msg instanceof CloseWebSocketFrame) { log.info("客户端关闭连接,服务端关闭通道"); Channel channel = ctx.channel(); channel.close(); } } /** * 未注册状态 */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("等待连接"); } /** * 非活跃状态,没有连接远程主机的时候。 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("客户端关闭"); channelGroup.remove(ctx.channel()); } /** * 异常处理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("连接异常:" + cause.getMessage()); ctx.close(); } /** * 给指定用户发内容 * 可以使用此方法推送消息给客户端 */ public void sendMessage(String userId, String message) { Channel channel = channelMap.get(userId); channel.writeAndFlush(new TextWebSocketFrame(message)); } /** * 群发消息 */ public void sendMessageAll(String message) { channelGroup.writeAndFlush(new TextWebSocketFrame(message)); } }
public class NoticeApplication {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(NoticeApplication.class, args);
NettyServer nettyServer = context.getBean(NettyServer.class);
try {
nettyServer.startServer();
} catch (Exception e) {
System.out.println("netty 启动失败");
}
}
}
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketVersion; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import java.net.URI; /** * Netty - WebSocket 客户端 * @author cuixiaojian */ @Slf4j public class NettyClient { public static void main(String[] args) throws Exception { // Netty 线程组 EventLoopGroup group = new NioEventLoopGroup(); // Netty 启动类 Bootstrap boot = new Bootstrap(); boot.option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .group(group) .handler(new LoggingHandler(LogLevel.INFO)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("http-codec",new HttpClientCodec()); pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*10)); pipeline.addLast("hookedHandler", new WebSocketClientHandler()); } }); // 这里的路径即为网关中配置的路由路径 URI webSocketURI = new URI("ws://localhost:30000/nio/v1"); HttpHeaders httpHeaders = new DefaultHttpHeaders(); // 握手 WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(webSocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders); // 连接通道 final Channel channel = boot.connect(webSocketURI.getHost(), webSocketURI.getPort()).sync().channel(); WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("hookedHandler"); handler.setHandshaker(handshaker); handshaker.handshake(channel); // 阻塞等待是否握手成功 handler.handshakeFuture().sync(); // 给服务端发送的内容,如果客户端与服务端连接成功后,可以多次掉用这个方法发送消息 sendMessage(channel); } private static void sendMessage(Channel channel){ //发送的内容,是一个文本格式的内容 // 此处NettyMsg为自定义消息解析器,setJsonMsg为设置一条Json串消息 String putMessage = NettyMsg.setJsonMsg("NOTICE", "{\"userId\": \"123456\"}"); TextWebSocketFrame frame = new TextWebSocketFrame(putMessage); channel.writeAndFlush(frame).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("消息发送成功,发送的消息是:" + putMessage); } else { System.out.println("消息发送失败 " + channelFuture.cause().getMessage()); } } }); } }
import io.netty.channel.*; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.CharsetUtil; /** * @author cuixiaojian */ public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { /** * 握手的状态信息 */ private WebSocketClientHandshaker handshaker; /** * netty自带的异步处理 */ private ChannelPromise handshakeFuture; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("当前握手的状态"+this.handshaker.isHandshakeComplete()); Channel ch = ctx.channel(); FullHttpResponse response; // 进行握手操作 if (!this.handshaker.isHandshakeComplete()) { try { response = (FullHttpResponse)msg; // 握手协议返回,设置结束握手 this.handshaker.finishHandshake(ch, response); // 设置成功 this.handshakeFuture.setSuccess(); System.out.println("服务端的消息"+response.headers()); } catch (WebSocketHandshakeException var7) { FullHttpResponse res = (FullHttpResponse)msg; String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8)); this.handshakeFuture.setFailure(new Exception(errorMsg)); } } else if (msg instanceof FullHttpResponse) { response = (FullHttpResponse)msg; throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); } else { // 接收服务端的消息 WebSocketFrame frame = (WebSocketFrame)msg; // 文本信息 if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame)frame; System.out.println("客户端接收的消息是:"+textFrame.text()); } // 二进制信息 if (frame instanceof BinaryWebSocketFrame) { BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame; System.out.println("BinaryWebSocketFrame"); } // PING信息 if (frame instanceof PongWebSocketFrame) { System.out.println("WebSocket Client received pong"); } // 关闭消息 if (frame instanceof CloseWebSocketFrame) { System.out.println("receive close frame"); ch.close(); } } } /** * Handler活跃状态,表示连接成功 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("与服务端连接成功"); } /** * 非活跃状态,没有连接远程主机的时候。 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("主机关闭"); } /** * 异常处理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("连接异常:"+cause.getMessage()); ctx.close(); } @Override public void handlerAdded(ChannelHandlerContext ctx) { this.handshakeFuture = ctx.newPromise(); } public WebSocketClientHandshaker getHandshaker() { return handshaker; } void setHandshaker(WebSocketClientHandshaker handshaker) { this.handshaker = handshaker; } public ChannelPromise getHandshakeFuture() { return handshakeFuture; } public void setHandshakeFuture(ChannelPromise handshakeFuture) { this.handshakeFuture = handshakeFuture; } ChannelFuture handshakeFuture() { return this.handshakeFuture; }
// 初始化weosocket initWebSocket() { // 设置网关路由地址 const wsuri = 'ws://127.0.0.1:30000/nio/v1' this.websock = new WebSocket(wsuri) this.websock.onmessage = this.websocketonmessage this.websock.onopen = this.websocketonopen this.websock.onerror = this.websocketonerror this.websock.onclose = this.websocketclose }, // 连接建立之后执行send方法发送数据 websocketonopen() { const actions = { 'code': '0', 'method': 'NOTICE', 'msg': '', 'con': '{\'userId\':\'1\'}' } this.websocketsend(JSON.stringify(actions)) }, // 连接建立失败重连 websocketonerror() { // this.initWebSocket() }, // 数据接收 websocketonmessage(e) { // 解析为Json消息 // const redata = JSON.parse(e.data) // element-ui 通知组件 this.$notify({ title: '通知', message: '您有一条新消息' }) // 自定义接收到消息后的处理 this.getMessage() }, // 数据发送 websocketsend(Data) { this.websock.send(Data) }, // 关闭 websocketclose(e) { console.log('断开连接', e) }
mounted() {
this.initWebSocket()
},
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。