赞
踩
令牌桶的一个主要使用场景是限流。
程序以一定的速率生产令牌加入到令牌桶中。
每个请求到达时都会尝试从令牌桶中获取一块令牌, 如果获取令牌失败(令牌桶为空)则不处理该请求, 以此达到限流的目的。
juju/ratelimit是开源的、golang语言实现的高效令牌桶,代码简洁,在本文写作时该项目有2.4k的start。
项目地址是 github.com/juju/ratelimit
先看三个创建令牌锁的方法及它们的区别:
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
//令牌桶容量为1,每10ms填充一块令牌
bucket := ratelimit.NewBucket(10 * time.Millisecond, 1)
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket
//令牌桶容量为3, 每10ms填充3块令牌
bucket := ratelimit.NewBucketWithQuantum(10 * time.Millisecond, 3, 3)
func NewBucketWithRate(rate float64, capacity int64) *Bucket
//令牌桶容量为1, 每秒限速100次
bucket := ratelimit.NewBucketWithRate(100, 1)
Take 非阻塞, 返回需等待的时间
func (tb *Bucket) Take(count int64) time.Duration
TakeAvailable 非阻塞, 令牌数不满足需求时, 返回可用的令牌数
func (tb *Bucket) TakeAvailable(count int64) int64
TakeMaxDuration 非阻塞, 返回需等待时间, 超过最大时间返回 0, false
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
Wait 阻塞, 直到拿到令牌
func (tb *Bucket) Wait(count int64)
WaitMaxDuration 阻塞 若在最大等待时间内能拿到令牌则阻塞, 否则立即返回false
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool
Available 返回当前可用令牌数
func (tb *Bucket) Available() int64
Rate 返回每秒限流速率
func (tb *Bucket) Rate() float64
type Bucket struct { //Clock提供了获取当前时间、sleep指定时间的方法 clock Clock //桶被创建的时间, 当前属于第几个tick也是基于这个起始时间来计算 startTime time.Time // 桶容量 capacity int64 // 每个tick向桶填充的令牌数 quantum int64 // 填充间隔 fillInterval time.Duration // 桶方法使用的互斥锁, 保障线程安全 mu sync.Mutex // 桶内可用的令牌数 // 当有操作等待令牌的时候, 这个数值会变成负数 availableTokens int64 // 上一个填充tick // 计算当前tick与上一个tick的差值 得出需要填充的令牌数 latestTick int64 }
juju/ratelimit没有启用额外的线程定时向桶中填充令牌, 而是在外部调用令牌锁的方法时触发一次填充方法,根据当前时间和令牌锁的创建时间的差值计算出是否需要填充、需要填充的数量。
填充令牌的方法, 外部调用令牌锁的方法时会触发
func (tb *Bucket) adjustavailableTokens(tick int64) { //令牌桶结构中记录了上一个填充周期的值 lastTick := tb.latestTick tb.latestTick = tick //如果桶是满的直接返回 if tb.availableTokens >= tb.capacity { return } //需要填充的数量是 (本周期数 - 上次填充周期数) * 单周期填充数 tb.availableTokens += (tick - lastTick) * tb.quantum //填充数量不得超过桶的容量 if tb.availableTokens > tb.capacity { tb.availableTokens = tb.capacity } return }
Take(), TakeMaxDuration(), Wait(), WaitMaxDuration()这几个方法都是通过调用take()这个内部方法实现的
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) { if count <= 0 { return 0, true } //计算当前tick, 调用adjustavailableTokens填充令牌 tick := tb.currentTick(now) tb.adjustavailableTokens(tick) //用可用令牌数(availableTokens)减去需要获取的令牌数(count),这里计算出的avail可能为负值 avail := tb.availableTokens - count //令牌充足, 返回0(不需要等待), true(获取令牌成功) if avail >= 0 { tb.availableTokens = avail return 0, true } //计算出一个endTick, 在未来的endTick到达时,令牌数将不再是负的 endTick := tick + (-avail+tb.quantum-1)/tb.quantum //计算endTick的时间点 endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval) //需要等待的时间时endTime - now waitTime := endTime.Sub(now) if waitTime > maxWait { return 0, false } //更新availableTokens, 可能为负值 tb.availableTokens = avail //返回等待时间, 获取成功 return waitTime, true }
import ( "github.com/juju/ratelimit" "time" ) func main() { //创建一个令牌桶初始容量为1, 每10ms填充3个令牌 bucket := ratelimit.NewBucketWithQuantum(10 * time.Millisecond, 1, 3) //程序运行长3秒 endTime := time.Now().Add(3 * time.Second) //打印桶的每秒限速频率(预期300/s) println("bucket rate:" , bucket.Rate(), "/s") //使用一个变量记录获取令牌的总数 var tockensCount int64 = 0 for { //每次拿1块令牌, 成功返回1, 失败返回0 tocken := bucket.TakeAvailable(1) tockensCount += tocken if(time.Now().After(endTime)) { println("tockensCount: ", tockensCount) return; } time.Sleep(5 * time.Millisecond) } }
bucket rate: +3.000000e+002 /s
tockensCount: 301
tockensCount为什么是301而不是300:
因为创建的令牌桶初始容量为1。桶初始化完成后里面已经有一块令牌了, 可以立即拿到这块令牌不需要等待填充。
在后面的3000ms共填充了300块令牌。
使用令牌桶时要注意, 由于令牌桶是有"容量"的, 允许一定的瞬时流量, 对限制速率有严格要求的时候要小心设置容量与填充速度, 并进行实测验证。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。