当前位置:   article > 正文

滑动窗口计算器如何对多个接口限流_分布式高并发系统限流原理与实践

滑动窗口算法实现接口限流

668d9962472ddf4045034d25e0735499.png

分布式三大利器

随着业务的发展壮大,对后端服务的压力也会越来越大,为了打造高效稳定的系统, 产生了分布式,微服务等等系统设计,因为这个原因,设计复杂度也随之增加,基于此 诞生了高并发系统三大利器限流,缓存,降级/熔断

4ec6903970fed0592a8dc158e5c391b0.png

限流: 从系统的流量入口考虑,从进入的流量上进行限制,达到保护系统的作用;

缓存: 将数据库中的数据缓存起来,提升系统访问速度和并发度,保护数据库资源。

降级和熔断比较类似,都是属于过载保护机制,但是实现上有着如下区别

降级: 从系统内部的平级服务或者业务的维度考虑,流量大了,可以暂停或延迟一些非重要服务,如日志收集等等,保护其他正常使用;

熔断: 当某一服务出现了过载现象,为防止整个系统故障,直接关闭该服务或者保证部分请求成功,另一部分返回失败。比如10s内连续请求失败次数达到20次, 触发熔断机制,过滤60%请求。

e1160f4343902884347b47f7285a8e90.png

限流的必要性

本文主要介绍第一利器限流,后续会有详细的文章介绍缓存和降级。

应对秒杀,大促等高性能压力的场景时,为了保证系统的平稳运行,必须针对超过预期的流量,通过预先设定的限流规则选择性的对某些请求进行限流。

在分布式系统中,其接口的调用来自多个系统,有的调用方可能会请求数量突增,过去争夺服务器资源, 而来自其他调用方的接口请求因此来不及响应而排队等待,微服务整体的请求响应时间变长甚至超时。 所以为了防止接口被过度调用,需要对每个调用方进行细粒度的访问限流。

限流方案

网关层限流

acd04c06d069ac62329b41b2327603a1.png

上图是一个请求流量漏斗模型,执行过程依次是 1. 用户请求经过网关将请求下发到服务层 2. 服务层针对每条请求获取缓存数据 3. 缓存没有命中,直接请求数据

网关作为服务的入口,承接了系统整个系统的所有流量,是整个访问链路的源头,具有"一夫当关,万夫莫开"的角色,所以是最适合做限流的的途径。

另外,引入网关层还可以解决分布式系统中的如下问题: 客户端会多次请求不同的微服务,增加了客户端的复杂性 存在跨域请求,在一定场景下处理相对复杂 认证复杂,每个服务都需要独立认证 难以重构,随着项目的迭代,可能需要重新划分微服务。例如,可能将多个服务合并成一个或者将一个服务拆分成多个。如果客户端直接与微服务通信, 那么重构将会很难实施 某些微服务可能使用了防火墙 / 浏览器不友好的协议,直接访问会有一定的困难 恶意请求,存在安全问题

目前主流的网关层有以软件为代表的Nginx,还有Spring Cloud中的Gateway和Zuul这类网关层组件,也有以硬件+软件为代表的F5(F5价钱贵到你怀疑人生)。 除此之外,还有许多大厂开发的组件,比如腾讯里约。

中间件限流

提到中间件首先想到的就是消息队列(Message queue),简称MQ, 顾名思义,消息队列就是一个 存放“消息”的“队列”,有着 FIFO 的特性。这里的“消息”实际就是“数据”的意思,因此消息队列本身就是一个简单的数据结构——队列。

在设计模式上,消息队列是“生产者-消费者”模式的一个经典实现。 一般而言,用户就是请求的“生产者”,而后台服务就是请求的“消费者”,kafka、RabbitMQ,RocketMQ等等都是目前系统中常见的MQ, 在系统中可以达到异步解耦,削峰填谷的作用。

b411b0cbeacf8dc7c2f746f6c69dd93d.png

算法

限流算法常见有三种,分别是计数器、漏桶、令牌桶。

计数器(固定窗口)

计数器算法是限流算法里最简单也是最容易实现的一种算法。比如对于A接口来说,我们1分钟的访问次数不能超过100个。 我们可以设置一个计数器counter,每当一个请求过来的时候,counter就加1,如果counter的值大于100并且该请求与第一个请求的间隔时间还在1分钟之内, 那么说明请求数过多;如果该请求与第一个请求的间隔时间大于1分钟,且counter的值还在限流范围内,那么就重置 counter。

927cabff31cf4df1fd9c3cfc51d3ccff.png

在对计数器进行技术的问题,要注意原子性,防止并发问题,导致计数不准。 代码设计如下:

  1. // CounterLimit 计数器
  2. type CounterLimit struct {
  3. counter int64 //计数器
  4. limit int64 //指定时间窗口内允许的最大请求数
  5. intervalNano int64 //指定的时间窗口
  6. unixNano int64 //unix时间戳,单位为纳秒
  7. }
  8. // NewCounterLimit 初始化
  9. func NewCounterLimit(interval time.Duration, limit int64) *CounterLimit {
  10. return &CounterLimit{
  11. counter: 0,
  12. limit: limit,
  13. intervalNano: int64(interval),
  14. unixNano: time.Now().UnixNano(),
  15. }
  16. }
  17. // Allow 判断当前时间窗口是否允许请求
  18. func (c *CounterLimit) Allow() bool {
  19. now := time.Now().UnixNano()
  20. if now-c.unixNano > c.intervalNano { //如果当前过了当前的时间窗口,则重新进行计数
  21. atomic.StoreInt64(&c.counter, 0)
  22. atomic.StoreInt64(&c.unixNano, now)
  23. return true
  24. }
  25. atomic.AddInt64(&c.counter, 1)
  26. return c.counter < c.limit //判断是否要进行限流
  27. }

滑动窗口

滑动窗口固定窗口的优化版本,主要解决临界问题。

6a8db7a484a81f95252632fc65f50998.png

如上图,假设0:59时,瞬间收到100个请求,并且1:00时候又瞬间收到了100个请求,那么其实这个服务在 1秒里面,收到了200个请求。 我们刚才规定的是1分钟最多100个请求,也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求,可以瞬间超过我们的速率限制,压垮我们的应用。

滑动窗口协议(Sliding Window Protocol),属于TCP协议的一种应用,用于网络数据传输时的流量控制,以避免拥塞的发生。 该协议允许发送方在停止并等待确认前发送多个数据分组。由于发送方不必每发一个分组就停下来等待确认。因此该协议可以加速数据的传输,提高网络吞吐量。

9b55312e43bc241bf0d3489dd1307c89.png

滑动窗口计数器是通过将窗口再细分,并且按照时间"滑动",这种算法避免了固定窗口计数器带来的双倍突发请求,但时间区间的精度越高,算法所需的空间容量就越大。

在上图中,整个红色的矩形框表示一个时间窗口,也就是一分钟。然后我们将时间窗口进行划分,每格代表的是10秒钟,总共6格。每过10秒钟,我们的时间窗口就会 往右滑动一格。每一个格子都有自己独立的计数器counter,假设一个请求 在0:25秒的时候到达,那么0:20~0:29对应的counter就会加1。

滑动窗口怎么解决刚才的临界问题的呢?

0:59到达的100个请求会落在灰色的格子中,而1:00到达的请求会落在橘黄色的格子中。当时间到达1:00时,我们的窗口会往右移动一格,那么此时时间窗口内的 总请求数量一共是200个,超过了限定的100个,所以此时能够检测出来触发了限流。

当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

  1. var (
  2. limitCount int = 10 // 6s限频
  3. limitBucket int = 6 // 滑动窗口个数
  4. curCount int32 = 0 // 记录限频数量
  5. head *ring.Ring // 环形队列(链表)
  6. )
  7. func main() {
  8. tcpAddr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:9090") //获取一个tcpAddr
  9. checkError(err)
  10. listener, err := net.ListenTCP("tcp", tcpAddr) //监听一个端口
  11. checkError(err)
  12. defer listener.Close()
  13. // 初始化滑动窗口
  14. head = ring.New(limitBucket)
  15. for i := 0; i < limitBucket; i++ {
  16. head.Value = 0
  17. head = head.Next()
  18. }
  19. // 启动执行器
  20. go func() {
  21. timer := time.NewTicker(time.Second * 1)
  22. for range timer.C { // 定时每隔1秒刷新一次滑动窗口数据
  23. subCount := int32(0 - head.Value.(int))
  24. newCount := atomic.AddInt32(&curCount, subCount)
  25. arr := [6]int{}
  26. for i := 0; i < limitBucket; i++ { // 这里是为了方便打印
  27. arr[i] = head.Value.(int)
  28. head = head.Next()
  29. }
  30. fmt.Println("move subCount,newCount,arr", subCount, newCount, arr)
  31. head.Value = 0
  32. head = head.Next()
  33. }
  34. }()
  35. for {
  36. conn, err := listener.Accept() // 在此处阻塞,每次来一个请求才往下运行handle函数
  37. if err != nil {
  38. fmt.Println(err)
  39. continue
  40. }
  41. go handle(&conn) // 起一个单独的协程处理,有多少个请求,就起多少个协程,协程之间共享同一个全局变量limiting,对其进行原子操作。
  42. }
  43. }
  44. func handle(conn *net.Conn) {
  45. defer (*conn).Close()
  46. n := atomic.AddInt32(&curCount, 1)
  47. //fmt.Println("handler n:", n)
  48. if n > int32(limitCount) { // 超出限频
  49. atomic.AddInt32(&curCount, -1) // add 1 by atomic,业务处理完毕,放回令牌
  50. (*conn).Write([]byte("HTTP/1.1 404 NOT FOUNDrnrnError, too many request, please try again."))
  51. } else {
  52. mu := sync.Mutex{}
  53. mu.Lock()
  54. pos := head.Prev()
  55. val := pos.Value.(int)
  56. val++
  57. pos.Value = val
  58. mu.Unlock()
  59. time.Sleep(1 * time.Second) // 假设我们的应用处理业务用了1s的时间
  60. (*conn).Write([]byte("HTTP/1.1 200 OKrnrnI can change the world!")) // 业务处理结束后,回复200成功。
  61. }
  62. }
  63. // 异常报错的处理
  64. func checkError(err error) {
  65. if err != nil {
  66. fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
  67. os.Exit(1)
  68. }
  69. }

由于滑动窗口比较难于理解,打印了如下压测日志,压测工具地址 https://github.com/rakyll/hey

  1. $ ./hey -c 6 -n 300 -q 6 -t 80 http://localhost:9090
  2. move subCount,newCount,arr 0 0 [0 0 0 0 0 0]
  3. move subCount,newCount,arr 0 0 [0 0 0 0 0 0]
  4. move subCount,newCount,arr 0 6 [0 0 0 0 0 6]
  5. move subCount,newCount,arr 0 10 [0 0 0 0 6 4]
  6. move subCount,newCount,arr 0 10 [0 0 0 6 4 0]
  7. move subCount,newCount,arr 0 10 [0 0 6 4 0 0]
  8. move subCount,newCount,arr 0 10 [0 6 4 0 0 0]
  9. move subCount,newCount,arr -6 4 [6 4 0 0 0 0]
  10. move subCount,newCount,arr -4 6 [4 0 0 0 0 6]
  11. move subCount,newCount,arr 0 10 [0 0 0 0 6 4]
  12. move subCount,newCount,arr 0 10 [0 0 0 6 4 0]
  13. move subCount,newCount,arr 0 10 [0 0 6 4 0 0]
  14. move subCount,newCount,arr 0 10 [0 6 4 0 0 0]
  15. move subCount,newCount,arr -6 4 [6 4 0 0 0 0]
  16. move subCount,newCount,arr -4 3 [4 0 0 0 0 3]
  17. move subCount,newCount,arr 0 3 [0 0 0 0 3 0]
  18. move subCount,newCount,arr 0 3 [0 0 0 3 0 0]
  19. move subCount,newCount,arr 0 3 [0 0 3 0 0 0]
  20. move subCount,newCount,arr 0 3 [0 3 0 0 0 0]
  21. move subCount,newCount,arr -3 0 [3 0 0 0 0 0]
  22. move subCount,newCount,arr 0 0 [0 0 0 0 0 0]
  23. move subCount,newCount,arr 0 0 [0 0 0 0 0 0]

漏桶

c351d31d15d1b7c66ad19af9a9a54b2c.png

漏桶算法概念如下:

  • 将每个请求视作"水滴"放入"漏桶"进行存储;
  • “漏桶"以固定速率向外"漏"出请求来执行如果"漏桶"空了则停止"漏水”;
  • 如果"漏桶"满了则多余的"水滴"会被直接丢弃。
  1. // BucketLimit 漏桶结构
  2. type BucketLimit struct {
  3. rate float64 //漏桶中水的漏出速率
  4. bucketSize float64 //漏桶最多能装的水大小
  5. unixNano int64 //unix时间戳
  6. curWater float64 //当前桶里面的水
  7. }
  8. // NewBucketLimit 初始化
  9. func NewBucketLimit(rate float64, bucketSize int64) *BucketLimit {
  10. return &BucketLimit{
  11. bucketSize: float64(bucketSize),
  12. rate: rate,
  13. unixNano: time.Now().UnixNano(),
  14. curWater: 0,
  15. }
  16. }
  17. // refresh 更新当前桶的容量
  18. func (b *BucketLimit) refresh() {
  19. now := time.Now().UnixNano()
  20. //时间差, 把纳秒换成秒
  21. diffSec := float64(now-b.unixNano) / 1000 / 1000 / 1000
  22. b.curWater = math.Max(0, b.curWater-diffSec*b.rate)
  23. b.unixNano = now
  24. return
  25. }
  26. // Allow 允许请求,是否超过桶的容量
  27. func (b *BucketLimit) Allow() bool {
  28. b.refresh()
  29. if b.curWater < b.bucketSize {
  30. b.curWater = b.curWater + 1
  31. return true
  32. }
  33. return false
  34. }

漏桶算法也存在着明显缺陷,当短时间内有大量的突发请求时,即便此时服务器没有任何负载,每个请求也都得在队列中等待一段时间才能被响应。

令牌桶

对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。 令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

fd7be947736022ef5b2f3d1087ddbeb2.png
  1. // TokenBucket 令牌桶
  2. type TokenBucket struct {
  3. rate int // 令牌放入速度
  4. tokenSize int // 令牌桶的容量
  5. curNum int // 当前桶中token
  6. }
  7. // NewTokenBucket 初始化
  8. func NewTokenBucket(rate, tokenSize int) *TokenBucket {
  9. return &TokenBucket{
  10. rate: rate,
  11. tokenSize: tokenSize,
  12. }
  13. }
  14. // PushToken 在桶中存放token
  15. func (t *TokenBucket) PushToken() chan struct{} {
  16. tb := make(chan struct{}, t.tokenSize)
  17. ticker := time.NewTicker(time.Duration(1000) * time.Microsecond)
  18. //初始化token
  19. for i := 0; i < t.tokenSize; i++ {
  20. tb <- struct{}{}
  21. }
  22. t.curNum = t.tokenSize
  23. // 指定速率放入token
  24. go func() {
  25. for {
  26. for i := 0; i < t.rate; i++ {
  27. tb <- struct{}{}
  28. }
  29. t.curNum += t.rate
  30. if t.curNum > t.tokenSize {
  31. t.curNum = t.tokenSize
  32. }
  33. <-ticker.C
  34. }
  35. }()
  36. return tb
  37. }
  38. // popToken 取出token
  39. func (t *TokenBucket) PopToken(bucket chan struct{}, n int) {
  40. for i := 0; i < n; i++ {
  41. _, ok := <-bucket
  42. if ok {
  43. t.curNum -= 1
  44. fmt.Println("get token success")
  45. } else {
  46. fmt.Println("get token fail")
  47. }
  48. }
  49. }

总结

文中涉及代码已上传至代码库,参见 https://github.com/MerlinFeng/codenote/tree/main/rate_limit

在实际的系统设计中,令牌桶和漏桶算法的应用是比较广泛的,分别有着不同的使用场景。

令牌桶可以用来保护自己,主要用来对上游调用者频率进行限流,为的是让自己不被打垮。 所以如果自己本身有处理能力的时候,如果流量突发(实际消费能力强于配置的流量限制),那么实际处理速率可以超过配置的限制。

漏桶算法,用来保护他人,也就是保护他所调用的下游系统。主要场景是,当调用的第三方系统本身没有保护机制,或者有流量限制的时候, 我们的调用速度不能超过他的限制,由于我们不能更改第三方系统,所以只有在主调方控制。这个时候,即使流量突发,也必须舍弃。 因为消费能力是第三方决定的。

442d074281fb0403b24e9ddbfdcbc91c.png
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/274862
推荐阅读
相关标签
  

闽ICP备14008679号