赞
踩
本文是基于Netty快速上手WebSocket客户端,不涉及WebSocket的TLS/SSL加密传输。
WebSocket原理参考【WebSocket简介-CSDN博客】,测试用的WebSocket服务端也是用Netty实现的,参考【基于Netty实现WebSocket服务端-CSDN博客】
一、基于Netty快速实现WebSocket客户端
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelFutureListener;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.http.DefaultHttpHeaders;
- import io.netty.handler.codec.http.HttpClientCodec;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
- import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
- import io.netty.handler.codec.http.websocketx.WebSocketVersion;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- import io.netty.handler.stream.ChunkedWriteHandler;
- import lombok.extern.slf4j.Slf4j;
-
- import java.net.URI;
- import java.util.concurrent.CountDownLatch;
-
- /**
- * https://blog.csdn.net/a1053765496/article/details/130701218
- * 基于Netty快速实现WebSocket客户端,不手动处理握手
- */
- @Slf4j
- public class SimpleWsClient {
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- public static void main(String[] args) throws Exception {
- SimpleWsClient client = new SimpleWsClient();
- client.test();
- }
-
- public void test() throws Exception {
- Channel dest = dest();
- latch.await();
- dest.writeAndFlush(new TextWebSocketFrame("CountDownLatch完成后发送的消息"));
- }
-
- public Channel dest() throws Exception {
- final URI webSocketURL = new URI("ws://127.0.0.1:7070/helloWs");
-
- EventLoopGroup group = new NioEventLoopGroup();
- Bootstrap boot = new Bootstrap();
- boot.option(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.TCP_NODELAY, true)
- .group(group)
- .handler(new LoggingHandler(LogLevel.INFO))
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel sc) throws Exception {
- ChannelPipeline pipeline = sc.pipeline();
- pipeline.addLast(new HttpClientCodec());
- pipeline.addLast(new ChunkedWriteHandler());
- pipeline.addLast(new HttpObjectAggregator(64 * 1024));
- pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory
- .newHandshaker(webSocketURL, WebSocketVersion.V13, null, false, new DefaultHttpHeaders())));
- pipeline.addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
- throws Exception {
- System.err.println(" 客户端收到消息======== " + msg.text());
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE
- .equals(evt)) {
- log.info(ctx.channel().id().asShortText() + " 握手完成!");
- latch.countDown();
- send(ctx.channel());
- }
- super.userEventTriggered(ctx, evt);
- }
- });
-
- }
- });
-
- ChannelFuture cf = boot.connect(webSocketURL.getHost(), webSocketURL.getPort()).sync();
-
- return cf.channel();
- }
-
- public static void send(Channel channel) {
- final String textMsg = "握手完成后直接发送的消息";
-
- if (channel != null && channel.isActive()) {
- TextWebSocketFrame frame = new TextWebSocketFrame(textMsg);
- channel.writeAndFlush(frame).addListener((ChannelFutureListener) channelFuture -> {
- if (channelFuture.isDone() && channelFuture.isSuccess()) {
- log.info(" ================= 发送成功.");
- } else {
- channelFuture.channel().close();
- log.info(" ================= 发送失败. cause = " + channelFuture.cause());
- channelFuture.cause().printStackTrace();
- }
- });
- } else {
- log.error("消息发送失败! textMsg = " + textMsg);
- }
- }
-
- }
这里我们不手动进行握手,由Netty通过WebSocketClientProtocolHandler进行握手,但是我们要知道何时握手完成了。握手完成了我们才能进行正常的消息读写。
握手事件是在自定义的Handler中实现的,这里为了方便使用CountDownLatch,使用了匿名内部类SimpleChannelInboundHandler的方式。userEventTriggered这个方法会接收到所有的事件,其中就包括握手完成事件。
二、基于Netty,手动处理WebSocket握手信息:
客户端启动代码:
- import io.netty.bootstrap.Bootstrap;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.http.DefaultHttpHeaders;
- import io.netty.handler.codec.http.HttpClientCodec;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
- import io.netty.handler.codec.http.websocketx.WebSocketFrame;
- import io.netty.handler.codec.http.websocketx.WebSocketVersion;
-
- import java.io.BufferedReader;
- import java.io.InputStreamReader;
- import java.net.URI;
-
- /**
- * https://gitcode.com/ddean2009/learn-netty4/tree/master/src/main/java/com/flydean25/socketclient
- * https://www.flydean.com/25-netty-websocket-client
- * https://blog.csdn.net/superfjj/article/details/120648434
- * https://blog.csdn.net/twypx/article/details/84543518
- */
- public final class NettyWsClient {
-
- static final String URL = System.getProperty("url", "ws://127.0.0.1:7070/helloWs");
-
- public static void main(String[] args) throws Exception {
- URI uri = new URI(URL);
- final int port = uri.getPort();
-
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory
- .newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
-
- Bootstrap b = new Bootstrap();
- b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline p = ch.pipeline();
- p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);
- }
- });
-
- Channel ch = b.connect(uri.getHost(), port).sync().channel();
- handler.handshakeFuture().sync();
-
- BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
- while (true) {
- String msg = console.readLine();
- if (msg == null) {
- break;
- } else if ("再见".equalsIgnoreCase(msg)) {
- ch.writeAndFlush(new CloseWebSocketFrame());
- ch.closeFuture().sync();
- break;
- } else if ("ping".equalsIgnoreCase(msg)) {
- WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {8, 1, 8, 1}));
- ch.writeAndFlush(frame);
- } else {
- WebSocketFrame frame = new TextWebSocketFrame(msg);
- ch.writeAndFlush(frame);
- }
- }
- } finally {
- group.shutdownGracefully();
- }
- }
- }
客户端Handler
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelPromise;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.codec.http.FullHttpResponse;
- import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
- import io.netty.handler.codec.http.websocketx.WebSocketFrame;
- import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
- import io.netty.util.CharsetUtil;
- import lombok.extern.slf4j.Slf4j;
-
- @Slf4j
- public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
-
- private final WebSocketClientHandshaker handshaker;
-
- private ChannelPromise handshakeFuture;
-
- public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
- this.handshaker = handshaker;
- }
-
- public ChannelFuture handshakeFuture() {
- return handshakeFuture;
- }
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) {
- handshakeFuture = ctx.newPromise();
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) {
- log.info("channelActive, 进行handshake");
- handshaker.handshake(ctx.channel());
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) {
- log.info("channelInactive!");
- }
-
- @Override
- public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- Channel ch = ctx.channel();
- if (!handshaker.isHandshakeComplete()) {
- try {
- handshaker.finishHandshake(ch, (FullHttpResponse) msg);
- log.info("websocket Handshake 完成!");
- handshakeFuture.setSuccess();
- } catch (WebSocketHandshakeException e) {
- log.info("websocket连接失败!");
- handshakeFuture.setFailure(e);
- }
- return;
- }
-
- if (msg instanceof FullHttpResponse) {
- FullHttpResponse response = (FullHttpResponse) msg;
- throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
- + response.content().toString(CharsetUtil.UTF_8) + ')');
- }
-
- WebSocketFrame frame = (WebSocketFrame) msg;
- if (frame instanceof TextWebSocketFrame) {
- TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
- log.info("接收到TXT消息: " + textFrame.text());
- } else if (frame instanceof PongWebSocketFrame) {
- log.info("接收到pong消息");
- } else if (frame instanceof CloseWebSocketFrame) {
- log.info("接收到closing消息");
- ch.close();
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- // 异常处理
- log.error("出现异常", cause);
- if (!handshakeFuture.isDone()) {
- handshakeFuture.setFailure(cause);
- }
- ctx.close();
- }
- }
可以看到,我们这里也要手动标记握手完成,是在自定义的Handler的channelRead0方法中标记的。
测试的时候,我们可以看到,当客户端发送ping的时候,服务端会自动回pong,这个是有Netty实现的服务端自带的心跳机制。
- 2024-05-25 16:35:10 INFO [WebSocketClientHandler] channelActive, 进行handshake
- 2024-05-25 16:35:10 INFO [WebSocketClientHandler] websocket Handshake 完成!
- 123
- 2024-05-25 16:35:26 INFO [WebSocketClientHandler] 接收到TXT消息: 2024-35-25 04:05:26: 123
- ping
- 2024-05-25 16:35:52 INFO [WebSocketClientHandler] 接收到pong消息
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。