赞
踩
通常语义中的线程,指的是内核级线程,核心点如下:
协程又称为用户级线程核心点如下:
Goroutine,经 Golang 优化后的特殊“协程”,核心点如下:
模型 | 依赖内核 | 可并行 | 可应对阻塞 | 栈可动态扩缩 |
---|---|---|---|---|
线程 | √ | √ | √ | X |
协程 | X | X | X | X |
goroutine | X | √ | √ | √ |
goroutine更像是一个博采众长的存在。实际上,“灵活调度” 一词概括得实在过于简要,Golang 在调度 goroutine 时,针对“如何减少加锁行为”,“如何避免资源不均”等问题都给出了精彩的解决方案,这一切都得益于经典的 “gmp” 模型
gmp = goroutine + machine + processor (+ 一套有机组合的机制),下面先单独拆出每个组件进行介绍,最后再总览全局,对 gmp 进行总述
gmp 数据结构定义为 runtime/runtime2.go 文件中
type g struct {
// ...
// m:在 p 的代理,负责执行当前 g 的 m;
m *m
// ...
sched gobuf
// ...
}
type gobuf struct {
sp uintptr
pc uintptr
ret uintptr
bp uintptr // for framepointer-enabled architectures
}
const(
_Gidle = itoa // 0
_Grunnable // 1
_Grunning // 2
_Gsyscall // 3
_Gwaiting // 4
_Gdead // 6
_Gcopystack // 8
_Gpreempted // 9
)
type m struct {
g0 *g // goroutine with scheduling stack
// ...
tls [tlsSlots]uintptr // thread-local storage (for x86 extern register)
// ...
}
type p struct {
// ...
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
// ...
}
sched 是全局队列的封装
type schedt struct {
// ...
lock mutex
// ...
runq gQueue
runqsize int32
// ...
}
即 普通任务 g 和调度查找任务 g0 之间的转换
goroutine 的类型可以分为两类:
func gogo(buf *gobuf)
// ...
func mcall(fn func(*g))
通常,调度指的是由 g0 按照特定策略找到下一个可执行 g 的过程. 而本小节谈及的调度类型是广义上的“调度”,指的是调度器 p 实现从执行一个 g 切换到另一个 g 的过程.
这种广义“调度”可分为几种类型:
func Gosched() {
checkTimeouts()
mcall(gosched_m)
}
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
// ...
mcall(park_m)
}
通常 goready 与 gopark 成对出现,能够将 g 从阻塞状态恢复过来的,重新进入等待执行的状态
源码位于 runtime/proc.go
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
正常调度
g 中的任务执行完后,g0 会将当前 g 置于死亡状态,发起新一轮的调度
抢占调度:
如果 g 执行系统调度时间过长,超过了指定的市场,且全局的 p 资源比较紧缺,此时将 p 和 g 解绑,抢占出来用于其他 g 调度。等 g 完成系统调用后,会重新进入可执行队列中等待被调度
但是跟前三种调度方式不同的是,其余三个调度方式都是在 m 下的 g0 完成的,抢占调度则不同
因为发起系统调度时需要打破用户态的边界进入内核,此时 m 也会因系统调用而陷入僵直,无法主动完成抢占调度的行为
所以Golang进程会有一个全局监控协程 monitor g 的存在,这个 g 会越过 p 直接跟 m 进行绑定,不断轮询对所有的 p 的执行状况进行监控,倘若发现满足抢占调度的条件,则从第三方角度出手干预。主动发起抢占调度动作
调度流程的主干方法是位于 runtime/proc.go 中的 schedule 函数,此时的执行权位于 g0 手中:
func schedule() {
// ...
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
// ...
execute(gp, inheritTime)
}
调度流程中,一个非常核心的步骤,就是为 m 寻找到下一个执行的 g,这部分内容位于 runtime/proc.go 的 findRunnable 方法中:
func findRunnable() (gp *g, inheritTime, tryWakeP bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() // ... // 判断执行查找到 61 次没有 if _p_.schedtick%61 == 0 && sched.runqsize > 0 { // 加锁向全局队列进行查找 lock(&sched.lock) gp = globrunqget(_p_, 1) // 释放锁 unlock(&sched.lock) if gp != nil { // 返回可执行的 g return gp, false, false } } // ... // 尝试从 p 本地队列中进行查找 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime, false } // ... // 判断全局队列长度,尝试从全局队列中进行查找 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false, false } } // 尝试获取就绪的网络协议 --> 向 epoll 就绪队列中进行查找 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) return gp, false, false } } // ... // 尝试从其余的 p 中偷取一半的 g procs := uint32(gomaxprocs) if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } gp, inheritTime, tnow, w, newWork := stealWork(now) now = tnow if gp != nil { // Successfully stole. return gp, inheritTime, false } if newWork { // There may be new timer or GC work; restart to // discover. goto top } if w != 0 && (pollUntil == 0 || w < pollUntil) { // Earlier timer to wait for. pollUntil = w } }
调度流程如图:
if _p_.schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_p_, 1) unlock(&sched.lock) if gp != nil { return gp, false, false } } // 除了查找流程外还会将全局队列中的 g 转移到本地 p func globrunqget(_p_ *p, max int32) *g { if sched.runqsize == 0 { return nil } // 判断 全局队列长度/p 的数量 + 1 == 每个p可以分到的g的个数 n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { // 全局队列只有 1 个,则直接提取一个 n = sched.runqsize } // 传参 max 最大获取个数,如果 n > max 则只获取 max 个 if max > 0 && n > max { n = max } // 如果获取个数超过了本地队列的一半,需要考虑能不能存的下 if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 } // 将全局队列的长度减去获取到的 g 个数 sched.runqsize -= n // 全局队列循环弹出 g gp := sched.runq.pop() n-- for ; n > 0; n-- { gp1 := sched.runq.pop() // 并将多余的 g 存储到 p 本地队列中 runqput(_p_, gp1, false) } return gp // 本地队列存储全局队列 g 的方法 func runqput(_p_ *p, gp *g, next bool) { // ... retry: // 获取本地队列头节点,同时对本地队列加锁 h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers // 获取尾节点 t := _p_.runqtail // 如果尾节点减去头节点 小于本地队列长度 == 本地队列未满 if t-h < uint32(len(_p_.runq)) { // 直接将 g 插入 队列中 _p_.runq[t%uint32(len(_p_.runq))].set(gp) // 将尾节点索引 + 1,并释放队列 atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption return } // 本地队列满了 if runqputslow(_p_, gp, h, t) { return } // the queue is not full, now the put above must succeed goto retry // 本地队列满了,将获取本地队列的一半放入到全局队列中,帮助本地队列减少压力 func runqputslow(_p_ *p, gp *g, h, t uint32) bool { // 创建本地队列一半 + 1 的数组 var batch [len(_p_.runq)/2 + 1]*g // First, grab a batch from local queue. n := t - h // 本地队列现有长度的一半 n = n / 2 // ... // for 循环放置 for i := uint32(0); i < n; i++ { batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr() } // 释放 p 的存储 if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume return false } // 将新获取到的gp也存储全局队列中 batch[n] = gp // Link the goroutines. for i := uint32(0); i < n; i++ { // for循环将本地队列提取到的 g 转成链表 batch[i].schedlink.set(batch[i+1]) } var q gQueue // 设置头尾节点 q.head.set(batch[0]) q.tail.set(batch[n]) // Now put the batch on global queue. lock(&sched.lock) globrunqputbatch(&q, int32(n+1)) unlock(&sched.lock) return true
需要注意,虽然本地队列是属于 p 独有的,但是由于 work-stealing 机制的存在,其他 p 可能会前来执行窃取动作,因此操作仍需加锁.
但是,由于窃取动作发生的频率不会太高,因此当前 p 取得锁的成功率是很高的,因此可以说p 的本地队列是接近于无锁化,但没有达到真正意义的无锁.
if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime, false } func runqget(_p_ *p) (gp *g, inheritTime bool) { // 如果当前 runnext 为非空 则直接返回下一个 runnext 即可 if next != 0 && _p_.runnext.cas(next, 0) { return next.ptr(), true } for { // 加锁并获取头尾节点 ==> 虽然本地队列是 p 独有的,但是存在偷 g 的机制,所以还是需要加锁 h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := _p_.runqtail // 如果头节点等于尾节点,则表示 p 为空 if t == h { return nil, false } // g 存在则取头节点并返回,将头节点设置为下一个 并释放锁 gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume return gp, false } }
if sched.runqsize != 0 {
// 加锁
lock(&sched.lock)
// 获取的首节点,不向 p 中存储节点
gp := globrunqget(_p_, 0)
// 释放锁
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
需要注意的是,刚获取网络协程时,g 的状态是处于 waiting 的,因此需要先更新为 runnable 状态.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
// 状态更新
casgstatus(gp, _Gwaiting, _Grunnable)
return gp, false, false
}
}
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) { pp := getg().m.p.ptr() ranTimer := false // 偷取操作最多只遍历 4 次 p 队列 const stealTries = 4 for i := 0; i < stealTries; i++ { stealTimersOrRunNextG := i == stealTries-1 // 为保证窃取行为的公平性,遍历的起点是随机的 for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { // ... } } return nil, false, now, pollUntil, ranTime // 偷取操作 func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 { for { // 因为存在 p 也获取头节点的可能,需要加锁 h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer // 获取长度的一半 n := t - h n = n - n/2 // 如果长度为 0 if n == 0 { // 是否是最后一次遍历 if stealRunNextG { // Try to steal from _p_.runnext. // 查看是否有下一个要执行的 g if next := _p_.runnext; next != 0 { // 查询 p 是否允许偷取 if _p_.status == _Prunning { // 等待一段执行时间 if GOOS != "windows" && GOOS != "openbsd" && GOOS != "netbsd" { usleep(3) } else { osyield() } } // 等待期间已经完成执行则退出 if !_p_.runnext.cas(next, 0) { continue } // 不然就偷取 batch[batchHead%uint32(len(batch))] = next return 1 } } return 0 } // 偷取一半长度大于自身的一半,退出 if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t continue } // for循环的获取 for i := uint32(0); i < n; i++ { g := _p_.runq[(h+i)%uint32(len(_p_.runq))] batch[(batchHead+i)%uint32(len(batch))] = g } // 释放锁 并 改变头节点 if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume return n } } }
当 g0 为 m 寻找到可执行的 g 之后,接下来就开始执行 g. 这部分内容位于 runtime/proc.go 的 execute 方法中:
func execute(gp *g, inheritTime bool) { // 获取 g _g_ := getg() // 建立 g 和 m 之间的绑定关系 _g_.m.curg = gp gp.m = _g_.m // 修改状态信息 casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard // 更新 p 的调度次数,为后续61次调度做好准备 if !inheritTime { _g_.m.p.ptr().schedtick++ } // gogo 将 g0 切换为 g,执行任务 gogo(&gp.sched)
g 执行主动让渡时,会调用 mcall 方法将执行权归还给 g0,并由 g0 调用 gosched_m 方法,位于 runtime/proc.go 文件中:
func Gosched() { // ... // 执行mcall让渡 mcall(gosched_m) } // 压栈执行 goschedImpl func gosched_m(gp *g) { goschedImpl(gp) } // 实际让渡流程 func goschedImpl(gp *g) { // status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) throw("bad g status") } // 改变状态,从running 更改为 runable casgstatus(gp, _Grunning, _Grunnable) // 解绑 g 和 m dropg() // 加锁 --> 添加到全局队列中 --> 释放锁 lock(&sched.lock) globrunqput(gp) unlock(&sched.lock) // 开启新的一轮调度 schedule() // 解绑函数 func dropg() { // 获取 g _g_ := getg() // 解绑操作,g 和 m 分别置空 setMNoWB(&_g_.m.curg.m, nil) setGNoWB(&_g_.m.curg, nil) }
g 需要被动调度时,会调用 mcall 方法切换至 g0,并调用 park_m 方法将 g 置为阻塞态,执行流程位于 runtime/proc.go 的 gopark 方法当中:
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { // ... mcall(park_m) } func park_m(gp *g) { _g_ := getg() // 修改状态 running 为 waiting casgstatus(gp, _Grunning, _Gwaiting) // 解绑 dropg() // ... // 新的一轮查找 schedule() // 当因被动调度陷入阻塞态的 g 需要被唤醒时,会由其他协程执行 goready 方法将 g 重新置为可执行的状态, // 方法位于 runtime/proc.go . func goready(gp *g, traceskip int) { systemstack(func() { ready(gp, traceskip, true) }) } // 被动调度如果需要唤醒,则会其他 g 负责将 g 的状态由 waiting 改为 runnable, // 然后会将其添加到唤醒者的 p 的本地队列中: func ready(gp *g, traceskip int, next bool) { // ... _g_ := getg() // ... // 修改状态 casgstatus(gp, _Gwaiting, _Grunnable) // 重新加入 p 队列中 // 如果队列满了,会连带 g 一起将一半的元素转移到全局队列 runqput(_g_.m.p.ptr(), gp, next) // ... }
当 g 执行完成时,会先执行 mcall 方法切换至 g0,然后调用 goexit0 方法,内容为 runtime/proc.go:
// Finishes execution of the current goroutine. func goexit1() { // ... mcall(goexit0) } // 实际结束方法 func goexit0(gp *g) { // 获取 g _g_ := getg() _p_ := _g_.m.p.ptr() // 更改状态为 dead casgstatus(gp, _Grunning, _Gdead) // ... // 解绑 gp.m = nil // ... // 解绑 dropg() // ... // 开启新一轮调度 schedule()
抢占调度的执行者不是 g0,而是一个全局的 monitor g,代码位于 runtime/proc.go 的 retake 方法中:
func retake(now int64) uint32 { n := 0 // 加锁 lock(&allpLock) // 遍历全局的 p 搜索能抢占的目标 for i := 0; i < len(allp); i++ { _p_ := allp[i] // p 还没创建 if _p_ == nil { // This can happen if procresize has grown // allp but not yet created new Ps. continue } pd := &_p_.sysmontick // ... // 执行系统调用超过 10 ms // p 本地队列有等待执行的 g // 当前没有空闲的 p 和 m. if s == _Psyscall { // ... if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } unlock(&allpLock) // 抢占调度的步骤 // 将当前 p 的状态更新为 idle if atomic.Cas(&_p_.status, s, _Pidle) { n++ _p_.syscalltick++ // 然后步入 handoffp 方法中,判断是否需要为 p 寻找接管的 m(因为其原本绑定的 m 正在执行系统调用) handoffp(_p_) } incidlelocked(1) // 抢占调度 lock(&allpLock) } } unlock(&allpLock) return uint32(n) } // 判断是否需要 p 接管 m func handoffp(_p_ *p) { if !runqempty(_p_) || sched.runqsize != 0 { startm(_p_, false) return } if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { startm(_p_, true) return } lock(&sched.lock) // ... if sched.runqsize != 0 { unlock(&sched.lock) startm(_p_, false) return } // If this is the last running P and nobody is polling network, // need to wakeup another M to poll network. if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) startm(_p_, false) return } // ...
当以下四个条件满足其一时,则需要为 p 获取新的 m:
获取 m 时,会先尝试获取已有的空闲的 m,若不存在,则会创建一个新的 m.
func startm(_p_ *p, spinning bool) { mp := acquirem() lock(&sched.lock) // ... // 获取 m nmp := mget() if nmp == nil { // 创建 m id := mReserveID() unlock(&sched.lock) var fn func() // ... // 绑定 p newm(fn, _p_, id) // ... return } unlock(&sched.lock) // ... }
在 m 需要执行系统调用前,会先执行位于 runtime/proc.go 的 reentersyscall 的方法:
func reentersyscall(pc, sp uintptr) { // 此时执行权同样位于 m 的 g0 手中; _g_ := getg() // ... // 保存当前 g 的执行环境; save(pc, sp) _g_.syscallsp = sp _g_.syscallpc = pc // 将 g 和 p 的状态更新为 syscall; casgstatus(_g_, _Grunning, _Gsyscall) // ... // 解除 p 和 当前 m 之间的绑定,因为 m 即将进入系统调用而导致短暂不可用; pp := _g_.m.p.ptr() pp.m = 0 // 将 p 添加到 当前 m 的 oldP 容器当中,后续 m 恢复后,会优先寻找旧的 p 重新建立绑定关系. _g_.m.oldp.set(pp) _g_.m.p = 0 // 将 g 和 p 的状态更新为 syscall; atomic.Store(&pp.status, _Psyscall) // ...
当 m 完成了内核态的系统调用之后,此时会步入位于 runtime/proc.go 的 exitsyscall 函数中,尝试寻找 p 重新开始运作:
func exitsyscall() { // 方法执行之初,此时的执行权是普通 g. _g_ := getg() // ... // 倘若此前设置的 oldp 仍然可用,则重新和 oldP 绑定 if exitsyscallfast(oldp) { // ... // 将当前 g 重新置为 running 状态,然后开始执行后续的用户函数; casgstatus(_g_, _Gsyscall, _Grunning) // ... return } // ... // old 绑定失败,则调用 mcall 方法切换到 m 的 g0,并执行 exitsyscall0 方法: mcall(exitsyscall0) // ... } // func exitsyscall0(gp *g) { // 将 g 由系统调用状态切换为可运行态,并解绑 g 和 m 的关系 casgstatus(gp, _Gsyscall, _Grunnable) dropg() // 加锁 --> 从全局 p 队列获取可用的 p lock(&sched.lock) var _p_ *p if schedEnabled(gp) { _p_, _ = pidleget(0) } var locked bool // 如果获取到了,则执行 g: if _p_ == nil { globrunqput(gp) } // 释放锁 unlock(&sched.lock) // 如若无 p 可用,则将 g 添加到全局队列, if _p_ != nil { acquirep(_p_) execute(gp, false) // Never returns. } // ... // 当前 m 陷入沉睡. 直到被唤醒后才会继续发起调度. stopm() schedule() // Never returns. }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。