当前位置:   article > 正文

Go 控制协程(goroutine)的并发数量

Go 控制协程(goroutine)的并发数量

在使用协程并发处理某些任务时, 其并发数量往往因为各种因素的限制不能无限的增大. 例如网络请求、数据库查询等等。

从运行效率角度考虑,在相关服务可以负载的前提下(限制最大并发数),尽可能高的并发。

在Go语言中,可以使用一些方法来控制协程(goroutine)的并发数量,以防止并发过多导致资源耗尽或性能下降

1、使用信号量(Semaphore)

可以使用 Go 语言中的 channel 来实现简单的信号量,限制并发数量

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func worker(id int, sem chan struct{}) {
  7. sem <- struct{}{} // 占用一个信号量
  8. defer func() {
  9. <-sem // 方法运行结束释放信号量
  10. }()
  11. // 执行工作任务
  12. fmt.Printf("Worker %d: Working...\n", id)
  13. }
  14. func main() {
  15. concurrency := 3
  16. sem := make(chan struct{}, concurrency)
  17. var wg sync.WaitGroup
  18. for i := 0; i < 10; i++ {
  19. wg.Add(1)
  20. go func(id int) {
  21. defer wg.Done()
  22. worker(id, sem)
  23. }(i)
  24. }
  25. wg.Wait()
  26. close(sem)
  27. }

sem 是一个有缓冲的 channel,通过控制 channel 中元素的数量,实现了一个简单的信号量机制 

 2、使用协程池

可以创建一个固定数量的协程池,将任务分发给这些协程执行。 

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func worker(id int, jobs <-chan int, results chan<- int) {
  7. //jobs等待主要协程往jobs放数据
  8. for j := range jobs {
  9. fmt.Printf("协程池 %d: 协程池正在工作 %d\n", id, j)
  10. results <- j
  11. }
  12. }
  13. func main() {
  14. const numJobs = 5 //协程要做的工作数量
  15. const numWorkers = 3 //协程池数量
  16. jobs := make(chan int, numJobs)
  17. results := make(chan int, numJobs)
  18. var wg sync.WaitGroup
  19. // 启动协程池
  20. for i := 1; i <= numWorkers; i++ {
  21. wg.Add(1)
  22. go func(id int) {
  23. defer wg.Done()
  24. worker(id, jobs, results)
  25. }(i)
  26. }
  27. // 提交任务
  28. for j := 1; j <= numJobs; j++ {
  29. jobs <- j
  30. }
  31. close(jobs)
  32. // 等待所有工作完成
  33. go func() {
  34. wg.Wait()
  35. close(results)
  36. }()
  37. // 处理结果
  38. for result := range results {
  39. fmt.Println("Result:", result)
  40. }
  41. }

jobs 通道用于存储任务,results 通道用于存储处理结果。通过创建固定数量的工作协程,可以有效地控制并发数量。

3、使用其他包

Go 1.16 引入了 golang.org/x/sync/semaphore 包,它提供了一个更为灵活的信号量实现。

案例一

限制对外部API的并发请求 

假设我们有一个外部API,它对并发请求有限制,我们希望确保不超过这个限制。我们可以使用semaphore.Weighted来控制对API的并发访问

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "golang.org/x/sync/semaphore"
  6. "sync"
  7. "time"
  8. )
  9. func main() {
  10. /*
  11. 1、在并发量一定的情况下,通过改变允许并发请求数可以更快处理请求任务(在CPU够用的前提下)
  12. 2、sem := semaphore.NewWeighted(n),参数n就是权重量
  13. 3、当一个协程需要获取的单位的权重越多,运行就会慢(比如权重总量n=5个,一个协程分配了2个,跟一个协程分配1个效率是不一样的)
  14. 4、信号量没有足够的可用权重的情况发生在所有已分配的权重单位都已经被占用,即信号量的当前权重计数达到了它的总容量。在这种情况下,任何尝试通过Acquire方法获取更多权重的调用都将无法立即完成,从而导致调用者(通常是goroutine)阻塞,直到其他调用者释放一些权重单位。
  15. */
  16. /*
  17. 1、权权重较大的任务在资源竞争时有更高的优先级,更容易获得执行的机会
  18. 2、如果当前资源足够满足高权重任务的需求,这些任务将立即执行;若资源不足,则按照权重高低顺序排队等待
  19. 3、一旦任务开始执行,其完成的速度主要取决于任务自身的逻辑复杂度、所需资源以及系统的当前负载等因素,与任务在信号量中的权重无关
  20. 3、高权重的任务并不会中断已经在执行的低权重任务,而是等待这些任务自行释放资源。一旦资源释放,等待队列中的高权重任务将优先被唤醒
  21. 4、Acquire 方法会检查当前信号量的可用资源量是否满足请求的权重,如果满足,则立即减少信号量的资源计数并返回,允许任务继续执行。如果不满足,任务将阻塞等待,直到有足够的资源被释放
  22. */
  23. // 记录开始时间
  24. startTime := time.Now()
  25. // 假设外部API允许的最大并发请求为5(信号量的总容量是5个权重单位)
  26. const (
  27. maxConcurrentRequests = 5
  28. )
  29. sem := semaphore.NewWeighted(maxConcurrentRequests)
  30. var wg sync.WaitGroup
  31. // 模拟对外部API的10个并发请求
  32. for i := 0; i < 10; i++ {
  33. wg.Add(1)
  34. go func(requestId int) {
  35. defer wg.Done()
  36. // 假设我们想要获取2个单位的权重
  37. if err := sem.Acquire(context.Background(), 2); err != nil {
  38. fmt.Printf("请求 %d 无法获取信号量: %v\n", requestId, err)
  39. return
  40. }
  41. defer sem.Release(2) // 请求完成后释放信号量
  42. // 模拟对API的请求处理
  43. fmt.Printf("请求 %d 开始...\n", requestId)
  44. time.Sleep(2 * time.Second) // 模拟网络延迟
  45. fmt.Printf("请求 %d 完成。\n", requestId)
  46. }(i)
  47. }
  48. wg.Wait()
  49. // 记录结束时间
  50. endTime := time.Now()
  51. // 计算并打印总耗时
  52. fmt.Printf("程序总耗时: %v\n", endTime.Sub(startTime))
  53. }

信号量没有足够的可用权重的情况发生在所有已分配的权重单位都已经被占用,即信号量的当前权重计数达到了它的总容量。在这种情况下,任何尝试通过Acquire方法获取更多权重的调用都将无法立即完成,从而导致调用者(通常是goroutine)阻塞,直到其他调用者释放一些权重单位。

以下是一些导致信号量没有足够可用权重的具体情况:

  1. 信号量初始化容量较小:如果信号量的总容量设置得较小,而并发请求的数量较大,那么很快就会出现权重不足的情况。

  2. 长时间占用权重:如果某些goroutine长时间占用权重单位而不释放,这会导致其他goroutine无法获取到权重,即使这些goroutine只是少数。

  3. 权重分配不均:在某些情况下,可能存在一些goroutine占用了不成比例的权重单位,导致其他goroutine无法获取足够的权重。

  4. 权重释放不及时:如果goroutine因为错误或异常情况提前退出,而没有正确释放它们所占用的权重,那么这些权重单位将不会被回收到信号量中。

  5. 高频率的请求:在短时间内有大量goroutine请求权重,即使它们请求的权重不大,累积起来也可能超过信号量的总容量。

  6. 信号量权重未正确管理:如果信号量的权重管理逻辑存在缺陷,例如错误地释放了过多的权重,或者在错误的时间点释放权重,也可能导致可用权重不足。

为了避免信号量没有足够的可用权重,可以采取以下措施:

  • 合理设置信号量容量:根据资源限制和并发需求合理设置信号量的总容量。
  • 及时释放权重:确保在goroutine完成工作后及时释放权重。
  • 使用超时:在Acquire调用中使用超时,避免无限期地等待权重。
  • 监控和日志记录:监控信号量的使用情况,并记录关键信息,以便及时发现和解决问题。
  • 权重分配策略:设计合理的权重分配策略,确保权重的公平和高效分配。

通过这些措施,可以更好地管理信号量的使用,避免因权重不足导致的并发问题。

案例二

假设有一个在线视频平台,它需要处理不同分辨率的视频转码任务。由于高清视频转码比标清视频更消耗计算资源,因此平台希望设计一个系统,能够优先处理更多标清视频转码请求,同时又不完全阻塞高清视频的转码,以保持整体服务质量和资源的有效利用。

  1. package main
  2. import (
  3. "fmt"
  4. "golang.org/x/net/context"
  5. "golang.org/x/sync/semaphore"
  6. "runtime"
  7. "sync"
  8. "time"
  9. )
  10. // VideoTranscodeJob 视频转码任务
  11. type VideoTranscodeJob struct {
  12. resolution string
  13. weight int64
  14. }
  15. func main() {
  16. cpuCount := runtime.NumCPU()
  17. fmt.Printf("当前CPU个数%v\n", cpuCount)
  18. /*
  19. 1、权权重较大的任务在资源竞争时有更高的优先级,更容易获得执行的机会
  20. 2、如果当前资源足够满足高权重任务的需求,这些任务将立即执行;若资源不足,则按照权重高低顺序排队等待
  21. 3、一旦任务开始执行,其完成的速度主要取决于任务自身的逻辑复杂度、所需资源以及系统的当前负载等因素,与任务在信号量中的权重无关
  22. 3、高权重的任务并不会中断已经在执行的低权重任务,而是等待这些任务自行释放资源。一旦资源释放,等待队列中的高权重任务将优先被唤醒
  23. 4、Acquire 方法会检查当前信号量的可用资源量是否满足请求的权重,如果满足,则立即减少信号量的资源计数并返回,允许任务继续执行。如果不满足,任务将阻塞等待,直到有足够的资源被释放
  24. */
  25. // 初始化两个信号量,一个用于标清,一个用于高清,假设总共有8个CPU核心可用
  26. normalSem := semaphore.NewWeighted(6) // 标清任务,分配6个单位权重,因为它们消耗资源较少
  27. highDefSem := semaphore.NewWeighted(2) // 高清任务,分配2个单位权重,因为它们更消耗资源
  28. var wg sync.WaitGroup
  29. //假设有20个需要转码的视频
  30. videoJobs := []VideoTranscodeJob{
  31. {"HD", 2}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2},
  32. {"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"HD", 2},
  33. {"SD", 4}, {"SD", 4}, {"HD", 2}, {"SD", 1}, {"HD", 2},
  34. {"SD", 1}, {"SD", 4}, {"HD", 2}, {"SD", 6}, {"HD", 2},
  35. }
  36. for _, job := range videoJobs {
  37. wg.Add(1)
  38. go func(job VideoTranscodeJob) {
  39. defer wg.Done()
  40. var sem *semaphore.Weighted
  41. switch job.resolution {
  42. case "SD":
  43. sem = normalSem //分配权重大,当前为6,任务在获取执行机会上有优势,但并不直接意味着执行速度快
  44. case "HD":
  45. sem = highDefSem
  46. default:
  47. panic("无效的分辨率")
  48. }
  49. if err := sem.Acquire(context.Background(), job.weight); err != nil {
  50. fmt.Printf("名为 %s 视频无法获取信号量: %v\n", job.resolution, err)
  51. return
  52. }
  53. defer sem.Release(job.weight) //释放权重对应的信号量
  54. // 模拟转码任务执行
  55. fmt.Printf("转码 %s 视频 (权重: %d)...\n", job.resolution, job.weight)
  56. //通过利用VideoTranscodeJob的weight值来模拟转码时间的长短,HD用时长则设置2比SD的1大,*时间就自然长,运行就时间长
  57. time.Sleep(time.Duration(job.weight*100) * time.Millisecond) // 模拟不同分辨率视频转码所需时间
  58. fmt.Printf("------------------------%s 视频转码完成。。。\n", job.resolution)
  59. }(job)
  60. }
  61. wg.Wait()
  62. }

标清(SD)和高清(HD),分别分配了不同的权重(1和2)。通过创建两个不同权重的信号量,我们可以控制不同类型任务的同时执行数量,从而优先保证标清视频的快速处理,同时也确保高清视频能够在不影响系统稳定性的情况下进行转码。这展示了带权重的并发控制如何帮助在资源有限的情况下优化任务调度和执行效率。

 注意:对协程分配的权重单位数不能大于对应上下文semaphore.NewWeighted(n)中参数n的单位数

案例三

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "golang.org/x/sync/semaphore"
  8. )
  9. type weightedTask struct {
  10. id int
  11. weight int64
  12. }
  13. func main() {
  14. const (
  15. maxTotalWeight = 20 // 最大总权重
  16. )
  17. sem := semaphore.NewWeighted(maxTotalWeight)
  18. var wg sync.WaitGroup
  19. tasksCh := make(chan weightedTask, 10)
  20. // 发送任务
  21. for i := 1; i <= 10; i++ {
  22. tasksCh <- weightedTask{id: i, weight: int64(i)} // 假设任务ID即为其权重
  23. }
  24. close(tasksCh)
  25. // 启动任务处理器
  26. for task := range tasksCh {
  27. wg.Add(1)
  28. go func(task weightedTask) {
  29. defer wg.Done()
  30. if err := sem.Acquire(context.Background(), int64(task.id)); err != nil {
  31. fmt.Printf("任务 %d 无法获取信号量: %v\n", task.id, err)
  32. return
  33. }
  34. defer sem.Release(int64(task.id)) //释放
  35. // 模拟任务执行
  36. fmt.Printf("任务 %d (权重: %d) 正在运行...\n", task.id, task.weight)
  37. time.Sleep(time.Duration(task.weight*100) * time.Millisecond) // 示例中简单用时间模拟权重影响
  38. fmt.Printf("任务 %d 完成.\n", task.id)
  39. }(task)
  40. }
  41. wg.Wait()
  42. }

 

总结

选择哪种方法取决于具体的应用场景和需求。使用信号量是一种简单而灵活的方法,而协程池则更适用于需要批量处理任务的情况。golang.org/x/sync/semaphore 包提供了一个标准库外的更灵活的信号量实现         

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/629958
推荐阅读
相关标签
  

闽ICP备14008679号