赞
踩
目录
令牌桶算法是当流量进入系统前需要获取令牌,没有令牌那么就要进行限流
这个算法是怎么实现的呢
定义一个后台协程按照一定的频率去产生token
后台协程产生的token 放到固定大小容器里面
有流量进入系统尝试拿到token,没有token 就需要限流了
- type TokenBucketLimiter struct {
- token chan struct{}
- stop chan struct{}
- }
-
- func NewTokenBucket(capactity int, timeInternal time.Duration) *TokenBucketLimiter {
- te := make(chan struct{}, capactity)
- stop := make(chan struct{})
- ticker := time.NewTicker(timeInternal)
- go func() {
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- select {
- case te <- struct{}{}:
- default:
-
- }
- case <-stop:
- return
- }
- }
- }()
- return &TokenBucketLimiter{
- token: te,
- stop: stop,
- }
- }
-
- func (t *TokenBucketLimiter) BuildServerInterceptor() grpc.UnaryServerInterceptor {
- return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
- select {
- case <-ctx.Done():
- err = ctx.Err()
- return
- case <-t.token:
- return handler(ctx, req)
- case <-t.stop:
- err = errors.New("缺乏保护")
- return
- }
- }
- }
-
- func (t *TokenBucketLimiter) Stop() {
- close(t.stop)
- }
什么是固定窗口限流算法
固定窗口限流算法(Fixed Window Rate Limiting Algorithm)是一种最简单的限流算法,其原理是在固定时间窗口(单位时间)内限制请求的数量。该算法将时间分成固定的窗口,并在每个窗口内限制请求的数量。具体来说,算法将请求按照时间顺序放入时间窗口中,并计算该时间窗口内的请求数量,如果请求数量超出了限制,则拒绝该请求。
优点:实现简单
缺点:对于瞬时流量没发处理,也就是临界问题,比如下图在20t前后,在16t以及26t有大量流量进来,在这10t中,已经超过了流量限制,没法限流
实现如下
- type fixWindow1 struct {
- lastVistTime int64
- vistCount int64
- interval int64
- maxCount int64
- }
-
- func NewfixWindow1(macCount int64) *fixWindow1 {
- t := &fixWindow1{
- maxCount: macCount,
- }
- return t
- }
-
- func (f *fixWindow1) FixWindow1() grpc.UnaryServerInterceptor {
- return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
- current := time.Now().UnixNano()
- lasttime := atomic.LoadInt64(&f.lastVistTime)
- if lasttime+f.interval > current {
- if atomic.CompareAndSwapInt64(&f.lastVistTime, lasttime, current) {
- atomic.StoreInt64(&f.lastVistTime, current)
- atomic.StoreInt64(&f.maxCount, 0)
- }
- }
- count := atomic.AddInt64(&f.vistCount, 1)
- if count > f.maxCount {
- return gen.GetByIDResp{}, errors.New("触发限流")
- }
- return handler(ctx, req)
- }
- }
什么是滑动窗口算法:
滑动窗口限流算法是一种常用的限流算法,用于控制系统对外提供服务的速率,防止系统被过多的请求压垮。它将单位时间周期分为n
个小周期,分别记录每个小周期内接口的访问次数,并且根据时间滑动删除过期的小周期。它可以解决固定窗口临界值的问题。
type slideWindow struct {
-
- timeWindow *list.List
- interval int64
- maxCnt int
- lock sync.Mutex
- }
-
- func NewSlideWindow(interval time.Duration, maxCnt int) *slideWindow {
- t := &slideWindow{
- timeWindow: list.New(),
- interval: interval.Nanoseconds(),
- maxCnt: maxCnt,
- }
- return t
- }
-
- func (s *slideWindow) SlideWinowlimit() grpc.UnaryServerInterceptor {
- return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
- s.lock.Lock()
- now := time.Now().UnixNano()
- // 快路径
- if s.timeWindow.Len() < s.maxCnt {
- resp, err = handler(ctx, req)
- s.timeWindow.PushBack(now)
- s.lock.Unlock()
- return
- }
- front := s.timeWindow.Front()
- for front != nil && front.Value.(int64)+s.interval < now {
- s.timeWindow.Remove(front)
- front = s.timeWindow.Front()
- }
- if s.timeWindow.Len() >= s.maxCnt {
- s.lock.Unlock()
- return &gen.GetByIdReq{}, errors.New("触发限流")
- }
- s.lock.Unlock()
- resp, err = handler(ctx, req)
- s.timeWindow.PushBack(now)
- return
- }
- }
下面是分布式限流,为啥是分布式限流,单机限流只能对单台服务器进行限流,没发对集权进行限流,需要用分布式限流来进行集权限流
type redisFix struct {
-
- serName string
- interVal int
- limitCnt int
- redis redis.Cmdable
- }
-
- //go:embed lua/fixwindow.lua
- var lua_redis_fix string
-
- func NewRedisFix(serName string, interval int, limitCnt int, redis redis.Cmdable) *redisFix {
- t := &redisFix{
- serName: serName,
- interVal: interval,
- limitCnt: limitCnt,
- redis: redis,
- }
- return t
- }
-
- func (r *redisFix) RedisFix() grpc.UnaryServerInterceptor {
- return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
- res, err := r.limit(ctx)
- if err != nil {
- return &gen.GetByIDResp{}, err
- }
- if res {
- return &gen.GetByIdReq{}, errors.New("触发限流")
- }
- return handler(ctx, req)
- }
- }
-
- func (r *redisFix) limit(ctx context.Context) (res bool, err error) {
- keys := []string{r.serName}
- res, err = r.redis.Eval(ctx, lua_redis_fix, keys, r.interVal, r.limitCnt).Bool()
- return
- }
lua
local key = KEYS[1]
-
- local limitCnt = tonumber(ARGV[2])
- local val = redis.call('get',key)
- if val==false then
- if limitCnt<1 then
- return "true"
- else
- redis.call('set',key,1,'PX',ARGV[1])
- return "false"
- end
- elseif tonumber(val)<limitCnt then
- redis.call('incr',key)
- return "false"
- else
- return "true"
- end
//go:embed lua/slidewindow.lua
-
- var slideWindLua string
-
- type redisSlib struct {
- serverName string
- interVal time.Duration
- maxCnt int64
- redis redis.Cmdable
- }
-
- func NewRedisSlib(interval time.Duration, maxCnt int64, serverName string, clientCmd redis.Cmdable) *redisSlib {
- t := &redisSlib{
- serverName: serverName,
- interVal: interval,
- maxCnt: maxCnt,
- redis: clientCmd,
- }
- return t
- }
-
- func (r *redisSlib) RedisSlibLimt() grpc.UnaryServerInterceptor {
- return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
- limt, err := r.limt(ctx)
- if err != nil {
- return nil, err
- }
- if limt {
- return nil, errors.New("限流")
- }
- return handler(ctx, req)
- }
- }
-
- func (r *redisSlib) limt(ctx context.Context) (bool, error) {
- now := time.Now().UnixMilli()
- return r.redis.Eval(ctx, slideWindLua, []string{r.serverName}, r.interVal.Milliseconds(), r.maxCnt, now).Bool()
- }
lua
- local key = KEYS[1]
- local window = tonumber(ARGV[1])
- local maxCnt = tonumber(ARGV[2])
- local now = tonumber(ARGV[3])
-
- --- 窗口的最小边界
- local min = now-window
-
- redis.call('ZREMRANGEBYSCORE',key,'-inf',min)
-
- local cnt = redis.call('ZCOUNT',key,'-inf','+inf')
-
- if cnt>=maxCnt then
- return "true"
- else
- redis.call('ZADD',key,now,now)
- redis.call('PEXPIRE',key,window)
- return "false"
- end
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。