赞
踩
源码地址:golang.org/x/time/rate
rate包是Google基于令牌桶的算法实现的限流器,可以在服务的限流中使用。
- //定义某个时间的最大频率
- //表示每秒的事件数
- type Limit float64
-
- //Inf表示无速率限制
- const Inf = Limit(math.MaxFloat64)
Limiter结构体
- type Limiter struct {
- limit Limit //每秒允许处理的事件数量,即每秒处理事件的频率
- burst int //令牌桶的最大数量, 如果burst为0,则除非limit == Inf,否则不允许处理任何事件。
-
- mu sync.Mutex
- tokens float64 //令牌桶中可用的令牌数量
- // last is the last time the limiter's tokens field was updated
- //记录上次limiter的tokens被更新的时间
- last time.Time
- // lastEvent is the latest time of a rate-limited event (past or future)
- //lastEvent记录速率受限制(桶中没有令牌)的时间点,该时间点可能是过去的,也可能是将来的(Reservation预定的结束时间点)
- lastEvent time.Time
- }
Limiter是限流器中最核心的结构体,用于限流(控制事件发生的频率),在初始化后默认是满的,并以每秒r个令牌的速率重新填充直到达到桶的容量(burst),如果r == Inf表示无限制速率。
Limiter有三个主要的方法 Allow、Reserve和Wait,最常用的是Wait和Allow方法
这三个方法每调用一次都会消耗一个令牌,这三个方法的区别在于没有令牌时,他们的处理方式不同
Allow: 如果没有令牌,则直接返回false
Reserve:如果没有令牌,则返回一个reservation,
Wait:如果没有令牌,则等待直到获取一个令牌或者其上下文被取消。
tokens更新的策略:
1). 成功获取到令牌或成功预约(Reserve)到令牌
2). 预约取消时(Cancel)并且需要还原令牌到令牌桶中时
3). 重新设置限流器的速率时(SetLimit)
4. 重新设置限流器的容量时(SetBurst)
lastEvent表示速率受限制的时间点,它可能时过去的时间,也可能时将来的时间。
如果没有预约令牌的话,该时间等于last,是过去的
如果有预约令牌的话,该时间等于最新的预约的截至时间。
注意:由于令牌桶的令牌可以预约,所有令牌桶中的tokens可能为负数。
Reservation结构体
- type Reservation struct {
- ok bool //到截至时间是否可以获取足够的令牌
- lim *Limiter
- tokens int //需要获取的令牌数量
- timeToAct time.Time //需要等待的时间点
- // This is the Limit at reservation time, it can change later.
- limit Limit
- }
Reservation可以理解成预定令牌的操作,timeToAct是本次预约需要等待到的指定时间点才有足够预约的令牌。
1) Limiter初始化
- func NewLimiter(r Limit, b int) *Limiter {
- return &Limiter{
- limit: r,
- burst: b,
- }
- }
初始化Limiter,指定每秒允许处理事件的上限为r,允许令牌桶的最大容量为b
- func Every(interval time.Duration) Limit {
- if interval <= 0 {
- return Inf
- }
- return 1 / Limit(interval.Seconds())
- }
Every将事件的最小时间间隔转换为限制
2)Limiter使用
- func (lim *Limiter) Allow() bool {
- return lim.AllowN(time.Now(), 1)
- }
从令牌桶中获取一个令牌,成功获取到则返回true
- func (lim *Limiter) AllowN(now time.Time, n int) bool {
- return lim.reserveN(now, n, 0).ok
- }
从令牌桶中获取n个令牌,成功获取到则返回true
- func (lim *Limiter) Wait(ctx context.Context) (err error) {
- return lim.WaitN(ctx, 1)
- }
获取一个令牌,如果没有则等待直到获取令牌或者上下文ctx取消
- func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
- //同步获取令牌桶的最大容量burst和限流器的速率limit
- lim.mu.Lock()
- burst := lim.burst
- limit := lim.limit
- lim.mu.Unlock()
-
- //如果n大于令牌桶的最大容量,则返回error
- if n > burst && limit != Inf {
- return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
- }
- // Check if ctx is already cancelled
- //判断上下文ctx是否已经被取消,如果已经取消则返回error
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
-
- // Determine wait limit
- now := time.Now()
- waitLimit := InfDuration
- if deadline, ok := ctx.Deadline(); ok { //如果可以获取上下文的截至时间,则更新可以等待的时间waitLimit
- waitLimit = deadline.Sub(now)
- }
-
- // Reserve
- //调用reserveN获取Reversation
- r := lim.reserveN(now, n, waitLimit)
- if !r.ok { //没有足够的时间获取令牌,则返回error
- return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
- }
- // Wait if necessary
- //需要等待的时间
- delay := r.DelayFrom(now)
- if delay == 0 {
- return nil
- }
- t := time.NewTimer(delay)
- defer t.Stop()
- select {
- case <-t.C:
- // We can proceed.
- return nil
- case <-ctx.Done():
- // Context was canceled before we could proceed. Cancel the
- // reservation, which may permit other events to proceed sooner.
- r.Cancel()
- return ctx.Err()
- }
- }
WaitN方法获取n个令牌,直到成功获取或者ctx取消
如果n大于令牌桶的最大容量则返回error
如果上下文被取消或者等待的时间大于上下文的截至时间,则返回error
如果速率限制为Inf则不会限流
无论是Wait、Allow或者Reserve其实都会调用advance和reserveN方法,所以这两个方法是整个限流器rate实现的核心。
advance方法
- func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
- //last不能在当前时间now之后,否则计算出来的elapsed为负数,会导致令牌桶数量减少
- last := lim.last
- if now.Before(last) {
- last = now
- }
-
- // Avoid making delta overflow below when last is very old.
- //根据令牌桶的缺数计算出令牌桶未进行更新的最大时间
- maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
- elapsed := now.Sub(last) //令牌桶未进行更新的时间段
- if elapsed > maxElapsed {
- elapsed = maxElapsed
- }
-
- // Calculate the new number of tokens, due to time that passed.
- //根据未更新的时间(未向桶中加入令牌的时间段)计算出产生的令牌数。
- delta := lim.limit.tokensFromDuration(elapsed)
- tokens := lim.tokens + delta //计算出可用的令牌数
- if burst := float64(lim.burst); tokens > burst {
- tokens = burst
- }
-
- return now, last, tokens
- }
advance方法的作用是更新令牌桶的状态,计算出令牌桶未更新的时间(elapsed),根据elapsed算出需要向桶中加入的令牌数delta,然后算出桶中可用的令牌数newTokens
now:当前时间
lastNow: 上次更新令牌的时间
newTokens:计算出桶中可用的令牌数
reserveN方法
- func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
- lim.mu.Lock()
-
- //如果没有限流则直接返回
- if lim.limit == Inf {
- lim.mu.Unlock()
- return Reservation{
- ok: true, //桶中有足够的令牌
- lim: lim,
- tokens: n,
- timeToAct: now,
- }
- }
-
- //更新令牌桶的状态,tokens为目前可用的令牌数量
- now, last, tokens := lim.advance(now)
-
- // Calculate the remaining number of tokens resulting from the request.
- //可用的令牌数tokens减去需要获取的令牌数(n)
- tokens -= float64(n)
-
- // Calculate the wait duration
- //如果tokens小于0,则说明桶中没有足够的令牌,计算出产生这些缺数的令牌需要多久(waitDuration)
- //计算出产生出缺数的令牌(即-tokens)需要多长时间
- var waitDuration time.Duration
- if tokens < 0 {
- waitDuration = lim.limit.durationFromTokens(-tokens)
- }
-
- // Decide result
- //如果n小于等于令牌桶的容量,并且可以等待到足够的令牌(即 waitDuration <= maxFutureReserve),则ok为true。表示可以获取到足够的令牌
- ok := n <= lim.burst && waitDuration <= maxFutureReserve
-
- // Prepare reservation
- r := Reservation{
- ok: ok,
- lim: lim,
- limit: lim.limit,
- }
- if ok {
- r.tokens = n // 需要的令牌数
- r.timeToAct = now.Add(waitDuration) //计算获取到足够令牌的结束时间点
- }
-
- // Update state
- if ok {
- lim.last = now //更新tokens的时间
- lim.tokens = tokens //更新令牌桶目前可用的令牌数tokens
- lim.lastEvent = r.timeToAct //下次事件时间(即获取到足够令牌的时刻)
- } else {
- lim.last = last
- }
-
- lim.mu.Unlock()
- return r
- }
-
参数:
now:当前时间
n: 想要获取的令牌数量
maxFutureReserve: 最大的等待时间
reserveN是 AllowN, ReserveN及 WaitN的辅助方法,用于判断在maxFutureReserve时间内是否有足够的令牌。
durationFromTokens和tokensFromDuration工具转换方法
- func (limit Limit) durationFromTokens(tokens float64) time.Duration {
- seconds := tokens / float64(limit)
- return time.Nanosecond * time.Duration(1e9*seconds)
- }
根据根据令牌数量tokens计算出产生该数量的令牌需要的时长
- // tokensFromDuration is a unit conversion function from a time duration to the number of tokens
- // which could be accumulated during that duration at a rate of limit tokens per second.
- func (limit Limit) tokensFromDuration(d time.Duration) float64 {
- // Split the integer and fractional parts ourself to minimize rounding errors.
- // See golang.org/issues/34861.
- sec := float64(d/time.Second) * float64(limit)
- nsec := float64(d%time.Second) * float64(limit)
- return sec + nsec/1e9
- }
获取指定期间d内产生的令牌数量
另外:
Limiter的Limit方法用于获取限流的速率即结构体中limit的值,Burst方法用于返回桶的最大容量。
Reservation相关的方法即预约令牌需要使用的方法
Reserve和ReserveN分别用于预约1个或者n个令牌
- func (lim *Limiter) Reserve() *Reservation {
- return lim.ReserveN(time.Now(), 1)
- }
-
- // ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
- // The Limiter takes this Reservation into account when allowing future events.
- // ReserveN returns false if n exceeds the Limiter's burst size.
- // Usage example:
- // r := lim.ReserveN(time.Now(), 1)
- // if !r.OK() {
- // // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
- // return
- // }
- // time.Sleep(r.Delay())
- // Act()
- // Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
- // If you need to respect a deadline or cancel the delay, use Wait instead.
- // To drop or skip events exceeding rate limit, use Allow instead.
- func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
- r := lim.reserveN(now, n, InfDuration)
- return &r
- }
返回一个Reservation,该Reservation指示调用者在n个事件发生前需要等待多长事件
如果n超出了限流器的burst,则返回false
- func (r *Reservation) OK() bool {
- return r.ok
- }
返回限流器limiter是否可以在最大等待时间内提供请求数量的令牌。
如果Ok为false,则Delay返回InfDuration,Cancel不执行任何操作
- func (r *Reservation) Delay() time.Duration {
- return r.DelayFrom(time.Now())
- }
返回到截至时间的时间段 ,即需要等待的时间
- func (r *Reservation) DelayFrom(now time.Time) time.Duration {
- if !r.ok { //ok为false,则返回InfDuration
- return InfDuration
- }
- delay := r.timeToAct.Sub(now) //截止时间
- if delay < 0 { //如果截至时间已过,则返回0
- return 0
- }
- return delay
- }
DelayFrom方法用于返回当前时间now到截至时间的时间段
如果为0,表示有足够的令牌,需要立即执行
如果返回InfDuration,表示到截至时间时仍然没有足够的令牌
- func (r *Reservation) Cancel() {
- r.CancelAt(time.Now())
- return
- }
-
-
- func (r *Reservation) CancelAt(now time.Time) {
- if !r.ok {
- return
- }
-
- r.lim.mu.Lock()
- defer r.lim.mu.Unlock()
-
- /*
- 1.如果无需限流
- 2. tokens为0 (需要获取的令牌数量为0)
- 3. 已经过了截至时间
- 以上三种情况无需处理取消操作
- */
- if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
- return
- }
-
- // calculate tokens to restore
- // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
- // after r was obtained. These tokens should not be restored.
- //计算出出需要还原的令牌数量
- //这里的r.lim.lastEvent可能是本次Reservation的结束时间,也可能是后来的Reservation的结束时间,所以要把本次结束时间点(r.timeToAct)之后产生的令牌数减去
- restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
- if restoreTokens <= 0 {
- return
- }
-
-
- // advance time to now
- //从新计算令牌桶的状态
- now, _, tokens := r.lim.advance(now)
- // calculate new number of tokens
- //还原当前令牌桶的令牌数量,当前的令牌数tokens加上需要还原的令牌数restoreTokens
- tokens += restoreTokens
- //如果tokens大于桶的最大容量,则将tokens置为桶的最大容量
- if burst := float64(r.lim.burst); tokens > burst {
- tokens = burst
- }
- // update state
- r.lim.last = now //记录桶的更新时间
- r.lim.tokens = tokens //更新令牌数量
- //还原lastEvent,即上次速率受限制的时间
- if r.timeToAct == r.lim.lastEvent {
- prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
- if !prevEvent.Before(now) {
- r.lim.lastEvent = prevEvent
- }
- }
-
- return
- }
CancelAt用于取消预约令牌操作,如果有需要还原的令牌,则将需要还原的令牌重新放入到令牌桶中。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。