赞
踩
这里只讨论常见的限流算法
大致上可以分为两类算法:计数器算法、生产者消费者算法
令牌桶算法:如下面这个带着包浆的图所示,每个请求到来时需要获取令牌,而令牌是均匀生产放入桶中的,请求来到时,桶中有令牌则通过,无令牌则拒绝。
漏桶算法:如包浆图所示,与令牌算法不同的是,漏桶算法是将请求放入桶中,以恒定速率流出
go time/rate的限流算法是令牌桶算法,与上面介绍令牌桶算法所用图不同的是,其使用懒加载或者说懒计算的方式,且没有存放请求的队列,每一个请求通过调用不同的方法自行决定没有足够令牌是等待还是直接放弃。
type Limiter struct { limit Limit // float64别名, 即QPS, 为Inf即无限含义时代表通过任何请求 burst int // 令牌桶大小, 为0时不允许任何请求通过, 其等于0的优先级小于limit = Inf mu sync.Mutex // 同步锁 tokens float64 // 桶中的令牌数目 可为负数且 <= burst last time.Time // 最后一次更新tokens字段时的时间, < 当前时间 lastEvent time.Time // 最迟的请求的通过时间,可能 > 当前时间, 因为tokens可为负, 即请求先更新tokens与lastEvent再等待 // lastEvent即等于当前最迟在等待的请求的Reservation.timeToAct } // 请求尝试获取令牌时的返回结构体,核心函数reserveN的返回 type Reservation struct { ok bool // 是否通过 lim *Limiter tokens int // 获取的令牌 timeToAct time.Time // 可以通过的时间 limit Limit }
// 计算产生对应数目的令牌需要多长时间 // 浮点数的+-*/都有可能产生误差,根本原因就是由于不管用多少位表示某些浮点数本来就不精确 // 再加上2个浮点数的计算过程(对齐、和尾数的舍去)等也会对精度产生影响 // 这个没有改进精度的原因应该是我们传入的tokens都是整数https://golang.org/cl/200917 func (limit Limit) durationFromTokens(tokens float64) time.Duration { seconds := tokens / float64(limit) return time.Nanosecond * time.Duration(1e9*seconds) } // 计算传入的时间能生产多少令牌:这里浮点数这样处理的原因可以看注释中的issues链接 // 简单来说就是如果直接用2个float相乘,会有较大的精度损失,而分开整数与小数再与limit相乘 // 精度损失较少(很好理解, 原来64位里面要包含整数信息,那么小数的信息就有可能丢失的就多一点) // https://golang.org/cl/200900 func (limit Limit) tokensFromDuration(d time.Duration) float64 { // See golang.org/issues/34861. sec := float64(d/time.Second) * float64(limit) nsec := float64(d%time.Second) * float64(limit) return sec + nsec/1e9 }
// 获取自从上一次更新到现在(now入参),产生的令牌数 func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { // 矫正, 这里是有可能的, 当同事竞争的请求很多时, 先执行time.Now()的请求可能后进入advance, 就可能发生前面的 // 请求已经更新last了, 这里回溯now理论上会带来一定的超量, 测试见下面 last := lim.last if now.Before(last) { last = now } // 计算距离上一次更新的时间差,调用tokensFromDuration函数计算这一段时间产生了多少token // 注意这里虽然可能溢出,但是溢出后为+Inf, 还是会被大小判断限制为桶容量大小 elapsed := now.Sub(last) delta := lim.limit.tokensFromDuration(elapsed) // 加上剩余token tokens := lim.tokens + delta // token满了后不能继续产生 if burst := float64(lim.burst); tokens > burst { tokens = burst } return now, last, tokens }
测试:
var LockPushLimit = NewLimiter(Limit(1000000), 10) var g sync.WaitGroup fmt.Println(time.Now()) for a := 0; a < 10; a++ { g.Add(1) go func() { for b := 0; b < 100*100*100; b++ { _ = LockPushLimit.Wait(context.Background()) } g.Done() }() } g.Wait() fmt.Println(time.Now()) // var i = 0 // if now.Before(last) { // i++ // last = now // } fmt.Println(i)
结果:可见理论上最快10秒通过的请求量,只花费了不到10秒就通过了
2022-09-20 14:32:08.605816 +0800 CST m=+0.000109762
2022-09-20 14:32:17.801572 +0800 CST m=+9.195948529
482747
// 设置新的QPS,但是由于有提前获取等待的机制(Wait等),新设置的QPS不会作用与已经获取令牌还在等待的请求
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) {
lim.mu.Lock()
defer lim.mu.Unlock()
// 可以看到调用了advance方法,更新最后一次更新token的时间,以及token数目
now, _, tokens := lim.advance(now)
lim.last = now
lim.tokens = tokens
lim.limit = newLimit
}
从WaitN查看整个流程:Allow等函数比Wait简洁,所以拿Wait理解后即可
最新代码中,WaitN也是一个简单的函数,核心功能在内部函数wait中
func (lim *Limiter) wait(ctx context.Context, n int, now time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error { // 获取QPS与容量,需要获取锁, // 在获取之后,有另外的请求调用SetLimit等函数将不会对当前请求下面的快速判断返回产生影响 lim.mu.Lock() burst := lim.burst limit := lim.limit lim.mu.Unlock() // 1. 获取数量大于桶容量且不是无限QPS返回错误 if n > burst && limit != Inf { return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst) } // 2. 提前查看是否已经超时 select { case <-ctx.Done(): return ctx.Err() default: } // 3. 计算等待时间 尝试在waitLimit时间内获取指定令牌 waitLimit := InfDuration if deadline, ok := ctx.Deadline(); ok { waitLimit = deadline.Sub(now) } r := lim.reserveN(now, n, waitLimit) // 获取失败 if !r.ok { return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) } // 需要等待的时间,对比超时时间获取最后结果 delay := r.DelayFrom(now) if delay == 0 { return nil } ch, stop, advance := newTimer(delay) defer stop() advance() // only has an effect when testing select { case <-ch: return nil case <-ctx.Done(): // 尽最大努力偿还令牌 r.Cancel() return ctx.Err() } }
再来看看reserveN函数:
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { lim.mu.Lock() defer lim.mu.Unlock() // 1. 处理limit = Inf/0 特殊情况 if lim.limit == Inf { // 无限QPS直接返回通过 return Reservation{ok: true, lim: lim, tokens: n, timeToAct: now} } else if lim.limit == 0 { // 当QPS = 0时, 一共只允许桶容量请求通过 var ok bool if lim.burst >= n { ok = true lim.burst -= n } return Reservation{ok: ok, lim: lim, tokens: lim.burst, timeToAct: now} } // 3. 计算到请求尝试获取令牌时桶中令牌数 now, last, tokens := lim.advance(now) // 4. 计算剩余令牌数,可为负数 // 如果为负数,计算需要等待的时间 tokens -= float64(n) var waitDuration time.Duration if tokens < 0 { waitDuration = lim.limit.durationFromTokens(-tokens) } // 计算等待时间是否满足<=最长等待时间 ok := n <= lim.burst && waitDuration <= maxFutureReserve // 5. 更新各值,并返回结果 r := Reservation{ok: ok, lim: lim, limit: lim.limit} // 取到令牌更新各值,可以看到只有持有锁的函数才能更新字段值 if ok { r.tokens = n r.timeToAct = now.Add(waitDuration) } if ok { lim.last = now lim.tokens = tokens lim.lastEvent = r.timeToAct } else { // 没有取到令牌则保持不变(正如上面advace函数中所解释, 可能回溯了一点) lim.last = last } return r }
// wait函数
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now) // 假设这里10ms
}
// reserveN函数
ok := n <= lim.burst && waitDuration <= maxFutureReserve // 9.5 <= 10
// wait函数
<-ctx.Done() // 这里可能只剩余8ms了
CancelAt函数
func (r *Reservation) CancelAt(now time.Time) { // 1. 当请求没有获取到token, 或者limit = Inf等情况快速返回 if !r.ok { return } // 一切修改字段值的函数都要加锁 r.lim.mu.Lock() defer r.lim.mu.Unlock() if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { return } // 2. 要减去在当前请求请求后, 又有请求已经成功请求的情况 // 比如now = 10001s,r.timeToAct = 10001.5s, r.lim.lastEvent = 10001.7s // 此时我们不能返还所有token值,要减去0.2s的token,因为假设不减去这个值 // 那么r.lim.lastEvent这个请求相当于获得了没有被限流的流量(即时间还没到10001.7时可能限流器的token就转负为正了) restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) if restoreTokens <= 0 { return } // 3. 顺带更新token值到当前时间 now, _, tokens := r.lim.advance(now) tokens += restoreTokens if burst := float64(r.lim.burst); tokens > burst { tokens = burst } r.lim.last = now r.lim.tokens = tokens // 如果最近的一次请求就是自己, 要尝试还原lastEvent的值 if r.timeToAct == r.lim.lastEvent { prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) if !prevEvent.Before(now) { r.lim.lastEvent = prevEvent } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。