当前位置:   article > 正文

从0到1学SpringCloud——16 gateway websocket长链接_spring cloud gateway websocket

spring cloud gateway websocket

目录

一、前言

二、代码实现

1、gateway网关配置

2、socket server 服务端

3、socket client客户端

4、模拟测试

5、集成 web socket


一、前言

gateway网关内置了支持socket长链接的路由转发功能。

本篇主要讲解通过socket客户端、web页面两周方式建立socket,通过gateway网关路由到socket服务端的实现。

因为gateway默认使用netty,我们引入socket时,也适用netty。

二、代码实现

1、gateway网关配置

网关端口设置为9990。

对于网关来讲,只需要在配置文件中添加以下配置:

  1. #服务名称
  2. spring:
  3. application:
  4. name: zhufeng-gateway-config
  5. cloud:
  6. gateway:
  7. routes:
  8. - id: zhufeng-route-socket
  9. uri: ws://127.0.0.1:8880
  10. predicates:
  11. - Path=/nio/info

请求地址以 ws 开头,上述配置意思是当请求 /nio/info 时,将请求转发到 ws:127.0.0.1:8880 

2、socket server 服务端

pom依赖:

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <!-- <version>4.1.82.Final</version>-->
  5. <version>5.0.0.Alpha2</version>
  6. </dependency>

新建一个socket服务,端口为 8880 

  1. public class WebSocketNettyServer {
  2. private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  3. public static void main(String[] args) {
  4. // 创建两个线程池
  5. // 主线程池
  6. NioEventLoopGroup mainGrp = new NioEventLoopGroup();
  7. // 从线程池
  8. NioEventLoopGroup subGrp = new NioEventLoopGroup();
  9. try {
  10. // 创建Netty服务器启动对象
  11. ServerBootstrap serverBootstrap = new ServerBootstrap();
  12. // 初始化服务器启动对象
  13. serverBootstrap
  14. // 指定使用上面创建的两个线程池
  15. .group(mainGrp, subGrp)
  16. // 指定Netty通道类型
  17. .channel(NioServerSocketChannel.class)
  18. // 指定通道初始化器用来加载当Channel收到事件消息后,
  19. // 如何进行业务处理
  20. .childHandler(new WebSocketChannelInitializer());
  21. // 绑定服务器端口,以同步的方式启动服务器
  22. ChannelFuture future = serverBootstrap.bind(8880).sync();
  23. // 等待服务器关闭
  24. future.channel().closeFuture().sync();
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. } finally {
  28. // 优雅关闭服务器
  29. mainGrp.shutdownGracefully();
  30. subGrp.shutdownGracefully();
  31. }
  32. }
  33. }

自定义socket通道:

  1. public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{
  2. // 初始化通道
  3. @Override
  4. protected void initChannel(SocketChannel ch) throws Exception {
  5. // 获取管道,将一个一个的ChannelHandler添加到管道中
  6. ChannelPipeline pipeline = ch.pipeline();
  7. // 添加一个http的编解码器
  8. pipeline.addLast(new HttpServerCodec());
  9. // 添加一个用于支持大数据流的支持
  10. pipeline.addLast(new ChunkedWriteHandler());
  11. // 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
  12. pipeline.addLast(new HttpObjectAggregator(1024 * 1024));
  13. // 需要指定接收请求的路由
  14. // 必须使用以 /nio/info 后缀结尾的url才能访问
  15. pipeline.addLast(new WebSocketServerProtocolHandler("/nio/info"));
  16. // 添加自定义的Handler
  17. pipeline.addLast(new MyScocketHandler());
  18. }
  19. }

自定义socket消息处理:

  1. public class MyScocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  2. /**用来保存所有的客户端连接*/
  3. public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  4. /**时间格式化*/
  5. private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  6. /**
  7. * 当有新的客户端连接服务器之后,会自动调用这个方法
  8. * @param ctx
  9. * @throws Exception
  10. */
  11. @Override
  12. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  13. // 将新的通道加入到clients
  14. clients.add(ctx.channel());
  15. }
  16. @Override
  17. protected void messageReceived(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg) throws Exception {
  18. StringBuilder resMsg = new StringBuilder();
  19. // 获取客户端发送过来的文本消息
  20. String text = msg.text();
  21. System.out.println("获取客户端发送内容:" + text);
  22. //发送所有客户端
  23. /**
  24. for (Channel client : clients) {
  25. System.out.println("获取客户端:" + client.id());
  26. // 将消息发送到所有的客户端
  27. resMsg.append(text).append(",").append(channelHandlerContext.channel().id()).append(",").append(sdf.format(new Date()));
  28. client.writeAndFlush(new TextWebSocketFrame(resMsg.toString()));
  29. }
  30. */
  31. //发送指定客户端
  32. resMsg.append(text).append(",").append(channelHandlerContext.channel().id()).append(",").append(sdf.format(new Date()));
  33. channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(resMsg.toString()));
  34. }
  35. }

在低版本的netty依赖中 实现 channelRead0 方法来接收消息

高版本中使用 messageReceived 方法来接收消息

3、socket client客户端

  1. public class WebSocketNettyClient {
  2. public static void main(String[] args) {
  3. EventLoopGroup group = new NioEventLoopGroup();
  4. final ClientHandler clientHandler =new ClientHandler();
  5. try {
  6. Bootstrap bootstrap = new Bootstrap();
  7. bootstrap.group(group).channel(NioSocketChannel.class)
  8. .option(ChannelOption.TCP_NODELAY, true)
  9. .option(ChannelOption.SO_KEEPALIVE,true)
  10. .handler(new ChannelInitializer<SocketChannel>() {
  11. @Override
  12. protected void initChannel(SocketChannel ch) throws Exception {
  13. ChannelPipeline pipeline = ch.pipeline();
  14. // 添加一个http的编解码器
  15. pipeline.addLast(new HttpClientCodec());
  16. // 添加一个用于支持大数据流的支持
  17. pipeline.addLast(new ChunkedWriteHandler());
  18. // 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
  19. pipeline.addLast(new HttpObjectAggregator(1024 * 1024));
  20. pipeline.addLast(clientHandler);
  21. }
  22. });
  23. URI websocketURI = new URI("ws://localhost:9990/nio/info");
  24. HttpHeaders httpHeaders = new DefaultHttpHeaders();
  25. //进行握手
  26. WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String)null, true,httpHeaders);
  27. final Channel channel=bootstrap.connect(websocketURI.getHost(),websocketURI.getPort()).sync().channel();
  28. clientHandler.setHandshaker(handshaker);
  29. handshaker.handshake(channel);
  30. //阻塞等待是否握手成功
  31. clientHandler.handshakeFuture().sync();
  32. //发送消息
  33. JSONObject userInfo = new JSONObject();
  34. userInfo.put("userId","u1001");
  35. userInfo.put("userName","月夜烛峰");
  36. channel.writeAndFlush(new TextWebSocketFrame(userInfo.toString()));
  37. // 等待连接被关闭
  38. channel.closeFuture().sync();
  39. } catch (Exception e) {
  40. e.printStackTrace();
  41. } finally {
  42. group.shutdownGracefully();
  43. }
  44. }
  45. }

上述代码中:

URI websocketURI = new URI("ws://localhost:9990/nio/info");

请求的是gateway的地址。

客户端逻辑处理:

  1. public class ClientHandler extends SimpleChannelInboundHandler<Object> {
  2. private WebSocketClientHandshaker handshaker;
  3. ChannelPromise handshakeFuture;
  4. /**
  5. * 当客户端主动链接服务端的链接后,调用此方法
  6. *
  7. * @param channelHandlerContext ChannelHandlerContext
  8. */
  9. @Override
  10. public void channelActive(ChannelHandlerContext channelHandlerContext) {
  11. System.out.println("客户端Active .....");
  12. handlerAdded(channelHandlerContext);
  13. }
  14. @Override
  15. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  16. System.err.println("exceptionCaught:异常信息:" + cause.getMessage());
  17. ctx.close();
  18. }
  19. public void setHandshaker(WebSocketClientHandshaker handshaker) {
  20. this.handshaker = handshaker;
  21. }
  22. @Override
  23. public void handlerAdded(ChannelHandlerContext ctx) {
  24. this.handshakeFuture = ctx.newPromise();
  25. }
  26. public ChannelFuture handshakeFuture() {
  27. return this.handshakeFuture;
  28. }
  29. @Override
  30. protected void messageReceived(ChannelHandlerContext ctx, Object o) throws Exception {
  31. // 握手协议返回,设置结束握手
  32. if (!this.handshaker.isHandshakeComplete()) {
  33. FullHttpResponse response = (FullHttpResponse) o;
  34. this.handshaker.finishHandshake(ctx.channel(), response);
  35. this.handshakeFuture.setSuccess();
  36. System.out.println("握手成功::messageReceived HandshakeComplete...");
  37. return;
  38. } else if (o instanceof TextWebSocketFrame) {
  39. TextWebSocketFrame textFrame = (TextWebSocketFrame) o;
  40. System.out.println("接收消息::messageReceived textFrame: " + textFrame.text());
  41. } else if (o instanceof CloseWebSocketFrame) {
  42. System.out.println("关闭链接::messageReceived CloseWebSocketFrame");
  43. }
  44. }
  45. }

4、模拟测试

分别启动gateway网关、socket服务端、socket客户端

运行客户端后,控制台打印:

  1. 客户端Active .....
  2. 握手成功::messageReceived HandshakeComplete...
  3. 接收消息::messageReceived textFrame: {"userName":"月夜烛峰","userId":"u1001"},20a1a1e5,2022-09-26 15:52:09

服务端控制台:

客户端发送内容,服务端已经收到,说明socket链接已经建立并可以使用。

5、集成 web socket

为了更直观测试,通过web socket来访问

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <meta charset="utf-8">
  5. <title>websocket</title>
  6. <style>
  7. table {
  8. border-collapse: collapse;
  9. border-spacing: 0;
  10. table-layout: fixed;
  11. }
  12. </style>
  13. </head>
  14. <body>
  15. <div id="main" style="display: flex;margin-top:20px;">
  16. <div style="width: 20px"></div>
  17. <div style="border: 1px solid grey;width:700px;">
  18. <div class="card_header" style="padding: 0.5rem 1rem;border-bottom: 1px solid grey "><span>消息列表 </span>
  19. </div>
  20. <div class="card_header"
  21. style="padding: 0.5rem 1rem;font-size:15px;border-bottom: 1px dashed grey;height: 21px">
  22. <table style="width: 100%;">
  23. <thead>
  24. <tr>
  25. <th>消息内容</th>
  26. <th>发送者</th>
  27. <th>发送时间</th>
  28. </tr>
  29. </thead>
  30. </table>
  31. </div>
  32. <div id="socket_keep_alive" style="height: 300px;overflow-y:auto;padding: 0rem 1rem 1rem 1rem;font-size:16px">
  33. <table id="socket_keep_alive_table" style="width: 100%;">
  34. <tbody style="width: 100%;text-align: center;">
  35. </tbody>
  36. </table>
  37. </div>
  38. </div>
  39. <div style="width: 20px"></div>
  40. <form onsubmit="return false">
  41. <div id="show_jvm_info" style="border: 1px solid grey;width: 450px;">
  42. <div class="card_header" style="padding: 0.5rem 1rem;border-bottom: 1px solid grey;"><span>消息发送 </span>
  43. <button style="float: right;margin-right: 20px;"
  44. onclick="send(this.form.message.value);this.form.reset()">发送
  45. </button>
  46. </div>
  47. <textarea name="message" style="height: 345px; width: 445px;border: none;"></textarea>
  48. </div>
  49. </form>
  50. </div>
  51. </body>
  52. <script src="/js/jquery-1.8.3.min.js"></script>
  53. <script>
  54. function cleanInfo(id) {
  55. $("#" + id).empty();
  56. }
  57. function appendInfo(data, id) {
  58. var sinfo = data.split(",");
  59. var tableData = "<tr>";
  60. $.each(sinfo, function (k, v) {
  61. //console.log(k+':'+v);
  62. tableData = tableData + "<td>" + v + "</td>";
  63. })
  64. tableData = tableData + "</tr>";
  65. console.log(tableData);
  66. $("#" + id).prepend(tableData);
  67. }
  68. var socket;
  69. //判断当前浏览器是否支持websocket
  70. if (window.WebSocket) {
  71. //go on
  72. socket = new WebSocket("ws://localhost:9990/nio/info");
  73. //相当于channelReado, ev 收到服务器端回送的消息
  74. socket.onmessage = function (msg) {
  75. appendInfo(msg.data, "socket_keep_alive_table");
  76. }
  77. //相当于连接开启(感知到连接开启)
  78. socket.onopen = function (ev) {
  79. console.log("websocket 已打开");
  80. }
  81. //相当于连接关闭(感知到连接关闭)
  82. socket.onclose = function (ev) {
  83. console.log("websocket 已关闭");
  84. }
  85. } else {
  86. alert("当前浏览器不支持websocket")
  87. }
  88. //发送消息到服务器
  89. function send(message) {
  90. if (!window.socket) { //先判断socket是否创建
  91. return;
  92. }
  93. if (socket.readyState == WebSocket.OPEN) {
  94. //通过socket 发送消息
  95. socket.send(message)
  96. } else {
  97. alert("连接没有开启");
  98. }
  99. }
  100. </script>
  101. </html>

html页面中也访问gateway网关地址,访问页面:

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/858245
推荐阅读
相关标签
  

闽ICP备14008679号