当前位置:   article > 正文

WebSocket学习——结合OkHttp源码分析_realwebsocket

realwebsocket

转自:https://www.jianshu.com/p/ba0f45aa7457

前言

最近公司有项目需要用WebSocket完成及时通信的需求,这里来学习一下。

WebScoket简介

在以前的web应用中,双向通信机制往往借助轮询或是长轮询来实现,但是这两种方式都会或多或少的造成资源的浪费,且是非实时的。还有http长连接,但是本质上还是Request与Response,只是减少握手连接次数,虽然减少了部分开销,但仍然会造成资源的浪费、实时性不强等问题。

WebSocket作为一种解决web应用双向通信的协议由HTML5规范引出(RFC6455传送门),是一种建立在TCP协议基础上的全双工通信的协议。

一、与传统轮询不同

这里取网上流传度很高的例子介绍

轮询:
客户端(发请求,建立链接):啦啦啦,有没有新信息(Request)
服务端:没有(Response)
客户端(发请求,建立链接):啦啦啦,有没有新信息(Request)
服务端:没有。。(Response)
客户端(发请求,建立链接):啦啦啦,有没有新信息(Request)
服务端:你好烦啊,没有啊。。(Response)
客户端(发请求,建立链接):啦啦啦,有没有新消息(Request)
服务端:好啦好啦,有啦给你。(Response)
客户端(发请求,建立链接):啦啦啦,有没有新消息(Request)
服务端:。。。。。没。。。。没。。。没有(Response)

长轮询:
客户端(发请求,建立链接):啦啦啦,有没有新信息,没有的话就等有了才返回给我吧(Request)
等等等。。。。。
服务端:额。。 等待到有消息的时候。。来 给你(Response)
客户端(发请求,建立链接):啦啦啦,有没有新信息,没有的话就等有了才返回给我吧(Request)

WebSocket:
客户端:啦啦啦,我要建立Websocket协议,需要的服务:chat,Websocket协议版本:17(HTTP Request)
服务端:ok,确认,已升级为Websocket协议(HTTP Protocols Switched)
客户端:麻烦你有信息的时候推送给我噢。。
服务端:ok,有的时候会告诉你的。
服务端:balabalabalabala
客户端:balabalabalabala
服务端:哈哈哈哈哈啊哈哈哈哈
服务端:笑死我了哈哈哈哈哈哈哈

来自知乎高赞同回答WebSocket 是什么原理?为什么可以实现持久连接?

从上面的例子可以看出,不管是轮询还是长轮询,本质都是不断地发送HTTP请求,然后由服务端处理返回结果,并不是真正意义上的双向通信。而且带来的后果是大量的资源被浪费(HTTP请求),服务端需要快速的处理请求,还要考虑并发等问题。而WebSocket解决了这些问题,通过握手操作后就建立了持久连接,之后客户端和服务端在连接断开之前都可以发送消息,实现真正的全双工通信。

二、与Socket、HTTP的关系

很多人刚接触WebSocket肯定会与Socket混淆,这里放出OSI模型

OSI Model



我们知道,Socket是对TCP/IP协议的封装,Socket本身并不是协议,而是一个调用接口(API)。而WebSocket在图中处于应用层,属于应用层协议。所以二者仅仅是名字像而已,就像Java与JavaScript一样。

 

TCP是传输层的协议,WebScoket和HTTP都是基于TCP协议的高层(应用层)协议,所以从本质上讲,WebSocket和HTTP是处于同一层的两种不同的协议。但是WebSocket使用了HTTP完成了握手连接,根据RFC6455文档中1.5节设计哲♂学中描述,是为了简单和兼容性考虑。具体握手操作我们会在后面提到。

所以总的来说,WebSocket与Socket由于层级不同,关系也仅仅是在某些环境中WebSocket可能通过Socket来使用TCP协议和名字比较像。和HTTP是同一层面的不同协议(最大的区别WebSocket是持久化协议而HTTP不是)。

WebScoket协议

这里主要提一下协议中比较重要的握手和发送数据

一、握手

之前有说到,WebSocket的握手是用HTTP请求来完成的,这里我们来看一下RFC6455文档中一个客户端握手的栗子

  1. GET /chat HTTP/1.1
  2. Host: server.example.com
  3. Upgrade: websocket
  4. Connection: Upgrade
  5. Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
  6. Origin: http://example.com
  7. Sec-WebSocket-Protocol: chat, superchat
  8. Sec-WebSocket-Version: 13

可以发现,这和一个一般的HTTP请求头没啥区别,需要注意的是(这里讲重点,具体还请看协议文档):

  • 根据协议规范,握手必须是一个HTTP请求,请求的方法必须是GET,HTTP版本不可以低于1.1。
  • 请求头必须包含Upgrade属性名,其值必须包含"websocket"。
  • 请求头必须包含Connection属性名,其值必须包含"Upgrade"。
  • 请求头必须包含Sec-WebSocket-Key属性名,其值是16字节的随机数的被base64编码后的值
  • 如果请求来自浏览器必须包含Origin属性名
  • 请求头必须包含Sec-WebSocket-Version属性名,其值必须是13

如果请求不符合规范,服务端会返回400 bad request。如果服务端选择接受连接,则会返回比如:

  1. HTTP/1.1 101 Switching Protocols
  2. Upgrade: websocket
  3. Connection: Upgrade
  4. Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
  5. Sec-WebSocket-Protocol: chat

首先不同于普通的HTTP请求这里返回101,然后Upgrade和Connection同上都是规定好的,Sec-WebSocket-Accept是由请求头的Sec-WebSocket-Key加上字符串258EAFA5-E914-47DA-95CA-C5AB0DC85B11之后再进行SHA1加密和BASE64编码得到的值。返回的状态码为101,表示同意客户端协议转换请求,并将它转换为websocket协议。

在握手成功之后,WebSocket连接建立,双向通信便可以开始了。

二、发送数据

在WebSocket协议,数据使用帧来传输。一个基本的协议帧如下

基本协议帧



细节不描述了,好多地方没看懂。。。这里说下重点

 

三、数据帧类型

  • 0x0 表示一个继续帧
  • 0x1 表示一个文本帧
  • 0x2 表示一个二进制帧
  • 0x3-7 为以后的非控制帧
  • 0x8 表示一个连接关闭帧
  • 0x9 表示一个ping
  • 0xA 表示一个pong
  • 0xB-F 为以后的控制帧

大部分都十分明了,这里来说说Ping,Pong帧:WebSocket用Ping,Pong帧来维持心跳,当接收到Ping帧,终端必须发送一个Pong帧响应,除非它已经接收到一个关闭帧,它应该尽快返回Pong帧作为响应。Pong帧必须包含与被响应Ping帧的应用程序数据完全相同的数据。一个Pong帧可能被主动发送,但一般不必须返回响应,也可以做特殊处理。

结合源码分析

首先WebSocket虽然是H5提出的,但不仅仅应用于Web应用上。在Android客户端,一般用下面两种库完成WebSocket:

  • OkHttp 16年OkHttp就加入了WebSocket支持包,最新版本已经将ws融合进来,直接可以使用
  • Java-WebSocket Java实现的WebSocket协议

由于OkHttp用的多,这里毫不犹豫的使用了OkHttp,下面我们看看基本用法API

官方测试地址

  1. String url = "ws://echo.websocket.org";
  2. OkHttpClient client = new OkHttpClient.Builder().build();
  3. Request request = new Request.Builder()
  4. .url(url)
  5. .build();
  6. client.newWebSocket(request, new WebSocketListener() {
  7. @Override
  8. public void onOpen(WebSocket webSocket, Response response) {
  9. mWebSocket = webSocket;
  10. super.onOpen(webSocket, response);
  11. }
  12. @Override
  13. public void onMessage(WebSocket webSocket, String text) {
  14. super.onMessage(webSocket, text);
  15. }
  16. @Override
  17. public void onMessage(WebSocket webSocket, ByteString bytes) {
  18. super.onMessage(webSocket, bytes);
  19. }
  20. @Override
  21. public void onClosing(WebSocket webSocket, int code, String reason) {
  22. super.onClosing(webSocket, code, reason);
  23. }
  24. @Override
  25. public void onClosed(WebSocket webSocket, int code, String reason) {
  26. super.onClosed(webSocket, code, reason);
  27. }
  28. @Override
  29. public void onFailure(WebSocket webSocket, Throwable t, Response response) {
  30. t.printStackTrace();
  31. super.onFailure(webSocket, t, response);
  32. }
  33. });
  34. button.setOnClickListener((view) -> {
  35. msg = "Hello!";
  36. mWebSocket.send(msg);
  37. });

用法非常简单,从API可以看出双向通信与HTTP的不同,接下来我们更深入一些,主要看一下WebSocket的握手和数据收发

  1. @Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
  2. RealWebSocket webSocket = new RealWebSocket(request, listener, new Random());
  3. webSocket.connect(this);
  4. return webSocket;
  5. }
  1. public RealWebSocket(Request request, WebSocketListener listener, Random random) {
  2. if (!"GET".equals(request.method())) {
  3. throw new IllegalArgumentException("Request must be GET: " + request.method());
  4. }
  5. this.originalRequest = request;
  6. this.listener = listener;
  7. this.random = random;
  8. byte[] nonce = new byte[16];
  9. random.nextBytes(nonce);
  10. this.key = ByteString.of(nonce).base64();
  11. this.writerRunnable = new Runnable() {
  12. @Override public void run() {
  13. try {
  14. while (writeOneFrame()) {
  15. }
  16. } catch (IOException e) {
  17. failWebSocket(e, null);
  18. }
  19. }
  20. };
  21. }

在WebSocket实现类RealWebSocket的构造方法中进行了初始化的操作,包括之前提到的握手请求头部一个经Base64的随机数,writerRunnable的作用是数据发送。

然后调用connect方法开始建立连接

  1. public void connect(OkHttpClient client) {
  2. client = client.newBuilder()
  3. .protocols(ONLY_HTTP1)
  4. .build();
  5. final int pingIntervalMillis = client.pingIntervalMillis();
  6. final Request request = originalRequest.newBuilder()
  7. .header("Upgrade", "websocket")
  8. .header("Connection", "Upgrade")
  9. .header("Sec-WebSocket-Key", key)
  10. .header("Sec-WebSocket-Version", "13")
  11. .build();
  12. call = Internal.instance.newWebSocketCall(client, request);
  13. call.enqueue(new Callback() {
  14. @Override public void onResponse(Call call, Response response) {
  15. try {
  16. checkResponse(response);
  17. } catch (ProtocolException e) {
  18. failWebSocket(e, response);
  19. closeQuietly(response);
  20. return;
  21. }
  22. // Promote the HTTP streams into web socket streams.
  23. StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
  24. streamAllocation.noNewStreams(); // Prevent connection pooling!
  25. Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);
  26. // Process all web socket messages.
  27. try {
  28. listener.onOpen(RealWebSocket.this, response);
  29. String name = "OkHttp WebSocket " + request.url().redact();
  30. initReaderAndWriter(name, pingIntervalMillis, streams);
  31. streamAllocation.connection().socket().setSoTimeout(0);
  32. loopReader();
  33. } catch (Exception e) {
  34. failWebSocket(e, null);
  35. }
  36. }
  37. @Override public void onFailure(Call call, IOException e) {
  38. failWebSocket(e, null);
  39. }
  40. });
  41. }

这段代码涉及很多,我们来逐条看。

第一步发了一个符合WebSocket协议握手规范的HTTP请求,我们可以看到1.1协议版本和headers都和之前提到的一样,然后看看checkResponse方法

  1. void checkResponse(Response response) throws ProtocolException {
  2. if (response.code() != 101) {
  3. throw new ProtocolException("Expected HTTP 101 response but was '"
  4. + response.code() + " " + response.message() + "'");
  5. }
  6. String headerConnection = response.header("Connection");
  7. if (!"Upgrade".equalsIgnoreCase(headerConnection)) {
  8. throw new ProtocolException("Expected 'Connection' header value 'Upgrade' but was '"
  9. + headerConnection + "'");
  10. }
  11. String headerUpgrade = response.header("Upgrade");
  12. if (!"websocket".equalsIgnoreCase(headerUpgrade)) {
  13. throw new ProtocolException(
  14. "Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'");
  15. }
  16. String headerAccept = response.header("Sec-WebSocket-Accept");
  17. String acceptExpected = ByteString.encodeUtf8(key + WebSocketProtocol.ACCEPT_MAGIC)
  18. .sha1().base64();
  19. if (!acceptExpected.equals(headerAccept)) {
  20. throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '"
  21. + acceptExpected + "' but was '" + headerAccept + "'");
  22. }
  23. }

也是一些协议的内容,如果有不符合规范的地方就会抛出ProtocolException。

第二步,在检查完成后,连接就算正式建立了,接下来要为数据的通信做一些准备。我们来看看Streams是什么

  1. public abstract static class Streams implements Closeable {
  2. public final boolean client;
  3. public final BufferedSource source;
  4. public final BufferedSink sink;
  5. public Streams(boolean client, BufferedSource source, BufferedSink sink) {
  6. this.client = client;
  7. this.source = source;
  8. this.sink = sink;
  9. }
  10. }

Streams封装了BufferedSource和BufferedSink,这两个类是抽象的,实现类是RealBufferedSource和RealBufferedSink,具体的初始化过程在StreamAllocation中,而StreamAllocation的初始化与OkHttp拦截器有关,这里不多赘述,总之此时RealBufferedSource和RealBufferedSink都已初始化完成,封装到Streams中。

第三步通过注册的listener回调了onOpen函数。

第四步初始化Writer和Reader

  1. public void initReaderAndWriter(
  2. String name, long pingIntervalMillis, Streams streams) throws IOException {
  3. synchronized (this) {
  4. this.streams = streams;
  5. this.writer = new WebSocketWriter(streams.client, streams.sink, random);
  6. this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
  7. if (pingIntervalMillis != 0) {
  8. executor.scheduleAtFixedRate(
  9. new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
  10. }
  11. if (!messageAndCloseQueue.isEmpty()) {
  12. runWriter(); // Send messages that were enqueued before we were connected.
  13. }
  14. }
  15. reader = new WebSocketReader(streams.client, streams.source, this);
  16. }

主要就是将放进Streams的BufferedSource和BufferedSink加进去,因为实际的读写操作还是这俩来进行。

第五步就是loopReader()开启消息读取循环

  1. public void loopReader() throws IOException {
  2. while (receivedCloseCode == -1) {
  3. // This method call results in one or more onRead* methods being called on this thread.
  4. reader.processNextFrame();
  5. }
  6. }
  7. void processNextFrame() throws IOException {
  8. readHeader();
  9. if (isControlFrame) {
  10. readControlFrame();
  11. } else {
  12. readMessageFrame();
  13. }
  14. }

我们先看看readHeader()方法

  1. private void readHeader() throws IOException {
  2. if (closed) throw new IOException("closed");
  3. // Disable the timeout to read the first byte of a new frame.
  4. int b0;
  5. long timeoutBefore = source.timeout().timeoutNanos();
  6. source.timeout().clearTimeout();
  7. try {
  8. b0 = source.readByte() & 0xff;
  9. } finally {
  10. source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS);
  11. }
  12. opcode = b0 & B0_MASK_OPCODE;
  13. isFinalFrame = (b0 & B0_FLAG_FIN) != 0;
  14. isControlFrame = (b0 & OPCODE_FLAG_CONTROL) != 0;
  15. // Control frames must be final frames (cannot contain continuations).
  16. if (isControlFrame && !isFinalFrame) {
  17. throw new ProtocolException("Control frames must be final.");
  18. }
  19. boolean reservedFlag1 = (b0 & B0_FLAG_RSV1) != 0;
  20. boolean reservedFlag2 = (b0 & B0_FLAG_RSV2) != 0;
  21. boolean reservedFlag3 = (b0 & B0_FLAG_RSV3) != 0;
  22. if (reservedFlag1 || reservedFlag2 || reservedFlag3) {
  23. // Reserved flags are for extensions which we currently do not support.
  24. throw new ProtocolException("Reserved flags are unsupported.");
  25. }
  26. int b1 = source.readByte() & 0xff;
  27. isMasked = (b1 & B1_FLAG_MASK) != 0;
  28. if (isMasked == isClient) {
  29. // Masked payloads must be read on the server. Unmasked payloads must be read on the client.
  30. throw new ProtocolException(isClient
  31. ? "Server-sent frames must not be masked."
  32. : "Client-sent frames must be masked.");
  33. }
  34. // Get frame length, optionally reading from follow-up bytes if indicated by special values.
  35. frameLength = b1 & B1_MASK_LENGTH;
  36. if (frameLength == PAYLOAD_SHORT) {
  37. frameLength = source.readShort() & 0xffffL; // Value is unsigned.
  38. } else if (frameLength == PAYLOAD_LONG) {
  39. frameLength = source.readLong();
  40. if (frameLength < 0) {
  41. throw new ProtocolException(
  42. "Frame length 0x" + Long.toHexString(frameLength) + " > 0x7FFFFFFFFFFFFFFF");
  43. }
  44. }
  45. frameBytesRead = 0;
  46. if (isControlFrame && frameLength > PAYLOAD_BYTE_MAX) {
  47. throw new ProtocolException("Control frame must be less than " + PAYLOAD_BYTE_MAX + "B.");
  48. }
  49. if (isMasked) {
  50. // Read the masking key as bytes so that they can be used directly for unmasking.
  51. source.readFully(maskKey);
  52. }
  53. }

有点多,着重看下source.readByte(),根据之前说的找到BufferSource的实现类,经过半天的调用链寻找,找到了最后在Okio类里面创建的Soure、Sink匿名内部类的读写方法,这里以读为例

  1. private static Source source(final InputStream in, final Timeout timeout) {
  2. if (in == null) throw new IllegalArgumentException("in == null");
  3. if (timeout == null) throw new IllegalArgumentException("timeout == null");
  4. return new Source() {
  5. @Override public long read(Buffer sink, long byteCount) throws IOException {
  6. if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
  7. if (byteCount == 0) return 0;
  8. try {
  9. timeout.throwIfReached();
  10. Segment tail = sink.writableSegment(1);
  11. int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
  12. int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
  13. if (bytesRead == -1) return -1;
  14. tail.limit += bytesRead;
  15. sink.size += bytesRead;
  16. return bytesRead;
  17. } catch (AssertionError e) {
  18. if (isAndroidGetsocknameError(e)) throw new IOException(e);
  19. throw e;
  20. }
  21. }
  22. @Override public void close() throws IOException {
  23. in.close();
  24. }
  25. @Override public Timeout timeout() {
  26. return timeout;
  27. }
  28. @Override public String toString() {
  29. return "source(" + in + ")";
  30. }
  31. };
  32. }

可以看出,最终调用了在最底层的socket的输入流的read方法,这里也是IO阻塞模型,等待接收消息。到这里连接的建立到消息如何接收,已经差不多搞明白了,我们再来看下接收消息后帧类型的判断。

  1. private void readControlFrame() throws IOException {
  2. ......
  3. switch (opcode) {
  4. case OPCODE_CONTROL_PING:
  5. frameCallback.onReadPing(buffer.readByteString());
  6. break;
  7. case OPCODE_CONTROL_PONG:
  8. frameCallback.onReadPong(buffer.readByteString());
  9. break;
  10. case OPCODE_CONTROL_CLOSE:
  11. int code = CLOSE_NO_STATUS_CODE;
  12. String reason = "";
  13. long bufferSize = buffer.size();
  14. if (bufferSize == 1) {
  15. throw new ProtocolException("Malformed close payload length of 1.");
  16. } else if (bufferSize != 0) {
  17. code = buffer.readShort();
  18. reason = buffer.readUtf8();
  19. String codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code);
  20. if (codeExceptionMessage != null) throw new ProtocolException(codeExceptionMessage);
  21. }
  22. frameCallback.onReadClose(code, reason);
  23. closed = true;
  24. break;
  25. default:
  26. throw new ProtocolException("Unknown control opcode: " + toHexString(opcode));
  27. }
  28. }
  29. private void readMessageFrame() throws IOException {
  30. ......
  31. if (opcode == OPCODE_TEXT) {
  32. frameCallback.onReadMessage(message.readUtf8());
  33. } else {
  34. frameCallback.onReadMessage(message.readByteString());
  35. }
  36. }

这里和我们在协议里看到的一样,对应Ping Pong Close Text Byte帧都会有相应的回调(没看到Continue帧),之后操作也遵循协议内容,篇幅有点长就不放代码了,比如Ping帧的回调里会发送一个Pong帧,发送的逻辑在通过前面提到的writerRunnable里,和接收类似,最终由Sink来执行。

简单分析就到这里了,有兴趣的同学可以再进一步研究OkHttp源码。



作者:Misery_Dx
链接:https://www.jianshu.com/p/ba0f45aa7457
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读