赞
踩
保障服务稳定的三大利器:熔断、降级、服务限流。今天和大家谈谈限流算法的几种实现方式,本文所说的限流并非是Nginx、网关层面的限流,而是业务代码中的逻辑限流。
常见的限流算法:固定窗口、滑动窗口、漏桶、令牌桶
在一个固定的时间窗口内对请求计数,如果请求数达到阈值那么进行限流,到达时间临界点时重置计数
import ( "sync" "time" ) type CountLimitRate struct { duration time.Duration rate int32 mu sync.Mutex count int32 lastGetTime time.Time } func NewCountLimitRate(duration time.Duration, rate int32) *CountLimitRate { return &CountLimitRate{ duration: duration, rate: rate, lastGetTime: time.Now(), } } func (r *CountLimitRate) Acquire() bool { r.mu.Lock() defer r.mu.Unlock() now := time.Now() if now.Sub(r.lastGetTime) > r.duration { r.Reset(now) } if r.count >= r.rate { return false } r.count++ return true } func (r *CountLimitRate) Reset(getTime time.Time) { r.lastGetTime = getTime r.count = 0 }
固定窗口会有边界峰值问题即突刺问题
比如窗口大小是1S,限制是1S内最多100个请求
前一S的最后200ms出现100个请求,后一S的前200ms出现100个请求,对于固定窗口来说是符合规定的,但此时就出现了边界峰值,1S内有200个请求,可能会压垮后端服务
滑动窗口算法将一个大的时间窗口分成多个小窗口(timeSlot),每次大窗口向后滑动一个小窗口,并保证大的窗口内流量不会超出最大值,大窗口内的流量是所有小窗口流量之和。
对于滑动时间窗口,我们可以把1s的时间窗口划分成10个小窗口即窗口有10个时间插槽timeSlot, 每个timeSlot统计某个100ms的请求数量。每经过100ms,有一个新的timeSlot加入窗口,早于当前时间1s的timeSlot出窗口,窗口内最多维护10个time slot。
import ( "sync" "time" ) type timeSlot struct { startTime time.Time count int32 } type SlideWindowLimitRate struct { rate int32 windowDuration time.Duration slotDuration time.Duration mu sync.Mutex slotList []*timeSlot } type SlideWindowDuration struct { windowDuration time.Duration slotDuration time.Duration } func NewSlideWindowLimitRate(rate int32, slideWindowDuration *SlideWindowDuration) *SlideWindowLimitRate { return &SlideWindowLimitRate{ rate: rate, windowDuration: slideWindowDuration.windowDuration, slotDuration: slideWindowDuration.slotDuration, slotList: make([]*timeSlot, 0, slideWindowDuration.windowDuration/slideWindowDuration.slotDuration), } } func (r *SlideWindowLimitRate) Acquire() bool { r.mu.Lock() defer r.mu.Unlock() now := time.Now() discardSlotIdx := -1 for i := range r.slotList { slot := r.slotList[i] if slot.startTime.Add(r.slotDuration).After(now) { break } discardSlotIdx = i } if discardSlotIdx > -1 { r.slotList = r.slotList[discardSlotIdx+1:] } var reqCount int32 = 0 for i := range r.slotList { reqCount += r.slotList[i].count } if reqCount >= r.rate { return false } if len(r.slotList) > 0 { r.slotList[len(r.slotList)-1].count++ } else { r.slotList = append(r.slotList, r.newTimeSlot(now)) } return true } func (r *SlideWindowLimitRate) newTimeSlot(startTime time.Time) *timeSlot { return &timeSlot{startTime: startTime, count: 1} }
如上图
前一S的后200ms即800~1000ms来个100个请求,窗口内100个请求就满了
下一S的前200ms即1000~1200ms再来100个请求,就会被限流拒绝
想象有一个木桶,桶的容量是固定的。当有请求到来时先放到木桶中,处理请求的worker以固定的速度从木桶中取出请求进行相应。如果木桶已经满了,直接返回请求限流错误。
import ( "math" "sync" "time" ) type LeakBucketLimitRate struct { rate float64 //每秒速度 capacity float64 mu sync.Mutex lastLeakTime time.Time waterCapacity float64 } func NewLeadBucketLimitRate(rate float64, capacity float64) *LeakBucketLimitRate { return &LeakBucketLimitRate{ rate: rate, capacity: capacity, lastLeakTime: time.Now(), waterCapacity: 0, } } func (r *LeakBucketLimitRate) Acquire() bool { r.mu.Lock() defer r.mu.Unlock() now := time.Now() remainWaterCap := math.Max(0, r.waterCapacity-(now.Sub(r.lastLeakTime).Seconds()*r.rate)) r.lastLeakTime = now if remainWaterCap < r.capacity { r.waterCapacity = remainWaterCap + 1 return true } return false }
如果需要以固定的速率处理请求,不接收并发的高流量的请求,那么适合适用漏桶算法
想象有一个木桶,桶的容量是固定的。会以固定的速率往木桶中放入令牌,如果能从木桶中拿到令牌那么允许处理请求,否则返回限流错误。
import ( "math" "sync" "time" ) type TokenBucketLimitRate struct { rate float64 capacity float64 mu sync.Mutex tokenCount float64 lastAddTokenTime time.Time } func NewTokenBucketLimitRate(rate float64, capacity float64) *TokenBucketLimitRate { return &TokenBucketLimitRate{ rate: rate, capacity: capacity, tokenCount: 0, lastAddTokenTime: time.Now(), } } func (r *TokenBucketLimitRate) Acquire() bool { r.mu.Lock() defer r.mu.Unlock() now := time.Now() addTokenNum := now.Sub(r.lastAddTokenTime).Seconds() * r.rate r.tokenCount = math.Min(r.capacity, r.tokenCount+addTokenNum) r.lastAddTokenTime = now if r.tokenCount > 0 { r.tokenCount-- return true } return false }
因为令牌桶可以缓存令牌,所以可以应对突发的高并发流量,比如木桶最多可以存储500个令牌,那么就可以最多处理500并发量的请求
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。