当前位置:   article > 正文

基于Netty实现WebSocket客户端_netty websocket 客户端

netty websocket 客户端

本文是基于Netty快速上手WebSocket客户端,不涉及WebSocket的TLS/SSL加密传输。

WebSocket原理参考【WebSocket简介-CSDN博客】,测试用的WebSocket服务端也是用Netty实现的,参考【基于Netty实现WebSocket服务端-CSDN博客

一、基于Netty快速实现WebSocket客户端

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.channel.Channel;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelFutureListener;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.ChannelInitializer;
  7. import io.netty.channel.ChannelOption;
  8. import io.netty.channel.ChannelPipeline;
  9. import io.netty.channel.EventLoopGroup;
  10. import io.netty.channel.SimpleChannelInboundHandler;
  11. import io.netty.channel.nio.NioEventLoopGroup;
  12. import io.netty.channel.socket.SocketChannel;
  13. import io.netty.channel.socket.nio.NioSocketChannel;
  14. import io.netty.handler.codec.http.DefaultHttpHeaders;
  15. import io.netty.handler.codec.http.HttpClientCodec;
  16. import io.netty.handler.codec.http.HttpObjectAggregator;
  17. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  18. import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
  19. import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
  20. import io.netty.handler.codec.http.websocketx.WebSocketVersion;
  21. import io.netty.handler.logging.LogLevel;
  22. import io.netty.handler.logging.LoggingHandler;
  23. import io.netty.handler.stream.ChunkedWriteHandler;
  24. import lombok.extern.slf4j.Slf4j;
  25. import java.net.URI;
  26. import java.util.concurrent.CountDownLatch;
  27. /**
  28. * https://blog.csdn.net/a1053765496/article/details/130701218
  29. * 基于Netty快速实现WebSocket客户端,不手动处理握手
  30. */
  31. @Slf4j
  32. public class SimpleWsClient {
  33. final CountDownLatch latch = new CountDownLatch(1);
  34. public static void main(String[] args) throws Exception {
  35. SimpleWsClient client = new SimpleWsClient();
  36. client.test();
  37. }
  38. public void test() throws Exception {
  39. Channel dest = dest();
  40. latch.await();
  41. dest.writeAndFlush(new TextWebSocketFrame("CountDownLatch完成后发送的消息"));
  42. }
  43. public Channel dest() throws Exception {
  44. final URI webSocketURL = new URI("ws://127.0.0.1:7070/helloWs");
  45. EventLoopGroup group = new NioEventLoopGroup();
  46. Bootstrap boot = new Bootstrap();
  47. boot.option(ChannelOption.SO_KEEPALIVE, true)
  48. .option(ChannelOption.TCP_NODELAY, true)
  49. .group(group)
  50. .handler(new LoggingHandler(LogLevel.INFO))
  51. .channel(NioSocketChannel.class)
  52. .handler(new ChannelInitializer<SocketChannel>() {
  53. @Override
  54. protected void initChannel(SocketChannel sc) throws Exception {
  55. ChannelPipeline pipeline = sc.pipeline();
  56. pipeline.addLast(new HttpClientCodec());
  57. pipeline.addLast(new ChunkedWriteHandler());
  58. pipeline.addLast(new HttpObjectAggregator(64 * 1024));
  59. pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory
  60. .newHandshaker(webSocketURL, WebSocketVersion.V13, null, false, new DefaultHttpHeaders())));
  61. pipeline.addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {
  62. @Override
  63. protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
  64. throws Exception {
  65. System.err.println(" 客户端收到消息======== " + msg.text());
  66. }
  67. @Override
  68. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  69. if (WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE
  70. .equals(evt)) {
  71. log.info(ctx.channel().id().asShortText() + " 握手完成!");
  72. latch.countDown();
  73. send(ctx.channel());
  74. }
  75. super.userEventTriggered(ctx, evt);
  76. }
  77. });
  78. }
  79. });
  80. ChannelFuture cf = boot.connect(webSocketURL.getHost(), webSocketURL.getPort()).sync();
  81. return cf.channel();
  82. }
  83. public static void send(Channel channel) {
  84. final String textMsg = "握手完成后直接发送的消息";
  85. if (channel != null && channel.isActive()) {
  86. TextWebSocketFrame frame = new TextWebSocketFrame(textMsg);
  87. channel.writeAndFlush(frame).addListener((ChannelFutureListener) channelFuture -> {
  88. if (channelFuture.isDone() && channelFuture.isSuccess()) {
  89. log.info(" ================= 发送成功.");
  90. } else {
  91. channelFuture.channel().close();
  92. log.info(" ================= 发送失败. cause = " + channelFuture.cause());
  93. channelFuture.cause().printStackTrace();
  94. }
  95. });
  96. } else {
  97. log.error("消息发送失败! textMsg = " + textMsg);
  98. }
  99. }
  100. }

这里我们不手动进行握手,由Netty通过WebSocketClientProtocolHandler进行握手,但是我们要知道何时握手完成了。握手完成了我们才能进行正常的消息读写。

握手事件是在自定义的Handler中实现的,这里为了方便使用CountDownLatch,使用了匿名内部类SimpleChannelInboundHandler的方式。userEventTriggered这个方法会接收到所有的事件,其中就包括握手完成事件。

二、基于Netty,手动处理WebSocket握手信息:

客户端启动代码:

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelPipeline;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. import io.netty.handler.codec.http.DefaultHttpHeaders;
  11. import io.netty.handler.codec.http.HttpClientCodec;
  12. import io.netty.handler.codec.http.HttpObjectAggregator;
  13. import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
  14. import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
  15. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  16. import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
  17. import io.netty.handler.codec.http.websocketx.WebSocketFrame;
  18. import io.netty.handler.codec.http.websocketx.WebSocketVersion;
  19. import java.io.BufferedReader;
  20. import java.io.InputStreamReader;
  21. import java.net.URI;
  22. /**
  23. * https://gitcode.com/ddean2009/learn-netty4/tree/master/src/main/java/com/flydean25/socketclient
  24. * https://www.flydean.com/25-netty-websocket-client
  25. * https://blog.csdn.net/superfjj/article/details/120648434
  26. * https://blog.csdn.net/twypx/article/details/84543518
  27. */
  28. public final class NettyWsClient {
  29. static final String URL = System.getProperty("url", "ws://127.0.0.1:7070/helloWs");
  30. public static void main(String[] args) throws Exception {
  31. URI uri = new URI(URL);
  32. final int port = uri.getPort();
  33. EventLoopGroup group = new NioEventLoopGroup();
  34. try {
  35. WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory
  36. .newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
  37. Bootstrap b = new Bootstrap();
  38. b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
  39. @Override
  40. protected void initChannel(SocketChannel ch) {
  41. ChannelPipeline p = ch.pipeline();
  42. p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);
  43. }
  44. });
  45. Channel ch = b.connect(uri.getHost(), port).sync().channel();
  46. handler.handshakeFuture().sync();
  47. BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
  48. while (true) {
  49. String msg = console.readLine();
  50. if (msg == null) {
  51. break;
  52. } else if ("再见".equalsIgnoreCase(msg)) {
  53. ch.writeAndFlush(new CloseWebSocketFrame());
  54. ch.closeFuture().sync();
  55. break;
  56. } else if ("ping".equalsIgnoreCase(msg)) {
  57. WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {8, 1, 8, 1}));
  58. ch.writeAndFlush(frame);
  59. } else {
  60. WebSocketFrame frame = new TextWebSocketFrame(msg);
  61. ch.writeAndFlush(frame);
  62. }
  63. }
  64. } finally {
  65. group.shutdownGracefully();
  66. }
  67. }
  68. }

客户端Handler

  1. import io.netty.channel.Channel;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelPromise;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import io.netty.handler.codec.http.FullHttpResponse;
  7. import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
  8. import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
  9. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  10. import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
  11. import io.netty.handler.codec.http.websocketx.WebSocketFrame;
  12. import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
  13. import io.netty.util.CharsetUtil;
  14. import lombok.extern.slf4j.Slf4j;
  15. @Slf4j
  16. public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
  17. private final WebSocketClientHandshaker handshaker;
  18. private ChannelPromise handshakeFuture;
  19. public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
  20. this.handshaker = handshaker;
  21. }
  22. public ChannelFuture handshakeFuture() {
  23. return handshakeFuture;
  24. }
  25. @Override
  26. public void handlerAdded(ChannelHandlerContext ctx) {
  27. handshakeFuture = ctx.newPromise();
  28. }
  29. @Override
  30. public void channelActive(ChannelHandlerContext ctx) {
  31. log.info("channelActive, 进行handshake");
  32. handshaker.handshake(ctx.channel());
  33. }
  34. @Override
  35. public void channelInactive(ChannelHandlerContext ctx) {
  36. log.info("channelInactive!");
  37. }
  38. @Override
  39. public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  40. Channel ch = ctx.channel();
  41. if (!handshaker.isHandshakeComplete()) {
  42. try {
  43. handshaker.finishHandshake(ch, (FullHttpResponse) msg);
  44. log.info("websocket Handshake 完成!");
  45. handshakeFuture.setSuccess();
  46. } catch (WebSocketHandshakeException e) {
  47. log.info("websocket连接失败!");
  48. handshakeFuture.setFailure(e);
  49. }
  50. return;
  51. }
  52. if (msg instanceof FullHttpResponse) {
  53. FullHttpResponse response = (FullHttpResponse) msg;
  54. throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
  55. + response.content().toString(CharsetUtil.UTF_8) + ')');
  56. }
  57. WebSocketFrame frame = (WebSocketFrame) msg;
  58. if (frame instanceof TextWebSocketFrame) {
  59. TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
  60. log.info("接收到TXT消息: " + textFrame.text());
  61. } else if (frame instanceof PongWebSocketFrame) {
  62. log.info("接收到pong消息");
  63. } else if (frame instanceof CloseWebSocketFrame) {
  64. log.info("接收到closing消息");
  65. ch.close();
  66. }
  67. }
  68. @Override
  69. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  70. // 异常处理
  71. log.error("出现异常", cause);
  72. if (!handshakeFuture.isDone()) {
  73. handshakeFuture.setFailure(cause);
  74. }
  75. ctx.close();
  76. }
  77. }

可以看到,我们这里也要手动标记握手完成,是在自定义的Handler的channelRead0方法中标记的。

测试的时候,我们可以看到,当客户端发送ping的时候,服务端会自动回pong,这个是有Netty实现的服务端自带的心跳机制。

  1. 2024-05-25 16:35:10 INFO [WebSocketClientHandler] channelActive, 进行handshake
  2. 2024-05-25 16:35:10 INFO [WebSocketClientHandler] websocket Handshake 完成!
  3. 123
  4. 2024-05-25 16:35:26 INFO [WebSocketClientHandler] 接收到TXT消息: 2024-35-25 04:05:26: 123
  5. ping
  6. 2024-05-25 16:35:52 INFO [WebSocketClientHandler] 接收到pong消息

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/884023
推荐阅读
相关标签
  

闽ICP备14008679号