赞
踩
✨作者简介:00后,22年刚刚毕业,一枚在鹅厂搬砖的程序员。
✨学习目标:学习
OkHttp
的简单使用,并对其分发机制进行深入解析。
✨创作初衷:学习
OkHttp
的原理,阅读Kotlin
框架源码,提高自己对Kotlin
代码的阅读能力。为了读代码而读代码,笔者知道这是不对的,但作为应届生,提高阅读源码的能力笔者认为还是很重要的。
OkHttp
是实现了HTTP
协议的框架,此篇文章不对HTTP
的协议规范进行深入解析,读者可自行阅读RFC文档
读者需要正确理解HTTP
是应用层的协议,其内部会借助TCP
协议的实现类-Socket
,在操作系统层面Socket
实现了TCP
协议,无论任何语言其内部的网络通信都是调用的操作系统的Socket
,HTTP
只是对TCP
报文的数据段进行了格式规范,OkHttp
是应用层的网络通信框架则底层也一定是Socket
实现,因此在后面阅读源码时,只要找到Socket
则代表找到了根。
在分析OkHttp
的调用流程之前先学习如何使用,本篇文章okhttp
版本为4.9.3
,全文使用Kotlin
,笔者也是初入Kotlin
,笔者认为转Kotlin
是非常有必要的,2019年谷歌就宣布Kotlin
成为安卓第一开发语言了,新的安卓开发环境下难道你还不选择学习Kotlin
吗?
笔者之前记录学习Kotlin
的一些笔记,读者如果感兴趣可以选择性学习,链接在这:Kotlin入门篇
本节只学习简单的get,post
,异步和同步请求,更详细的API
使用请阅读OkHttp官网
同步get
请求
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//同步请求,必须启动新线程请求
Thread {
get()
}.start()
}
fun get() {
//创建OkHttpClient
val okHttpClient = OkHttpClient()
val url = "https://www.baidu.com"
//设置请求体
val request = Request.Builder()
.url(url)
.build()
//创建Call,请求的发起者
val call = okHttpClient.newCall(request)
//发起请求,返回结果
val response = call.execute()
//返回的结果
val responseBody = response.body;
if (responseBody != null) {
//打印结果
Log.d("get", responseBody.string())
}
}
}
结果如下:
百度返回的是一个网页,因此是html
的代码。
同步post
请求,和get
类似,只是参数使用FormBody
传递,
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//同步请求,必须启动新线程请求
Thread {
get()
}.start()
}
fun post() {
val okHttpClient = OkHttpClient()
val url = "http://api.k780.com/?app=weather.history"
//设置请求体,post不同于get,参数的拼接需要借助请求体
val body = FormBody.Builder()
.add("weaid", "1")
.add("date", "2018-08-13")
.add("appkey", "10003")
.add("sign", "b59bc3ef6191eb9f747dd4e83c99f2a4")
.add("format", "json")
.build();
val request = Request.Builder()
.url(url)
.post(body)
.build();
val call = okHttpClient.newCall(request);
val response = call.execute()
val responseBody = response.body;
if (responseBody != null) {
Log.d("get", responseBody.string())
}
}
结果如下:
异步请求使用get
请求举例:
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//不再需要新的线程,直接调用即可
get()
}
fun get() {
//创建OkHttpClient
val okHttpClient = OkHttpClient()
val url = "https://www.baidu.com"
val request = Request.Builder()
.url(url)
.build()
val call = okHttpClient.newCall(request)
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
Log.d("get", "onFailure")
}
override fun onResponse(call: Call, response: Response) {
val responseBody = response.body;
if (responseBody != null) {
Log.d("get", responseBody.string())
}
}
})
}
}
结果与上述get
的结果一样。
为什么OkHttp
需要提供同步请求?只提供异步请求不行吗?
OkHttp
首先是一个Java
框架,只是安卓不允许在主线程启动网络请求,另一个原因就是业务需求,可能有些业务需要顺序多次请求网络,因此同步请求的存在是非常有必要的
重点分析Dispatcher
和拦截器
先分析为什么需要Dispatcher
,试想客户端如果发起了许多的请求,OkHttp
难道一股脑全部发送吗,显然是不可能的,OkHttp
必须考虑服务器和客户端双方的压力,所以如何对这些请求进行有效的管理和发送则成为问题,Dispatcher
就是为解决此问题而存在的。
有效的管理则必不可少的就是线程池和相应的数据结构,看一下Dispatcher
的属性声明
Dispatcher
class Dispatcher constructor() {
//并发请求的最大数,最多默认支持同时存在64个请求,保护客户端
@get:Synchronized var maxRequests = 64
set(maxRequests) {
//设置的值必须大于等于一否则报错
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
field = maxRequests
}
//立即执行请求
promoteAndExecute()
}
//同一Server的最大请求数,限制客户端对同一个服务器发送大量请求,保护服务端
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
//设置的值必须大于等于一否则报错
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
//立即执行请求
promoteAndExecute()
}
//64,5两个阈值据说是参考了市面上多种浏览器
//线程池
private var executorServiceOrNull: ExecutorService? = null
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
//创建线程池
if (executorServiceOrNull == null) {
//核心线程0,最大线程为Int的最大值,超时时间60s,阻塞队列为无界队列,抛出问题为什么如此设计
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
//保存准备发送但发送的异步请求的队列,ArrayDeque为Deque(双端队列),但OkHttp只是用了普通队列方法,可能为了后续优化使用了双端队列。
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
//保存正在请求的异步请求的队列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
//保存正在请求的同步请求的队列
private val runningSyncCalls = ArrayDeque<RealCall>()
...
}
上述中了解了Dispatcher
其中的几个队列和线程池的存在,后续则分析其是如何工作的。
笔者从Call
的创建开始分析,略过OkHttpClient
的创建过程和Request
的构建过程,这两步只要是初始化网络配置和配置请求参数。
Call
由下面代码创建,所以分析okHttpClient.newCall(request)
val call = okHttpClient.newCall(request)
okHttpClient.newCall
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
Call
是一个接口,newCall()
方法返回的是RealCall
,RealCall
实现了Call
。
后续真正的请求可能有异步和同步两种,异步则调用Call
的enqueue()
,同步则调用execute()
。
先对异步请求进行分析。
异步请求调用Call
的enqueue()
,进入此方法
RealCall#enqueue
override fun enqueue(responseCallback: Callback) {
//CAS替换标志位,标志要开始网络请求了,若此Call的标志位已经是true,则报错,因此一个Call只能执行一次
check(executed.compareAndSet(false, true)) { "Already Executed" }
//重点堆栈跟踪器,并调用生命周期监听器的开始callStart()方法
callStart()
//重点分析方法,封装AsyncCall并调用dispatcher的入队方法
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
Dispatcher#enqueue
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//数据结构出现,用来保存Call,本质保存的就是请求
readyAsyncCalls.add(call)
///若不是WebSocket协议请求,则寻找之前的请求中host(请求地址)相同的Call
if (!call.call.forWebSocket) {
//找到之前有相同请求Server地址的Call,后续保证相同地址的请求不超过阈值,提出问题为什么此操作可以保证相同地址的请求不超过阈值?
val existingCall = findExistingCallWithHost(call.host) //看下1
//将旧的Call中此Server地址的请求数交给新Call
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) //看下2
}
}
//执行Call
promoteAndExecute() //看下3
}
1.Dispatcher#findExistingCallWithHost
private fun findExistingCallWithHost(host: String): AsyncCall? {
//runningAsyncCalls保存的是正在请求的Call,先遍历此队列找到请求地址相等的Call,并返回
for (existingCall in runningAsyncCalls) {
if (existingCall.host == host) return existingCall
}
//若runningAsyncCalls中不存在host相等的Call,则寻找readyAsyncCalls,readyAsyncCalls中保存未发送的Call,找到host相等的Call则返回
for (existingCall in readyAsyncCalls) {
if (existingCall.host == host) return existingCall
}
return null
}
2.Dispatcher#reuseCallsPerHostFrom
fun reuseCallsPerHostFrom(other: AsyncCall) {
//callsPerHost为AtomicInteger,原子操作使用CAS保证线程安全,提出问题为什么需要原子操作?
this.callsPerHost = other.callsPerHost
}
3.Dispatcher#promoteAndExecute
此方法遍历readyAsyncCalls
队列,将可以运行的Call
转移到runningAsyncCalls
并执行
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
//保存readyAsyncCalls中所有可以运行的Call
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
//拿到准备队列的迭代器
val i = readyAsyncCalls.iterator()
//循环的意义是找到准备队列上可以运行的Call
while (i.hasNext()) {
val asyncCall = i.next()
//若正在请求的Call超过了最大限制则直接结束循环
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//若同一个Server的请求超过了限制则跳过此Call,寻找下一个
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
//找到了,在准备队列上移除此Call
i.remove()
//当前请求Server的数量原子加一
asyncCall.callsPerHost.incrementAndGet()
//将可运行的Call转移到executableCalls和runningAsyncCalls中
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
//将executableCalls中的Call全部执行
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
//执行 看下面分析,asyncCall的真实类型为AsyncCall
asyncCall.executeOn(executorService)
}
return isRunning
}
AsyncCall#executeOn
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
//线程池执行,既然把自己交给了线程池,则自身必然是一个Runnable
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
//回调Callback的onFailure方法
responseCallback.onFailure(this@RealCall, ioException)
} finally {
//失败则通知dispatcher回调finished()方法,在队列中移除此Call
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
AsyncCall
的继承关系如下:
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
...
}
其run
函数如下:
AsyncCall#run
override fun run() {
//设置线程的名字
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//真正的请求发起者,开启责任链链条,下篇文章中重点分析此方法,此方法是重中之重
val response = getResponseWithInterceptorChain()
signalledCallback = true
//请求成功回调Callback的onResponse()方法
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// 不能两次发出回调信号,为true则Callback的onResponse()方法一定调用,但是出现错误就不能回调onFailure()方法,因此这里调用日志打印错误
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
//请求失败回调Callback的onFailure()方法
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
//请求失败回调Callback的onFailure()方法
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
//无论成功与否都将此Call结束,finished()有重载,这里this的真实类型为AsyncCall,因此命中相应的finished()方法,看下面分析
client.dispatcher.finished(this)
}
}
}
Dispatcher#finished
internal fun finished(call: AsyncCall) {
//此请求地址的数目减一
call.callsPerHost.decrementAndGet()
//看下面分析
finished(runningAsyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//运行队列中移除需要结束的Call
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
//再次进行新的请求,达成闭环,旧的Call结束则需要把新的可以运行的Call从准备队列上转移到运行队列并运行,上述已经分析过此方法
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
上述我们提出了三个问题:
线程池为何要设置那样的参数?
OkHttp
默认的线程池创建如下:
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
重要参数解析:
0
Int
的最大值60s
SynchronousQueue
无界队列拿3
个任务举例,第一个任务添加进线程池并没有执行完则添加第二个任务,第一个任务执行完毕,第二个任务未结束时添加第三个任务,真个过程在60s
完成。
这里需要读者了解线程池的工作流程,本篇文章不对线程池进行深入理解,直接分析上述例子的执行过程。
0
则不启动核心线程,此时判断是否可以加入阻塞队列,SynchronousQueue
为无界队列,不存储任何元素,因此启动最大线程运行此任务。如此设计保证了任务只要添加进线程池则百分之100会被立即执行,保证了最大的吞吐量。OkHttp
其实只用到了存活时间这个参数,只是为了让线程不那么早死,让他们再活一会,等待后续的任务,因为创建线程是非常消耗资源的。
如何保证相同地址的请求不超过阈值?
假设目前运行队列上存在一个正在请求192.168.10.148的Call
,正在请求则意味着此Call
的callsPerHost
属性为1
,若此时再次请求此地址,根据上述的执行流程,首先会拿到队列上之前地址相同的Call
的callsPerHost
,并将callsPerHost
赋值给新的Call
,因为callsPerHost
是AtomicInteger
类型属于对象类型,因此=
传递的是引用,所以说只要Host
相同Call
就会公用一个计数器,也就意味着一个地址维护一个计数器,后期再判断时就可以判断出此地址下的有几个请求了。
为什么callsPerHost
需要使用AtomicInteger
因为不知道哪个线程会发起请求,会有多个线程操作此变量,因此就必须保证此变量的线程安全。若A
线程刚需要减一操作,但是B
线程要执行加一操作,A
线程还没减一成功的时候,B
线程读到了原来的callsPerHost
值,当A
线程执行完毕后,callsPerHost
确实减一了,但是B
线程读到的时减一之前的值,B线
程按照原值进行加一,将A
线程的减一操作覆盖掉了,造成错误。保证线程安全有很多种方式,AtomicInteger
是使用CAS
保证线程安全,内部使用自旋实现,竞争时线程并不会真正的睡眠,避免了线程的上下文切换,对于简单的加减操作,CAS
已经完全够用,synchronized
则没有必要使用。
总结:异步请求使用并发量最大的线程池,请求时先将Call
放入准备队列,随即调用Dispatcher#promoteAndExecute()
方法将准备队列可以执行的Call
转移到运行运行队列上,在请求结束时再次调用此方法,使整个过程形成闭环(兜底)。
异步请求调用Call
的execute()
,进入此方法
RealCall#execute
override fun execute(): Response {
//Call也只能执行一次
check(executed.compareAndSet(false, true)) { "Already Executed" }
//用于超时心跳检测
timeout.enter()
callStart()
try {
//添加进同步运行队列
client.dispatcher.executed(this)
//与异步请求一样启动责任链获取Response
return getResponseWithInterceptorChain()
} finally {
//与异步结束类似,只是现在this的真实类型为RealCall
client.dispatcher.finished(this)
}
}
Dispatcr#executed
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
Dispatcher#finished
internal fun finished(call: RealCall) {
//在异步分析中已经分析过此方法,重点就是移除传入参数队列中需要结束的Call
finished(runningSyncCalls, call)
}
总结:同步请求还是比较简单,一共三步,加入同步运行队列,启动责任链发起请求,最终结束请求。
✨ 原 创 不 易 , 还 希 望 各 位 大 佬 支 持 一 下 \textcolor{blue}{原创不易,还希望各位大佬支持一下} 原创不易,还希望各位大佬支持一下
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/1021597
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。