赞
踩
上一篇讲了一些基础的知识,本篇就讲下具体是怎么样的,首先就是添加WebSocketServerProtocolHandler
之后的handlerAdded
事件。因为是WebSocket
协议,肯定需要一些处理器,所以这里就会添加一些处理器,比如第一次的握手处理器,UFT8
帧验证器来验证文本帧,还要关闭帧处理器,用来响应关闭帧
@Override public void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline cp = ctx.pipeline(); if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) { // Add the WebSocketHandshakeHandler before this one.在前面添加一个握手处理器 cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(), new WebSocketServerProtocolHandshakeHandler(serverConfig)); } if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) { // Add the UFT8 checking before this one.在前面添加帧验证器 cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(), new Utf8FrameValidator()); } if (serverConfig.sendCloseFrame() != null) {//添加关闭帧处理器 cp.addBefore(ctx.name(), WebSocketCloseFrameHandler.class.getName(), new WebSocketCloseFrameHandler(serverConfig.sendCloseFrame(), serverConfig.forceCloseTimeoutMillis())); } }
添加完之后就是这个样子(先不管自定义的处理器):
之后就是客户端发来HTTP
请求websocket
握手。HTTP
解码出完整消息后就传递到WebSocketServerProtocolHandshakeHandler
了,我们来看看他做了什么。
url
。GET
的请求升级。forbiddenHttpRequestResponder
。WebSocketServerHandshaker
对象,进行握手。public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { final FullHttpRequest req = (FullHttpRequest) msg; if (isNotWebSocketPath(req)) {//不是websocket路径就不管 ctx.fireChannelRead(msg); return; } try { if (!GET.equals(req.method())) {//只有GET支持的升级的 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0))); return; } //创建握手工厂 final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()), serverConfig.subprotocols(), serverConfig.decoderConfig()); final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);//创建一个握手处理器 final ChannelPromise localHandshakePromise = handshakePromise;//握手回调 if (handshaker == null) {//不支持的版本 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);//设置处理器 ctx.pipeline().replace(this, "WS403Responder", WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());//把当前处理器替换掉,变成403 final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); handshakeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) {//发送不成功 localHandshakePromise.tryFailure(future.cause()); ctx.fireExceptionCaught(future.cause()); } else {//发送成功 localHandshakePromise.trySuccess(); // 保持兼容性 触发事件 ctx.fireUserEventTriggered(//这个HANDSHAKE_COMPLETE是过时的 WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); ctx.fireUserEventTriggered(//这个是新的 new WebSocketServerProtocolHandler.HandshakeComplete( req.uri(), req.headers(), handshaker.selectedSubprotocol())); } } }); applyHandshakeTimeout(); } } finally { req.release(); } }
这个主要就是验证URL
是否是WebSocke
的URL
,主要就是判断创建时候传进去的这个"/wc"
:
默认是比较整个字符串,不是比较开头。
private boolean isNotWebSocketPath(FullHttpRequest req) {
String websocketPath = serverConfig.websocketPath();
return serverConfig.checkStartsWith() ? !req.uri().startsWith(websocketPath) : !req.uri().equals(websocketPath);
}
如果响应的状态码不是200
或者请求不是设置长连接,就关闭通道了。
private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!isKeepAlive(req) || res.status().code() != 200) {//req不支持KeepAlive,或者res状态码不是200就等写完成了关闭通道
f.addListener(ChannelFutureListener.CLOSE);
}
}
ChannelFutureListener CLOSE = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
根据请求头信息的sec-websocket-version
来决定要哪个版本的握手对象,一般都是13
,如果都不支持就会返回null
。
public WebSocketServerHandshaker newHandshaker(HttpRequest req) { //从请求头获取WEBSOCKET版本,根据不同版本,返回不同握手对象 CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION); if (version != null) { if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) { // Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification). return new WebSocketServerHandshaker13( webSocketURL, subprotocols, decoderConfig); } else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) { // Version 8 of the wire protocol - version 10 of the draft hybi specification. return new WebSocketServerHandshaker08( webSocketURL, subprotocols, decoderConfig); } else if (version.equals(WebSocketVersion.V07.toHttpHeaderValue())) { // Version 8 of the wire protocol - version 07 of the draft hybi specification. return new WebSocketServerHandshaker07( webSocketURL, subprotocols, decoderConfig); } else { return null; } } else {//没指定版本的情况 // Assume version 00 where version header was not specified return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig); } }
这个就是用来创建禁止HTTP
请求的响应器,只要握手对象创建好了,就不需要响应HTTP
了,直接就把当前处理器WebSocketServerProtocolHandler
给替换了。
static ChannelHandler forbiddenHttpRequestResponder() {
return new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
((FullHttpRequest) msg).release();
FullHttpResponse response =
new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer(0));
ctx.channel().writeAndFlush(response);//从通道尾部开始
} else {
ctx.fireChannelRead(msg);
}
}
};
}
替换之后就是这样:
握手对象进行握手,其实就是发送响应数据。先会创建一个FullHttpResponse
响应,然后把跟HTTP
相关的聚合,压缩处理器删除,如果有HttpServerCodec
,那就在前面添加websocket
的编解码器,等发送响应成功了把HttpServerCodec
删了。如果是HTTP
编解码器,就把解码器先替换成websocket
的解码器,等发送响应成功了,再把编码器替换成websocket
的编码器。
public final ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders responseHeaders, final ChannelPromise promise) { if (logger.isDebugEnabled()) { logger.debug("{} WebSocket version {} server handshake", channel, version()); } FullHttpResponse response = newHandshakeResponse(req, responseHeaders);//创建响应 ChannelPipeline p = channel.pipeline(); if (p.get(HttpObjectAggregator.class) != null) { p.remove(HttpObjectAggregator.class);//删除聚合 } if (p.get(HttpContentCompressor.class) != null) {//删除压缩 p.remove(HttpContentCompressor.class); } ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);//请求解码器 final String encoderName; if (ctx == null) {//不存在 // this means the user use an HttpServerCodec ctx = p.context(HttpServerCodec.class);//HttpServerCodec是否存在 if (ctx == null) {//也不存在,就没办法解码http了,失败了 promise.setFailure( new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline")); return promise; }//在之前添加WebSocket编解码 p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder()); p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder()); encoderName = ctx.name(); } else { p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());//替换HttpRequestDecoder encoderName = p.context(HttpResponseEncoder.class).name(); p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());//在HttpResponseEncoder之前添加编码器 }//监听发出事件 channel.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { ChannelPipeline p = future.channel().pipeline(); p.remove(encoderName);//成功了就把http的编码器删除了,HttpServerCodec或者HttpResponseEncoder promise.setSuccess(); } else { promise.setFailure(future.cause()); } } }); return promise; }
发送回调前是这样:
发送回调成功后是这样:
发送可能会等好久,所以就给了个超时的定时任务,默认设置是10
秒,超时了就触发超时事件,然后关闭通道,如果发送回调了,就把定时任务取消。
private void applyHandshakeTimeout() { final ChannelPromise localHandshakePromise = handshakePromise; final long handshakeTimeoutMillis = serverConfig.handshakeTimeoutMillis(); if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) { return;//完成了就不管了 } //起一个定时任务 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() { @Override public void run() { if (!localHandshakePromise.isDone() && localHandshakePromise.tryFailure(new WebSocketHandshakeException("handshake timed out"))) { ctx.flush()//没完成就刷出去,触发超时事件,然后关闭 .fireUserEventTriggered(ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT) .close(); } } }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); //如果成功了,就把超时任务取消 // Cancel the handshake timeout when handshake is finished. localHandshakePromise.addListener(new FutureListener<Void>() { @Override public void operationComplete(Future<Void> f) throws Exception { timeoutFuture.cancel(false); } }); }
完成握手后:
至此WebSocketServerProtocolHandshakeHandler
做的事就完成了,后面讲升级后的通信。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。