赞
踩
尽管云原生网关里有统一入口的限流(根据ip、userID来控制),但是有的微服务需要有自己的限流策略(比如根据不同的算法任务、不同的子产品来限制),所以封装了一个限流器公共包,可以在多个微服务中复用这个功能。直接原因是有一次某个子功能流量激增,大量任务失败。
关键步骤:
针对ip限流,例如1s限制50个请求,考虑到公共ip的情况;
针对某个算法任务,不限制vip用户,普通用户1s限制创建10个创建任务的请求。
目前的做法是限流了就直接拒绝,抛出错误提示,还有其他的做法:
我们公司有完善的监控,所以我可以通过观测到的性能数据来确定阈值。比如说观察线上的数据,如果在业务高峰期整个集群的 QPS 都没超过 1000,那么就可以考虑将阈值设定在 1200,多出来的 200 就是余量。
不过这种方式有一个要求,就是服务必须先上线,有了线上的观测数据才能确定阈值。并且,整个阈值很有可能是偏低的。因为业务巅峰并不意味着是集群性能的瓶颈。如果集群本身可以承受每秒 3000 个请求,但是因为业务量不够,每秒只有 1000 个请求,那么我这里预估出来的阈值是显著低于集群真实瓶颈 QPS 的。
不过我个人觉得,最好的方式应该是在线上执行全链路压测,测试出瓶颈。即便不能做全链路压测,也可以考虑模拟线上环境进行压测,再差也应该在测试环境做一个压力测试。
不过如果真的做不了,或者来不及,或者没资源,那么还可以考虑参考类似服务的阈值。比如说如果 A、B 服务是紧密相关的,也就是通常调用了 A 服务就会调用 B 服务,那么可以用 A 已经确定的阈值作为 B 的阈值。又或者 A 服务到 B 服务之间有一个转化关系。比如说创建订单到支付,会有一个转化率,假如说是 90%,如果创建订单的接口阈值是 100,那么支付的接口就可以设置为 90。
实在没办法了,就只能手动计算了。也就是沿着整条调用链路统计出现了多少次数据库查询、多少次微服务调用、多少次第三方中间件访问,如 Redis,Kafka 等。举一个最简单的例子,假如说一个非常简单的服务,整个链路只有一次数据库查询,这是一个会回表的数据库查询,根据公司的平均数据这一次查询会耗时 10ms,那么再增加 10 ms 作为 CPU 计算耗时。也就是说这一个接口预期的响应时间是 20ms。如果一个实例是 4 核,那么就可以简单用 1000ms÷20ms×4=200 得到阈值。
系统会以一个恒定的速率产生令牌,这些令牌会放到一个桶里面,每个请求只有拿到了令牌才会被执行。每当一个请求过来的时候,就需要尝试从桶里面拿一个令牌。如果拿到了令牌,那么请求就会被处理;如果没有拿到,那么这个请求就被限流了。(当令牌桶已满时,新生成的令牌会被丢弃,不会增加桶中的令牌数量。)
你需要注意,本身令牌桶是可以积攒一定数量的令牌的。比如说桶的容量是 100,也就是这里面最多积攒 100 个令牌。那么当某一时刻突然来了 100 个请求,它们都能拿到令牌。
漏桶是指当请求以不均匀的速度到达服务器之后,限流器会以固定的速率转交给业务逻辑。
某种程度上,你可以将漏桶算法看作是令牌桶算法的一种特殊形态。你将令牌桶中桶的容量设想为 0,就是漏桶了。
所以你可以看到,在漏桶里面,令牌产生之后你就需要取走,没取走的话也不会积攒下来。因此漏桶是绝对均匀的,而令牌桶不是绝对均匀的。
固定窗口是指在一个固定时间段,只允许执行固定数量的请求。比如说在一秒钟之内只能执行 100 个请求。滑动窗口类似于固定窗口,也是指在一个固定时间段内,只允许执行固定数量的请求。区别就在于,滑动窗口是平滑地挪动窗口,而不像固定窗口那样突然地挪动窗口。假设窗口大小是一分钟。此时时间是 t1,那么窗口的起始位置是 t1-1 分钟。过了 2 秒之后,窗口大小依旧是 1 分钟,但是窗口的起始位置也向后挪动了 2 秒,变成了 t1 - 1 分钟 + 2 秒。这也就是滑动的含义。
参考:
https://blog.csdn.net/z3551906947/article/details/140477024,并且里面阐述了各个算法的优缺点,漏桶是可以用来处理突发流量的。
package limiter import ( "sync" "time" ) // TokenBucketLimiter 令牌桶限流器 type TokenBucketLimiter struct { capacity int // 容量 currentTokens int // 令牌数量 rate int // 发放令牌速率/秒 lastTime time.Time // 上次发放令牌时间 mutex sync.Mutex // 避免并发问题 } // NewTokenBucketLimiter 创建一个新的令牌桶限流器实例。 func NewTokenBucketLimiter(capacity, rate int) *TokenBucketLimiter { return &TokenBucketLimiter{ capacity: capacity, rate: rate, lastTime: time.Now(), currentTokens: 0, // 初始化时桶中没有令牌 } } // TryAcquire 尝试从令牌桶中获取一个令牌。 func (l *TokenBucketLimiter) TryAcquire() bool { l.mutex.Lock() defer l.mutex.Unlock() now := time.Now() interval := now.Sub(l.lastTime) // 计算时间间隔 // 如果距离上次发放令牌超过 1/rate 秒,则发放新的令牌 if float64(interval) >= float64(time.Second)/float64(l.rate) { // 计算应该发放的令牌数量,但不超过桶的容量 newTokens := int(float64(interval)/float64(time.Second)* l.rate) l.currentTokens = minInt(l.capacity, l.currentTokens+newTokens) // 更新上次发放令牌的时间 l.lastTime = now } // 如果桶中没有令牌,则请求失败 if l.currentTokens == 0 { return false } // 桶中有令牌,消费一个令牌 l.currentTokens-- return true } // minInt 返回两个整数中的较小值。 func minInt(a, b int) int { if a < b { return a } return b } func TestName(t *testing.T) { tokenBucket := NewTokenBucketLimiter(5, 10) for i := 0; i < 10; i++ { fmt.Println(tokenBucket.TryAcquire()) } time.Sleep(100 * time.Millisecond) fmt.Println(tokenBucket.TryAcquire()) }
package limiter import ( "fmt" "math" "sync" "testing" "time" ) // LeakyBucketLimiter 漏桶限流器 type LeakyBucketLimiter struct { capacity int // 桶容量 currentLevel int // 当前水位 rate int // 水流速度/秒 lastTime time.Time // 上次放水时间 mutex sync.Mutex // 避免并发问题 } // NewLeakyBucketLimiter 初始化漏桶限流器 func NewLeakyBucketLimiter(capacity, rate int) *LeakyBucketLimiter { return &LeakyBucketLimiter{ capacity: capacity, currentLevel: 0, // 初始化时水位为0 rate: rate, lastTime: time.Now(), } } // TryAcquire 尝试获取处理请求的权限 func (l *LeakyBucketLimiter) TryAcquire() bool { l.mutex.Lock() // 直接获取写锁 defer l.mutex.Unlock() // 如果上次放水时间距今不到 1/rate 秒,不需要放水 now := time.Now() interval := now.Sub(l.lastTime) // 计算放水后的水位 if float64(interval) >= float64(time.Second)/float64(l.rate) { l.currentLevel = int(math.Max(0, float64(l.currentLevel)-float64(interval)/float64(time.Second)*float64(l.rate))) l.lastTime = now } // 尝试增加水位 if l.currentLevel < l.capacity { l.currentLevel++ return true } return false } func TestName(t *testing.T) { tokenBucket := NewLeakyBucketLimiter(5, 10) for i := 0; i < 10; i++ { fmt.Println(tokenBucket.TryAcquire()) } time.Sleep(100 * time.Millisecond) fmt.Println(tokenBucket.TryAcquire()) }
package main import ( "sync" "time" ) // FixedWindowRateLimiter 定义固定窗口限流器 type FixedWindowRateLimiter struct { mu sync.Mutex maxRequests int requestCount int window time.Time // 窗口的起始点,每个窗口长度1s } // NewFixedWindowRateLimiter 创建一个新的固定窗口限流器实例 func NewFixedWindowRateLimiter(maxRequests int) *FixedWindowRateLimiter { return &FixedWindowRateLimiter{ maxRequests: maxRequests, window: time.Now().Truncate(time.Second), } } // TryAcquire 尝试获取请求许可 func (f *FixedWindowRateLimiter) TryAcquire() bool { f.mu.Lock() defer f.mu.Unlock() // 检查是否需要重置窗口 if time.Now().After(f.window.Add(time.Second)) { f.requestCount = 0 f.window = time.Now().Truncate(time.Second) } // 检查是否达到最大请求次数 if f.requestCount >= f.maxRequests { return false } // 请求成功,递增计数器 f.requestCount++ return true } func main() { limiter := NewFixedWindowRateLimiter(5) for i := 0; i < 10; i++ { if limiter.TryAcquire() { fmt.Println("请求通过") } else { fmt.Println("请求被拒绝") } time.Sleep(100 * time.Millisecond) } }
package main import ( "sync" "time" ) // SlidingWindowRateLimiter 定义滑动窗口限流器 type SlidingWindowRateLimiter struct { mu sync.Mutex maxRequests int windowSize time.Duration // 窗口长度 windows []int windowIndex int currentTime time.Time // 上个滑窗的起始点 } // NewSlidingWindowRateLimiter 创建一个新的滑动窗口限流器实例 func NewSlidingWindowRateLimiter(maxRequests int, windowSize time.Duration) *SlidingWindowRateLimiter { numWindows := int(windowSize.Seconds()) return &SlidingWindowRateLimiter{ maxRequests: maxRequests, windowSize: windowSize, windows: make([]int, numWindows), currentTime: time.Now().Truncate(time.Second), windowIndex: 0, } } // TryAcquire 尝试获取请求许可 func (s *SlidingWindowRateLimiter) TryAcquire() bool { s.mu.Lock() defer s.mu.Unlock() // 更新当前时间 currentTime := time.Now().Truncate(time.Second) // 检查是否需要更新窗口 if currentTime.After(s.currentTime.Add(s.windowSize)) { s.currentTime = currentTime s.windowIndex = 0 } else if currentTime.After(s.currentTime.Add(time.Second)) { s.windowIndex = (s.windowIndex + 1) % len(s.windows) } // 清除过期窗口 for i := range s.windows { if currentTime.Before(s.currentTime.Add(time.Duration(i+1)*time.Second)) { break } s.windows[i] = 0 } // 检查是否达到最大请求次数 totalRequests := 0 for _, count := range s.windows { totalRequests += count } if totalRequests >= s.maxRequests { return false } // 请求成功,递增计数器 s.windows[s.windowIndex]++ return true } func main() { limiter := NewSlidingWindowRateLimiter(5, 10*time.Second) for i := 0; i < 10; i++ { if limiter.TryAcquire() { fmt.Println("请求通过") } else { fmt.Println("请求被拒绝") } time.Sleep(100 * time.Millisecond) } }
从单机或者集群的角度看,可以分为单机限流或者集群限流。集群限流一般需要借助 Redis 之类的中间件来记录流量和阈值。换句话说,就是你需要用 Redis 等工具来实现前面提到的限流算法。当然如果是利用网关来实现集群限流,那么可以摆脱 Redis。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。