当前位置:   article > 正文

吃透Netty源码系列四十七之WebSocketServerProtocolHandler详解

websocketserverprotocolhandler

handlerAdded处理器添加事件

上一篇讲了一些基础的知识,本篇就讲下具体是怎么样的,首先就是添加WebSocketServerProtocolHandler之后的handlerAdded事件。因为是WebSocket协议,肯定需要一些处理器,所以这里就会添加一些处理器,比如第一次的握手处理器UFT8帧验证器来验证文本帧,还要关闭帧处理器,用来响应关闭帧

 @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        ChannelPipeline cp = ctx.pipeline();
        if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
            // Add the WebSocketHandshakeHandler before this one.在前面添加一个握手处理器
            cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
                    new WebSocketServerProtocolHandshakeHandler(serverConfig));
        }
        if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {
            // Add the UFT8 checking before this one.在前面添加帧验证器
            cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
                    new Utf8FrameValidator());
        }
        if (serverConfig.sendCloseFrame() != null) {//添加关闭帧处理器
            cp.addBefore(ctx.name(), WebSocketCloseFrameHandler.class.getName(),
                new WebSocketCloseFrameHandler(serverConfig.sendCloseFrame(), serverConfig.forceCloseTimeoutMillis()));
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

添加完之后就是这个样子(先不管自定义的处理器):
在这里插入图片描述

WebSocketServerProtocolHandshakeHandler的channelRead

之后就是客户端发来HTTP请求websocket握手。HTTP解码出完整消息后就传递到WebSocketServerProtocolHandshakeHandler了,我们来看看他做了什么。

  • 验证协议url
  • 验证GET的请求升级。
  • 替换当前处理器为forbiddenHttpRequestResponder
  • 创建握手WebSocketServerHandshaker 对象,进行握手。
  • 启动一个定义任务进行超时回调。
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        final FullHttpRequest req = (FullHttpRequest) msg;
        if (isNotWebSocketPath(req)) {//不是websocket路径就不管
            ctx.fireChannelRead(msg);
            return;
        }

        try {
            if (!GET.equals(req.method())) {//只有GET支持的升级的
                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0)));
                return;
            }
            //创建握手工厂
            final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                    getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()),
                    serverConfig.subprotocols(), serverConfig.decoderConfig());
            final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);//创建一个握手处理器
            final ChannelPromise localHandshakePromise = handshakePromise;//握手回调
            if (handshaker == null) {//不支持的版本
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {

                WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);//设置处理器
                ctx.pipeline().replace(this, "WS403Responder",
                        WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());//把当前处理器替换掉,变成403

                final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
                handshakeFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {//发送不成功
                            localHandshakePromise.tryFailure(future.cause());
                            ctx.fireExceptionCaught(future.cause());
                        } else {//发送成功
                            localHandshakePromise.trySuccess();
                            //  保持兼容性 触发事件
                            ctx.fireUserEventTriggered(//这个HANDSHAKE_COMPLETE是过时的
                                    WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
                            ctx.fireUserEventTriggered(//这个是新的
                                    new WebSocketServerProtocolHandler.HandshakeComplete(
                                            req.uri(), req.headers(), handshaker.selectedSubprotocol()));
                        }
                    }
                });
                applyHandshakeTimeout();
            }
        } finally {
            req.release();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

isNotWebSocketPath验证URL

这个主要就是验证URL是否是WebSockeURL,主要就是判断创建时候传进去的这个"/wc"
在这里插入图片描述
默认是比较整个字符串,不是比较开头。

private boolean isNotWebSocketPath(FullHttpRequest req) {
        String websocketPath = serverConfig.websocketPath();
        return serverConfig.checkStartsWith() ? !req.uri().startsWith(websocketPath) : !req.uri().equals(websocketPath);
    }
  • 1
  • 2
  • 3
  • 4

sendHttpResponse发送消息

如果响应的状态码不是200或者请求不是设置长连接,就关闭通道了。

    private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.status().code() != 200) {//req不支持KeepAlive,或者res状态码不是200就等写完成了关闭通道
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

 	ChannelFutureListener CLOSE = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

WebSocketServerHandshakerFactory的newHandshaker创建握手对象

根据请求头信息的sec-websocket-version来决定要哪个版本的握手对象,一般都是13,如果都不支持就会返回null
在这里插入图片描述

public WebSocketServerHandshaker newHandshaker(HttpRequest req) {
        //从请求头获取WEBSOCKET版本,根据不同版本,返回不同握手对象
        CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);
        if (version != null) {
            if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) {
                // Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification).
                return new WebSocketServerHandshaker13(
                        webSocketURL, subprotocols, decoderConfig);
            } else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) {
                // Version 8 of the wire protocol - version 10 of the draft hybi specification.
                return new WebSocketServerHandshaker08(
                        webSocketURL, subprotocols, decoderConfig);
            } else if (version.equals(WebSocketVersion.V07.toHttpHeaderValue())) {
                // Version 8 of the wire protocol - version 07 of the draft hybi specification.
                return new WebSocketServerHandshaker07(
                        webSocketURL, subprotocols, decoderConfig);
            } else {
                return null;
            }
        } else {//没指定版本的情况
            // Assume version 00 where version header was not specified
            return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

forbiddenHttpRequestResponder

这个就是用来创建禁止HTTP请求的响应器,只要握手对象创建好了,就不需要响应HTTP了,直接就把当前处理器WebSocketServerProtocolHandler给替换了。

static ChannelHandler forbiddenHttpRequestResponder() {
        return new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (msg instanceof FullHttpRequest) {
                    ((FullHttpRequest) msg).release();
                    FullHttpResponse response =
                            new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer(0));
                    ctx.channel().writeAndFlush(response);//从通道尾部开始
                } else {
                    ctx.fireChannelRead(msg);
                }
            }
        };
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

替换之后就是这样:
在这里插入图片描述

WebSocketServerHandshaker的handshake

握手对象进行握手,其实就是发送响应数据。先会创建一个FullHttpResponse 响应,然后把跟HTTP相关的聚合,压缩处理器删除,如果有HttpServerCodec,那就在前面添加websocket的编解码器,等发送响应成功了把HttpServerCodec删了。如果是HTTP编解码器,就把解码器先替换成websocket的解码器,等发送响应成功了,再把编码器替换成websocket的编码器。

public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
                                            HttpHeaders responseHeaders, final ChannelPromise promise) {

        if (logger.isDebugEnabled()) {
            logger.debug("{} WebSocket version {} server handshake", channel, version());
        }
        FullHttpResponse response = newHandshakeResponse(req, responseHeaders);//创建响应
        ChannelPipeline p = channel.pipeline();
        if (p.get(HttpObjectAggregator.class) != null) {
            p.remove(HttpObjectAggregator.class);//删除聚合
        }
        if (p.get(HttpContentCompressor.class) != null) {//删除压缩
            p.remove(HttpContentCompressor.class);
        }
        ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);//请求解码器
        final String encoderName;
        if (ctx == null) {//不存在
            // this means the user use an HttpServerCodec
            ctx = p.context(HttpServerCodec.class);//HttpServerCodec是否存在
            if (ctx == null) {//也不存在,就没办法解码http了,失败了
                promise.setFailure(
                        new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
                return promise;
            }//在之前添加WebSocket编解码
            p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
            p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
            encoderName = ctx.name();
        } else {
            p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());//替换HttpRequestDecoder

            encoderName = p.context(HttpResponseEncoder.class).name();
            p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());//在HttpResponseEncoder之前添加编码器
        }//监听发出事件
        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    ChannelPipeline p = future.channel().pipeline();
                    p.remove(encoderName);//成功了就把http的编码器删除了,HttpServerCodec或者HttpResponseEncoder
                    promise.setSuccess();
                } else {
                    promise.setFailure(future.cause());
                }
            }
        });
        return promise;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

发送回调前是这样:
在这里插入图片描述
发送回调成功后是这样:
在这里插入图片描述

applyHandshakeTimeout

发送可能会等好久,所以就给了个超时的定时任务,默认设置是10秒,超时了就触发超时事件,然后关闭通道,如果发送回调了,就把定时任务取消。

private void applyHandshakeTimeout() {
        final ChannelPromise localHandshakePromise = handshakePromise;
        final long handshakeTimeoutMillis = serverConfig.handshakeTimeoutMillis();
        if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
            return;//完成了就不管了
        }
        //起一个定时任务
        final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                if (!localHandshakePromise.isDone() &&
                        localHandshakePromise.tryFailure(new WebSocketHandshakeException("handshake timed out"))) {
                    ctx.flush()//没完成就刷出去,触发超时事件,然后关闭
                       .fireUserEventTriggered(ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT)
                       .close();
                }
            }
        }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
        //如果成功了,就把超时任务取消
        // Cancel the handshake timeout when handshake is finished.
        localHandshakePromise.addListener(new FutureListener<Void>() {
            @Override
            public void operationComplete(Future<Void> f) throws Exception {
                timeoutFuture.cancel(false);
            }
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

完成握手后:
在这里插入图片描述
至此WebSocketServerProtocolHandshakeHandler做的事就完成了,后面讲升级后的通信。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/241994
推荐阅读
相关标签
  

闽ICP备14008679号