赞
踩
okhttp的拦截器是项目中的精髓代码,今天我们来具体分析一下,base4.9.1版本,首先列一下okhttp的类图用来加深印象
我们已经知道拦截器是在RealCall中添加的:
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
val response = chain.proceed(originalRequest)
构造了一系列的Interceptor,然后传递给RealInterceptorChain
注:分析代码的时候只保留主要逻辑,对于一些异常处理和我们不关心的部分省略掉
调用了RealInterceptorChain的proceed方法中(okhttp喜欢将具体执行任务的类命名为Realxxx,这样也方便我们查看,后面还有这样的例子)
override fun proceed(request: Request): Response {
//检查越界情况
check(index < interceptors.size)
calls++
//构造下一个RealInterceptorChain
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
//从下一个拦截器获取结果
val response = interceptor.intercept(next)
return response
}
可以看到,这里复制了一个RealInterceptorChain,并且调用了下一个interceptor的intercept方法,这里用了浅拷贝,看起来仅仅是index+1
从这里开始进入拦截器,按照次序调用:
从字面意义上来看,似乎和重定向有关,现在我们只关心intercept方法:
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
//创建ExchangeFinder
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
try {
response = realChain.proceed(request)
newExchangeFinder = true
}
}
}
由于这是一个栈调用的过程,我们现在只关心目前会调用到的代码,其余的后面再分析。进入下一个拦截器
从字面意义上看,起到一个“桥”的作用
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
// 默认添加gzip
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
return responseBuilder.build()
}
对一些请求头做了处理,其中包括假如没有Accept-Encoding字段的话,默认使用gzip并对输出流做了解码操作,进入下一个
cache是一个“重头戏”,我们进去看看
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
//1 缓存策略的获取
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
// If we're forbidden from using the network and the cache is insufficient, fail.
//2 如果不需要请求网络,并且缓存为空,就返回失败
if (networkRequest == null && cacheResponse == null) {
}
// If we don't need the network, we're done.
//3 不需要网络,直接缓存命中
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// If we have a cache response too, then we're doing a conditional get.
//4 条件GET,判断缓存是否过期
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}
//5 更新缓存
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
// This will log a conditional cache miss only.
listener.cacheMiss(call)
}
}
}
//6 有些条件下不能缓存
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
我们从以上6个主要的点展开分析,首先是缓存
1)构建CacheStrategy
fun compute(): CacheStrategy {
val candidate = computeCandidate()
// 设置了only-if-cached,代表不需要条件get
if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
return CacheStrategy(null, null)
}
return candidate
}
private fun computeCandidate(): CacheStrategy {
val responseCaching = cacheResponse.cacheControl
//响应过期
if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
val builder = cacheResponse.newBuilder()
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
}
val oneDayMillis = 24 * 60 * 60 * 1000L
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
}
return CacheStrategy(null, builder.build())
}
//构建条件GET的request和response
when {
etag != null -> {
conditionName = "If-None-Match"
conditionValue = etag
}
lastModified != null -> {
conditionName = "If-Modified-Since"
conditionValue = lastModifiedString
}
servedDate != null -> {
conditionName = "If-Modified-Since"
conditionValue = servedDateString
}
else -> return CacheStrategy(request, null) // No condition! Make a regular request.
}
val conditionalRequestHeaders = request.headers.newBuilder()
conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)
val conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build()
return CacheStrategy(conditionalRequest, cacheResponse)
}
根据是否过期以及一些条件构造了缓存策略,以及是否需要条件GET
2)3)4)缓存命中的异常情况,以及条件GET的结果
5)6)缓存更新
这里就是okhttp的缓存实现的地方。进入下一个拦截器
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
代码比较简单,主要是调用了realChain.call.initExchange,生成了一个Exchange对象
internal fun initExchange(chain: RealInterceptorChain): Exchange {
val exchangeFinder = this.exchangeFinder!!
//寻找ExchangeCodec
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
return result
}
这个exchangeFinder就是我们在RetryAndFollowUpInterceptor中创建的,然后寻找ExchangeCodec,这个ExchangeCodec可以理解为对http原始数据的编解码,通过okio的一些工具对字节数据转换为我们需要的结果,包括读出和写入。我们看下find方法:
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
//寻找健康的连接
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
}
}
看来我们的连接就是从这里建立的,进去看看:
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// Confirm that the connection is good.
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
//如果连接不健康,避免再进行数据的交换
candidate.noNewExchanges()
//判断路由是否耗尽
}
}
这里只是循环寻找一个健康的连接,进入findConnection:
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
// 1 假如call中的连接不为空,直接使用
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
//省略代码
}
// We need a new connection. Give it fresh stats.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// Attempt to get a connection from the pool.
// 2 试图从连接池获取一个连接
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// Nothing in the pool. Figure out what route we'll try next.
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
// Use a route from a preceding coalesced connection.
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if (routeSelection != null && routeSelection!!.hasNext()) {
// Use a route from an existing route selection.
routes = null
route = routeSelection!!.next()
} else {
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. We have a better chance of matching thanks to connection coalescing.
// 有了一组ip,再从pool获取试试
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
// Connect. Tell the call about the connecting call so async cancels work.
// 3 好了,什么都没有获取到,我们自己新建一个
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
// 4 建立连接
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
// 5 放到连接池去
synchronized(newConnection) {
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
return newConnection
}
真是一个漫长的过程,1)如果call中的连接不为空,直接用旧的连接,2)如果连接池可以获取到,从连接池获取,3)新建,4)建立连接,5)放入连接池。建立连接和连接池我们这里不做展开,知道是进行了三次握手就可以了,包括https的安全隧道的建立。进入最后一个拦截器
终于到了和服务器通信的地方了,目前我们已经建立了连接,准备好了数据,开始通信
override fun intercept(chain: Interceptor.Chain): Response {
try {
//写请求头
exchange.writeRequestHeaders(request)
//请求体处理
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
//带100-continue的,等返回之后才发送数据
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
//是否双向
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
}
//开始读响应头
try {
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
//省略部分代码
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
//读请求体
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
//省略一些异常处理
return response
}
}
用Exchange来读写数据,并根据一些特殊情况处理具体的逻辑,Exchange里面是用ExchangeCodec来进行具体的通信实现。
拦截器贯穿了okhttp的始终,说是“精髓”也不为过。我们可以看到,包括请求的处理、缓存、连接、连接池、响应处理,基本上每个拦截器都可以再展开具体的逻辑,鉴于篇幅有限,其他细节我们其他章节再做展开。
如果想要理解透彻okhttp,就要对HTTP有一个深入的理解和认识,很多领域驱动的情况,假如不理解HTTP的一些概念和原理的话,对okhttp的理解也只是空中楼阁、不得其解。
这里的一些分析只能说是皮毛,发出来抛砖引玉
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。