赞
踩
go实现协程池,协程轻量但并不是越多越好。虽然golang底层实现了对协程的复用,协程(Goroutine)的创建和调度由底层的运行时系统(runtime)负责,它会自动管理和复用协程,但是一瞬间并发过高仍然会导致内存资源消耗过大。使用协程池可用对资源进行有效控制。在内存资源够用的情况,或者其他不用限制同时任务数的情况,请用原生go 协程,不必使用协程池
协程池的数量和CPU核数的关系
小于或者等于CPU核数:
适用于计算密集型的任务中,如果协程的执行时间较长且没有IO操作,可以将协程池的数量设置为小于CPU核数的值。这样做可以避免过多的协程竞争CPU资源,减少上下文切换的开销,如图像处理、数据分析等。
大于CPU核数:
如果任务需要进行大量的IO操作,可以考虑将协程池的数量设置为大于CPU核数的值。
这样做可以充分利用CPU等待IO操作的时间,如网络请求、数据库查询等。
package main import ( "fmt" "runtime" "sync" "time" ) type Job struct { ID int } type Worker struct { ID int JobQueue <-chan Job QuitChan <-chan bool WaitGroup *sync.WaitGroup f func(job Job) } type Pool struct { WorkerSize int JobQueue chan Job QuitChan chan bool WaitGroup *sync.WaitGroup workerFunc func(job Job) } func NewWorker(id int, wg *sync.WaitGroup, jobQueue chan Job, quitChan <-chan bool) *Worker { return &Worker{ ID: id, JobQueue: jobQueue, QuitChan: quitChan, WaitGroup: wg, } } func (worker *Worker) StartWork() { go func() { for { select { case job := <-worker.JobQueue: worker.f(job) worker.WaitGroup.Done() case <-worker.QuitChan: fmt.Printf("Worker %d: quitting\n", worker.ID) return } } }() } func NewPool(workerSize int, f func(job Job)) *Pool { return &Pool{ WorkerSize: workerSize, JobQueue: make(chan Job), WaitGroup: &sync.WaitGroup{}, workerFunc: f, } } func (p *Pool) AddJob(job Job) { p.WaitGroup.Add(1) p.JobQueue <- job } func (p *Pool) Start() { for i := 0; i < p.WorkerSize; i++ { worker := NewWorker(i, p.WaitGroup, p.JobQueue, p.QuitChan) worker.f = p.workerFunc worker.StartWork() } } func (p *Pool) Stop() { for i := 0; i < p.WorkerSize; i++ { p.QuitChan <- true } } func (p *Pool) Close() { close(p.JobQueue) close(p.QuitChan) } func WorKerFunc(job Job) { fmt.Println("Job,id:", job.ID) time.Sleep(1 * time.Second) for i := 0; i < 1000000; i++ { c := 100 * 1000 _ = c } } func main() { fmt.Printf("CPU 内核数量:%d,协程数量:%d", runtime.NumCPU(), runtime.NumCPU()*2) start := time.Now().UnixMilli() pool := NewPool(runtime.NumCPU()*2, WorKerFunc) pool.Start() for i := 0; i < 100; i++ { job := Job{ ID: i, } pool.AddJob(job) } pool.WaitGroup.Wait() end := time.Now().UnixMilli() fmt.Println(end - start) }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。