当前位置:   article > 正文

Golang Channel_golang判断channel为空

golang判断channel为空

Channel解析

1. Channel源码分析
1.1 Channel数据结构
type hchan struct {
	qcount   uint           // channel的元素数量
	dataqsiz uint           // channel循环队列长度
	buf      unsafe.Pointer // 指向循环队列的指针
	elemsize uint16         // 元素大小
	closed   uint32         // channel是否关闭 0-未关闭
	elemtype *_type // 元素类型
	sendx    uint   // 当前已发送的元素在队列当中的索引位置
	recvx    uint   // 当前已接受的元素在队列当中的索引位置
	recvq    waitq  // 阻塞的接受goroutine
	sendq    waitq  // 阻塞的发送goroutine

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

// waitq 是一个双向链表,里面保存了 goroutine
type waitq struct {
	first *sudog
	last  *sudog
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

channel底层示例图:

image-20211012230659188
1.2 创建channel

Go中通过调用makechan(t *chantype, size int)来创建channel,源码分为如下两部分:

校验

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
  // 元素大小不允许超过16kb
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
  // 判断当前的 hchanSize 是否是 maxAlign 整数倍,并且元素的对齐大小不能大于最大对齐的大小
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在校验时,有两个参数hchansizemaxAlign,值如下:

const (
   // 获取maxAlign 是内存对齐的最大值,这个等于 64 位 CPU 下的 cacheline 的大小
   maxAlign  = 8
   // hchanSize 计算 unsafe.Sizeof(hchan{}) 最近的 8 的倍数,一脸懵逼
   hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

对于hchanSize举一个简单的例子,假设
n = u n s a f e . S i z e o f ( h c h a n ) = 14 n = unsafe.Sizeof(hchan{}) = 14 n=unsafe.Sizeof(hchan)=14
n的补码为00001110,-n的补码为11110010,则

-n & (maxAlign - 1) = 2

-n           11110010
maxAlign - 1 00000111
res          00000010
  • 1
  • 2
  • 3
  • 4
  • 5

hchanSize = 16。

获取最近的8的倍数计算公式如下:
c = n + ( a − n % a ) c = n + (a - n\%a) c=n+(an%a)
这个公式就等同于
c = n + ( ( − n ) & ( a − 1 ) ) c = n + ((-n) \& (a - 1)) c=n+((n)&(a1))

创建Channel

func makechan(t *chantype, size int) *hchan {
	...
  
  // 当前元素和个数相乘获得元素占用的内存空间
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}
	var c *hchan
	switch {
	case mem == 0:  // 无缓冲
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.ptrdata == 0: // 无指针
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:  // 缓冲 || 指针
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	return c
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

在创建channel是一共有三种情况:

  1. 无缓冲通道:直接给hchan分配内存
  2. 元素不包含指针:为 hchan 和底层数组分配一段连续的内存地址
  3. 默认: 如果元素包含指针,分别为 hchan 和 底层数组分配内存地址

至此Channel的创建已完成。

2. 发送Channel

Go中通过调用chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)方法向channel中发送数据,源码部分如下:

channel的前置处理

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 
  // 当前channel是否为空
	if c == nil {
    // 非阻塞直接返回
		if !block {
			return false
		}
    // 阻塞:挂起当前协程
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

  // debug使用
	if debugChan {
		print("chansend: chan=", c, "\n")
	}
	
	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}
  // 快速失败
	if !block && c.closed == 0 && full(c) {
		return false
	}
}

func full(c *hchan) bool {
	if c.dataqsiz == 0 {
		// Assumes that a pointer read is relaxed-atomic.
		return c.recvq.first == nil
	}
	// Assumes that a uint read is relaxed-atomic.
	return c.qcount == c.dataqsiz
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

从源码中可以看出,channel的前置处理首先会判断channel是否为空,如果为空且为非阻塞,直接返回,否则调用gopark将当前goroutine挂起。

然后会进行快速失败的检测:非阻塞 && 未关闭 && channel满,其中,channel满共有两种情况:

  1. channel底层的环形数组为空,且当前没有接受的goroutine
  2. 环形数组已满

上锁

lock(&c.lock)
  • 1

发送数据

// 向已关闭的channel的信道发送数据直接panic
if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	
	// 直接发送
	// 如果存在阻塞的接受goroutine,直接调用send发送
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// 缓冲发送
  // 如果当前缓冲未满,将数据发送到缓冲数组buf
	if c.qcount < c.dataqsiz {
    // chanbuf(c, i) is pointer to the i'th slot in the buffer.
    // 指向缓存的第i个位置
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
    // typedmemmove copies a value of type t to dst from src.
    // 将ep复制到qp
		typedmemmove(c.elemtype, qp, ep)
    // 发送索引+1
		c.sendx++
    // 环形,如果sendx=dataqsize,置零
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
    // 元素数量+1
		c.qcount++
		unlock(&c.lock)
		return true
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

发送数据的流程首先会判断当前的channel是否关闭,继而进行直接发送或者缓冲发送。

如果缓冲区已满,我们继续分析ing

// 如果缓冲区已满且非阻塞直接返回
if !block {
		unlock(&c.lock)
		return false
	}
	
  // 以下为阻塞发送的操作
	// 获取当前goroutine
	gp := getg()
  // 获取当前的sudog,同时设置相关信息,包括当前的channel,是否为select等。
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
  // 将sudog加入到发送队列 
	c.sendq.enqueue(mysg)
	atomic.Store8(&gp.parkingOnChan, 1)
  // 挂起当前 goroutine 等待接收 channel数据
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  // 确保当前数据为活跃状态,避免被回收
	KeepAlive(ep)

	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
 // 唤醒阻塞的channel
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

从上诉过程中,可以看出channel发送数据主要分为三种情况:

  1. 如果存在等待的接受channel,那么直接将数据发送出去;
  2. 如果channel是有缓冲的,并且缓冲区未满,将数据放在缓冲区中;
  3. 如果缓冲区已满,进入阻塞发送的阶段,获取到sudog后,设置相关信息,将当前的发送channel挂起,等待接受channel的唤醒。
3. 接受channel

接受channel和发送channel的流程基本类似,通过调用chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)接受数据,源码如下:

前置处理
	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}
	// 当前channel为空
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// 非阻塞且接受channel为空
	if !block && empty(c) {
		// 判断通道是否关闭,如果是未关闭的通道说明当前还没准备好数据,直接返回
    if atomic.Load(&c.closed) == 0 {
			return
		}
    // Fast path: 检查非阻塞的操作
    // empty 主要是有两种情况返回 true:
    // 1. 无缓冲channel,并且没有阻塞住发送者
    // 2. 有缓冲 channel,但是缓冲区没有数据
		if empty(c) {
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

// empty reports whether a read from c would block (that is, the channel is
// empty).  It uses a single atomic read of mutable state.
func empty(c *hchan) bool {
	// c.dataqsiz is immutable.
	if c.dataqsiz == 0 {
		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
	}
	return atomic.Loaduint(&c.qcount) == 0
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
接收数据
// channel 关闭且缓存数为0
if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
	// 和发送类似,接收数据时也是先看一下有没有正在阻塞的等待发送数据的 Goroutine
  // 如果有的话 直接调用 recv 方法从发送者或者是缓冲区中接收数据
	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	// 缓冲区存在数据
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

阻塞接受数据

// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

数据接收和发送其实大同小异,也是分为检查和数据接收,数据接收又分三种情况

  • 直接获取数据,如果当前有阻塞的发送者 Goroutine 走这条路
    • 如果是无缓冲 channel,直接从发送者那里把数据拷贝给接收变量
    • 如果是有缓冲 channel,并且 channel 已经满了,就先从 channel 的底层数组拷贝数据,再把阻塞的发送者 Goroutine 的数据拷贝到 channel 的循环队列中
  • 从 channel 的缓冲中获取数据,有缓冲 channel 并且缓存队列有数据时走这条路
    • 直接从缓存队列中复制数据给接收变量
  • 阻塞接收,剩余情况走这里
    • 和发送类似,先获取当前 Goroutine 信息,构造 sudog 加入到 channel 的 recvq 上
    • 然后休眠当前 Goroutine 等待唤醒
    • 唤醒后做一些清理工作,释放 sudog 返回
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/400402
推荐阅读
相关标签
  

闽ICP备14008679号