赞
踩
本文是对OkHttp
开源库的一个详细解析,如果你觉得自己不够了解OkHttp
,想进一步学习一下,相信本文对你会有所帮助。
本文包含了详细的请求流程分析、各大拦截器解读以及自己的一点反思总结,文章很长,欢迎大家一起交流讨论。
使用方法十分简单,分别创建一个OkHttpClient
对象,一个Request
对象,然后利用他们创建一个Call
对象,最后调用同步请求execute()
方法或者异步请求enqueue()
方法来拿到Response
。
private final OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() .url("https://github.com/") .build(); //同步请求 Response response = client.newCall(request).execute(); //todo handle response //异步请求 client.newCall(request).enqueue(new Callback() { @Override public void onFailure(@NotNull Call call, @NotNull IOException e) { //todo handle request failed } @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { //todo handle Response } });
正如使用方法中所述,我们先后构建了 OkHttpClient
对象、Request
对象、Call
对象,那这些对象都是什么意思,有什么作用呢?这个就需要我们进一步学习了解了。
一个请求的配置类,采用了建造者模式,方便用户配置一些请求参数,如配置callTimeout
,cookie
,interceptor
等等。
open class OkHttpClient internal constructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory { constructor() : this(Builder()) class Builder constructor() { //调度器 internal var dispatcher: Dispatcher = Dispatcher() //连接池 internal var connectionPool: ConnectionPool = ConnectionPool() //整体流程拦截器 internal val interceptors: MutableList<Interceptor> = mutableListOf() //网络流程拦截器 internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() //流程监听器 internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() //连接失败时是否重连 internal var retryOnConnectionFailure = true //服务器认证设置 internal var authenticator: Authenticator = Authenticator.NONE //是否重定向 internal var followRedirects = true //是否从HTTP重定向到HTTPS internal var followSslRedirects = true //cookie设置 internal var cookieJar: CookieJar = CookieJar.NO_COOKIES //缓存设置 internal var cache: Cache? = null //DNS设置 internal var dns: Dns = Dns.SYSTEM //代理设置 internal var proxy: Proxy? = null //代理选择器设置 internal var proxySelector: ProxySelector? = null //代理服务器认证设置 internal var proxyAuthenticator: Authenticator = Authenticator.NONE //socket配置 internal var socketFactory: SocketFactory = SocketFactory.getDefault() //https socket配置 internal var sslSocketFactoryOrNull: SSLSocketFactory? = null internal var x509TrustManagerOrNull: X509TrustManager? = null internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS //协议 internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS //域名校验 internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT internal var certificateChainCleaner: CertificateChainCleaner? = null //请求超时 internal var callTimeout = 0 //连接超时 internal var connectTimeout = 10_000 //读取超时 internal var readTimeout = 10_000 //写入超时 internal var writeTimeout = 10_000 internal var pingInterval = 0 internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE internal var routeDatabase: RouteDatabase? = null ···省略代码···
同样是请求参数的配置类,也同样采用了建造者模式,但相比于OkHttpClient
,Request
就十分简单了,只有四个参数,分别是请求URL
、请求方法
、请求头
、请求体
。
class Request internal constructor( @get:JvmName("url") val url: HttpUrl, @get:JvmName("method") val method: String, @get:JvmName("headers") val headers: Headers, @get:JvmName("body") val body: RequestBody?, internal val tags: Map<Class<*>, Any> ) { open class Builder { //请求的URL internal var url: HttpUrl? = null //请求方法,如:GET、POST.. internal var method: String //请求头 internal var headers: Headers.Builder //请求体 internal var body: RequestBody? = null ···省略代码···
请求调用接口,表示这个请求已经准备好可以执行,也可以取消,只能执行一次。
interface Call : Cloneable { /** 返回发起此调用的原始请求 */ fun request(): Request /** * 同步请求,立即执行。 * * 抛出两种异常: * 1. 请求失败抛出IOException; * 2. 如果在执行过一回的前提下再次执行抛出IllegalStateException;*/ @Throws(IOException::class) fun execute(): Response /** * 异步请求,将请求安排在将来的某个时间点执行。 * 如果在执行过一回的前提下再次执行抛出IllegalStateException */ fun enqueue(responseCallback: Callback) /** 取消请求。已经完成的请求不能被取消 */ fun cancel() /** 是否已被执行 */ fun isExecuted(): Boolean /** 是否被取消 */ fun isCanceled(): Boolean /** 一个完整Call请求流程的超时时间配置,默认选自[OkHttpClient.Builder.callTimeout] */ fun timeout(): Timeout /** 克隆这个call,创建一个新的相同的Call */ public override fun clone(): Call /** 利用工厂模式来让 OkHttpClient 来创建 Call对象 */ fun interface Factory { fun newCall(request: Request): Call } }
在 OkHttpClient
中,我们利用 newCall
方法来创建一个 Call
对象,但从源码中可以看出,newCall
方法返回的是一个 RealCall
对象。
OkHttpClient.kt
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
RealCall
是Call接口
的具体实现类,是应用端与网络层的连接桥,展示应用端原始的请求与连接数据,以及网络层返回的response
及其它数据流。
通过使用方法也可知,创建RealCall
对象后,就要调用同步或异步请求方法,所以它里面还包含同步请求 execute()
与异步请求 enqueue()
方法。(后面具体展开分析)
异步请求调用,是RealCall
的一个内部类,就是一个Runnable
,被调度器中的线程池所执行。
inner class AsyncCall( //用户传入的响应回调方法 private val responseCallback: Callback ) : Runnable { //同一个域名的请求次数,volatile + AtomicInteger 保证在多线程下及时可见性与原子性 @Volatile var callsPerHost = AtomicInteger(0) private set fun reuseCallsPerHostFrom(other: AsyncCall) { this.callsPerHost = other.callsPerHost } ···省略代码··· fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock() var success = false try { //调用线程池执行 executorService.execute(this) success = true } catch (e: RejectedExecutionException) { val ioException = InterruptedIOException("executor rejected") ioException.initCause(e) noMoreExchanges(ioException) //请求失败,调用 Callback.onFailure() 方法 responseCallback.onFailure(this@RealCall, ioException) } finally { if (!success) { //请求失败,调用调度器finish方法 client.dispatcher.finished(this) // This call is no longer running! } } } override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false timeout.enter() try { //请求成功,获取到服务器返回的response val response = getResponseWithInterceptorChain() signalledCallback = true //调用 Callback.onResponse() 方法,将 response 传递出去 responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e) } else { //请求失败,调用 Callback.onFailure() 方法 responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { //请求出现异常,调用cancel方法来取消请求 cancel() if (!signalledCallback) { val canceledException = IOException("canceled due to $t") canceledException.addSuppressed(t) //请求失败,调用 Callback.onFailure() 方法 responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { //请求结束,调用调度器finish方法 client.dispatcher.finished(this) } } } }
调度器,用来调度Call
对象,同时包含线程池与异步请求队列,用来存放与执行AsyncCall
对象。
class Dispatcher constructor() { @get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { //创建一个缓存线程池,来处理请求调用 executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! } /** 已准备好的异步请求队列 */ @get:Synchronized private val readyAsyncCalls = ArrayDeque<AsyncCall>() /** 正在运行的异步请求队列, 包含取消但是还未finish的AsyncCall */ private val runningAsyncCalls = ArrayDeque<AsyncCall>() /** 正在运行的同步请求队列, 包含取消但是还未finish的RealCall */ private val runningSyncCalls = ArrayDeque<RealCall>() ···省略代码··· }
对象 | 作用 |
---|---|
Call | 请求调用接口,表示这个请求已经准备好可以执行,也可以被取消,只能执行一次。 |
RealCall | Call 接口的具体实现类,是应用与网络层之间的连接桥,包含OkHttpClient 与Request 信息。 |
AsyncCall | 异步请求调用,其实就是个Runnable ,会被放到线程池中进行处理。 |
Dispatcher | 调度器,用来调度Call 对象,同时包含线程池与异步请求队列,用来存放与执行AsyncCall 对象。 |
Request | 请求类,包含url 、method 、headers 、body 。 |
Response | 网络层返回的响应数据。 |
Callback | 响应回调函数接口,包含onFailure 、onResponse 两个方法。 |
介绍完了对象,接下来就根据使用方法,具体看一下源码吧。
同步请求的使用方法。
client.newCall(request).execute();
newCall
方法就是创建一个RealCall
对象,然后执行其execute()
方法。
RealCall.kt override fun execute(): Response { //CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常 check(executed.compareAndSet(false, true)) { "Already Executed" } //请求超时开始计时 timeout.enter() //开启请求监听 callStart() try { //调用调度器中的 executed() 方法,调度器只是将 call 加入到了runningSyncCalls队列中 client.dispatcher.executed(this) //调用getResponseWithInterceptorChain 方法拿到 response return getResponseWithInterceptorChain() } finally { //执行完毕,调度器将该 call 从 runningSyncCalls队列中移除 client.dispatcher.finished(this) } }
调用调度器executed
方法,就是将当前的RealCall
对象加入到runningSyncCalls
队列中,然后调用getResponseWithInterceptorChain
方法拿到response
。
在来看看异步请求。
RealCall.kt
override fun enqueue(responseCallback: Callback) {
//CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//开启请求监听
callStart()
//新建一个AsyncCall对象,通过调度器enqueue方法加入到readyAsyncCalls队列中
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
然后调用调度器的enqueue
方法,
Dispatcher.kt internal fun enqueue(call: AsyncCall) { //加锁,保证线程安全 synchronized(this) { //将该请求调用加入到 readyAsyncCalls 队列中 readyAsyncCalls.add(call) // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.call.forWebSocket) { //通过域名来查找有没有相同域名的请求,有则复用。 val existingCall = findExistingCallWithHost(call.host) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } //执行请求 promoteAndExecute() } private fun promoteAndExecute(): Boolean { this.assertThreadDoesntHoldLock() val executableCalls = mutableListOf<AsyncCall>() //判断是否有请求正在执行 val isRunning: Boolean //加锁,保证线程安全 synchronized(this) { //遍历 readyAsyncCalls 队列 val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() //runningAsyncCalls 的数量不能大于最大并发请求数 64 if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. //同域名最大请求数5,同一个域名最多允许5条线程同时执行请求 if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity. //从 readyAsyncCalls 队列中移除,并加入到 executableCalls 及 runningAsyncCalls 队列中 i.remove() asyncCall.callsPerHost.incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } //通过运行队列中的请求数量来判断是否有请求正在执行 isRunning = runningCallsCount() > 0 } //遍历可执行队列,调用线程池来执行AsyncCall for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) } return isRunning }
调度器的enqueue
方法就是将AsyncCall
加入到readyAsyncCalls
队列中,然后调用promoteAndExecute
方法来执行请求,promoteAndExecute
方法做的其实就是遍历readyAsyncCalls
队列,然后将符合条件的请求用线程池执行,也就是会执行AsyncCall.run()
方法。
AsyncCall 方法的具体代码看基本对象介绍 AsyncCall,这边就不在此展示了,简单来说就是调用getResponseWithInterceptorChain
方法拿到response
,然后通过Callback.onResponse
方法传递出去。反之,如果请求失败,捕获了异常,就通过Callback.onFailure
将异常信息传递出去。
最终,请求结束,调用调度器finish
方法。
Dispatcher.kt /** 异步请求调用结束方法 */ internal fun finished(call: AsyncCall) { call.callsPerHost.decrementAndGet() finished(runningAsyncCalls, call) } /** 同步请求调用结束方法 */ internal fun finished(call: RealCall) { finished(runningSyncCalls, call) } private fun <T> finished(calls: Deque<T>, call: T) { val idleCallback: Runnable? synchronized(this) { //将当前请求调用从 正在运行队列 中移除 if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!") idleCallback = this.idleCallback } //继续执行剩余请求,将call从readyAsyncCalls中取出加入到runningAsyncCalls,然后执行 val isRunning = promoteAndExecute() if (!isRunning && idleCallback != null) { //如果执行完了所有请求,处于闲置状态,调用闲置回调方法 idleCallback.run() } }
接着就是看看getResponseWithInterceptorChain
方法是如何拿到response
的。
internal fun getResponseWithInterceptorChain(): Response { //拦截器列表 val interceptors = mutableListOf<Interceptor>() interceptors += client.interceptors interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket) //构建拦截器责任链 val chain = RealInterceptorChain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalRequest, connectTimeoutMillis = client.connectTimeoutMillis, readTimeoutMillis = client.readTimeoutMillis, writeTimeoutMillis = client.writeTimeoutMillis ) //如果call请求完成,那就意味着交互完成了,没有更多的东西来交换了 var calledNoMoreExchanges = false try { //执行拦截器责任链来获取 response val response = chain.proceed(originalRequest) //如果被取消,关闭响应,抛出异常 if (isCanceled()) { response.closeQuietly() throw IOException("Canceled") } return response } catch (e: IOException) { calledNoMoreExchanges = true throw noMoreExchanges(e) as Throwable } finally { if (!calledNoMoreExchanges) { noMoreExchanges(null) } } }
简单概括一下:这里采用了责任链设计模式
,通过拦截器构建了以RealInterceptorChain
责任链,然后执行proceed
方法来得到response
。
那么,这又涉及拦截器是什么?拦截器责任链又是什么?
只声明了一个拦截器方法,在子类中具体实现,还包含一个Chain
接口,核心方法是proceed(request)
处理请求来获取response
。
fun interface Interceptor { /** 拦截方法 */ @Throws(IOException::class) fun intercept(chain: Chain): Response interface Chain { /** 原始请求数据 */ fun request(): Request /** 核心方法,处理请求,获取response */ @Throws(IOException::class) fun proceed(request: Request): Response fun connection(): Connection? fun call(): Call fun connectTimeoutMillis(): Int fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain fun readTimeoutMillis(): Int fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain fun writeTimeoutMillis(): Int fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain } }
拦截器链就是实现Interceptor.Chain
接口,重点就是复写的proceed
方法。
class RealInterceptorChain( internal val call: RealCall, private val interceptors: List<Interceptor>, private val index: Int, internal val exchange: Exchange?, internal val request: Request, internal val connectTimeoutMillis: Int, internal val readTimeoutMillis: Int, internal val writeTimeoutMillis: Int ) : Interceptor.Chain { ···省略代码··· private var calls: Int = 0 override fun call(): Call = call override fun request(): Request = request @Throws(IOException::class) override fun proceed(request: Request): Response { check(index < interceptors.size) calls++ if (exchange != null) { check(exchange.finder.sameHostAndPort(request.url)) { "network interceptor ${interceptors[index - 1]} must retain the same host and port" } check(calls == 1) { "network interceptor ${interceptors[index - 1]} must call proceed() exactly once" } } //index+1, 复制创建新的责任链,也就意味着调用责任链中的下一个处理者,也就是下一个拦截器 val next = copy(index = index + 1, request = request) //取出当前拦截器 val interceptor = interceptors[index] //执行当前拦截器的拦截方法 @Suppress("USELESS_ELVIS") val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null") if (exchange != null) { check(index + 1 >= interceptors.size || next.calls == 1) { "network interceptor $interceptor must call proceed() exactly once" } } check(response.body != null) { "interceptor $interceptor returned a response with no body" } return response } }
链式调用,最终会执行拦截器列表中的每个拦截器,返回Response
。
OK,接下来就该看看拦截器列表中的具体拦截器了。
先上各类拦截器的总结,按顺序:
client.interceptors
:这是由开发者设置的,会在所有的拦截器处理之前进行最早的拦截处理,可用于添加一些公共参数,如自定义header
、自定义log
等等。RetryAndFollowUpInterceptor
:这里会对连接做一些初始化工作,以及请求失败的重试工作,重定向的后续请求工作。跟他的名字一样,就是做重试工作还有一些连接跟踪工作。BridgeInterceptor
:是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,以及将网络请求返回回来的响应转换为用户可用的响应。CacheInterceptor
:这里主要是缓存的相关处理,会根据用户在OkHttpClient
里定义的缓存配置,然后结合请求新建一个缓存策略,由它来判断是使用网络还是缓存来构建response
。ConnectInterceptor
:这里主要就是负责建立连接,会建立TCP连接
或者TLS连接
。client.networkInterceptors
:这里也是开发者自己设置的,所以本质上和第一个拦截器差不多,但是由于位置不同,所以用处也不同。CallServerInterceptor
:这里就是进行网络数据的请求和响应了,也就是实际的网络I/O操作,将请求头与请求体发送给服务器,以及解析服务器返回的response
。接下来我们按顺序,从上往下,对这些拦截器进行一一解读。
这是用户自己定义的拦截器,称为应用拦截器,会保存在OkHttpClient
的interceptors: List<Interceptor>
列表中。
他是拦截器责任链中的第一个拦截器,也就是说会第一个执行拦截方法,我们可以通过它来添加自定义Header信息
,如:
class HeaderInterceptor implements Interceptor { @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request().newBuilder() .addHeader("device-android", "xxxxxxxxxxx") .addHeader("country-code", "ZH") .build(); return chain.proceed(request); } } //然后在 OkHttpClient 中加入 OkHttpClient client = new OkHttpClient.Builder() .connectTimeout(60, TimeUnit.SECONDS) .readTimeout(15, TimeUnit.SECONDS) .writeTimeout(15, TimeUnit.SECONDS) .cookieJar(new MyCookieJar()) .addInterceptor(new HeaderInterceptor())//添加自定义Header拦截器 .build();
第二个拦截器,从它的名字也可知道,它负责请求失败的重试工作与重定向的后续请求工作,同时它会对连接做一些初始化工作。
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain var request = chain.request val call = realChain.call var followUpCount = 0 var priorResponse: Response? = null var newExchangeFinder = true var recoveredFailures = listOf<IOException>() while (true) { //这里会新建一个ExchangeFinder,ConnectInterceptor会使用到 call.enterNetworkInterceptorExchange(request, newExchangeFinder) var response: Response var closeActiveExchange = true try { if (call.isCanceled()) { throw IOException("Canceled") } try { response = realChain.proceed(request) newExchangeFinder = true } catch (e: RouteException) { //尝试通过路由连接失败。该请求将不会被发送。 if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) { throw e.firstConnectException.withSuppressed(recoveredFailures) } else { recoveredFailures += e.firstConnectException } newExchangeFinder = false continue } catch (e: IOException) { //尝试与服务器通信失败。该请求可能已发送。 if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) { throw e.withSuppressed(recoveredFailures) } else { recoveredFailures += e } newExchangeFinder = false continue } // Attach the prior response if it exists. Such responses never have a body. //尝试关联上一个response,注意:body是为null if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() } val exchange = call.interceptorScopedExchange //会根据 responseCode 来判断,构建一个新的request并返回来重试或者重定向 val followUp = followUpRequest(response, exchange) if (followUp == null) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response } //如果请求体是一次性的,不需要再次重试 val followUpBody = followUp.body if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response } response.body?.closeQuietly() //最大重试次数,不同的浏览器是不同的,比如:Chrome为21,Safari则是16 if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") } request = followUp priorResponse = response } finally { call.exitNetworkInterceptorExchange(closeActiveExchange) } } } /** 判断是否要进行重连,false->不尝试重连;true->尝试重连。*/ private fun recover( e: IOException, call: RealCall, userRequest: Request, requestSendStarted: Boolean ): Boolean { //客户端禁止重试 if (!client.retryOnConnectionFailure) return false //不能再次发送该请求体 if (requestSendStarted && requestIsOneShot(e, userRequest)) return false //发生的异常是致命的,无法恢复,如:ProtocolException if (!isRecoverable(e, requestSendStarted)) return false //没有更多的路由来尝试重连 if (!call.retryAfterFailure()) return false // 对于失败恢复,使用带有新连接的相同路由选择器 return true } ···省略代码···
从它的名字可以看出,他的定位是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,比如:添加 Content-Type
,添加 Cookie
,添加User-Agent
等等。再将服务器返回的response
做一些处理转换为客户端需要的response
。比如:移除响应头中的Content-Encoding
、Content-Length
等等。
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { //获取原始请求数据 val userRequest = chain.request() val requestBuilder = userRequest.newBuilder() //重新构建请求头,请求体信息 val body = userRequest.body val contentType = body.contentType() requestBuilder.header("Content-Type", contentType.toString()) requestBuilder.header("Content-Length", contentLength.toString()) requestBuilder.header("Transfer-Encoding", "chunked") requestBuilder.header("Host", userRequest.url.toHostHeader()) requestBuilder.header("Connection", "Keep-Alive") ···省略代码··· //添加cookie val cookies = cookieJar.loadForRequest(userRequest.url) if (cookies.isNotEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)) } //添加user-agent if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", userAgent) } //重新构建一个Request,然后执行下一个拦截器来处理该请求 val networkResponse = chain.proceed(requestBuilder.build()) cookieJar.receiveHeaders(userRequest.url, networkResponse.headers) //创建一个新的responseBuilder,目的是将原始请求数据构建到response中 val responseBuilder = networkResponse.newBuilder() .request(userRequest) if (transparentGzip && "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) && networkResponse.promisesBody()) { val responseBody = networkResponse.body if (responseBody != null) { val gzipSource = GzipSource(responseBody.source()) val strippedHeaders = networkResponse.headers.newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build() //修改response header信息,移除Content-Encoding,Content-Length信息 responseBuilder.headers(strippedHeaders) val contentType = networkResponse.header("Content-Type") //修改response body信息 responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer())) } } return responseBuilder.build() ···省略代码···
用户可以通过OkHttpClient.cache
来配置缓存,缓存拦截器通过CacheStrategy
来判断是使用网络还是缓存来构建response
。
class CacheInterceptor(internal val cache: Cache?) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val call = chain.call() //通过request从OkHttpClient.cache中获取缓存 val cacheCandidate = cache?.get(chain.request()) val now = System.currentTimeMillis() //创建一个缓存策略,用来确定怎么使用缓存 val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() //为空表示不使用网络,反之,则表示使用网络 val networkRequest = strategy.networkRequest //为空表示不使用缓存,反之,则表示使用缓存 val cacheResponse = strategy.cacheResponse //追踪网络与缓存的使用情况 cache?.trackResponse(strategy) val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE //有缓存但不适用,关闭它 if (cacheCandidate != null && cacheResponse == null) { cacheCandidate.body?.closeQuietly() } //如果网络被禁止,但是缓存又是空的,构建一个code为504的response,并返回 if (networkRequest == null && cacheResponse == null) { return Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build().also { listener.satisfactionFailure(call, it) } } //如果我们禁用了网络不使用网络,且有缓存,直接根据缓存内容构建并返回response if (networkRequest == null) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } } //为缓存添加监听 if (cacheResponse != null) { listener.cacheConditionalHit(call, cacheResponse) } else if (cache != null) { listener.cacheMiss(call) } var networkResponse: Response? = null try { //责任链往下处理,从服务器返回response 赋值给 networkResponse networkResponse = chain.proceed(networkRequest) } finally { //捕获I/O或其他异常,请求失败,networkResponse为空,且有缓存的时候,不暴露缓存内容。 if (networkResponse == null && cacheCandidate != null) { cacheCandidate.body?.closeQuietly() } } //如果有缓存 if (cacheResponse != null) { //且网络返回response code为304的时候,使用缓存内容新构建一个Response返回。 if (networkResponse?.code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!!.close() // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache!!.trackConditionalCacheHit() cache.update(cacheResponse, response) return response.also { listener.cacheHit(call, it) } } else { //否则关闭缓存响应体 cacheResponse.body?.closeQuietly() } } //构建网络请求的response val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() //如果cache不为null,即用户在OkHttpClient中配置了缓存,则将上一步新构建的网络请求response存到cache中 if (cache != null) { //根据response的code,header以及CacheControl.noStore来判断是否可以缓存 if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // 将该response存入缓存 val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response).also { if (cacheResponse != null) { listener.cacheMiss(call) } } } //根据请求方法来判断缓存是否有效,只对Get请求进行缓存,其它方法的请求则移除 if (HttpMethod.invalidatesCache(networkRequest.method)) { try { //缓存无效,将该请求缓存从client缓存配置中移除 cache.remove(networkRequest) } catch (_: IOException) { // The cache cannot be written. } } } return response } ···省略代码···
负责实现与服务器真正建立起连接,
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
//初始化一个exchange对象
val exchange = realChain.call.initExchange(chain)
//根据这个exchange对象来复制创建一个新的连接责任链
val connectedChain = realChain.copy(exchange = exchange)
//执行该连接责任链
return connectedChain.proceed(realChain.request)
}
}
一扫下来,代码十分简单,拦截方法里就只有三步。
exchange
对象。exchange
对象来复制创建一个新的连接责任链。那这个exchange
对象又是什么呢?
RealCall.kt
internal fun initExchange(chain: RealInterceptorChain): Exchange {
...省略代码...
//这里的exchangeFinder就是在RetryAndFollowUpInterceptor中创建的
val exchangeFinder = this.exchangeFinder!!
//返回一个ExchangeCodec(是个编码器,为request编码以及为response解码)
val codec = exchangeFinder.find(client, chain)
//根据exchangeFinder与codec新构建一个Exchange对象,并返回
val result = Exchange(this, eventListener, exchangeFinder, codec)
...省略代码...
return result
}
具体看看ExchangeFinder.find()
这一步,
ExchangeFinder.kt fun find( client: OkHttpClient, chain: RealInterceptorChain ): ExchangeCodec { try { //查找合格可用的连接,返回一个 RealConnection 对象 val resultConnection = findHealthyConnection( connectTimeout = chain.connectTimeoutMillis, readTimeout = chain.readTimeoutMillis, writeTimeout = chain.writeTimeoutMillis, pingIntervalMillis = client.pingIntervalMillis, connectionRetryEnabled = client.retryOnConnectionFailure, doExtensiveHealthChecks = chain.request.method != "GET" ) //根据连接,创建并返回一个请求响应编码器:Http1ExchangeCodec 或者 Http2ExchangeCodec,分别对应Http1协议与Http2协议 return resultConnection.newCodec(client, chain) } catch (e: RouteException) { trackFailure(e.lastConnectException) throw e } catch (e: IOException) { trackFailure(e) throw RouteException(e) } }
继续往下看findHealthyConnection
方法
ExchangeFinder.kt private fun findHealthyConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection { while (true) { //重点:查找连接 val candidate = findConnection( connectTimeout = connectTimeout, readTimeout = readTimeout, writeTimeout = writeTimeout, pingIntervalMillis = pingIntervalMillis, connectionRetryEnabled = connectionRetryEnabled ) //检查该连接是否合格可用,合格则直接返回该连接 if (candidate.isHealthy(doExtensiveHealthChecks)) { return candidate } //如果该连接不合格,标记为不可用,从连接池中移除 candidate.noNewExchanges() ...省略代码... } }
简单概括一下就是:通过findConnection
方法来查找连接,找到连接后判断是否是合格可用的,合格就直接返回该连接。
所以核心方法就是findConnection
,我们继续深入看看该方法:
private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection { if (call.isCanceled()) throw IOException("Canceled") //第一次,尝试重连 call 中的 connection,不需要去重新获取连接 val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! if (callConnection != null) { var toClose: Socket? = null synchronized(callConnection) { if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } } //如果 call 中的 connection 还没有释放,就重用它。 if (call.connection != null) { check(toClose == null) return callConnection } //如果 call 中的 connection 已经被释放,关闭Socket. toClose?.closeQuietly() eventListener.connectionReleased(call, callConnection) } //需要一个新的连接,所以重置一些状态 refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0 //第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用 if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } //连接池中是空的,准备下次尝试连接的路由 val routes: List<Route>? val route: Route ...省略代码... //第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用 if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } route = localRouteSelection.next() } //第四次,手动创建一个新连接 val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null } call.client.routeDatabase.connected(newConnection.route()) //第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。 //这一步主要是为了校验一下,比如已经有了一条连接了,就可以直接复用,而不用使用手动创建的新连接。 if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { val result = call.connection!! nextRouteToTry = route newConnection.socket().closeQuietly() eventListener.connectionAcquired(call, result) return result } synchronized(newConnection) { //将手动创建的新连接放入连接池 connectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) } eventListener.connectionAcquired(call, newConnection) return newConnection }
在代码中可以看出,一共做了5次尝试去得到连接:
OK,到了这一步,就算建立起了连接。
该拦截器称为网络拦截器,与client.interceptors
一样也是由用户自己定义的,同样是以列表的形式存在OkHttpClient
中。
那这两个拦截器有什么不同呢?
其实他两的不同都是由于他们所处的位置不同所导致的,应用拦截器处于第一个位置,所以无论如何它都会被执行,而且只会执行一次。而网络拦截器处于倒数第二的位置,它不一定会被执行,而且可能会被执行多次,比如:在RetryAndFollowUpInterceptor
失败或者CacheInterceptor
直接返回缓存的情况下,我们的网络拦截器是不会被执行的。
到了这里,客户端与服务器已经建立好了连接,接着就是将请求头与请求体发送给服务器,以及解析服务器返回的response
了。
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body var invokeStartEvent = true var responseBuilder: Response.Builder? = null try { //写入请求头 exchange.writeRequestHeaders(request) //如果不是GET请求,并且请求体不为空 if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { //当请求头为"Expect: 100-continue"时,在发送请求体之前需要等待服务器返回"HTTP/1.1 100 Continue" 的response,如果没有等到该response,就不发送请求体。 //POST请求,先发送请求头,在获取到100继续状态后继续发送请求体 if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { //刷新请求,即发送请求头 exchange.flushRequest() //解析响应头 responseBuilder = exchange.readResponseHeaders(expectContinue = true) exchange.responseHeadersStart() invokeStartEvent = false } //写入请求体 if (responseBuilder == null) { if (requestBody.isDuplex()) { //如果请求体是双公体,就先发送请求头,稍后在发送请求体 exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() //写入请求体 requestBody.writeTo(bufferedRequestBody) } else { //如果获取到了"Expect: 100-continue"响应,写入请求体 val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } ···省略代码··· //请求结束,发送请求体 exchange.finishRequest() ···省略代码··· try { if (responseBuilder == null) { //读取响应头 responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! ···省略代码··· //构建一个response var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code ···省略代码··· return response ···省略代码···
简单概括一下:写入发送请求头,然后根据条件是否写入发送请求体,请求结束。解析服务器返回的请求头,然后构建一个新的response
,并返回。
这里CallServerInterceptor
是拦截器责任链中最后一个拦截器了,所以他不会再调用chain.proceed()
方法往下执行,而是将这个构建的response
往上传递给责任链中的每个拦截器。
我们分析了请求的流程,包括同步请求与异步请求,还仔细分析了拦截器责任链中的每个拦截器,现在画一个流程图,简单总结一下,你可以对照着流程图,在走一遍流程。
OkHttpClient
、Request
还是Response
中都用到了建造者模式,因为这几个类中都有很多参数,需要供用户选择需要的参数来构建其想要的实例,所以在开源库中,Build模式
是很常见的。OkHttpClient.newCall(request Request) 来创建 Call 对象
。Response
后,从下往上传回去。在 AsyncCall
类中的 callsPerHost
变量,使用了 Volatile
+ AtomicInteger
来修饰,从而保证在多线程下的线程安全。至于为什么?可以参考我的另一篇文章Android程序员重头学Volatile,这里就不展开了。
inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
//同一个域名的请求次数,volatile + AtomicInteger 保证在多线程下及时可见性与原子性
@Volatile var callsPerHost = AtomicInteger(0)
private set
...省略代码...
为什么
readyAsyncCalls
、runningAsyncCalls
、runningSyncCalls
采用ArrayDeque
呢?
两个点回答:一、他们都是用来存放网络请求的,这些请求需要做到先到先得,所以采用队列。二、根据代码所示,当执行enqueue
时,我们需要遍历readyAsyncCalls
,将符合执行条件的Call
加入到runningAsyncCalls
,这相对比于链表来说,数组的查找效率要更高,所以采用ArrayDeque
。
到此,关于OkHttp
的源码解析就介绍啦。
其实学习源码的最好方式,就是自己将代码克隆下来,然后对着使用方法,按流程,一步一步往下走。这里你可以参考OkHttp 详细代码注释。
其实分享文章的最大目的正是等待着有人指出我的错误,如果你发现哪里有错误,请毫无保留的指出即可,虚心请教。 另外,如果你觉得文章不错,对你有所帮助,请给我点个赞,就当鼓励,谢谢~Peace~!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。