赞
踩
一个小活动: https://developer.aliyun.com//topic/lingma/activities/202403?taskCode=14508&recordId=40dcecb786f9a65c2e83e95306822ce4#/?utm_content=m_fission_1 「通义灵码 · 体验 AI 编码,开 AI 盲盒」
githubio地址:https://a18792721831.github.io/
select 是Go在语言层面提供的多路I/O复用机制,用于检测多个chan是否就绪。
建议先查看chan的文章:https://jiayq.blog.csdn.net/article/details/135885482
select 只能作用于chan,包括数据读取和写入:
func TestSelect(t *testing.T) {
var c chan string
c = make(chan string)
var msg string
h := "hello"
select {
case msg = <-c:
fmt.Printf("msg = %s\n", msg)
case c <- h:
fmt.Printf("say hello\n")
}
}
在上面的代码中,select有两个case语句,分别对应chan的读取和写入操作。
因为创建的chan没有缓存,在写入chan的同时需要有goroutine去读取,相应的,在读取chan的同时必须有goroutine写入。
上面只有一个goroutine,所以是select阻塞的。
如果对上面的代码做个修改,设置缓存大小为1:
func TestSelect(t *testing.T) {
var c chan string
c = make(chan string, 1)
var msg string
h := "hello"
select {
case msg = <-c:
fmt.Printf("msg = %s\n", msg)
case c <- h:
fmt.Printf("say hello\n")
}
}
因为缓存区为1,此时就能写入了,所以select的写入case是不阻塞的,执行chan写入case。
如果先给设置一个初值呢:
func TestSelect(t *testing.T) {
var c chan string
c = make(chan string, 1)
c <- "hi"
var msg string
h := "hello"
select {
case msg = <-c:
fmt.Printf("msg = %s\n", msg)
case c <- h:
fmt.Printf("say hello\n")
}
}
因为缓存区只有1,已经满了,所以chan无法再写入了,只有chan读取是不阻塞的,所以每次都会执行chan读取。
如果chan既能写,又能读呢:
func TestSelect(t *testing.T) {
var c chan string
c = make(chan string, 2)
c <- "hi"
var msg string
h := "hello"
select {
case msg = <-c:
fmt.Printf("msg = %s\n", msg)
case c <- h:
fmt.Printf("say hello\n")
}
}
当两个case都满足的时候,多次执行,发现select选择的case是不确定的:
可能要多执行几次才能发现不一样。
如果没有case满足,而且又不希望阻塞呢:
func TestSelect(t *testing.T) {
var c chan string
c = make(chan string)
var msg string
h := "hello"
select {
case msg = <-c:
fmt.Printf("msg = %s\n", msg)
case c <- h:
fmt.Printf("say hello\n")
default:
fmt.Printf("default case \n")
}
}
此时就不会阻塞了。
如果既有default,又有多个不阻塞的case呢:
func TestSelect(t *testing.T) {
var c chan string
c = make(chan string, 2)
c <- "hi"
var msg string
h := "hello"
select {
case msg = <-c:
fmt.Printf("msg = %s\n", msg)
case c <- h:
fmt.Printf("say hello\n")
default:
fmt.Printf("default case \n")
}
}
此时只会随机执行case语句,不会执行default语句。
总结下,select的每个case语句只能操作一个chan,要么写入数据,要么读取数据。
如果没有chan可以进行读写操作,在没有default的情况下,select会阻塞,直到任意一个chan解除阻塞。
如果存在多个case不阻塞,那么select会随机挑选一个执行。
如果case阻塞,有default,那么select会执行default。
如果case不阻塞,有default,那么select永远不会执行default。
select 为Go语言的预留关键字,不是函数,可以在case语句中声明变量并为变量赋值。
case语句读取chan时,最多可以给两个变量赋值:
func TestSelect(t *testing.T) {
var c chan string
c = make(chan string, 2)
c <- "hi"
var msg string
select {
case <-c: // 0个变量
fmt.Printf("skip res\n")
case msg = <-c: // 一个变量
fmt.Printf("msg = %s\n", msg)
case msg, ok := <-c: // 两个变量,注意这里的 msg 重新定义了,与select外面的没有关系了,只是同名
fmt.Printf("msg=%s, ok=%v", msg, ok)
}
}
case 语句中chan的读取有两个返回条件,第一个是成功读取到数据,第二个是没有数据,且管道已经被关闭。
chan 读取更多资料:https://jiayq.blog.csdn.net/article/details/135885482
msg 在case中重新声明了, 简短变量声明更多资料:https://jiayq.blog.csdn.net/article/details/136428399
使用两个变量接收chan读取的返回值,可以判断读取到的值是否可信。
如果在上面的代码中增加 close(c)
,这样case语句中的chan读取到了零值:
func TestSelect(t *testing.T) {
var c chan string
c = make(chan string)
var msg string
close(c)
select {
case <-c:
fmt.Printf("skip res\n")
case msg = <-c:
fmt.Printf("msg = %s\n", msg)
case msg, ok := <-c:
fmt.Printf("msg=%s, ok=%v", msg, ok)
}
}
此时就拿到了零值,如果不判断是否可信,而是直接使用,就使用了预期之外的数据。
select 中的default语句不能处理chan的读写操作,当select的所有case语句都阻塞时,default语句将被执行。
在1.1中已经尝试了default的例子了。
需要注意的是,在一个select中,default只能出现一次,不限制位置,可以出现在任意顺序。
如果我们启动goroutine处理任务,并且不希望main函数退出,需要永久性阻塞main函数。
在Kubernetes项目的多个组件中均有使用select阻塞main函数的例子:
func main(){
server := webhooktesting.NewTestServer(nil)
server.StartTLS()
fmt.Println("servering on", server.URL)
select {}
}
如果一个select不包含case语句和default语句,那么goroutine就会陷入永久性阻塞中。
如果使用chan在不同的goroutine中传递异常错误,使用select语句可以快速检查chan中是否有错误,避免等待:
errCh := make(chan error, activate)
jm.deleteJobPods(&job, activatePods, errCh) // 传入chan用于记录错误
select{
case manageJobErr = <- errCh: // 检查是否有错误发生
if manageJobErr != nil {
break
}
default: // 没有错误,快速结束检查
}
如果没有异常错误,那么select会执行default语句,不会因为从chan中读取数据导致阻塞。
如果想实现一个有效期的chan,当超过有效期,那么关闭chan,不能在写入了。
func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
stopChWithTimeout := make(chan struct{})
go func() {
select {
case <- stopCh: // 自然结束
case <- time.After(timeout): // chan有效期
}
close(stopChWithTimeout)
} ()
return stopChWithTimeout
}
函数返回一个chan,可以在函数之间传递,但是chan会在指定时间后自动关闭。
请先思考:
select中的case语句对应于runtime包中的scase(select-case)数据结构reflect/value.go:2480
:
// A runtimeSelect is a single case passed to rselect.
// This must match ../runtime/select.go:/runtimeSelect
type runtimeSelect struct {
dir SelectDir // SelectSend, SelectRecv or SelectDefault
typ *rtype // channel type
ch unsafe.Pointer // channel
val unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir)
}
可以看出每个select-case的结构都如上,也就是说,每个case中必须包含chan,而且只能有一个chan。
另外,编译器在处理case语句时,如果case语句中没有chan,则会给出编译错误:
select case must be receive, send or assign recv
在runtime/select.go
中也有定义:
// A runtimeSelect is a single case passed to rselect.
// This must match ../reflect/value.go:/runtimeSelect
type runtimeSelect struct {
dir selectDir
typ unsafe.Pointer // channel type (not used here)
ch *hchan // channel
val unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir)
}
其中的dir是标明了case的类型,总共有:
// These values must match ../reflect/value.go:/SelectDir.
type selectDir int
const (
_ selectDir = iota
selectSend // case Chan <- Send
selectRecv // case <-Chan:
selectDefault // default
)
总共有4中:chan 的值为nil, 写chan, 读chan, default。
当chan的值为nil,那么不管是读还是写,都会永久阻塞,也就是说,如果case中的chan的值为nil,这类case会永远阻塞,select永远不会选中执行。
这也是为什么在case语句中向值为nil的chan中写数据不会panic。
这也是为什么会忽略第一个值。
default为特殊类型的case语句,其不会操作chan。而且,每个select语句中只能存在一个default语句,并且default语句可以出现在任意位置。
在reflect/value.go
中,存在对外可见的SelectCase声明:
type SelectCase struct {
Dir SelectDir // direction of case
Chan Value // channel to use (for send or receive)
Send Value // value to send (for send)
}
这个相比较少了很多运行时的定义。
在reflect/value.go
中,有对select的执行逻辑的定义:
// Select executes a select operation described by the list of cases. // Like the Go select statement, it blocks until at least one of the cases // can proceed, makes a uniform pseudo-random choice, // and then executes that case. It returns the index of the chosen case // and, if that case was a receive operation, the value received and a // boolean indicating whether the value corresponds to a send on the channel // (as opposed to a zero value received because the channel is closed). // Select supports a maximum of 65536 cases. func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool) { // select 的case 最多有65535 个 // int = int32 = 2^16 - 1 ,最高位是符号位 // 如果 case 的数量超过 65535,panic if len(cases) > 65536 { panic("reflect.Select: too many cases (max 65536)") } // 使用内部运行时的类型拷贝一份case var runcases []runtimeSelect // 如果 select中的case大于4个 if len(cases) > 4 { // 根据长度申请 slice 的长度,避免扩容 runcases = make([]runtimeSelect, len(cases)) } else { // 如果少于4个,那么申请 slice 的时候,长度按实际长度申请,容量为4 runcases = make([]runtimeSelect, len(cases), 4) } // 初始默认没有 default haveDefault := false // 遍历 select的case数组 for i, c := range cases { // 访问遍历的 case rc := &runcases[i] // 重新读取,可能是防止幻读? rc.dir = c.Dir // 看看当前遍历的是哪类case switch c.Dir { // 不在定义中的case类型,panic default: panic("reflect.Select: invalid Dir") // default 类型的 case case SelectDefault: // default // 如果已经有了default,那么会panic,表示一个select中有两个default if haveDefault { panic("reflect.Select: multiple default cases") } // 设置有default haveDefault = true // 如果default中有chan,那么panic,default中不能有chan if c.Chan.IsValid() { panic("reflect.Select: default case has Chan value") } // 如果chan有数据,表示写chan,default中不能写chan if c.Send.IsValid() { panic("reflect.Select: default case has Send value") } // 写chan类型的case case SelectSend: ch := c.Chan // chan如果为nil,忽略 if !ch.IsValid() { break } // 必须是chan类型 ch.mustBe(Chan) // 必须可见 ch.mustBeExported() // 获取dir值 tt := (*chanType)(unsafe.Pointer(ch.typ)) // 再次判断chan必须是写chan if ChanDir(tt.dir)&SendDir == 0 { panic("reflect.Select: SendDir case using recv-only channel") } // 设置内部类型属性 rc.ch = ch.pointer() rc.typ = &tt.rtype // 获取写chan的数据 v := c.Send // 数据是否有效,如果写chan,但是没有数据写,也会panic if !v.IsValid() { panic("reflect.Select: SendDir case missing Send value") } v.mustBeExported() // 设置内部类型属性 v = v.assignTo("reflect.Select", tt.elem, nil) if v.flag&flagIndir != 0 { rc.val = v.ptr } else { rc.val = unsafe.Pointer(&v.ptr) } // 读chan类型的case case SelectRecv: // chan是否有效 if c.Send.IsValid() { panic("reflect.Select: RecvDir case has Send value") } ch := c.Chan // chan如果为nil,忽略 if !ch.IsValid() { break } ch.mustBe(Chan) ch.mustBeExported() tt := (*chanType)(unsafe.Pointer(ch.typ)) if ChanDir(tt.dir)&RecvDir == 0 { panic("reflect.Select: RecvDir case using send-only channel") } rc.ch = ch.pointer() rc.typ = &tt.rtype rc.val = unsafe_New(tt.elem) } } // 将select-case数组转换为runtimeSelectCase类型,调用rselect,得到select命中的case和是否可信 chosen, recvOK = rselect(runcases) // 如果是读chan,那么需要返回值 if runcases[chosen].dir == SelectRecv { tt := (*chanType)(unsafe.Pointer(runcases[chosen].typ)) t := tt.elem p := runcases[chosen].val fl := flag(t.Kind()) if ifaceIndir(t) { recv = Value{t, p, fl | flagIndir} } else { recv = Value{t, *(*unsafe.Pointer)(p), fl} } } // 返回case的赋值 return chosen, recv, recvOK }
在reflect/value.go
中,select主要是将select-case数组进行规则校验,如果规则校验通过,那么转换为runtimeSelectCase数组,并调用select命中逻辑。
该函数返回select命中的是哪个case,值是什么,是否ok。
Go在运行时,runtime/select.go
中的reflect_rselect()
函数用于处理select语句
这里有个知识点://go:linkname
。
可以看到在reflect/Value.go
中的Select
函数调用了rselect
函数,但是rselect
函数没有方法体:
而rselect
函数的方法体在runtime/select.go
中:
可以了解下
go:linkname
//go:linkname reflect_rselect reflect.rselect func reflect_rselect(cases []runtimeSelect) (int, bool) { // 如果select没有case,那么将陷入永久性阻塞 if len(cases) == 0 { // 永久性阻塞 block() } // 创建一个 slice ,长度是select-case数组的长度(包含default) // 用于分离读chan和写chan,将写chan放在前面,读chan放在后面 sel := make([]scase, len(cases)) // 创建一个 int 类型的 slice,存储 select-case数组的下标,长度和select-case数组的长度相同 // 用于记录原始顺序 orig := make([]int, len(cases)) // 创建两个变量,统计select-case数组中读chan和写chan的数量 nsends, nrecvs := 0, 0 // 存储 default-case的位置 dflt := -1 // 对select-case数组进行遍历 for i, rc := range cases { var j int switch rc.dir { // 如果是default-case,那么记录default下标 case selectDefault: dflt = i continue // 如果是写chan,那么写chan的数量加1,同时记录当前下标 case selectSend: j = nsends nsends++ // 如果是读chan,那么读chan的数量加1,同时记录翻转的下标 // 这里是为了将读chan和写chan进行分类,写chan移动到slice的前面,读chan移动到slice的后面 case selectRecv: nrecvs++ j = len(cases) - nrecvs } // 将传入的select-case做类型转换后,写入新slice sel[j] = scase{c: rc.ch, elem: rc.val} // 记录原始位置 orig[j] = i } // 如果 select-case数组中只有default,那么直接选中并返回 if nsends+nrecvs == 0 { return dflt, false } // 如果slice中间存在空缺,空缺是default,上面的for-range中,没有将default-case放入slice if nsends+nrecvs < len(cases) { // 做合并,将空缺移动到最后,也就是将default-case放到最后 copy(sel[nsends:], sel[len(cases)-nrecvs:]) copy(orig[nsends:], orig[len(cases)-nrecvs:]) } // 创建一个2倍读写chan的case长度的slice,不计算default-case order := make([]uint16, 2*(nsends+nrecvs)) var pc0 *uintptr // 是否编译时有 -race if raceenabled { pcs := make([]uintptr, nsends+nrecvs) for i := range pcs { selectsetpc(&pcs[i]) } pc0 = &pcs[0] } // 调用 selectgo 函数选择 case chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1) // 如果没有选中读写chan,那么select选中default if chosen < 0 { chosen = dflt } else { // select命中的下标(原顺序的下标) chosen = orig[chosen] } // 返回 select命中的下标和可信标记 return chosen, recvOK }
可以发现runtime/select.go
中的reflect_rselect
函数提取出来读写chan的case,
并按照先写后读的顺序进行转换为内部类型的slice,最后调用selectgo
函数进行select选择读写chan,
如果读写chan都没有选中,那么select选中default,如果没有default,那么会阻塞等待。
接下来看下runtime/select.go
中的selectgo
函数:
// cas0是select-chan-case的slice,order0是原顺序的slice,pc0是 -race 的slice // nsends是写chan的case数量, nrecvs是读chan的case数量 // block是是否阻塞的标志,如果有default,那么没有选中返回-1,不阻塞 // 如果没有default,那么没有选中阻塞,直到有case就绪 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { // 是否是debug模式,debug模式会打印一些关键信息 if debugSelect { print("select: cas0=", cas0, "\n") } // 将cas0指向一个最大长度为65535的数组 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0)) // 将cas0指向一个最大长度为65535*2的数组 order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) // 计算select-chan-case的数量 ncases := nsends + nrecvs // 从 cas1中切取select-chan-case切片 // 这里填个坑:https://jiayq.blog.csdn.net/article/details/135716705 // 切片扩展表达式3.3 scases := cas1[:ncases:ncases] // 从 order1 中获取chan-case长度的切片,假设chan-case长度为n,这里从[0,675535*2)拿到了 [0,n) 长度的切片 // 使用切片扩展表达式,设置容量为n pollorder := order1[:ncases:ncases] // 从 order1 中获取 [n,2n) 长度的切片 // 使用切片扩展表达式,设置容量为n lockorder := order1[ncases:][:ncases:ncases] // 获取cpu的纳秒时间戳 var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 统计有效select-chan-case的数量 norder := 0 // 对传入的select-chan-case切片做遍历 for i := range scases { cas := &scases[i] // 如果 chan 为空,重新赋值,或者压根没有赋值,值为nil的chan // 那么清空chan的数据变量,忽略这个chan-case if cas.c == nil { cas.elem = nil // allow GC continue } // 获取一个[0, norder]的随机数 j := fastrandn(uint32(norder + 1)) // 将随机位置的数字放到递增的norder位置 // 当前位置是零值,未做初始化,将当前位置设置为之前任意位置的值 pollorder[norder] = pollorder[j] // 将被选定的随机位置放下标位置 // 将被交换的位置设置为当前下标 // 实际上可以这么立即,首先将当前下标设置为当前位置,然后选择之前的随机一个位置进行交换 pollorder[j] = uint16(i) // select-chan-case加1 norder++ } // 到了这里就对 pollorder 进行了随机shuffle,并且统计出来有效的chan-case数量 // 只要有效的chan-case数量的切片 pollorder = pollorder[:norder] // 顺便对 lockorder也切一下 lockorder = lockorder[:norder] // 下面两个 for 构成一个【堆排序】 // 建立大顶堆,大顶堆是为了升序 // 遍历 lockorder // 大顶堆,根节点最大 // 这个遍历主要是实现 lockorder 的随机初始化,以及 lockorder 切片对应的chan-case的chan的地址是相等或递减的 for i := range lockorder { // 临时获取 lockorder 的下标 j := i // 获取 shuffle 切片 pollorder中第一个值 // 然后获取该值所指示的 chan-case 的chan c := scases[pollorder[i]].c // 根节点小于左节点 for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() { // 将 k 设置为 lockorder对于当前位置 一半的位置,堆排序,用数组存储一棵树,怎么存储。 k := (j - 1) / 2 // 交换 当前位置和 当前位置一半的位置 lockorder[j] = lockorder[k] // 继续循环,实现 在 lockorder 中对应的 chan-case的chan地址是相等或者递减顺序 j = k } // 将 shuffle 切片 pollorder中第一个值设置到 lockorder中的第一个值 lockorder[j] = pollorder[i] } // 堆排序 for i := len(lockorder) - 1; i >= 0; i-- { // 当前索引的 lockorder 的值 o := lockorder[i] // 获取 lockorder 对应的chan-case 的 chan c := scases[o].c // 将大顶堆的根节点换到数组末尾 lockorder[i] = lockorder[0] // 重新构建大顶堆 j := 0 for { // 左节点 k := j*2 + 1 // 如果到了已经排序过的位置,不在构建堆(数组最后是最大的,如果到了i,表示到了已经排过序的数据,不需要在参与构建大顶堆了) if k >= i { break } // 左节点小于右节点 if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() { k++ } // 根节点小于左节点,交换 if c.sortkey() < scases[lockorder[k]].c.sortkey() { lockorder[j] = lockorder[k] j = k continue } break } lockorder[j] = o } // 是否是debug模式,如果是,输出堆排序后升序chan-case中chan地址的lockorder切片 if debugSelect { for i := 0; i+1 < len(lockorder); i++ { if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() { print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n") throw("select: broken sort") } } } // 按照 lockorder 的顺序,给chan-case加锁(地址升序排列了,内存访问更快) sellock(scases, lockorder) var ( gp *g sg *sudog c *hchan k *scase sglist *sudog sgnext *sudog qp unsafe.Pointer nextp **sudog ) // pass 1 - look for something already waiting var casi int var cas *scase var caseSuccess bool var caseReleaseTime int64 = -1 var recvOK bool // 对shuffle的切片进行遍历 for _, casei := range pollorder { // chan-case 的下标 casi = int(casei) // chan-case cas = &scases[casi] // chan-case 的 chan c = cas.c // 如果 下标大于 写chan-case的数量,那么表示当前的chan-case是读chan-case // 需要注意,如果写chan关闭,就不能在写了,读chan关闭,如果有缓存区,那么还是能够读的 if casi >= nsends { // 获取读chan的goroutine等待队列 sg = c.sendq.dequeue() // 如果有读chan的等待队列,那么跳转到读取 // 如果有读chan的等待队列,那么对于读chan表示缓存区为空 if sg != nil { goto recv } // 如果读chan是带有缓存的,而且缓存区有待读取的数据,那么跳转到数据读取 if c.qcount > 0 { goto bufrecv } // 如果读chan已经关闭,那么挑战到读chan关闭 if c.closed != 0 { goto rclose } } else { // 不是读chan-case,那么就是写chan-case // 是否 -race if raceenabled { racereadpc(c.raceaddr(), casePC(casi), chansendpc) } // 如果写chan-case的chan已经关闭了,那么跳转到写chan关闭 if c.closed != 0 { goto sclose } // 获取写chan的等待队列 sg = c.recvq.dequeue() // 如果写chan的等待队列不为空,那么跳转到写入 // 如果存在等待队列,那么对于写chan表示缓存区已经满了 if sg != nil { goto send } // 如果写chan缓存区未满,那么跳转到写入数据 if c.qcount < c.dataqsiz { goto bufsend } } } // 如果有default-case ,那么block=false,如果没有default-case,那么block=true if !block { // chan-case解锁 selunlock(scases, lockorder) // 返回-1,表示select没有选中 chan-case,需要返回default-case casi = -1 // 跳转到返回 goto retc } // pass 2 - enqueue on all chans // 能走到这里,表示chan-case没有就绪,而且没有default-case,需要阻塞等待chan-case就绪 // 获取当前 goroutine 信息 gp = getg() // 如果已经在等待了,异常 if gp.waiting != nil { throw("gp.waiting != nil") } // 获取等待信息 nextp = &gp.waiting // 对lockorder进行遍历 for _, casei := range lockorder { // 获取chan-case的chan的升序地址的位置 casi = int(casei) // 获取chan-case cas = &scases[casi] // 获取 c c = cas.c // 获取等待的goroutine sg := acquireSudog() sg.g = gp // 等待的 goroutine 是否在select 阻塞中 sg.isSelect = true // 将当前chan-case的数据给等待的select的goroutine sg.elem = cas.elem sg.releasetime = 0 if t0 != 0 { sg.releasetime = -1 } // 将chan-case的chan赋值给goroutine sg.c = c // 继续下一个等待的goroutine *nextp = sg nextp = &sg.waitlink // 如果当前索引小于写chan-case的数量,那么说明当前是写chan if casi < nsends { // 将等待的goroutine加入chan的等待队列 c.sendq.enqueue(sg) } else { // 否则就是读chan了 c.recvq.enqueue(sg) } } // 等待goutine 被从chan的等待队列唤醒(其他的goroutine操作chan,会唤醒等待队列) // 释放 goroutine占用的P资源 gp.param = nil // 使用原子操作设置goroutine处于等待chan唤醒状态 atomic.Store8(&gp.parkingOnChan, 1) // 设置goroutine为等待状态,selparkcommit是goroutine唤醒的逻辑 gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1) // 设置goroutine不是激活状态 gp.activeStackChans = false // 再次加锁 sellock(scases, lockorder) // 设置goroutine未完成 gp.selectDone = 0 // 获取goroutine sg = (*sudog)(gp.param) // 释放goroutine的P资源 gp.param = nil // pass 3 - dequeue from unsuccessful chans // otherwise they stack up on quiet channels // record the successful case, if any. // We singly-linked up the SudoGs in lock order. // 如果没有选中chan-case,那么返回 -1,必须有default-case casi = -1 cas = nil // case 执行是否成功 caseSuccess = false // 获取goroutine等待列表 sglist = gp.waiting // 链表遍历等待的goroutine for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { // 退出 select 等待状态 sg1.isSelect = false // 数据清空 sg1.elem = nil // chan释放 sg1.c = nil } // 等待状态退出 gp.waiting = nil // 遍历chan-case切片 for _, casei := range lockorder { // 获取 chan-case k = &scases[casei] // 如果是goroutine的等待链表头 if sg == sglist { // 获取chan-case的切片的下标 casi = int(casei) // 设置chan-case cas = k // 获取是否成功的状态 caseSuccess = sglist.success // 如果释放时间大于0,那么获取释放时间 if sglist.releasetime > 0 { caseReleaseTime = sglist.releasetime } } else { // 获取chan-case的chan c = k.c // 如果当前索引小于写chan的数量,那么当前chan就是写chan if int(casei) < nsends { // 从写chan的等待队列中移除goroutine c.sendq.dequeueSudoG(sglist) } else { // 从读chan的等待队列中移除goroutine c.recvq.dequeueSudoG(sglist) } } // 移动下一个等待的goroutine sgnext = sglist.waitlink // 从链表移除 sglist.waitlink = nil // 释放goroutine资源 releaseSudog(sglist) // 移动 sglist = sgnext } // 如果没有就绪,但是却被唤醒了 if cas == nil { throw("selectgo: bad wakeup") } // 拿到chan c = cas.c // 打印调试日志 if debugSelect { print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n") } // 判断是读chan-case还是写chan-case if casi < nsends { // 写chan // 如果写chan-case未成功,那么跳转到写chan关闭 if !caseSuccess { goto sclose } } else { // 如果写chan成功了,那么设置返回可信标志 recvOK = caseSuccess } // -race if raceenabled { if casi < nsends { raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } else if cas.elem != nil { raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) } } // -msan if msanenabled { if casi < nsends { msanread(cas.elem, c.elemtype.size) } else if cas.elem != nil { msanwrite(cas.elem, c.elemtype.size) } } // chan-case解锁 selunlock(scases, lockorder) // 跳转到返回 goto retc // 读chan读取数据 bufrecv: // -race if raceenabled { if cas.elem != nil { raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) } racenotify(c, c.recvx, nil) } // -msan if msanenabled && cas.elem != nil { msanwrite(cas.elem, c.elemtype.size) } // 设置可信标志 recvOK = true // 从chan的缓存区读取数据 qp = chanbuf(c, c.recvx) // 如果读chan-case的数据区不为空,那么清空并交换 if cas.elem != nil { typedmemmove(c.elemtype, cas.elem, qp) } // 内存整理 typedmemclr(c.elemtype, qp) // 读取索引(缓存区是环形列表) c.recvx++ // 如果读取指针追上写入指针,表示缓冲区空(数组构成的环形队列,到了尾端了) if c.recvx == c.dataqsiz { // 重置指针 c.recvx = 0 } // 缓存区可读数据减1 c.qcount-- // 解锁chan-case selunlock(scases, lockorder) // 跳转返回 goto retc // 写chan发送数据 bufsend: if raceenabled { racenotify(c, c.sendx, nil) raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } if msanenabled { msanread(cas.elem, c.elemtype.size) } // 从 写chan-case中将数据写入chan的缓冲区 typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) // chan写入指针以东南 c.sendx++ // 如果写入指针追上读取指针,表示缓冲区满(数组构成的环形队列,到了尾端了) if c.sendx == c.dataqsiz { // 重置指针 c.sendx = 0 } // 缓冲区数据加1 c.qcount++ // 解锁chan-case selunlock(scases, lockorder) // 跳转返回 goto retc // 读chan读取 recv: // 阻塞读取 recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) // 打印调试信息 if debugSelect { print("syncrecv: cas0=", cas0, " c=", c, "\n") } // 设置可信标志 recvOK = true // 跳转返回 goto retc // 读关闭的chan,有缓冲区的chan,关闭后可读 rclose: // 解锁chan-case selunlock(scases, lockorder) // 设置可信标志(为什么是false,当数据未读取完的时候,应该是true) recvOK = false if cas.elem != nil { typedmemclr(c.elemtype, cas.elem) } if raceenabled { raceacquire(c.raceaddr()) } // 跳转返回 goto retc // 写chan发送 send: if raceenabled { raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } if msanenabled { msanread(cas.elem, c.elemtype.size) } // 阻塞发送 send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) // 打印调试信息 if debugSelect { print("syncsend: cas0=", cas0, " c=", c, "\n") } // 跳转返回 goto retc // 返回 retc: // 如果case释放时间大于0,那么发送阻塞事件 if caseReleaseTime > 0 { blockevent(caseReleaseTime-t0, 1) } // 返回选中的chan-case和可信标志 return casi, recvOK // 写关闭的chan,panic sclose: // 解锁chan-case selunlock(scases, lockorder) // panic,关闭的chan不允许写 panic(plainError("send on closed channel")) }
首先在reflect/Value.go
中,存在对select的定义,在reflect/Value.go
中,调用Select
函数将select-case转换为内部类型,
并且会进行select-case的规则校验,调用rselect
函数进行select选中。
rselect
函数通过//go:linkname
在runtime/select.go
中实现,实现函数为reflect_rselect
。
在runtime/select.go
中的reflect_rselect
函数中对传入的select-case切片做读chan-case和写chan-case的分类,并且剔除永远阻塞和default-case。
接着调用runtime/select.go
中的selectgo
函数进行chan-case的选中。
在selectgo
函数中,对传入的select-case首先进行shuffle,得到pollorder。
然后使用大顶堆,对select-case构建lockorder,lockoder中的chan地址升序。
如果读chan-case的chan的等待队列不为空,那么唤醒等待goroutine,进行数据读取。
如果读chan-case的chan的等待队列为空,但是缓存区存在数据,那么进行缓存区数据读取。
如果读chan-case的chan已经关闭,那么直接读缓存区数据,否则返回不可信标志。
如果写chan-case的chan已经关闭,那么panic。
如果写chan-case的chan的等待队列不为空,那么唤醒并写入。
如果写chan-case的chan的缓冲区不满,那么写入数据。
如果有default,那么没有选中直接返回。上层调用直接返回default-case。
如果没有default,那么阻塞读写。
这里填个坑:https://jiayq.blog.csdn.net/article/details/135716705 ,切片扩展表达式3.3。
Go在运行时包中提供了selectgo
函数用于处理select语句:
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)
selectgo
函数会从一组case语句中挑选一个case,并返回命中case的下标,对于读chan-case,还会返回是否成功地读取了数据(第二个返回值对于其他类型case无意义)。
编译器会将select中的case语句存储在一个数组中,selectgo
的第一个参数cas0就是这个数组的地址,参数nsends,nrecvs表示读chan-case和写chan-case的个数。
selectgo
第二个参数order0是一个整型数组地址,长度是chan-case个数的2倍,order0数组是case执行随机性的关键。
order0数组被一分为二,前半部分存放chan-case的随机顺序(源代码中称为pollorder),selectgo
函数会将原始的chan-case顺序打乱,这样在检查每个chan-case是否就绪时,就会表现出随机性。
后半部分存放chan加锁的顺序(源代码中称之为lockorder),selectgo
会按照chan地址升序的顺序对chan进行加锁,从而避免因重复加锁引发的死锁问题。
当所有的chan-case都不可能就绪时,selectgo
会陷入永久的阻塞,此时函数不会返回。一旦selectgo
返回,就说明某个chan-case语句就绪了。
第一个返回值代表case的编号,这个编号与代码中出现的顺序一致,而非打乱后的顺序。
第二个返回值代表是否从chan中读取了数据,该值只针对读chan-case有意义。特别注意的是,第二个返回值为true时,仅代表从chan中读取了数据,对于已经关闭了的chan也是如此。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。