当前位置:   article > 正文

OkHttp原理第一篇—使用与分发机制_callsperhost

callsperhost

作者简介:00后,22年刚刚毕业,一枚在鹅厂搬砖的程序员。

学习目标:学习OkHttp的简单使用,并对其分发机制进行深入解析。

创作初衷:学习OkHttp的原理,阅读Kotlin框架源码,提高自己对Kotlin代码的阅读能力。为了读代码而读代码,笔者知道这是不对的,但作为应届生,提高阅读源码的能力笔者认为还是很重要的。


OkHttp第一篇—使用与分发机制

OkHttp是实现了HTTP协议的框架,此篇文章不对HTTP的协议规范进行深入解析,读者可自行阅读RFC文档

超文本传输协议——HTTP/1.0

读者需要正确理解HTTP是应用层的协议,其内部会借助TCP协议的实现类-Socket,在操作系统层面Socket实现了TCP协议,无论任何语言其内部的网络通信都是调用的操作系统的SocketHTTP只是对TCP报文的数据段进行了格式规范,OkHttp是应用层的网络通信框架则底层也一定是Socket实现,因此在后面阅读源码时,只要找到Socket则代表找到了根。

在分析OkHttp的调用流程之前先学习如何使用,本篇文章okhttp版本为4.9.3,全文使用Kotlin,笔者也是初入Kotlin,笔者认为转Kotlin是非常有必要的,2019年谷歌就宣布Kotlin成为安卓第一开发语言了,新的安卓开发环境下难道你还不选择学习Kotlin吗?

笔者之前记录学习Kotlin的一些笔记,读者如果感兴趣可以选择性学习,链接在这:Kotlin入门篇

使用

本节只学习简单的get,post,异步和同步请求,更详细的API使用请阅读OkHttp官网

get

同步get请求

class MainActivity : AppCompatActivity() {
    
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        //同步请求,必须启动新线程请求
        Thread {
            get()
        }.start()
    }
	
    fun get() {
        //创建OkHttpClient
        val okHttpClient = OkHttpClient()
        val url = "https://www.baidu.com"
        //设置请求体
        val request = Request.Builder()
            .url(url)
            .build()
        //创建Call,请求的发起者
        val call = okHttpClient.newCall(request)
        //发起请求,返回结果
        val response = call.execute()
        //返回的结果
        val responseBody = response.body;
        if (responseBody != null) {
            //打印结果
            Log.d("get", responseBody.string())
        }
    }
}

结果如下:

百度返回的是一个网页,因此是html的代码。

post

同步post请求,和get类似,只是参数使用FormBody传递,

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        //同步请求,必须启动新线程请求
        Thread {
            get()
        }.start()
    }
    
    fun post() {
        val okHttpClient = OkHttpClient()
        val url = "http://api.k780.com/?app=weather.history"
        //设置请求体,post不同于get,参数的拼接需要借助请求体
        val body = FormBody.Builder()
        .add("weaid", "1")
        .add("date", "2018-08-13")
        .add("appkey", "10003")
        .add("sign", "b59bc3ef6191eb9f747dd4e83c99f2a4")
        .add("format", "json")
        .build();

        val request = Request.Builder()
        .url(url)
        .post(body)
        .build();
        val call = okHttpClient.newCall(request);
        val response = call.execute()
        val responseBody = response.body;
        if (responseBody != null) {
            Log.d("get", responseBody.string())
        }
    }

结果如下:

异步

异步请求使用get请求举例:

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        //不再需要新的线程,直接调用即可
        get()
    }

    fun get() {
        //创建OkHttpClient
        val okHttpClient = OkHttpClient()
        val url = "https://www.baidu.com"
        val request = Request.Builder()
        .url(url)
        .build()
        val call = okHttpClient.newCall(request)
        call.enqueue(object : Callback {
            override fun onFailure(call: Call, e: IOException) {
                Log.d("get", "onFailure")
            }

            override fun onResponse(call: Call, response: Response) {
                val responseBody = response.body;
                if (responseBody != null)  {
                    Log.d("get", responseBody.string())
                }
            }

        })
    }
}

结果与上述get的结果一样。

为什么OkHttp需要提供同步请求?只提供异步请求不行吗?

OkHttp首先是一个Java框架,只是安卓不允许在主线程启动网络请求,另一个原因就是业务需求,可能有些业务需要顺序多次请求网络,因此同步请求的存在是非常有必要的

源码分析

重点分析Dispatcher和拦截器

Dispatcher

先分析为什么需要Dispatcher,试想客户端如果发起了许多的请求,OkHttp难道一股脑全部发送吗,显然是不可能的,OkHttp必须考虑服务器和客户端双方的压力,所以如何对这些请求进行有效的管理和发送则成为问题,Dispatcher就是为解决此问题而存在的。

有效的管理则必不可少的就是线程池和相应的数据结构,看一下Dispatcher的属性声明

Dispatcher

class Dispatcher constructor() {
   	//并发请求的最大数,最多默认支持同时存在64个请求,保护客户端
    @get:Synchronized var maxRequests = 64
    set(maxRequests) {
        //设置的值必须大于等于一否则报错
        require(maxRequests >= 1) { "max < 1: $maxRequests" }
        synchronized(this) {
            field = maxRequests
        }
        //立即执行请求
        promoteAndExecute()
    }

    //同一Server的最大请求数,限制客户端对同一个服务器发送大量请求,保护服务端
    @get:Synchronized var maxRequestsPerHost = 5
    set(maxRequestsPerHost) {
        //设置的值必须大于等于一否则报错
        require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
        synchronized(this) {
            field = maxRequestsPerHost
        }
        //立即执行请求
        promoteAndExecute()
    }
    //64,5两个阈值据说是参考了市面上多种浏览器

	//线程池
    private var executorServiceOrNull: ExecutorService? = null

    @get:Synchronized
    @get:JvmName("executorService") val executorService: ExecutorService
    get() {
        //创建线程池
        if (executorServiceOrNull == null) {
            //核心线程0,最大线程为Int的最大值,超时时间60s,阻塞队列为无界队列,抛出问题为什么如此设计
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                                                       SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
        }
        return executorServiceOrNull!!
    }

    //保存准备发送但发送的异步请求的队列,ArrayDeque为Deque(双端队列),但OkHttp只是用了普通队列方法,可能为了后续优化使用了双端队列。
    private val readyAsyncCalls = ArrayDeque<AsyncCall>()

    //保存正在请求的异步请求的队列
    private val runningAsyncCalls = ArrayDeque<AsyncCall>()

    //保存正在请求的同步请求的队列
    private val runningSyncCalls = ArrayDeque<RealCall>()
    
    ...
}

上述中了解了Dispatcher其中的几个队列和线程池的存在,后续则分析其是如何工作的。

笔者从Call的创建开始分析,略过OkHttpClient的创建过程和Request的构建过程,这两步只要是初始化网络配置配置请求参数

Call的创建

Call由下面代码创建,所以分析okHttpClient.newCall(request)

val call = okHttpClient.newCall(request)

okHttpClient.newCall

override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

Call是一个接口,newCall()方法返回的是RealCallRealCall实现了Call

后续真正的请求可能有异步和同步两种,异步则调用Callenqueue(),同步则调用execute()

先对异步请求进行分析。

异步

异步请求调用Callenqueue(),进入此方法

RealCall#enqueue

override fun enqueue(responseCallback: Callback) {
  //CAS替换标志位,标志要开始网络请求了,若此Call的标志位已经是true,则报错,因此一个Call只能执行一次
  check(executed.compareAndSet(false, true)) { "Already Executed" }
  //重点堆栈跟踪器,并调用生命周期监听器的开始callStart()方法
  callStart()
  //重点分析方法,封装AsyncCall并调用dispatcher的入队方法
  client.dispatcher.enqueue(AsyncCall(responseCallback)) 
}

Dispatcher#enqueue

internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
        //数据结构出现,用来保存Call,本质保存的就是请求
        readyAsyncCalls.add(call)
        ///若不是WebSocket协议请求,则寻找之前的请求中host(请求地址)相同的Call
        if (!call.call.forWebSocket) {
			//找到之前有相同请求Server地址的Call,后续保证相同地址的请求不超过阈值,提出问题为什么此操作可以保证相同地址的请求不超过阈值?
            val existingCall = findExistingCallWithHost(call.host) //看下1
            //将旧的Call中此Server地址的请求数交给新Call
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) //看下2
        }
    }
    //执行Call
    promoteAndExecute() //看下3
}

1.Dispatcher#findExistingCallWithHost

private fun findExistingCallWithHost(host: String): AsyncCall? {
	//runningAsyncCalls保存的是正在请求的Call,先遍历此队列找到请求地址相等的Call,并返回
    for (existingCall in runningAsyncCalls) {
        if (existingCall.host == host) return existingCall
    }
    //若runningAsyncCalls中不存在host相等的Call,则寻找readyAsyncCalls,readyAsyncCalls中保存未发送的Call,找到host相等的Call则返回
    for (existingCall in readyAsyncCalls) {
        if (existingCall.host == host) return existingCall
    }
    return null
}

2.Dispatcher#reuseCallsPerHostFrom

fun reuseCallsPerHostFrom(other: AsyncCall) {
    //callsPerHost为AtomicInteger,原子操作使用CAS保证线程安全,提出问题为什么需要原子操作?
    this.callsPerHost = other.callsPerHost
}

3.Dispatcher#promoteAndExecute

此方法遍历readyAsyncCalls队列,将可以运行的Call转移到runningAsyncCalls并执行

private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()
	//保存readyAsyncCalls中所有可以运行的Call
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
        //拿到准备队列的迭代器
        val i = readyAsyncCalls.iterator()
        //循环的意义是找到准备队列上可以运行的Call
        while (i.hasNext()) {
            val asyncCall = i.next()
			//若正在请求的Call超过了最大限制则直接结束循环
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            //若同一个Server的请求超过了限制则跳过此Call,寻找下一个
            if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
			//找到了,在准备队列上移除此Call
            i.remove()
            //当前请求Server的数量原子加一
            asyncCall.callsPerHost.incrementAndGet()
            //将可运行的Call转移到executableCalls和runningAsyncCalls中
            executableCalls.add(asyncCall)
            runningAsyncCalls.add(asyncCall)
        }
        isRunning = runningCallsCount() > 0
    }
	//将executableCalls中的Call全部执行
    for (i in 0 until executableCalls.size) {
        val asyncCall = executableCalls[i]
        //执行 看下面分析,asyncCall的真实类型为AsyncCall
        asyncCall.executeOn(executorService) 
    }

    return isRunning
}

AsyncCall#executeOn

fun executeOn(executorService: ExecutorService) {
    client.dispatcher.assertThreadDoesntHoldLock()
   
    var success = false
    try {
        //线程池执行,既然把自己交给了线程池,则自身必然是一个Runnable
        executorService.execute(this)
        success = true
    } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        //回调Callback的onFailure方法
        responseCallback.onFailure(this@RealCall, ioException)
    } finally {
        //失败则通知dispatcher回调finished()方法,在队列中移除此Call
        if (!success) {
            client.dispatcher.finished(this) // This call is no longer running!
        }
    }
}

AsyncCall的继承关系如下:

internal inner class AsyncCall(
  private val responseCallback: Callback
) : Runnable {
    ...
}

run函数如下:

AsyncCall#run

override fun run() {
    //设置线程的名字
    threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
            //真正的请求发起者,开启责任链链条,下篇文章中重点分析此方法,此方法是重中之重
            val response = getResponseWithInterceptorChain()
            signalledCallback = true
            //请求成功回调Callback的onResponse()方法
            responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
            if (signalledCallback) {
                // 不能两次发出回调信号,为true则Callback的onResponse()方法一定调用,但是出现错误就不能回调onFailure()方法,因此这里调用日志打印错误
                Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
            } else {
                 //请求失败回调Callback的onFailure()方法
                responseCallback.onFailure(this@RealCall, e)
            }
        } catch (t: Throwable) {
            cancel()
            if (!signalledCallback) {
                val canceledException = IOException("canceled due to $t")
                canceledException.addSuppressed(t)
                //请求失败回调Callback的onFailure()方法
                responseCallback.onFailure(this@RealCall, canceledException)
            }
            throw t
        } finally {
            //无论成功与否都将此Call结束,finished()有重载,这里this的真实类型为AsyncCall,因此命中相应的finished()方法,看下面分析
            client.dispatcher.finished(this)
        }
    }
}

Dispatcher#finished

internal fun finished(call: AsyncCall) {
    //此请求地址的数目减一
    call.callsPerHost.decrementAndGet()
   	//看下面分析
    finished(runningAsyncCalls, call)
}

private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    
    synchronized(this) {
        //运行队列中移除需要结束的Call
        if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
        idleCallback = this.idleCallback
    }
	//再次进行新的请求,达成闭环,旧的Call结束则需要把新的可以运行的Call从准备队列上转移到运行队列并运行,上述已经分析过此方法
    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
        idleCallback.run()
    }
}

上述我们提出了三个问题:

线程池为何要设置那样的参数?

OkHttp默认的线程池创建如下:

executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                                                       SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))

重要参数解析

  • 核心线程数为0
  • 最大线程数为Int的最大值
  • 存活时间60s
  • 阻塞队列为SynchronousQueue无界队列

3个任务举例,第一个任务添加进线程池并没有执行完则添加第二个任务,第一个任务执行完毕,第二个任务未结束时添加第三个任务,真个过程在60s完成。

这里需要读者了解线程池的工作流程,本篇文章不对线程池进行深入理解,直接分析上述例子的执行过程。

  1. 第一个任务添加后,核心线程为0则不启动核心线程,此时判断是否可以加入阻塞队列,SynchronousQueue为无界队列,不存储任何元素,因此启动最大线程运行此任务。
  2. 第二个任务来临,最大线程中执行任务一的线程并没有结束,因此再次启动一个最大线程执行任务二。
  3. 第三个任务来临,因为第一个任务执行完毕,则复用第一个任务的线程执行任务三。

如此设计保证了任务只要添加进线程池则百分之100会被立即执行,保证了最大的吞吐量。OkHttp其实只用到了存活时间这个参数,只是为了让线程不那么早死,让他们再活一会,等待后续的任务,因为创建线程是非常消耗资源的。

如何保证相同地址的请求不超过阈值?

假设目前运行队列上存在一个正在请求192.168.10.148Call,正在请求则意味着此CallcallsPerHost属性为1,若此时再次请求此地址,根据上述的执行流程,首先会拿到队列上之前地址相同的CallcallsPerHost,并将callsPerHost赋值给新的Call,因为callsPerHostAtomicInteger类型属于对象类型,因此=传递的是引用,所以说只要Host相同Call就会公用一个计数器,也就意味着一个地址维护一个计数器,后期再判断时就可以判断出此地址下的有几个请求了

为什么callsPerHost需要使用AtomicInteger

因为不知道哪个线程会发起请求,会有多个线程操作此变量,因此就必须保证此变量的线程安全。若A线程刚需要减一操作,但是B线程要执行加一操作,A线程还没减一成功的时候,B线程读到了原来的callsPerHost值,当A线程执行完毕后,callsPerHost确实减一了,但是B线程读到的时减一之前的值,B线程按照原值进行加一,将A线程的减一操作覆盖掉了,造成错误。保证线程安全有很多种方式,AtomicInteger是使用CAS保证线程安全,内部使用自旋实现,竞争时线程并不会真正的睡眠,避免了线程的上下文切换,对于简单的加减操作,CAS已经完全够用,synchronized则没有必要使用。

总结:异步请求使用并发量最大的线程池,请求时先将Call放入准备队列,随即调用Dispatcher#promoteAndExecute()方法将准备队列可以执行的Call转移到运行运行队列上,在请求结束时再次调用此方法,使整个过程形成闭环(兜底)。

同步

异步请求调用Callexecute(),进入此方法

RealCall#execute

override fun execute(): Response {
    //Call也只能执行一次
    check(executed.compareAndSet(false, true)) { "Already Executed" }
    //用于超时心跳检测
    timeout.enter()
    callStart()
    try {
        //添加进同步运行队列
        client.dispatcher.executed(this)
        //与异步请求一样启动责任链获取Response
        return getResponseWithInterceptorChain()
    } finally {
        //与异步结束类似,只是现在this的真实类型为RealCall
        client.dispatcher.finished(this)
    }
}

Dispatcr#executed

@Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
}

Dispatcher#finished

internal fun finished(call: RealCall) {
    //在异步分析中已经分析过此方法,重点就是移除传入参数队列中需要结束的Call
    finished(runningSyncCalls, call)
}

总结:同步请求还是比较简单,一共三步,加入同步运行队列,启动责任链发起请求,最终结束请求。

原 创 不 易 , 还 希 望 各 位 大 佬 支 持 一 下 \textcolor{blue}{原创不易,还希望各位大佬支持一下}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/1021597
推荐阅读
相关标签