当前位置:   article > 正文

限流算法,基于go的gRPC 实现的_grpc 限流

grpc 限流

目录

一、单机限流

1、令牌桶算法

3、固定窗口限流算法

4、滑动窗口

二、集群限流

1、分布式固定窗口 (基于redis)

2、分布式滑动窗口


一、单机限流

1、令牌桶算法

令牌桶算法是当流量进入系统前需要获取令牌,没有令牌那么就要进行限流

这个算法是怎么实现的呢

  1. 定义一个后台协程按照一定的频率去产生token

  2. 后台协程产生的token 放到固定大小容器里面

  3. 有流量进入系统尝试拿到token,没有token 就需要限流了


  1. type TokenBucketLimiter struct {
  2.   token chan struct{}
  3.   stop  chan struct{}
  4. }
  5. func NewTokenBucket(capactity int, timeInternal time.Duration) *TokenBucketLimiter {
  6.   te := make(chan struct{}, capactity)
  7.   stop := make(chan struct{})
  8.   ticker := time.NewTicker(timeInternal)
  9.   go func() {
  10.      defer ticker.Stop()
  11.      for {
  12.         select {
  13.         case <-ticker.C:
  14.            select {
  15.            case te <- struct{}{}:
  16.            default:
  17.           }
  18.         case <-stop:
  19.            return
  20.         }
  21.     }
  22.   }()
  23.   return &TokenBucketLimiter{
  24.      token: te,
  25.      stop:  stop,
  26.   }
  27. }
  28. func (t *TokenBucketLimiter) BuildServerInterceptor() grpc.UnaryServerInterceptor {
  29.   return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
  30.      select {
  31.      case <-ctx.Done():
  32.         err = ctx.Err()
  33.         return
  34.      case <-t.token:
  35.         return handler(ctx, req)
  36.      case <-t.stop:
  37.         err = errors.New("缺乏保护")
  38.         return
  39.     }
  40.   }
  41. }
  42. func (t *TokenBucketLimiter) Stop() {
  43.   close(t.stop)
  44. }

3、固定窗口限流算法

什么是固定窗口限流算法

固定窗口限流算法(Fixed Window Rate Limiting Algorithm)是一种最简单的限流算法,其原理是在固定时间窗口(单位时间)内限制请求的数量。该算法将时间分成固定的窗口,并在每个窗口内限制请求的数量。具体来说,算法将请求按照时间顺序放入时间窗口中,并计算该时间窗口内的请求数量,如果请求数量超出了限制,则拒绝该请求。

优点:实现简单

缺点:对于瞬时流量没发处理,也就是临界问题,比如下图在20t前后,在16t以及26t有大量流量进来,在这10t中,已经超过了流量限制,没法限流

实现如下

  1. type fixWindow1 struct {
  2.   lastVistTime int64
  3.   vistCount    int64
  4.   interval     int64
  5.   maxCount     int64
  6. }
  7. func NewfixWindow1(macCount int64) *fixWindow1 {
  8.   t := &fixWindow1{
  9.      maxCount: macCount,
  10.   }
  11.   return t
  12. }
  13. func (f *fixWindow1) FixWindow1() grpc.UnaryServerInterceptor {
  14.   return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
  15.      current := time.Now().UnixNano()
  16.      lasttime := atomic.LoadInt64(&f.lastVistTime)
  17.      if lasttime+f.interval > current {
  18.         if atomic.CompareAndSwapInt64(&f.lastVistTime, lasttime, current) {
  19.            atomic.StoreInt64(&f.lastVistTime, current)
  20.            atomic.StoreInt64(&f.maxCount, 0)
  21.         }
  22.     }
  23.      count := atomic.AddInt64(&f.vistCount, 1)
  24.      if count > f.maxCount {
  25.         return gen.GetByIDResp{}, errors.New("触发限流")
  26.     }
  27.      return handler(ctx, req)
  28.   }
  29. }

4、滑动窗口

什么是滑动窗口算法:

滑动窗口限流算法是一种常用的限流算法,用于控制系统对外提供服务的速率,防止系统被过多的请求压垮。它将单位时间周期分为n个小周期,分别记录每个小周期内接口的访问次数,并且根据时间滑动删除过期的小周期。它可以解决固定窗口临界值的问题

type slideWindow struct {
  1.   timeWindow *list.List
  2.   interval   int64
  3.   maxCnt     int
  4.   lock       sync.Mutex
  5. }
  6. func NewSlideWindow(interval time.Duration, maxCnt int) *slideWindow {
  7.   t := &slideWindow{
  8.      timeWindow: list.New(),
  9.      interval:   interval.Nanoseconds(),
  10.      maxCnt:     maxCnt,
  11.   }
  12.   return t
  13. }
  14. func (s *slideWindow) SlideWinowlimit() grpc.UnaryServerInterceptor {
  15.   return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
  16.      s.lock.Lock()
  17.      now := time.Now().UnixNano()
  18.      // 快路径
  19.      if s.timeWindow.Len() < s.maxCnt {
  20.         resp, err = handler(ctx, req)
  21.         s.timeWindow.PushBack(now)
  22.         s.lock.Unlock()
  23.         return
  24.     }
  25.      front := s.timeWindow.Front()
  26.      for front != nil && front.Value.(int64)+s.interval < now {
  27.         s.timeWindow.Remove(front)
  28.         front = s.timeWindow.Front()
  29.     }
  30.      if s.timeWindow.Len() >= s.maxCnt {
  31.         s.lock.Unlock()
  32.         return &gen.GetByIdReq{}, errors.New("触发限流")
  33.     }
  34.      s.lock.Unlock()
  35.      resp, err = handler(ctx, req)
  36.      s.timeWindow.PushBack(now)
  37.      return
  38.   }
  39. }

二、集群限流

下面是分布式限流,为啥是分布式限流,单机限流只能对单台服务器进行限流,没发对集权进行限流,需要用分布式限流来进行集权限流

1、分布式固定窗口 (基于redis)
type redisFix struct {
  1.   serName  string
  2.   interVal int
  3.   limitCnt int
  4.   redis    redis.Cmdable
  5. }
  6. //go:embed lua/fixwindow.lua
  7. var lua_redis_fix string
  8. func NewRedisFix(serName string, interval int, limitCnt int, redis redis.Cmdable) *redisFix {
  9.   t := &redisFix{
  10.      serName:  serName,
  11.      interVal: interval,
  12.      limitCnt: limitCnt,
  13.      redis:    redis,
  14.   }
  15.   return t
  16. }
  17. func (r *redisFix) RedisFix() grpc.UnaryServerInterceptor {
  18.   return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
  19.      res, err := r.limit(ctx)
  20.      if err != nil {
  21.         return &gen.GetByIDResp{}, err
  22.     }
  23.      if res {
  24.         return &gen.GetByIdReq{}, errors.New("触发限流")
  25.     }
  26.      return handler(ctx, req)
  27.   }
  28. }
  29. func (r *redisFix) limit(ctx context.Context) (res bool, err error) {
  30.   keys := []string{r.serName}
  31.   res, err = r.redis.Eval(ctx, lua_redis_fix, keys, r.interVal, r.limitCnt).Bool()
  32.   return
  33. }

lua

local key = KEYS[1]
  1. local limitCnt = tonumber(ARGV[2])
  2. local val = redis.call('get',key)
  3. if val==false then
  4.    if limitCnt<1 then
  5.        return "true"
  6.    else
  7.        redis.call('set',key,1,'PX',ARGV[1])
  8.        return "false"
  9.    end
  10. elseif tonumber(val)<limitCnt then
  11.    redis.call('incr',key)
  12.    return "false"
  13. else
  14.    return "true"
  15. end
2、分布式滑动窗口
//go:embed lua/slidewindow.lua
  1. var slideWindLua string
  2. type redisSlib struct {
  3.   serverName string
  4.   interVal   time.Duration
  5.   maxCnt     int64
  6.   redis      redis.Cmdable
  7. }
  8. func NewRedisSlib(interval time.Duration, maxCnt int64, serverName string, clientCmd redis.Cmdable) *redisSlib {
  9.   t := &redisSlib{
  10.      serverName: serverName,
  11.      interVal:   interval,
  12.      maxCnt:     maxCnt,
  13.      redis:      clientCmd,
  14.   }
  15.   return t
  16. }
  17. func (r *redisSlib) RedisSlibLimt() grpc.UnaryServerInterceptor {
  18.   return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
  19.      limt, err := r.limt(ctx)
  20.      if err != nil {
  21.         return nil, err
  22.     }
  23.      if limt {
  24.         return nil, errors.New("限流")
  25.     }
  26.      return handler(ctx, req)
  27.   }
  28. }
  29. func (r *redisSlib) limt(ctx context.Context) (bool, error) {
  30.   now := time.Now().UnixMilli()
  31.   return r.redis.Eval(ctx, slideWindLua, []string{r.serverName}, r.interVal.Milliseconds(), r.maxCnt, now).Bool()
  32. }

lua

  1. local key = KEYS[1]
  2. local window = tonumber(ARGV[1])
  3. local maxCnt = tonumber(ARGV[2])
  4. local now = tonumber(ARGV[3])
  5. --- 窗口的最小边界
  6. local min = now-window
  7. redis.call('ZREMRANGEBYSCORE',key,'-inf',min)
  8. local cnt = redis.call('ZCOUNT',key,'-inf','+inf')
  9. if cnt>=maxCnt then
  10.    return "true"
  11. else
  12.    redis.call('ZADD',key,now,now)
  13.    redis.call('PEXPIRE',key,window)
  14.    return "false"
  15. end

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/468543
推荐阅读
相关标签
  

闽ICP备14008679号