当前位置:   article > 正文

Golang限流器rate包源码详细分析_timex rate 包

timex rate 包

源码地址:golang.org/x/time/rate

rate包是Google基于令牌桶的算法实现的限流器,可以在服务的限流中使用。

一、数据结构

1.常量变量

  1. //定义某个时间的最大频率
  2. //表示每秒的事件数
  3. type Limit float64
  4. //Inf表示无速率限制
  5. const Inf = Limit(math.MaxFloat64)

2. 结构体

Limiter结构体

  1. type Limiter struct {
  2. limit Limit //每秒允许处理的事件数量,即每秒处理事件的频率
  3. burst int //令牌桶的最大数量, 如果burst为0,则除非limit == Inf,否则不允许处理任何事件。
  4. mu sync.Mutex
  5. tokens float64 //令牌桶中可用的令牌数量
  6. // last is the last time the limiter's tokens field was updated
  7. //记录上次limiter的tokens被更新的时间
  8. last time.Time
  9. // lastEvent is the latest time of a rate-limited event (past or future)
  10. //lastEvent记录速率受限制(桶中没有令牌)的时间点,该时间点可能是过去的,也可能是将来的(Reservation预定的结束时间点)
  11. lastEvent time.Time
  12. }

Limiter是限流器中最核心的结构体,用于限流(控制事件发生的频率),在初始化后默认是满的,并以每秒r个令牌的速率重新填充直到达到桶的容量(burst),如果r == Inf表示无限制速率。

Limiter有三个主要的方法 Allow、Reserve和Wait,最常用的是Wait和Allow方法

 

这三个方法每调用一次都会消耗一个令牌,这三个方法的区别在于没有令牌时,他们的处理方式不同

Allow: 如果没有令牌,则直接返回false

Reserve:如果没有令牌,则返回一个reservation,

Wait:如果没有令牌,则等待直到获取一个令牌或者其上下文被取消。

 

tokens更新的策略:

1). 成功获取到令牌或成功预约(Reserve)到令牌

2). 预约取消时(Cancel)并且需要还原令牌到令牌桶中时

3). 重新设置限流器的速率时(SetLimit)

4. 重新设置限流器的容量时(SetBurst)

 

lastEvent表示速率受限制的时间点,它可能时过去的时间,也可能时将来的时间。

如果没有预约令牌的话,该时间等于last,是过去的

如果有预约令牌的话,该时间等于最新的预约的截至时间。

 

注意:由于令牌桶的令牌可以预约,所有令牌桶中的tokens可能为负数。

 

Reservation结构体

  1. type Reservation struct {
  2. ok bool //到截至时间是否可以获取足够的令牌
  3. lim *Limiter
  4. tokens int //需要获取的令牌数量
  5. timeToAct time.Time //需要等待的时间点
  6. // This is the Limit at reservation time, it can change later.
  7. limit Limit
  8. }

Reservation可以理解成预定令牌的操作,timeToAct是本次预约需要等待到的指定时间点才有足够预约的令牌。

 

二、常用方法

 

1.令牌桶Limiter相关的方法

 

1) Limiter初始化

  1. func NewLimiter(r Limit, b int) *Limiter {
  2. return &Limiter{
  3. limit: r,
  4. burst: b,
  5. }
  6. }

初始化Limiter,指定每秒允许处理事件的上限为r,允许令牌桶的最大容量为b

 

  1. func Every(interval time.Duration) Limit {
  2. if interval <= 0 {
  3. return Inf
  4. }
  5. return 1 / Limit(interval.Seconds())
  6. }

Every将事件的最小时间间隔转换为限制

 

2)Limiter使用

  1. func (lim *Limiter) Allow() bool {
  2. return lim.AllowN(time.Now(), 1)
  3. }

从令牌桶中获取一个令牌,成功获取到则返回true

 

  1. func (lim *Limiter) AllowN(now time.Time, n int) bool {
  2. return lim.reserveN(now, n, 0).ok
  3. }

从令牌桶中获取n个令牌,成功获取到则返回true

 

  1. func (lim *Limiter) Wait(ctx context.Context) (err error) {
  2. return lim.WaitN(ctx, 1)
  3. }

获取一个令牌,如果没有则等待直到获取令牌或者上下文ctx取消

 

  1. func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
  2. //同步获取令牌桶的最大容量burst和限流器的速率limit
  3. lim.mu.Lock()
  4. burst := lim.burst
  5. limit := lim.limit
  6. lim.mu.Unlock()
  7. //如果n大于令牌桶的最大容量,则返回error
  8. if n > burst && limit != Inf {
  9. return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
  10. }
  11. // Check if ctx is already cancelled
  12. //判断上下文ctx是否已经被取消,如果已经取消则返回error
  13. select {
  14. case <-ctx.Done():
  15. return ctx.Err()
  16. default:
  17. }
  18. // Determine wait limit
  19. now := time.Now()
  20. waitLimit := InfDuration
  21. if deadline, ok := ctx.Deadline(); ok { //如果可以获取上下文的截至时间,则更新可以等待的时间waitLimit
  22. waitLimit = deadline.Sub(now)
  23. }
  24. // Reserve
  25. //调用reserveN获取Reversation
  26. r := lim.reserveN(now, n, waitLimit)
  27. if !r.ok { //没有足够的时间获取令牌,则返回error
  28. return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
  29. }
  30. // Wait if necessary
  31. //需要等待的时间
  32. delay := r.DelayFrom(now)
  33. if delay == 0 {
  34. return nil
  35. }
  36. t := time.NewTimer(delay)
  37. defer t.Stop()
  38. select {
  39. case <-t.C:
  40. // We can proceed.
  41. return nil
  42. case <-ctx.Done():
  43. // Context was canceled before we could proceed. Cancel the
  44. // reservation, which may permit other events to proceed sooner.
  45. r.Cancel()
  46. return ctx.Err()
  47. }
  48. }

WaitN方法获取n个令牌,直到成功获取或者ctx取消

如果n大于令牌桶的最大容量则返回error

如果上下文被取消或者等待的时间大于上下文的截至时间,则返回error

如果速率限制为Inf则不会限流

 

 

无论是Wait、Allow或者Reserve其实都会调用advance和reserveN方法,所以这两个方法是整个限流器rate实现的核心。

advance方法

  1. func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
  2. //last不能在当前时间now之后,否则计算出来的elapsed为负数,会导致令牌桶数量减少
  3. last := lim.last
  4. if now.Before(last) {
  5. last = now
  6. }
  7. // Avoid making delta overflow below when last is very old.
  8. //根据令牌桶的缺数计算出令牌桶未进行更新的最大时间
  9. maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
  10. elapsed := now.Sub(last) //令牌桶未进行更新的时间段
  11. if elapsed > maxElapsed {
  12. elapsed = maxElapsed
  13. }
  14. // Calculate the new number of tokens, due to time that passed.
  15. //根据未更新的时间(未向桶中加入令牌的时间段)计算出产生的令牌数。
  16. delta := lim.limit.tokensFromDuration(elapsed)
  17. tokens := lim.tokens + delta //计算出可用的令牌数
  18. if burst := float64(lim.burst); tokens > burst {
  19. tokens = burst
  20. }
  21. return now, last, tokens
  22. }

advance方法的作用是更新令牌桶的状态,计算出令牌桶未更新的时间(elapsed),根据elapsed算出需要向桶中加入的令牌数delta,然后算出桶中可用的令牌数newTokens

now:当前时间

lastNow: 上次更新令牌的时间

newTokens:计算出桶中可用的令牌数

 

reserveN方法

  1. func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
  2. lim.mu.Lock()
  3. //如果没有限流则直接返回
  4. if lim.limit == Inf {
  5. lim.mu.Unlock()
  6. return Reservation{
  7. ok: true, //桶中有足够的令牌
  8. lim: lim,
  9. tokens: n,
  10. timeToAct: now,
  11. }
  12. }
  13. //更新令牌桶的状态,tokens为目前可用的令牌数量
  14. now, last, tokens := lim.advance(now)
  15. // Calculate the remaining number of tokens resulting from the request.
  16. //可用的令牌数tokens减去需要获取的令牌数(n)
  17. tokens -= float64(n)
  18. // Calculate the wait duration
  19. //如果tokens小于0,则说明桶中没有足够的令牌,计算出产生这些缺数的令牌需要多久(waitDuration)
  20. //计算出产生出缺数的令牌(即-tokens)需要多长时间
  21. var waitDuration time.Duration
  22. if tokens < 0 {
  23. waitDuration = lim.limit.durationFromTokens(-tokens)
  24. }
  25. // Decide result
  26. //如果n小于等于令牌桶的容量,并且可以等待到足够的令牌(即 waitDuration <= maxFutureReserve),则ok为true。表示可以获取到足够的令牌
  27. ok := n <= lim.burst && waitDuration <= maxFutureReserve
  28. // Prepare reservation
  29. r := Reservation{
  30. ok: ok,
  31. lim: lim,
  32. limit: lim.limit,
  33. }
  34. if ok {
  35. r.tokens = n // 需要的令牌数
  36. r.timeToAct = now.Add(waitDuration) //计算获取到足够令牌的结束时间点
  37. }
  38. // Update state
  39. if ok {
  40. lim.last = now //更新tokens的时间
  41. lim.tokens = tokens //更新令牌桶目前可用的令牌数tokens
  42. lim.lastEvent = r.timeToAct //下次事件时间(即获取到足够令牌的时刻)
  43. } else {
  44. lim.last = last
  45. }
  46. lim.mu.Unlock()
  47. return r
  48. }

参数:

now:当前时间

n: 想要获取的令牌数量

maxFutureReserve: 最大的等待时间

 

reserveN是 AllowN, ReserveN及 WaitN的辅助方法,用于判断在maxFutureReserve时间内是否有足够的令牌。

 

durationFromTokens和tokensFromDuration工具转换方法

  1. func (limit Limit) durationFromTokens(tokens float64) time.Duration {
  2. seconds := tokens / float64(limit)
  3. return time.Nanosecond * time.Duration(1e9*seconds)
  4. }

根据根据令牌数量tokens计算出产生该数量的令牌需要的时长

 

  1. // tokensFromDuration is a unit conversion function from a time duration to the number of tokens
  2. // which could be accumulated during that duration at a rate of limit tokens per second.
  3. func (limit Limit) tokensFromDuration(d time.Duration) float64 {
  4. // Split the integer and fractional parts ourself to minimize rounding errors.
  5. // See golang.org/issues/34861.
  6. sec := float64(d/time.Second) * float64(limit)
  7. nsec := float64(d%time.Second) * float64(limit)
  8. return sec + nsec/1e9
  9. }

获取指定期间d内产生的令牌数量

 

另外:

Limiter的Limit方法用于获取限流的速率即结构体中limit的值,Burst方法用于返回桶的最大容量。

 

 

2. Reservation相关的方法

 

Reservation相关的方法即预约令牌需要使用的方法

 

Reserve和ReserveN分别用于预约1个或者n个令牌

  1. func (lim *Limiter) Reserve() *Reservation {
  2. return lim.ReserveN(time.Now(), 1)
  3. }
  4. // ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
  5. // The Limiter takes this Reservation into account when allowing future events.
  6. // ReserveN returns false if n exceeds the Limiter's burst size.
  7. // Usage example:
  8. // r := lim.ReserveN(time.Now(), 1)
  9. // if !r.OK() {
  10. // // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
  11. // return
  12. // }
  13. // time.Sleep(r.Delay())
  14. // Act()
  15. // Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
  16. // If you need to respect a deadline or cancel the delay, use Wait instead.
  17. // To drop or skip events exceeding rate limit, use Allow instead.
  18. func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
  19. r := lim.reserveN(now, n, InfDuration)
  20. return &r
  21. }

返回一个Reservation,该Reservation指示调用者在n个事件发生前需要等待多长事件

如果n超出了限流器的burst,则返回false

 

  1. func (r *Reservation) OK() bool {
  2. return r.ok
  3. }

返回限流器limiter是否可以在最大等待时间内提供请求数量的令牌。

如果Ok为false,则Delay返回InfDuration,Cancel不执行任何操作

 

  1. func (r *Reservation) Delay() time.Duration {
  2. return r.DelayFrom(time.Now())
  3. }

返回到截至时间的时间段 ,即需要等待的时间

 

  1. func (r *Reservation) DelayFrom(now time.Time) time.Duration {
  2. if !r.ok { //ok为false,则返回InfDuration
  3. return InfDuration
  4. }
  5. delay := r.timeToAct.Sub(now) //截止时间
  6. if delay < 0 { //如果截至时间已过,则返回0
  7. return 0
  8. }
  9. return delay
  10. }

DelayFrom方法用于返回当前时间now到截至时间的时间段

如果为0,表示有足够的令牌,需要立即执行

如果返回InfDuration,表示到截至时间时仍然没有足够的令牌

 

  1. func (r *Reservation) Cancel() {
  2. r.CancelAt(time.Now())
  3. return
  4. }
  5. func (r *Reservation) CancelAt(now time.Time) {
  6. if !r.ok {
  7. return
  8. }
  9. r.lim.mu.Lock()
  10. defer r.lim.mu.Unlock()
  11. /*
  12. 1.如果无需限流
  13. 2. tokens为0 (需要获取的令牌数量为0)
  14. 3. 已经过了截至时间
  15. 以上三种情况无需处理取消操作
  16. */
  17. if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
  18. return
  19. }
  20. // calculate tokens to restore
  21. // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
  22. // after r was obtained. These tokens should not be restored.
  23. //计算出出需要还原的令牌数量
  24. //这里的r.lim.lastEvent可能是本次Reservation的结束时间,也可能是后来的Reservation的结束时间,所以要把本次结束时间点(r.timeToAct)之后产生的令牌数减去
  25. restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
  26. if restoreTokens <= 0 {
  27. return
  28. }
  29. // advance time to now
  30. //从新计算令牌桶的状态
  31. now, _, tokens := r.lim.advance(now)
  32. // calculate new number of tokens
  33. //还原当前令牌桶的令牌数量,当前的令牌数tokens加上需要还原的令牌数restoreTokens
  34. tokens += restoreTokens
  35. //如果tokens大于桶的最大容量,则将tokens置为桶的最大容量
  36. if burst := float64(r.lim.burst); tokens > burst {
  37. tokens = burst
  38. }
  39. // update state
  40. r.lim.last = now //记录桶的更新时间
  41. r.lim.tokens = tokens //更新令牌数量
  42. //还原lastEvent,即上次速率受限制的时间
  43. if r.timeToAct == r.lim.lastEvent {
  44. prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
  45. if !prevEvent.Before(now) {
  46. r.lim.lastEvent = prevEvent
  47. }
  48. }
  49. return
  50. }

CancelAt用于取消预约令牌操作,如果有需要还原的令牌,则将需要还原的令牌重新放入到令牌桶中。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/724382
推荐阅读
相关标签
  

闽ICP备14008679号