赞
踩
目录
gateway网关内置了支持socket长链接的路由转发功能。
本篇主要讲解通过socket客户端、web页面两周方式建立socket,通过gateway网关路由到socket服务端的实现。
因为gateway默认使用netty,我们引入socket时,也适用netty。
网关端口设置为9990。
对于网关来讲,只需要在配置文件中添加以下配置:
- #服务名称
- spring:
- application:
- name: zhufeng-gateway-config
- cloud:
- gateway:
- routes:
- - id: zhufeng-route-socket
- uri: ws://127.0.0.1:8880
- predicates:
- - Path=/nio/info
请求地址以 ws 开头,上述配置意思是当请求 /nio/info 时,将请求转发到 ws:127.0.0.1:8880
pom依赖:
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <!-- <version>4.1.82.Final</version>-->
- <version>5.0.0.Alpha2</version>
- </dependency>
新建一个socket服务,端口为 8880
- public class WebSocketNettyServer {
-
- private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- public static void main(String[] args) {
-
- // 创建两个线程池
- // 主线程池
- NioEventLoopGroup mainGrp = new NioEventLoopGroup();
- // 从线程池
- NioEventLoopGroup subGrp = new NioEventLoopGroup();
-
- try {
- // 创建Netty服务器启动对象
- ServerBootstrap serverBootstrap = new ServerBootstrap();
-
- // 初始化服务器启动对象
- serverBootstrap
- // 指定使用上面创建的两个线程池
- .group(mainGrp, subGrp)
- // 指定Netty通道类型
- .channel(NioServerSocketChannel.class)
- // 指定通道初始化器用来加载当Channel收到事件消息后,
- // 如何进行业务处理
- .childHandler(new WebSocketChannelInitializer());
-
- // 绑定服务器端口,以同步的方式启动服务器
- ChannelFuture future = serverBootstrap.bind(8880).sync();
- // 等待服务器关闭
- future.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- // 优雅关闭服务器
- mainGrp.shutdownGracefully();
- subGrp.shutdownGracefully();
- }
- }
- }
自定义socket通道:
- public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{
- // 初始化通道
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // 获取管道,将一个一个的ChannelHandler添加到管道中
- ChannelPipeline pipeline = ch.pipeline();
-
- // 添加一个http的编解码器
- pipeline.addLast(new HttpServerCodec());
- // 添加一个用于支持大数据流的支持
- pipeline.addLast(new ChunkedWriteHandler());
- // 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
- pipeline.addLast(new HttpObjectAggregator(1024 * 1024));
-
- // 需要指定接收请求的路由
- // 必须使用以 /nio/info 后缀结尾的url才能访问
- pipeline.addLast(new WebSocketServerProtocolHandler("/nio/info"));
- // 添加自定义的Handler
- pipeline.addLast(new MyScocketHandler());
- }
- }
自定义socket消息处理:
- public class MyScocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
-
- /**用来保存所有的客户端连接*/
- public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
- /**时间格式化*/
- private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- /**
- * 当有新的客户端连接服务器之后,会自动调用这个方法
- * @param ctx
- * @throws Exception
- */
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- // 将新的通道加入到clients
- clients.add(ctx.channel());
- }
-
- @Override
- protected void messageReceived(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg) throws Exception {
- StringBuilder resMsg = new StringBuilder();
- // 获取客户端发送过来的文本消息
- String text = msg.text();
- System.out.println("获取客户端发送内容:" + text);
- //发送所有客户端
- /**
- for (Channel client : clients) {
- System.out.println("获取客户端:" + client.id());
- // 将消息发送到所有的客户端
- resMsg.append(text).append(",").append(channelHandlerContext.channel().id()).append(",").append(sdf.format(new Date()));
- client.writeAndFlush(new TextWebSocketFrame(resMsg.toString()));
- }
- */
- //发送指定客户端
- resMsg.append(text).append(",").append(channelHandlerContext.channel().id()).append(",").append(sdf.format(new Date()));
- channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(resMsg.toString()));
-
- }
- }
在低版本的netty依赖中 实现 channelRead0 方法来接收消息
高版本中使用 messageReceived 方法来接收消息
- public class WebSocketNettyClient {
-
- public static void main(String[] args) {
-
- EventLoopGroup group = new NioEventLoopGroup();
- final ClientHandler clientHandler =new ClientHandler();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(group).channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.SO_KEEPALIVE,true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- // 添加一个http的编解码器
- pipeline.addLast(new HttpClientCodec());
- // 添加一个用于支持大数据流的支持
- pipeline.addLast(new ChunkedWriteHandler());
- // 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
- pipeline.addLast(new HttpObjectAggregator(1024 * 1024));
-
- pipeline.addLast(clientHandler);
-
- }
- });
-
- URI websocketURI = new URI("ws://localhost:9990/nio/info");
- HttpHeaders httpHeaders = new DefaultHttpHeaders();
- //进行握手
- WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String)null, true,httpHeaders);
-
- final Channel channel=bootstrap.connect(websocketURI.getHost(),websocketURI.getPort()).sync().channel();
- clientHandler.setHandshaker(handshaker);
- handshaker.handshake(channel);
- //阻塞等待是否握手成功
- clientHandler.handshakeFuture().sync();
-
- //发送消息
- JSONObject userInfo = new JSONObject();
- userInfo.put("userId","u1001");
- userInfo.put("userName","月夜烛峰");
- channel.writeAndFlush(new TextWebSocketFrame(userInfo.toString()));
-
- // 等待连接被关闭
- channel.closeFuture().sync();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
-
- group.shutdownGracefully();
- }
-
- }
- }
上述代码中:
URI websocketURI = new URI("ws://localhost:9990/nio/info");
请求的是gateway的地址。
客户端逻辑处理:
- public class ClientHandler extends SimpleChannelInboundHandler<Object> {
-
- private WebSocketClientHandshaker handshaker;
- ChannelPromise handshakeFuture;
-
- /**
- * 当客户端主动链接服务端的链接后,调用此方法
- *
- * @param channelHandlerContext ChannelHandlerContext
- */
- @Override
- public void channelActive(ChannelHandlerContext channelHandlerContext) {
- System.out.println("客户端Active .....");
- handlerAdded(channelHandlerContext);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- System.err.println("exceptionCaught:异常信息:" + cause.getMessage());
- ctx.close();
- }
-
- public void setHandshaker(WebSocketClientHandshaker handshaker) {
- this.handshaker = handshaker;
- }
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) {
- this.handshakeFuture = ctx.newPromise();
- }
-
- public ChannelFuture handshakeFuture() {
- return this.handshakeFuture;
- }
-
- @Override
- protected void messageReceived(ChannelHandlerContext ctx, Object o) throws Exception {
- // 握手协议返回,设置结束握手
- if (!this.handshaker.isHandshakeComplete()) {
- FullHttpResponse response = (FullHttpResponse) o;
- this.handshaker.finishHandshake(ctx.channel(), response);
- this.handshakeFuture.setSuccess();
- System.out.println("握手成功::messageReceived HandshakeComplete...");
- return;
- } else if (o instanceof TextWebSocketFrame) {
- TextWebSocketFrame textFrame = (TextWebSocketFrame) o;
- System.out.println("接收消息::messageReceived textFrame: " + textFrame.text());
- } else if (o instanceof CloseWebSocketFrame) {
- System.out.println("关闭链接::messageReceived CloseWebSocketFrame");
- }
- }
- }
分别启动gateway网关、socket服务端、socket客户端
运行客户端后,控制台打印:
- 客户端Active .....
- 握手成功::messageReceived HandshakeComplete...
- 接收消息::messageReceived textFrame: {"userName":"月夜烛峰","userId":"u1001"},20a1a1e5,2022-09-26 15:52:09
服务端控制台:
客户端发送内容,服务端已经收到,说明socket链接已经建立并可以使用。
为了更直观测试,通过web socket来访问
- <!DOCTYPE html>
- <html>
- <head>
- <meta charset="utf-8">
- <title>websocket</title>
- <style>
- table {
- border-collapse: collapse;
- border-spacing: 0;
- table-layout: fixed;
- }
- </style>
- </head>
- <body>
-
- <div id="main" style="display: flex;margin-top:20px;">
- <div style="width: 20px"></div>
- <div style="border: 1px solid grey;width:700px;">
- <div class="card_header" style="padding: 0.5rem 1rem;border-bottom: 1px solid grey "><span>消息列表 </span>
- </div>
- <div class="card_header"
- style="padding: 0.5rem 1rem;font-size:15px;border-bottom: 1px dashed grey;height: 21px">
- <table style="width: 100%;">
- <thead>
- <tr>
- <th>消息内容</th>
- <th>发送者</th>
- <th>发送时间</th>
- </tr>
- </thead>
- </table>
- </div>
- <div id="socket_keep_alive" style="height: 300px;overflow-y:auto;padding: 0rem 1rem 1rem 1rem;font-size:16px">
- <table id="socket_keep_alive_table" style="width: 100%;">
- <tbody style="width: 100%;text-align: center;">
-
- </tbody>
- </table>
- </div>
- </div>
- <div style="width: 20px"></div>
- <form onsubmit="return false">
- <div id="show_jvm_info" style="border: 1px solid grey;width: 450px;">
- <div class="card_header" style="padding: 0.5rem 1rem;border-bottom: 1px solid grey;"><span>消息发送 </span>
- <button style="float: right;margin-right: 20px;"
- onclick="send(this.form.message.value);this.form.reset()">发送
- </button>
-
- </div>
- <textarea name="message" style="height: 345px; width: 445px;border: none;"></textarea>
- </div>
- </form>
-
- </div>
-
- </body>
- <script src="/js/jquery-1.8.3.min.js"></script>
- <script>
-
- function cleanInfo(id) {
- $("#" + id).empty();
- }
-
- function appendInfo(data, id) {
- var sinfo = data.split(",");
- var tableData = "<tr>";
- $.each(sinfo, function (k, v) {
- //console.log(k+':'+v);
- tableData = tableData + "<td>" + v + "</td>";
- })
- tableData = tableData + "</tr>";
- console.log(tableData);
- $("#" + id).prepend(tableData);
-
- }
-
- var socket;
- //判断当前浏览器是否支持websocket
- if (window.WebSocket) {
- //go on
- socket = new WebSocket("ws://localhost:9990/nio/info");
- //相当于channelReado, ev 收到服务器端回送的消息
- socket.onmessage = function (msg) {
-
- appendInfo(msg.data, "socket_keep_alive_table");
- }
-
- //相当于连接开启(感知到连接开启)
- socket.onopen = function (ev) {
- console.log("websocket 已打开");
- }
-
- //相当于连接关闭(感知到连接关闭)
- socket.onclose = function (ev) {
- console.log("websocket 已关闭");
- }
- } else {
- alert("当前浏览器不支持websocket")
- }
-
- //发送消息到服务器
- function send(message) {
- if (!window.socket) { //先判断socket是否创建
- return;
- }
- if (socket.readyState == WebSocket.OPEN) {
- //通过socket 发送消息
- socket.send(message)
- } else {
- alert("连接没有开启");
- }
- }
- </script>
-
- </html>
html页面中也访问gateway网关地址,访问页面:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。