当前位置:   article > 正文

Golang实现协程池_golang 协程池

golang 协程池

go实现协程池,协程轻量但并不是越多越好。虽然golang底层实现了对协程的复用,协程(Goroutine)的创建和调度由底层的运行时系统(runtime)负责,它会自动管理和复用协程,但是一瞬间并发过高仍然会导致内存资源消耗过大。使用协程池可用对资源进行有效控制。在内存资源够用的情况,或者其他不用限制同时任务数的情况,请用原生go 协程,不必使用协程池

协程池的数量和CPU核数的关系
小于或者等于CPU核数:
适用于计算密集型的任务中,如果协程的执行时间较长且没有IO操作,可以将协程池的数量设置为小于CPU核数的值。这样做可以避免过多的协程竞争CPU资源,减少上下文切换的开销,如图像处理、数据分析等。
大于CPU核数:
如果任务需要进行大量的IO操作,可以考虑将协程池的数量设置为大于CPU核数的值。
这样做可以充分利用CPU等待IO操作的时间,如网络请求、数据库查询等。

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

type Job struct {
	ID int
}

type Worker struct {
	ID        int
	JobQueue  <-chan Job
	QuitChan  <-chan bool
	WaitGroup *sync.WaitGroup
	f         func(job Job)
}

type Pool struct {
	WorkerSize int
	JobQueue   chan Job
	QuitChan   chan bool
	WaitGroup  *sync.WaitGroup
	workerFunc func(job Job)
}

func NewWorker(id int, wg *sync.WaitGroup, jobQueue chan Job, quitChan <-chan bool) *Worker {
	return &Worker{
		ID:        id,
		JobQueue:  jobQueue,
		QuitChan:  quitChan,
		WaitGroup: wg,
	}
}

func (worker *Worker) StartWork() {
	go func() {
		for {
			select {
			case job := <-worker.JobQueue:
				worker.f(job)
				worker.WaitGroup.Done()
			case <-worker.QuitChan:
				fmt.Printf("Worker %d: quitting\n", worker.ID)
				return
			}
		}
	}()
}

func NewPool(workerSize int, f func(job Job)) *Pool {
	return &Pool{
		WorkerSize: workerSize,
		JobQueue:   make(chan Job),
		WaitGroup:  &sync.WaitGroup{},
		workerFunc: f,
	}
}

func (p *Pool) AddJob(job Job) {
	p.WaitGroup.Add(1)
	p.JobQueue <- job
}

func (p *Pool) Start() {
	for i := 0; i < p.WorkerSize; i++ {
		worker := NewWorker(i, p.WaitGroup, p.JobQueue, p.QuitChan)
		worker.f = p.workerFunc
		worker.StartWork()
	}
}

func (p *Pool) Stop() {
	for i := 0; i < p.WorkerSize; i++ {
		p.QuitChan <- true
	}
}
func (p *Pool) Close() {
	close(p.JobQueue)
	close(p.QuitChan)
}
func WorKerFunc(job Job) {
	fmt.Println("Job,id:", job.ID)
	time.Sleep(1 * time.Second)
	for i := 0; i < 1000000; i++ {
		c := 100 * 1000
		_ = c
	}
}
func main() {
	fmt.Printf("CPU 内核数量:%d,协程数量:%d", runtime.NumCPU(), runtime.NumCPU()*2)
	start := time.Now().UnixMilli()
	pool := NewPool(runtime.NumCPU()*2, WorKerFunc)
	pool.Start()
	for i := 0; i < 100; i++ {
		job := Job{
			ID: i,
		}
		pool.AddJob(job)
	}
	pool.WaitGroup.Wait()
	end := time.Now().UnixMilli()
	fmt.Println(end - start)
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/317869
推荐阅读
相关标签
  

闽ICP备14008679号