赞
踩
Golang作为一个原生支持用户态的语言,当提到并发进程,多线程的时候,是离不开锁的,锁是一种并发编程中的同步原语(Synchronization Primitives),它能保证多个 Goroutine 在访问同一片内存时不会出现竞争条件(Race condition)等问题。
go语言在sync包中提供了用于同步的一些基本原语,包括常见的sync.Mutex,sync.RWMutex,sync.WaitGroup,
sync.Once,sync.Cond.
这些基本原语提高了较为基础的同步功能,但是它们是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级的更高的 Channel 实现同步。
Mutex由两个字段:state,sema组成,其中state表示当前互斥锁的状态,而sema是用于控制锁的状态的信号量。上述两个加起来,只占用8个字节
状态
正常模式和饥饿模
加锁和解锁
读写互斥锁sync.RWMutex,是细粒度的互斥锁,她并不限制资源的并发读,但是读写,写写操作无法并行执行。一个常见的服务对资源的读写比例会非常高,因为大多数的读请求之间不会相互影响,所以我们可以读写资源操作的分离,在类似场景下提高服务的性能。
sync.RWMutex 中总共包含以下 5 个字段:
type RWMUtex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
我们会依次分析获取写锁和读锁的实现能力,其中:
与加锁的过程正好相反,写锁的释放分以下几个执行
读锁的加锁方法 sync.RWMutex.RLock
func (rw *RWMutex) RLock(){
if atomic.AddInt32(&rw.readerCount,1) < 0 {
runtime_SemacquireMutex(&rw.readerSem,false,0)
}
}
当 Goroutine 想要释放读锁时,会调用如下所示的 sync.RWMutex.RUnlock 方法
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount,-1);r<0{
rw.rUnlockSlow(r)
}
}
sync.WaitGroup 可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
reuqests := []*Requests{...}
wg := &sync.WaitGroup()
wg.Add(len(requests))
for _,request := range requests {
go func(r *Request){
defer wg.Done()
}(request)
}
wg.Wait()
我们可以通过 sync.WaitGroup 将原本顺序执行的代码在多个 Goroutine 中并发执行,加快程序处理的速度。
sync.WaitGroup 结构体中的成员变量非常简单,其中只包含两个成员变量
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
其中的 sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法传入了 -1,所以我们重点分析另外两个方法 sync.WaitGroup.Add 和 sync.WaitGroup.Wait
func (wg *WaitGroup) Add(delta int){
statep,semap := wg.state()
state := atomic.AddUint64(statep,uint64(delta)<<32)
v := int32(state >>32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if v > 0 || w == 0{
return
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
另一个方法 sync.WaitGroup.Wait
func (wg *WaitGroup) Wait(){
statep,semp := wg.state()
for {
state := atomic.LoadUint64(statep)
v :=int32(state >> 32)
if v == 0{
return
}
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if +statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
当 sync.WaitGroup 的计数器归零时,当陷入睡眠状态的 Goroutine 就被唤醒
Go 语言标准库中 sync.Once 可以保证在 Go 程序运行期间的某段代码只会执行一次。在运行如下所示的代码时,我们会看到如下所示的运行结果
func main() {
o := &sync.Once{}
for i:=0;i<10;i++{
o.Do(func(){
fmt.Println("ddd)
})
}
}
每一个 sync.Once 结构体中都只包含一个用于标识代码块是否执行过的 done 以及一个互斥锁 sync.Mutex
type Once struct {
done uint32
m Mutex
}
sync.Once.Do 是 sync.Once 结构体对外唯一暴露的方法
func (o *Once) Do(f func()){
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()){
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUinit32(&o.done,1)
f()
}
}
Go标准库的中的sync.Cond是一个条件变量,它可以让一系列的goroutine都在满足特定条件下时候被唤醒,每一个 sync.Cond 结构体在初始化时都需要传入一个互斥锁,我们可以通过下面的例子了解它的使用方法
func main() {
c := sync.NewCond(&sync.Mutex{})
for i :=0;i<10;i++{
go listen(c)
}
time.Sleep(1*time.Second)
go broadcast(c)
ch := make(chan os.Signal,1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func broadcast(c *sync.Cond){
c.l.Lock()
c.Broadcast()
c.l.Unlock()
}
func listen(c *sync.Cond) {
c.l.Lock()
c.wait()
fmt.Println("ddd")
c.l,Unlock()
}
上述代码同时运行了 11 个 Goroutine,这 11 个 Goroutine 分别做了不同事情:
sync.Cond.Signal 和 sync.Cond.Broadcast 方法就是用来唤醒调用 sync.Cond.Wait 陷入休眠的 Goroutine,它们两个的实现有一些细微差别:
在一般情况下,我们都会先调用 sync.Cond.Wait 陷入休眠等待满足期望条件,当满足唤醒条件时,就可以选择使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 唤醒一个或者全部的 Goroutine。
x/sync/errgroup.Group 就为我们在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能,我们可以使用如下所示的方式并行获取网页的数据
var g errgroup.Group
var urls = []string{
"http://www.golang.org"
"http://www.baidu.com"
}
for i := range urls {
url := urls[i]
g.Go(func() error {
resp,err := http.Get(url)
if err == nil{
resp.Body.Close()
}
return err
})
}
if err := g.Wait();err == nil{
fmt.Println("Successfully fetched all URLs.")
}
x/sync/errgroup.Group.Go 方法能够创建一个 Goroutine 并在其中执行传入的函数,而 x/sync/errgroup.Group.Wait 会等待所有 Goroutine 全部返回,该方法的不同返回结果也有不同的含义:
信号量是在并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,它会保证持有的计数器在 0 到初始化的权重之间波动
这个结构体对外也只暴露了四个方法:
在使用过程中需要注意以下几个问题
这个是Go语言的扩展包中提供的另外一个信号量,它能够在一个服务中抑制对下游的多次重复请求,比如在redis的缓存雪崩中,能够限制对同一个 Key 的多次重复请求,减少对下游的瞬时流量。
在资源获取非常昂贵的时候,就很适合使用x/sync/singleflight.Group
type service struct {
requestGroup singleflight.Group
}
func (s *service) handleRequest(ctx context.Context, request Request) (Response error){
v,err,_ := requestGroup.Do(request.Hash(),func() (interface{},error) {
rows, err := // select * from tables
if err != nil {
return nil, err
}
})
if err != nil{
return nil,err
}
return Response {
rows:rows,
},nil
}
因为请求的哈希在业务上一般表示相同的请求,所以上述代码使用它作为请求的键。当然,我们也可以选择其他的唯一字段作为 x/sync/singleflight.Group.Do 方法的第一个参数减少重复的请求。
喜欢的朋友记得点赞、收藏、关注哦!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。