赞
踩
type hchan struct { qcount uint // channel的元素数量 dataqsiz uint // channel循环队列长度 buf unsafe.Pointer // 指向循环队列的指针 elemsize uint16 // 元素大小 closed uint32 // channel是否关闭 0-未关闭 elemtype *_type // 元素类型 sendx uint // 当前已发送的元素在队列当中的索引位置 recvx uint // 当前已接受的元素在队列当中的索引位置 recvq waitq // 阻塞的接受goroutine sendq waitq // 阻塞的发送goroutine // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex } // waitq 是一个双向链表,里面保存了 goroutine type waitq struct { first *sudog last *sudog }
channel底层示例图:
Go中通过调用makechan(t *chantype, size int)来创建channel,源码分为如下两部分:
校验
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
// 元素大小不允许超过16kb
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 判断当前的 hchanSize 是否是 maxAlign 整数倍,并且元素的对齐大小不能大于最大对齐的大小
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
...
}
在校验时,有两个参数hchansize 和maxAlign,值如下:
const (
// 获取maxAlign 是内存对齐的最大值,这个等于 64 位 CPU 下的 cacheline 的大小
maxAlign = 8
// hchanSize 计算 unsafe.Sizeof(hchan{}) 最近的 8 的倍数,一脸懵逼
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
)
对于hchanSize举一个简单的例子,假设
n
=
u
n
s
a
f
e
.
S
i
z
e
o
f
(
h
c
h
a
n
)
=
14
n = unsafe.Sizeof(hchan{}) = 14
n=unsafe.Sizeof(hchan)=14
n的补码为00001110,-n的补码为11110010,则
-n & (maxAlign - 1) = 2
-n 11110010
maxAlign - 1 00000111
res 00000010
hchanSize = 16。
获取最近的8的倍数计算公式如下:
c
=
n
+
(
a
−
n
%
a
)
c = n + (a - n\%a)
c=n+(a−n%a)
这个公式就等同于
c
=
n
+
(
(
−
n
)
&
(
a
−
1
)
)
c = n + ((-n) \& (a - 1))
c=n+((−n)&(a−1))
创建Channel
func makechan(t *chantype, size int) *hchan { ... // 当前元素和个数相乘获得元素占用的内存空间 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // 无缓冲 c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.ptrdata == 0: // 无指针 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 缓冲 || 指针 // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) return c }
在创建channel是一共有三种情况:
至此Channel的创建已完成。
Go中通过调用chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)方法向channel中发送数据,源码部分如下:
channel的前置处理
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// // 当前channel是否为空 if c == nil { // 非阻塞直接返回 if !block { return false } // 阻塞:挂起当前协程 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } // debug使用 if debugChan { print("chansend: chan=", c, "\n") } if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } // 快速失败 if !block && c.closed == 0 && full(c) { return false } } func full(c *hchan) bool { if c.dataqsiz == 0 { // Assumes that a pointer read is relaxed-atomic. return c.recvq.first == nil } // Assumes that a uint read is relaxed-atomic. return c.qcount == c.dataqsiz }
从源码中可以看出,channel的前置处理首先会判断channel是否为空,如果为空且为非阻塞,直接返回,否则调用gopark将当前goroutine挂起。
然后会进行快速失败的检测:非阻塞 && 未关闭 && channel满,其中,channel满共有两种情况:
上锁
lock(&c.lock)
发送数据
// 向已关闭的channel的信道发送数据直接panic if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 直接发送 // 如果存在阻塞的接受goroutine,直接调用send发送 if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 缓冲发送 // 如果当前缓冲未满,将数据发送到缓冲数组buf if c.qcount < c.dataqsiz { // chanbuf(c, i) is pointer to the i'th slot in the buffer. // 指向缓存的第i个位置 qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } // typedmemmove copies a value of type t to dst from src. // 将ep复制到qp typedmemmove(c.elemtype, qp, ep) // 发送索引+1 c.sendx++ // 环形,如果sendx=dataqsize,置零 if c.sendx == c.dataqsiz { c.sendx = 0 } // 元素数量+1 c.qcount++ unlock(&c.lock) return true }
发送数据的流程首先会判断当前的channel是否关闭,继而进行直接发送或者缓冲发送。
如果缓冲区已满,我们继续分析ing
// 如果缓冲区已满且非阻塞直接返回 if !block { unlock(&c.lock) return false } // 以下为阻塞发送的操作 // 获取当前goroutine gp := getg() // 获取当前的sudog,同时设置相关信息,包括当前的channel,是否为select等。 mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 将sudog加入到发送队列 c.sendq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) // 挂起当前 goroutine 等待接收 channel数据 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 确保当前数据为活跃状态,避免被回收 KeepAlive(ep) if mysg != gp.waiting { throw("G waiting list is corrupted") } // 唤醒阻塞的channel gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true
从上诉过程中,可以看出channel发送数据主要分为三种情况:
接受channel和发送channel的流程基本类似,通过调用chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)接受数据,源码如下:
if debugChan { print("chanrecv: chan=", c, "\n") } // 当前channel为空 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // 非阻塞且接受channel为空 if !block && empty(c) { // 判断通道是否关闭,如果是未关闭的通道说明当前还没准备好数据,直接返回 if atomic.Load(&c.closed) == 0 { return } // Fast path: 检查非阻塞的操作 // empty 主要是有两种情况返回 true: // 1. 无缓冲channel,并且没有阻塞住发送者 // 2. 有缓冲 channel,但是缓冲区没有数据 if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } // empty reports whether a read from c would block (that is, the channel is // empty). It uses a single atomic read of mutable state. func empty(c *hchan) bool { // c.dataqsiz is immutable. if c.dataqsiz == 0 { return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil } return atomic.Loaduint(&c.qcount) == 0 }
// channel 关闭且缓存数为0 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // 和发送类似,接收数据时也是先看一下有没有正在阻塞的等待发送数据的 Goroutine // 如果有的话 直接调用 recv 方法从发送者或者是缓冲区中接收数据 if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 缓冲区存在数据 if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false }
阻塞接受数据
// no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success
数据接收和发送其实大同小异,也是分为检查和数据接收,数据接收又分三种情况
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。