当前位置:   article > 正文

Go+Redis实现计数器限流和滑动窗口限流

Go+Redis实现计数器限流和滑动窗口限流

Go+Redis实现的并发安全限流器

限流常用有4种算法

  • 计数器
  • 滑动窗口
  • 漏桶算法
  • 令牌桶算法

本文的限流器实现了计数器限流滑动窗口限流,提供了非并发安全和并发安全的实现,方便两者的对比。

至于另外两个限流方式,漏桶算法的实现可以参考uber开源库github.com/uber-go/ratelimit
令牌桶算法可参考github.com/juju/ratelimit

这里就不介绍限流算法的原理,有需要则自己网上先过一遍大概原理,直接贴两种限流方式的设计思路和实现部分代码!!!

完整代码附在最后
计数器限流:

设计思路:核心是通过Redis的Incr和Expire设置过期时间

三个参数, key:限流的Api存储key值,count:限流上限个数,ttl:限流单位时间
1. 先Get key,判断有没有超过限流上限count
2. 没超过上限,可以直接放行,执行Incr。Incr为1的话则说明是限流单位时间区间内第一个请求,需要设置ttl过期时间
3. 超过上限,需要判断ttl是否没设置(因为存在第2步的Incr成功了,但是Expire失败了)
4. 设置了ttl的,说明在限定时间内超过上限,限流不放行
5. 未设置ttl的,用Set+px参数原子性操作设置为1,成功则放行,失败则限流
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

非并发安全代码:

reqCounts, _ := l.conn.Get(key).Int64()
if uint(reqCounts) < count {
	reqCounts, _ =l.conn.Incr(key).Result()
	if reqCounts == 1 {
		l.conn.Expire(key, time.Duration(ttl) * time.Second)
	}
	return true
}

if l.conn.TTL(key).Val() <= 0 {
	err := l.conn.Set(key, 1, time.Duration(ttl) * time.Second).Err()
	if err!=nil{
		log.Println("CountLimit Set Expire Err:", err)
		return false
	}

	return true
}
return false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

并发安全代码:

var luaScript string
luaScript =
	" local key = KEYS[1] " +
		" local ttl = ARGV[2] " +
		" local count = ARGV[1] " +
		" local reqCounts = redis.call('get', key) " +
		" if (not reqCounts or tonumber(reqCounts) < tonumber(count)) then " +
		"	 reqCounts = redis.call('incr', key) " +
		"	 if tonumber(reqCounts) == 1 then " +
		"		 redis.call('expire', key, tonumber(ttl)) " +
		"	 end " +
		"	 return 1 " +
		" end " +
		" if tonumber(redis.call('ttl', key)) <= 0 then " +
		"	 local res = redis.call('set', key, 1, 'ex', tonumber(ttl)) " +
		"	 redis.log(redis.LOG_NOTICE, key..\" not set expire\")	" +
		"	 if res.ok ~= \"OK\" then " +
		"	 	 redis.log(redis.LOG_NOTICE, key..\" set again err\") 	" +
		"		 return 2 " +
		"	 end " +
		"	 return 1 " +
		" end " +
		" return 2 "
result, err := l.conn.Eval(luaScript, []string{key}, count, ttl).Result()

if err !=nil{
	log.Println("SyncCountLimit error:", err)
	return false
}

if utils.GetInt(result) != 1 {
	return false
}

return true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
滑动窗口限流:

设计思路:核心是利用list队列左进右出,个数占位推进代替时间推进(空间代替时间推进的转换)

三个参数, key:限流的Api存储key值,count:限流上限个数,windowTime:滑动窗口时间
1. 判断list队列长度是否超过上限count
2. 没超过上限,直接放行,把当前时间戳(秒)放进去队列
3. 超过上限,判断队列最右边占位的时间戳和当前时间戳的差值是否大于windowTime
4. 小于窗口时间,说明在窗口时间内达到上限,限流不放行
5. 大于窗口时间,说明已推进到新窗口,移除最右边的并且放入当前时间戳到最左边,放行
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

非并发安全代码:

time := time.Now().Unix()
len := l.conn.LLen(key).Val()
if uint(len) < count{
	l.conn.LPush(key, time)
	return true
}

earlyTime,_ := l.conn.LIndex(key, len - 1).Int64()
if time - earlyTime < windowTime{
	return false
}

l.conn.RPop(key)
l.conn.LPush(key, time)
return true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

并发安全代码:

time := time.Now().Unix()
var luaScript string
luaScript =
	"local key = KEYS[1] " +
		"local time = ARGV[3] " +
		"local windowTime = ARGV[2] " +
		"local count = ARGV[1] " +
		"local len = redis.call('llen', key) " +
		"if tonumber(len) < tonumber(count) then " +
		"   redis.call('lpush', key, time) " +
		"	return 1 " +
		"end " +
		"local earlyTime = redis.call('lindex', key, tonumber(len) - 1) " +
		"if tonumber(time) - tonumber(earlyTime) < tonumber(windowTime) then " +
		"	return 2 " +
		"end " +
		"redis.call('rpop', key) " +
		"redis.call('lpush', key, time) " +
		"return 1 "

result, err := l.conn.Eval(luaScript, []string{key}, count, windowTime, time).Result()

if err !=nil{
	log.Println("SyncWindowLimit error:", err)
	return false
}
if utils.GetInt(result) != 1 {
	return false
}
return true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
总结

非并发安全方法,在并发请求操作时候存在竞争问题,导致结果与预期有出入。具体效果如下截图
测试代码,都是5秒内限流2个:

  	i := 10
  	var wait sync.WaitGroup
  	for i > 0{
  		wait.Add(1)
  		go func(index int){
  			res :=l.CountLimit("CountLimit", 2, 5)
  			log.Printf("CountLimit-Index:%v,RequestAllow:%v", index, res)
  			wait.Done()
		}(i)
  		i--
	}
	wait.Wait()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

测试结果如下:每个类型并发10个请求,设定5秒内限流放行2个,跑出来结果,只有SyncCountLimit和SyncWindowLimit是符合预期。
测试图

完整代码自行提取
github:github.com/toegg/ecurrent_limiter
gitee:gitee.com/toegg/ecurrent_limiter

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/468513
推荐阅读
相关标签
  

闽ICP备14008679号