赞
踩
目录
Mutex 是一种常用的锁机制,它可以用来保护临界区,确保同一时间只有一个 goroutine 访问共享资源。
- package main
-
- import (
- "fmt"
- "sync"
- "time"
- )
-
- // 使用场景:
- // 当多个 goroutines 需要访问和修改相同的变量或数据结构时,Mutex 可以用来确保每次只有一个 goroutine 在执行修改操作。
- func main() {
- var mu sync.Mutex
- count := 0
-
- var wg sync.WaitGroup
- wg.Add(100)
-
- for i := 0; i < 100; i++ {
- go func() {
- defer wg.Done()
- mu.Lock()
- count++
- fmt.Printf("Count increased to: %d\n", count)
- time.Sleep(time.Millisecond * 1) // 模拟耗时操作
- mu.Unlock()
- }()
- }
-
- wg.Wait()
- fmt.Println("Final count:", count)
- }

RWMutex 允许多个读操作同时进行,但是一次只能有一个写操作。这可以提高程序的性能,特别是当读操作远远多于写操作时。
- package main
-
- import (
- "fmt"
- "sync"
- )
-
- // 使用场景:
- // 当多个 goroutines 需要频繁读取共享数据,而写入操作较少时,RWMutex 可以提高并发性能。
- func main() {
- var mu sync.RWMutex
- count := 0
-
- var wg sync.WaitGroup
- wg.Add(10)
-
- for i := 0; i < 10; i++ {
- go func() {
- defer wg.Done()
- if i == 5 {
- mu.Lock()
- count++
- fmt.Printf("Write operation: Count increased to: %d\n", count)
- mu.Unlock()
- } else {
- mu.RLock()
- fmt.Printf("Read operation: Current count is: %d\n", count)
- mu.RUnlock()
- }
- }()
- }
-
- wg.Wait()
- fmt.Println("Final count:", count)
- }

Atomic 提供了一组原子操作,用于在不使用锁的情况下更新某些类型的变量,这对于避免锁的竞争和提高并发性能非常有用。它是实现锁的基石。
sync/atomic
包非常适合于那些需要高并发且操作简单的情况,例如计数器、标志位等。通过使用原子操作,可以显著减少锁的使用,从而提高程序的整体性能。对于整型变量,sync/atomic
提供了以下方法:
对于其他整型(int64, uint32, uint64, uintptr),也有类似的 Load, Store, Swap, Add, Sub, 和 CompareAndSwap 方法。
对于指针,sync/atomic
提供了以下方法:
- package main
-
- import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- )
-
- // 原子更新整型变量
- func main() {
- count := int64(0)
-
- var wg sync.WaitGroup
- wg.Add(10)
-
- for i := 0; i < 10; i++ {
- go func() {
- defer wg.Done()
- atomic.AddInt64(&count, 1)
- fmt.Printf("Count increased to: %d\n", atomic.LoadInt64(&count))
- time.Sleep(time.Millisecond * 50) // 模拟耗时操作
- }()
- }
-
- wg.Wait()
- fmt.Println("Final count:", atomic.LoadInt64(&count))
- }

- package main
-
- import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- "unsafe"
- )
-
- type MyStruct struct {
- Name string
- Age int
- }
-
- func main() {
- count := int64(0)
-
- var wg sync.WaitGroup
- wg.Add(10)
-
- for i := 0; i < 10; i++ {
- go func() {
- defer wg.Done()
- atomic.AddInt64(&count, 1)
- fmt.Printf("Count increased to: %d\n", atomic.LoadInt64(&count))
- time.Sleep(time.Millisecond * 50) // 模拟耗时操作
- }()
- }
-
- wg.Wait()
- fmt.Println("Final count:", atomic.LoadInt64(&count))
-
- // 使用指针
- var ptr unsafe.Pointer
- atomic.StorePointer(&ptr, unsafe.Pointer(new(MyStruct)))
-
- var wgPtr sync.WaitGroup
- wgPtr.Add(10)
-
- for i := 0; i < 10; i++ {
- go func() {
- defer wgPtr.Done()
- myStruct := (*MyStruct)(atomic.LoadPointer(&ptr))
- myStruct.Age++
- fmt.Printf("Age increased to: %d\n", myStruct.Age)
- time.Sleep(time.Millisecond * 50) // 模拟耗时操作
- }()
- }
-
- wgPtr.Wait()
- myStruct := (*MyStruct)(atomic.LoadPointer(&ptr))
- fmt.Println("Final age:", myStruct.Age)
- }

Channel 是 Go 中实现通信和同步的重要手段之一。它允许 goroutines 相互通信和同步。
消息队列,数据传递,信号通知,任务编排,锁
- package main
-
- import (
- "fmt"
- "time"
- )
-
- func main() {
- ch := make(chan int)
-
- go func() {
- val := <-ch // 从通道接收数据
- fmt.Println("Received value:", val)
- }()
-
- ch <- 1 // 发送数据到通道
- time.Sleep(time.Second)
- }

WaitGroup 用于等待一组 goroutines 完成它们的工作。
当你需要确保所有并发运行的 goroutines 都完成任务后再继续执行主 goroutine 时。
- package main
-
- import (
- "fmt"
- "sync"
- "time"
- )
-
- // 使用场景:
- // 当你需要确保所有并发运行的 goroutines 都完成任务后再继续执行主 goroutine 时。
- func main() {
- var wg sync.WaitGroup
-
- wg.Add(2)
- go func() {
- defer wg.Done()
- fmt.Println("goroutine 1 done")
- }()
-
- go func() {
- defer wg.Done()
- time.Sleep(time.Second)
- fmt.Println("goroutine 2 done")
- }()
-
- wg.Wait()
- fmt.Println("All goroutines finished")
- }

sync.WaitGroup
的内部结构主要包含以下几个关键部分:
state0
- 一个 uint32
类型的变量,用于存储等待组的状态。这个状态包含了两个重要的信息:
noCopy
- 一个 sync/noCopy
类型的字段,用于标记 WaitGroup
不应被复制。
sync.WaitGroup
提供了几个关键的方法:
Add(delta int)
- 增加或减少待完成的任务数量。如果 delta
为正,则增加;如果为负,则减少。当 delta
为负且减少了任务数量使得任务数量变为零时,会唤醒所有的等待者。
Done()
- 减少任务数量,通常用于表示一个任务已经完成。这相当于调用 Add(-1)
。
Wait()
- 阻塞当前 goroutine,直到所有任务完成。如果当前没有任务,那么 Wait()
方法会立即返回。
- // 结构体
- type WaitGroup struct {
- // 一个 sync/noCopy 类型的字段,用于标记 WaitGroup 不应被复制
- noCopy noCopy
-
- // state0 保存两个 32 位值的组合:
- // 低 32 位保存未完成的任务数量,
- // 高 32 位保存等待者的数量。
- state0 uint32
- }
-
- // Add方法
- // Add 方法负责更新任务数量,并在适当的时候唤醒等待者:
- func (wg *WaitGroup) Add(delta int) {
- // 从 state0 中获取当前的任务数量和等待者数量。
- old := atomic.LoadUint32(&wg.state0)
- for {
- // 解析出任务数量。
- n := int(old)
- n += delta
- // 如果任务数量小于 0,则返回错误。
- if n < 0 {
- panic(negCount)
- }
- // 新的状态,包括更新后的任务数量和等待者数量。
- new := uint32(n) // 仅更新任务数量,等待者数量不变。
- // 使用 CAS (compare-and-swap) 更新 state0。
- if atomic.CompareAndSwapUint32(&wg.state0, old, new) {
- break
- }
- old = atomic.LoadUint32(&wg.state0) // 重试
- }
-
- // 如果任务数量为 0,则唤醒所有等待者。
- if n == 0 {
- notifyAll(&wg.state0)
- }
- }
-
- // Done 方法
- // Done 方法实际上是对 Add(-1) 的封装:
- func (wg *WaitGroup) Done() {
- wg.Add(-1)
- }
-
- // Wait 方法
- // Wait 方法阻塞当前 goroutine 直到所有任务完成:
- func (wg *WaitGroup) Wait() {
- // 增加等待者数量。
- old := atomic.AddUint32(&wg.state0, waiters)
- // 如果任务数量为 0,则立即返回。
- if atomic.LoadUint32(&wg.state0)&pending == 0 {
- return
- }
-
- // 等待直到任务完成。
- wait(&wg.state0, old)
- }
-
- // 这里的 wait 函数是内部实现,它使用条件变量来等待,具体实现如下:
- // wait blocks until the state is zero.
- func wait(statep *uint32, old uint32) {
- for {
- // 如果任务数量为 0,则返回。
- if atomic.LoadUint32(statep)&pending == 0 {
- return
- }
-
- // 进入等待状态。
- runtime_notifyWait(&statep, old)
- old = atomic.LoadUint32(statep)
- }
- }
-
- // runtime_notifyWait 和 notifyAll 是 Go 运行时提供的函数,用于实现条件变量的等待和通知功能。

状态检查:
Add
方法中,通过原子操作检查当前任务数量是否为零。如果是零,则不需要做任何事情,直接返回。等待者处理:
Wait
方法中,当前 goroutine 成为等待者,并增加等待者数量。原子操作:
sync/atomic
包中的原子操作来更新状态,确保线程安全性。atomic.AddInt64
更新状态,通过 atomic.LoadInt64
获取状态。条件变量:
sync.runtime_notify
和 sync.runtime_wait
来实现条件变量的功能,以等待或通知等待者。Once 保证某个函数只被调用一次,即使有多个 goroutines 同时尝试调用该函数。
- package main
-
- import (
- "fmt"
- "sync"
- "time"
- )
-
- // 使用场景:
- // 当你想要确保某个初始化操作只执行一次时。
- func main() {
- var once sync.Once
-
- for i := 0; i < 10; i++ {
- go func() {
- once.Do(func() {
- fmt.Println("This will be printed only once")
- })
- }()
- }
-
- time.Sleep(time.Second)
- fmt.Println("Done")
- }

sync.Once
类型定义在一个 once
结构体中,该结构体包含以下字段:
uint32
类型的原子变量,用来表示是否已经执行过操作。sync.Once
的主要方法有两个:Do
和 Done
。
Do
方法执行完毕后会关闭这个通道。- type once struct {
- // done 是一个原子变量,如果操作未执行则为 0,已执行则为 1。
- done uint32
-
- // m 是一个互斥锁,在执行动作时持有。
- m Mutex
- }
-
- // Do 方法调用函数 f,如果这是第一次调用 Do 方法对于这个 Once 对象。
- // 如果其他协程同时进入 Do,其中一个会执行 f,其他则会等待其完成。
- func (o *once) Do(f func()) {
- // 如果 done 已经为 1,则直接返回,不执行任何操作。
- if atomic.LoadUint32(&o.done) == 1 {
- return
- }
-
- // 否则尝试获取互斥锁。
- o.m.Lock()
-
- // 再次检查 done 是否为 1,防止其他 goroutine 已经完成了操作。
- if atomic.LoadUint32(&o.done) != 1 {
- // 如果不是,则执行函数 f 并将 done 设置为 1。
- defer func() {
- atomic.StoreUint32(&o.done, 1)
- o.m.Unlock()
- }()
- f()
- } else {
- // 如果是,则释放锁并返回。
- o.m.Unlock()
- }
- }

sync.Cond可以让一组的Coroutine都在满足特定条件时被唤醒
利用等待/通知机制实现阻塞或者唤醒
- package main
-
- import (
- "fmt"
- "sync"
- "time"
- )
-
- func main() {
- mu := &sync.Mutex{}
- dataReady := false
- data := "Hello, World!"
-
- // 创建条件变量,传入互斥锁 mu
- cond := sync.NewCond(mu)
-
- // 生产者 goroutine
- go func() {
- time.Sleep(1 * time.Second)
- mu.Lock()
- fmt.Println("生产者:数据已准备好")
- dataReady = true
- //cond.Signal()
- cond.Broadcast() // 数据准备好了,唤醒所有等待的消费者
- mu.Unlock()
- }()
-
- // 消费者 goroutines
- consumerCount := 3
- for i := 0; i < consumerCount; i++ {
- go func(id int) {
- mu.Lock()
- for !dataReady { // 如果数据没有准备好,则等待
- fmt.Printf("消费者 %d:数据未准备好,正在等待...\n", id)
- cond.Wait()
- }
- fmt.Printf("消费者 %d:数据已获取: %s\n", id, data)
- mu.Unlock()
- }(i)
- }
-
- time.Sleep(3 * time.Second) // 等待 goroutines 完成
- fmt.Println("主goroutine结束")
- }

互斥锁 (Mutex
或 RWMutex
): sync.Cond
依赖于一个互斥锁(通常是一个 Mutex
或 RWMutex
),以确保在等待条件变量时,只有持有锁的 goroutine 才能调用 Wait()
方法。
等待队列 (waiterList
): 当一个 goroutine 调用 Wait()
方法时,它会释放锁并被添加到等待队列中。当条件变量被 Broadcast()
或 Signal()
时,等待队列中的 goroutines 会被唤醒。
唤醒机制 (Broadcast
和 Signal
): Broadcast()
方法会唤醒等待队列中的所有 goroutines,而 Signal()
方法只会唤醒等待队列中的一个 goroutine。
在标准库 sync/cond.go
中
- type Cond struct {
- L Locker // 互斥锁接口
- c chan struct{} // 用于信号的通道
- }
L
是一个 Locker
接口类型的指针,它可以是任何实现了 Lock()
和 Unlock()
方法的对象,如 Mutex
或 RWMutex
。c
是一个无缓冲的结构体通道,用于信号的传递。- type Locker interface {
- Lock()
- Unlock()
- }
这是一个简单的接口,它定义了锁的基本行为。
- func NewCond(c Locker) *Cond {
- return &Cond{c, make(chan struct{})}
- }
New
函数接受一个 Locker
类型的参数并返回一个 Cond
实例。
- func (c *Cond) Wait() {
- c.L.Lock()
- c.L.Unlock()
- c.c <- struct{}{}
- }
实际上,Wait
方法的实现要比上述代码复杂得多。这里简化了实现以便更容易理解。在实际的 sync/cond.go
文件中,Wait
方法会释放锁、将当前 goroutine 加入等待队列,并阻塞当前 goroutine 直到接收到信号。
- func (c *Cond) Signal() {
- select {
- case c.c <- struct{}{}:
- default:
- }
- }
Signal
方法尝试向 c
通道发送一个信号。如果通道未满,则发送成功;否则,由于通道无缓冲,Signal
方法将立即返回。
- func (c *Cond) Broadcast() {
- for i := 0; i < len(c.c); i++ {
- select {
- case c.c <- struct{}{}:
- default:
- break
- }
- }
- }
Broadcast
方法遍历 c.c
通道的长度,并尝试向通道发送信号。这会唤醒所有等待的 goroutines。
对象池化,TCP连接池、数据库连接池、Worker Pool
- package main
-
- import (
- "fmt"
- "sync"
- )
-
- // 定义一个函数来演示使用 sync.Pool
- func usePool() {
- // 创建一个 sync.Pool
- var pool sync.Pool
- pool.New = func() interface{} {
- return make([]int, 0, 100) // 初始容量为 100
- }
-
- // 从池中获取一个对象
- slice := pool.Get().([]int)
-
- // 使用 slice
- for i := 0; i < 100; i++ {
- slice = append(slice, i)
- }
- fmt.Println("Slice contents:", slice)
-
- // 使用完毕后,将 slice 放回池中
- pool.Put(slice)
- }
-
- func main() {
- // 调用 usePool 函数
- usePool()
-
- // 再次使用相同的 pool
- usePool()
- }

是 Go 语言标准库中的一个线程安全的哈希表,它提供了并发安全的键值对存储功能。与传统的 map
不同,sync.Map
不需要显式的加锁来保证线程安全性,这使得它非常适合用于高并发环境下的键值对存储。
并发读写:
sync.Map
。它可以在不需要手动加锁的情况下安全地读写数据。缓存:
sync.Map
可以用来实现简单的缓存逻辑,特别是当缓存项的生命周期较短时。配置管理:
sync.Map
可以用来存储和更新配置信息- package main
-
- import (
- "fmt"
- "sync"
- "time"
- )
-
- func main() {
- // 创建一个 sync.Map 实例
- syncMap := sync.Map{}
-
- // 添加键值对
- syncMap.Store("key1", "value1")
- syncMap.Store("key2", "value2")
-
- // 读取值
- if value, ok := syncMap.Load("key1"); ok {
- fmt.Println("Value of key1:", value)
- } else {
- fmt.Println("Key1 not found")
- }
-
- // 删除键值对
- syncMap.Delete("key2")
-
- // 遍历 sync.Map
- syncMap.Range(func(key, value interface{}) bool {
- fmt.Printf("Key: %v, Value: %v\n", key, value)
- return true // 继续遍历
- })
-
- // 更新值
- syncMap.Store("key1", "updated_value")
-
- // 再次遍历 sync.Map
- syncMap.Range(func(key, value interface{}) bool {
- fmt.Printf("Key: %v, Value: %v\n", key, value)
- return true // 继续遍历
- })
-
- // 使用 LoadOrStore
- value, loaded := syncMap.LoadOrStore("key3", "default_value")
- if loaded {
- fmt.Println("Value already present:", value)
- } else {
- fmt.Println("Value added:", value)
- }
-
- // 使用 CompareAndSwap
- oldValue := "updated_value"
- newValue := "new_updated_value"
- if swapped := syncMap.CompareAndSwap("key1", oldValue, newValue); swapped {
- fmt.Println("Value updated:", newValue)
- } else {
- fmt.Println("Value not updated")
- }
-
- // 等待一段时间,让其他 goroutines 完成
- time.Sleep(1 * time.Second)
- }

- // entry 键值对中的值结构体
- type entry struct {
- p unsafe.Pointer // 指针,指向实际存储value值的地方
- }
- // Map 并发安全的map结构体
- type Map struct {
- mu sync.Mutex // 锁,保护read和dirty字段
-
- read atomic.Value // 存仅读数据,原子操作,并发读安全,实际存储readOnly类型的数据
-
- dirty map[interface{}]*entry // 存最新写入的数据
-
- misses int // 计数器,每次在read字段中没找所需数据时,+1
- // 当此值到达一定阈值时,将dirty字段赋值给read
- }
-
- // readOnly 存储map中仅读数据的结构体
- type readOnly struct {
- m map[interface{}]*entry // 其底层依然是个最简单的map
- amended bool // 标志位,标识m.dirty中存储的数据是否和m.read中的不一样,flase 相同,true不相同
- }

Go语言中用于传递取消信号、截止时间、超时时间以及请求范围内的值的重要工具。
取消长时间运行的任务:
context
中,从而让任务知道应该尽早停止。设置超时时间:
context
设置请求的最大持续时间,防止请求无限期地等待。传递请求范围的值:
context
中携带与请求相关的数据,例如认证信息、跟踪ID等。资源管理:
- package main
-
- import (
- "context"
- "fmt"
- "time"
- )
-
- // LongRunningTask 模拟一个长时间运行的任务。
- func LongRunningTask(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- fmt.Println("Task canceled.")
- return
- default:
- fmt.Println("Working...")
- time.Sleep(1 * time.Second)
- }
- }
- }
-
- func main() {
- ctx, cancel := context.WithCancel(context.Background())
-
- go LongRunningTask(ctx)
-
- // 等待一段时间后取消任务
- time.Sleep(5 * time.Second)
- cancel()
-
- // 主goroutine等待一段时间以确保子goroutine有时间退出
- time.Sleep(1 * time.Second)
- fmt.Println("Main goroutine finished.")
- }

比如http请求和数据库连接超时
- package main
-
- import (
- "context"
- "fmt"
- "log"
- "net/http"
- "time"
- )
-
- // ServerFunc 是一个简单的服务函数,它模拟一些耗时的操作。
- func ServerFunc(w http.ResponseWriter, r *http.Request) {
- // 从请求中获取上下文
- ctx := r.Context()
-
- // 设置超时时间为5秒
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
-
- // 模拟一些耗时的工作
- for i := 0; ; i++ {
- select {
- case <-ctx.Done():
- http.Error(w, "Request timed out", http.StatusRequestTimeout)
- return
- default:
- fmt.Fprintf(w, "Working... (%d)\n", i)
- time.Sleep(1 * time.Second)
- }
- }
- }
-
- func main() {
- http.HandleFunc("/", ServerFunc)
-
- log.Fatal(http.ListenAndServe(":8080", nil))
- }

- package main
-
- import (
- "context"
- "fmt"
- "time"
- )
-
- // ProcessRequest 模拟处理一个带有请求范围值的请求。
- func ProcessRequest(ctx context.Context) {
- requestID, _ := ctx.Value("request_id").(string)
- fmt.Printf("Processing request with ID: %s\n", requestID)
- time.Sleep(1 * time.Second)
- fmt.Println("Request processed.")
- }
-
- func main() {
- ctx := context.WithValue(context.Background(), "request_id", "12345")
-
- go ProcessRequest(ctx)
-
- // 主goroutine等待一段时间以确保子goroutine完成
- time.Sleep(2 * time.Second)
- fmt.Println("Main goroutine finished.")
- }

Semaphore用于控制goroutine的数量
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。