当前位置:   article > 正文

Redis实现分布式限流(学习笔记

redis实现分布式限流

Redis实现分布式限流(学习笔记2022.07.09)

前言:

以下实现都是基于: spring-boot-starter-web + spring-boot-starter-data-redis (怎么配置连接Redis就不在这里描述)

单机的限流可以使用 Google 的 guava

Redis的大多限流算法原理是, 设置一个带有过期时间的 缓存, 缓存的值不断变化, 在过期前如果次数达到限流阀值, 则触发限流后的逻辑处理!

因为存在大量并发的情况下, 所以如何保证操作都是原子性的, 操作命令不可分割, 要么都成功, 要么都失败?

Redis单个命令都是原子性, 但如果需要组合多个命令都要原子性, 这就需要借助Redis2.6版本后Redis执行Lua脚本解决

Redis中使用EVAL命令来直接执行指定的Lua脚本

EVAL luascript numkeys key [key ...] arg [arg ...]
  • 1
  • EVAL 命令的关键字。
  • luascript Lua 脚本。
  • numkeys 指定的Lua脚本需要处理键的数量,其实就是 key数组的长度。
  • key 传递给Lua脚本零到多个键,空格隔开,在Lua 脚本中通过 KEYS[INDEX]来获取对应的值,其中1 <= INDEX <= numkeys
  • arg是传递给脚本的零到多个附加参数,空格隔开,在Lua脚本中通过ARGV[INDEX]来获取对应的值,其中1 <= INDEX <= numkeys

限流的常见算法有以下三种:

  • 滑动时间窗口算法
  • 漏桶算法
  • 令牌算法

1.0 滑动时间窗口算法Redis实现

滑动时间算法指的是以当前时间为截止时间,往前取一定的时间,比如往前取 60s 的时间,在这 60s 之内运行最大的访问数为 100,此时算法的执行逻辑为,先清除 60s 之前的所有请求记录,再计算当前集合内请求数量是否大于设定的最大请求数 100,如果大于则执行限流拒绝策略,否则插入本次请求记录并返回可以正常执行的标识给客户端。

这种算法实现起来较为复杂,并且需要记录窗口周期内的请求,如果限流阈值设置过大,窗口周期内记录的请求就会很多,就会比较占用内存

jwZ2pq.png

可以借助 Redis 的有序集合 ZSet 来实现时间窗口算法限流,

实现的过程是先使用 ZSet 的 key 存储限流的 score为毫秒时间戳,value也使用毫秒时间戳(比UUID更加节省内存) 用来存储请求的时间,每次有请求访问来了之后,先清空之前时间窗口的访问量,统计现在时间窗口的个数和最大允许访问量对比,如果大于等于最大访问量则返回 false 执行限流操作,true负责允许执行业务逻辑,并且在 ZSet 中添加一条有效的访问记录

1.0.1 编辑操作Redis的Lua 脚本 (菜鸟教程)

可以通过代码方式直接编写, 或者通过文件方式, 以下是通过文件方式

文件放在项目中的 /resources/redislua/slide-limit.lua, 使用classpath类路径加载

-- 下标从 1 开始 获取key
local key = KEYS[1]
-- 下标从 1 开始 获取参数,  Lua  tonumber这个函数会尝试将它的参数转换为数字
local now = tonumber(ARGV[1]) 			   -- 当前时间戳
local slideExpiredTime = tonumber(ARGV[2]) -- 当前key 滑动窗口之外过期时间
local max = tonumber(ARGV[3])              -- 最大限流次数
local expiredTime = tonumber(ARGV[4])      -- 缓存过期时间

-- 移除滑动窗口之外的数据 Redis ZREMRANGEBYSCORE 命令移除有序集key中,所有score值介于min和max之间(包括等于min或max)的成员。
-- 移除指定分数区间内的所有元素,expired 即已经过期的 score, 根据当前时间毫秒数 - 超时毫秒数,得到过期时间 slideExpiredTime
redis.call('zremrangebyscore', key, 0, slideExpiredTime)

-- 每次访问均重新设置 zset 的过期时间,单位毫秒
redis.call("pexpire", key, expiredTime)

-- 获取 zset 中的当前元素个数 , Redis ZCARD 命令用于返回有序集的成员个数
local current = tonumber(redis.call('zcard', key))
local next = current + 1
-- 判断当前集合内个数+1 后是否超出最大限制次数
if next > max then
  -- 达到限流大小 返回 0
  return 0
-- 否则添加进入zset集合
else
  -- 往 zset 中添加一个 Score、value均为当前时间戳的元素,[score,value]
  redis.call("zadd", key, now, now)

  -- PEXPIRE 命令和 EXPIRE 命令的作用类似,但是它以毫秒为单位设置 key 的生存时间,而不像 EXPIRE 命令那样,以秒为单位
  -- 返回当前次数
  return next
end
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

1.0.2 配置脚本类

@Configuration
public class RedisConfig {

    @Bean
    public RedisScript<Long> slideLimitRedisScript() {
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
//        redisScript.setScriptText();  直接设置脚本字符串
        redisScript.setResultType(Long.class);
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("redislua/slide-limit.lua")));
        return redisScript;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

1.0.3 进行测试

@Autowired
private RedisTemplate<String, String> redisTemplate;
@Resource(name = "slideLimitRedisScript")
private RedisScript<Long> redisScript;
@Test
public void myEST() throws Exception {
    // 并发30个线程执行, 使用信号灯模拟同时调用  (预想效果是 前20个请求只有5个成功, 然后后面10个请求5个成功)
    Semaphore semaphore = new Semaphore(30);
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(30);
    for (int i = 0; i < 30; i++) {
        int j = i;
        pool.execute(() -> {
            try {
                int i1;
                if (j > 20) {
                    i1 = j + 3100;
                    loggers.info("延迟3秒后执行的线程10个");
                } else {
                    i1 = j + 50;
                }

                // 睡眠
                ThreadUtil.sleep(1500 + i1);
                // 获取许可证
                semaphore.acquire();
                // 执行方法
                String result = this.executeMethod();
                loggers.info("executeMethod===, i:{}, ===, result:{}", j, result);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                semaphore.release();
            }
        });
    }
    TimeUnit.SECONDS.sleep(15);
}

private String executeMethod() {
    // 假设以下是通过注解获取到的配置, 或者其他地方获取到的配置
    // (限流时间内允许的) 最大请求
    long max = 5L;
    // 缓存key
    String key = "zhihao123";
    // 滑动窗口时间(限流时间范围)
    long period = 3L;
    // 限流时间单位, 默认秒
    TimeUnit timeUnit = TimeUnit.SECONDS;
    // 缓存的过期时间, 转成毫秒
    long expiredTime = timeUnit.toMillis(period);
    // 当前时间戳
    long currentTimeMillis = System.currentTimeMillis();
    // 当前时间 - 间隔时间 = 滑动窗口外数据过期时间
    long slideExpiredTime = currentTimeMillis - expiredTime;
    // 执行lua脚本判断是否达到限流    脚本对象,   keys集合,  参数集合(根据脚本索引放)
    Long result = redisTemplate.execute(redisScript, Collections.singletonList(key),
            String.valueOf(currentTimeMillis), String.valueOf(slideExpiredTime), String.valueOf(max),String.valueOf(expiredTime));
    result = Optional.ofNullable(result).orElse(-1L);
    if (result.intValue() == 0L) {
        return "限流了!!!";
    } else {
        return "正常执行!!!";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

1.0.4 结果:

2022-07-07 17:21:42.120  INFO 230408 --- [ool-1-thread-27] com.zhihao.demo.DemoApplicationTests     : 延迟3秒后执行的线程10个
2022-07-07 17:21:42.119  INFO 230408 --- [ool-1-thread-25] com.zhihao.demo.DemoApplicationTests     : 延迟3秒后执行的线程10个
2022-07-07 17:21:42.119  INFO 230408 --- [ool-1-thread-29] com.zhihao.demo.DemoApplicationTests     : 延迟3秒后执行的线程10个
2022-07-07 17:21:42.120  INFO 230408 --- [ool-1-thread-22] com.zhihao.demo.DemoApplicationTests     : 延迟3秒后执行的线程10个
2022-07-07 17:21:42.120  INFO 230408 --- [ool-1-thread-30] com.zhihao.demo.DemoApplicationTests     : 延迟3秒后执行的线程10个
2022-07-07 17:21:42.119  INFO 230408 --- [ool-1-thread-23] com.zhihao.demo.DemoApplicationTests     : 延迟3秒后执行的线程10个
2022-07-07 17:21:42.119  INFO 230408 --- [ool-1-thread-28] com.zhihao.demo.DemoApplicationTests     : 延迟3秒后执行的线程10个
2022-07-07 17:21:42.120  INFO 230408 --- [ool-1-thread-26] com.zhihao.demo.DemoApplicationTests     : 延迟3秒后执行的线程10个
2022-07-07 17:21:42.120  INFO 230408 --- [ool-1-thread-24] com.zhihao.demo.DemoApplicationTests     : 延迟3秒后执行的线程10个
2022-07-07 17:21:44.187  INFO 230408 --- [pool-1-thread-2] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:1, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-19] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:18, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-12] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:11, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-18] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:17, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-20] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:19, ===, result:正常执行!!!
2022-07-07 17:21:44.187  INFO 230408 --- [pool-1-thread-6] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:5, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-17] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:16, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-10] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:9, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [pool-1-thread-8] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:7, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [pool-1-thread-7] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:6, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-13] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:12, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [pool-1-thread-9] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:8, ===, result:正常执行!!!
2022-07-07 17:21:44.187  INFO 230408 --- [pool-1-thread-4] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:3, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-21] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:20, ===, result:正常执行!!!
2022-07-07 17:21:44.187  INFO 230408 --- [pool-1-thread-3] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:2, ===, result:正常执行!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-14] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:13, ===, result:正常执行!!!
2022-07-07 17:21:44.187  INFO 230408 --- [pool-1-thread-5] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:4, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-11] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:10, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [pool-1-thread-1] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:0, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-16] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:15, ===, result:限流了!!!
2022-07-07 17:21:44.187  INFO 230408 --- [ool-1-thread-15] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:14, ===, result:限流了!!!
## ===========================================分割 3秒后的线程=====================================
2022-07-07 17:21:46.743  INFO 230408 --- [ool-1-thread-22] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:21, ===, result:正常执行!!!
2022-07-07 17:21:46.744  INFO 230408 --- [ool-1-thread-23] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:22, ===, result:正常执行!!!
2022-07-07 17:21:46.744  INFO 230408 --- [ool-1-thread-24] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:23, ===, result:正常执行!!!
2022-07-07 17:21:46.745  INFO 230408 --- [ool-1-thread-25] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:24, ===, result:正常执行!!!
2022-07-07 17:21:46.746  INFO 230408 --- [ool-1-thread-26] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:25, ===, result:正常执行!!!
2022-07-07 17:21:46.747  INFO 230408 --- [ool-1-thread-27] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:26, ===, result:限流了!!!
2022-07-07 17:21:46.748  INFO 230408 --- [ool-1-thread-29] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:28, ===, result:限流了!!!
2022-07-07 17:21:46.748  INFO 230408 --- [ool-1-thread-28] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:27, ===, result:限流了!!!
2022-07-07 17:21:46.750  INFO 230408 --- [ool-1-thread-30] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:29, ===, result:限流了!!!
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

2.0 漏桶算法Redis实现

滑动时间算法有一个问题就是在一定范围内,比如 60s 内只能有 10 个请求,当第一秒时就到达了 10 个请求,那么剩下的 59s 只能把所有的请求都给拒绝掉,而漏桶算法可以解决这个问题。

jwLDED.png

无论上面的水流倒入漏斗有多大,也就是无论请求有多少,它都是以均匀的速度慢慢流出的。当上面的水流速度大于下面的流出速度时,漏斗会慢慢变满,当漏斗满了之后就会丢弃新来的请求;当上面的水流速度小于下面流出的速度的话,漏斗永远不会被装满,并且可以一直流出。

漏桶算法的实现步骤是,先声明一个队列用来保存请求,这个队列相当于漏斗,当队列容量满了之后就放弃新来的请求,然后重新声明一个线程定期从任务队列中获取一个或多个任务进行执行,这样就实现了漏桶算法。

Nginx 中的 limit_req 模块的底层实现就是用的这种算法,具体可参考【NGINX 和 NGINX Plus 的速率限制】(https://www.nginx.com/blog/rate-limiting-nginx)

可以使用 Redis 4.0 以上版本中提供的 Redis-Cell 模块,该模块使用的是漏斗算法,并且提供了原子的限流指令,而且依靠 Redis 这个天生的分布式程序就可以实现比较完美的限流了。

PS: 该模块需自己安装 https://github.com/brandur/redis-cell 也是推荐使用 (这里就不做实现了, 人家实现的已经经过市场考验, 自己实现可能很多场景和特殊情况没有考虑到!)

网上的伪代码:

long timeStamp = getNowTime(); 
int capacity = 10000;// 桶的容量,即最大承载值
int rate = 1;//水漏出的速度,即服务器的处理请求的能力
int water = 100;//当前水量,即当前的即时请求压力
 
//当前请求线程进入漏桶方法,true则不被拒绝,false则说明当前服务器负载水量不足,则被拒绝
public static bool control() {
long  now = getNowTime();//当前请求时间
//先执行漏水代码
//rate是固定的代表服务器的处理能力,所以可以认为“时间间隔*rate”即为漏出的水量
    water = Math.max(0, water - (now - timeStamp) * rate);//请求时间-上次请求时间=时间间隔
    timeStamp = now;//更新时间,为下次请求计算间隔做准备
    if (water < capacity) { // 执行漏水代码后,发现漏桶未满,则可以继续加水,即没有到服务器可以承担的上线
        water ++; 
        return true; 
    } else { 
        return false;//水满,拒绝加水,到服务器可以承担的上线,拒绝请求
   } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3.0 令牌算法Redis实现

令牌桶算法以一个设定的速率产生令牌并放入令牌桶,每次用户请求都得申请令牌,如果令牌不足,则拒绝请求。
令牌桶算法中新请求到来时会从桶里拿走一个令牌,如果桶内没有令牌可拿,就拒绝服务。当然,令牌的数量也是有上限的。令牌的数量与时间和发放速率强相关,时间流逝的时间越长,会不断往桶里加入越多的令牌,如果令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶,如图所示

jD3uff.png

令牌的发送速率可以设置,从而可以对突发的出口流量进行有效的应对

实现方式:

1- 提供程序定时, 向桶中放入令牌的方法是启动一个线程,每隔Y单位时间增加一次令牌数量,或者在Timer中定时执行这一过程。

这种方式一旦有多个令牌限流器的情况下 : 一是浪费线程资源,二是因为调度的问题执行时间不精确。 假设以活跃用户维度做限流, 那是会同时存在几千个限流器, 那就需要上千个线程资源添加令牌。

2- 这种方式是记录添加令牌的时间, 每次获取令牌的时候, 先校验当前时间 减 添加令牌的时间 = 间隔时间, 符合添加令牌时间后进行触发计算需要添加多少个令牌, 如果间隔时间过长直接将桶的令牌添加为满桶状态, 添加完令牌了, 在触发取令牌, 没有请求自然也不会触发限流和添加令牌。

相比漏桶算法,令牌桶算法可以在运行时控制和调整数据处理的速率,处理某时的突发流量。放令牌的频率增加可以提升整体数据处理的速度,而通过每次获取令牌的个数增加或者放慢令牌的发放速度和降低整体数据处理速度。而漏桶算法不行,因为它的流出速率是固定的,程序处理速度也是固定的。

3.0.1 编写Lua脚本

返回值需要返回多个使用 local ret={} ret[1]=1 ret[1]="13" 代码接收使用List<Object>

-- 要进行限流的Key   (返回 0:限流, 1:没有限流) 				
local key = KEYS[1]
-- 请求消耗的令牌数,每个请求消耗一个
local consume_permits = tonumber(ARGV[1])
-- 当前时间戳 (毫秒)
local curr_time = tonumber(ARGV[2])
-- 获取令牌桶hash数据结构
local limiter_info = redis.pcall("HMGET", key, "last_grant_time", "curr_permits", "bucket_cap", "rate")
-- 返回-1是没有配置限流
if not limiter_info[3] then
    return -1
end
-- 以下和令牌桶相关的参数, 使用项目启动时候初始化或者对应key同步方式初始化缓存,
-- 但是这种方式会导致这个令牌桶的缓存一直存在, 如果限流的颗粒度比较细, 会造成一些没有使用的令牌缓存浪费,
-- 这个时候就可以将桶参数传递方式进来, 先获取, 不存在的情况下, 初始化并添加过期时间
-- 上一次令牌的发放时间
local last_grant_time = tonumber(limiter_info[1]) or 0
-- 当前令牌数量
local curr_permits = tonumber(limiter_info[2]) or 0
-- 令牌桶容量
local bucket_cap = tonumber(limiter_info[3]) or 0
-- 令牌每个时间单位生成多少个,  目前这里是  /s
local rate = tonumber(limiter_info[4]) or 0
-- 发送时间单位  秒    1000毫秒=1秒  到时候也可以配置
local inflow_unit = 1000

-- 是否触发了发放令牌, 默认发放
local is_grant = true
local past_time = curr_time-last_grant_time
-- 大于说明需要发放令牌了否则反之
if past_time < inflow_unit then
    is_grant = false
end
-- 计算最终更新回缓存的当前令牌数
local total_permits = curr_permits
-- 需要发放计算需要发放令牌数
if is_grant then
-- 当前时间戳 - 上一次发放令牌时间戳  *  发放速度率 = 需要发放的令牌数
-- 预计投放数量 = (距上次投放过去的时间差)/投放的时间间隔1000) *每单位时间投放的数量 1
    local new_permits = math.floor((curr_time-last_grant_time)/1000) * rate
    total_permits = new_permits + curr_permits
end

-- 计算的总令牌数  >  桶容量, 则以桶容量发满令牌
if total_permits > bucket_cap then
    total_permits = bucket_cap
end

-- 返回是否限流结果, 默认是没有限流
local result = 1
-- 判断: 总令牌数 > 需要消耗的令牌
if total_permits >= consume_permits then
    -- 进行扣除令牌
    total_permits = total_permits - consume_permits
else
    -- 否则说明没有令牌可扣, 限流, 桶内没有令牌可获取
    result = 0
end
-- 根据结果, 做只更减少令牌数,   还是发放令牌与记录发放时间戳
if is_grant then
    -- 更新缓存数据   HMSET 可以更新多个字段
    redis.pcall("HMSET", key, "curr_permits", total_permits, "last_grant_time", curr_time)
else
    -- 更新缓存数据   HSET 更新单个字段
    redis.pcall("HSET", key, "curr_permits", total_permits)
end
-- 返回
return result
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

3.0.2 配置脚本类

@Configuration
public class RedisConfig {
    @Bean
    public RedisScript<Long> tokenRedisScript() {
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Long.class);
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("redislua/token-limit.lua")));
        return redisScript;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

3.0.3 进行测试

	@Autowired
    private RedisTemplate<String, String> redisTemplate;
    @Resource(name = "tokenRedisScript")
    private RedisScript<Long> redisScript;

    @Test
    public void myEST() throws Exception {
        // 并发30个线程执行, 使用信号灯模拟同时调用  (预想效果是 前20个请求只有10个成功, 后10个因为只过去了5秒, 才补充了5个令牌所以是5个成功)
        Semaphore semaphore = new Semaphore(20);
        ExecutorService pool = Executors.newFixedThreadPool(30);
        // 初始化缓存,  项目启动完毕初始化或者分布式锁先初始化, 甚至你在这里可以设置过期, 传递过期时间, 脚本里面每次执行都重置一下过期时间, 以节约非热点限流器资源
//        lock.lock();
//        String key = "zhihao123";
//        // 不存在, 进行初始化
//        if (!redisTemplate.hasKey(key)){
//            BoundHashOperations<String, String, String> hashOps = redisTemplate.boundHashOps(key);
             // 上一次添加令牌时间
//            hashOps.put("last_grant_time",String.valueOf(System.currentTimeMillis()));
            // 当前令牌数
//            hashOps.put("curr_permits",String.valueOf(10L));
            // 令牌桶
//            hashOps.put("bucket_cap",String.valueOf(10L));
            // 每隔多少时间生成多少令牌 (目前脚本里面是  1/s)
//            hashOps.put("rate",String.valueOf(1L));
//        }
//        lock.unlock();

        CountDownLatch countDownLatch = new CountDownLatch(30);
        for (int i = 0; i < 30; i++) {
            int j = i;
            pool.execute(() -> {
                try {
                    int i1;
                    if (j >= 20) {
                        i1 = j + 5100;
                        loggers.info("延迟5秒后执行的线程10个");
                    } else {
                        i1 = j + 50;
                    }

                    // 睡眠
                    ThreadUtil.sleep(1500 + i1);
                    // 获取许可证
                    semaphore.acquire();
                    // 执行方法
                    String result = this.executeMethod();
                    loggers.info("executeMethod===, i:{}, ===, result:{}", j, result);
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    semaphore.release();
                }
            });
        }
        countDownLatch.await();
    }

    private String executeMethod() {
        String key = "zhihao123";
        long currentTimeMillis = System.currentTimeMillis();
        Long result = redisTemplate.execute(redisScript, Collections.singletonList(key),
                String.valueOf(1L), String.valueOf(currentTimeMillis));
        result = Optional.ofNullable(result).orElse(-1L);
        if (result.intValue() == 0L) {
            return "限流了!!!";
        } else {
            return "正常执行!!!";
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

3.0.4 结果

2022-07-09 02:48:46.132  INFO 253832 --- [ool-1-thread-28] com.zhihao.demo.DemoApplicationTests     : 延迟5秒后执行的线程10个
2022-07-09 02:48:46.132  INFO 253832 --- [ool-1-thread-29] com.zhihao.demo.DemoApplicationTests     : 延迟5秒后执行的线程10个
2022-07-09 02:48:46.132  INFO 253832 --- [ool-1-thread-22] com.zhihao.demo.DemoApplicationTests     : 延迟5秒后执行的线程10个
2022-07-09 19:48:46.132  INFO 253832 --- [ool-1-thread-30] com.zhihao.demo.DemoApplicationTests     : 延迟5秒后执行的线程10个
2022-07-09 19:48:46.132  INFO 253832 --- [ool-1-thread-23] com.zhihao.demo.DemoApplicationTests     : 延迟5秒后执行的线程10个
2022-07-09 19:48:46.132  INFO 253832 --- [ool-1-thread-24] com.zhihao.demo.DemoApplicationTests     : 延迟5秒后执行的线程10个
2022-07-09 19:48:46.132  INFO 253832 --- [ool-1-thread-26] com.zhihao.demo.DemoApplicationTests     : 延迟5秒后执行的线程10个
2022-07-09 19:48:46.132  INFO 253832 --- [ool-1-thread-25] com.zhihao.demo.DemoApplicationTests     : 延迟5秒后执行的线程10个
2022-07-09 19:48:46.132  INFO 253832 --- [ool-1-thread-21] com.zhihao.demo.DemoApplicationTests     : 延迟5秒后执行的线程10个
2022-07-09 19:48:46.132  INFO 253832 --- [ool-1-thread-27] com.zhihao.demo.DemoApplicationTests     : 延迟5秒后执行的线程10个
2022-07-09 19:48:48.817  INFO 253832 --- [ool-1-thread-16] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:15, ===, result:正常执行!!!
2022-07-09 19:48:48.819  INFO 253832 --- [ool-1-thread-14] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:13, ===, result:正常执行!!!
2022-07-09 19:48:48.818  INFO 253832 --- [ool-1-thread-15] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:14, ===, result:限流了!!!
2022-07-09 19:48:48.817  INFO 253832 --- [ool-1-thread-17] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:16, ===, result:限流了!!!
2022-07-09 19:48:48.817  INFO 253832 --- [pool-1-thread-7] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:6, ===, result:正常执行!!!
2022-07-09 19:48:48.819  INFO 253832 --- [pool-1-thread-5] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:4, ===, result:限流了!!!
2022-07-09 19:48:48.818  INFO 253832 --- [pool-1-thread-2] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:1, ===, result:限流了!!!
2022-07-09 19:48:48.819  INFO 253832 --- [pool-1-thread-9] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:8, ===, result:正常执行!!!
2022-07-09 19:48:48.819  INFO 253832 --- [ool-1-thread-20] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:19, ===, result:限流了!!!
2022-07-09 19:48:48.819  INFO 253832 --- [ool-1-thread-10] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:9, ===, result:限流了!!!
2022-07-09 19:48:48.819  INFO 253832 --- [pool-1-thread-6] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:5, ===, result:正常执行!!!
2022-07-09 19:48:48.818  INFO 253832 --- [ool-1-thread-12] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:11, ===, result:限流了!!!
2022-07-09 19:48:48.817  INFO 253832 --- [pool-1-thread-4] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:3, ===, result:正常执行!!!
2022-07-09 19:48:48.818  INFO 253832 --- [ool-1-thread-18] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:17, ===, result:正常执行!!!
2022-07-09 19:48:48.819  INFO 253832 --- [pool-1-thread-1] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:0, ===, result:正常执行!!!
2022-07-09 19:48:48.817  INFO 253832 --- [ool-1-thread-13] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:12, ===, result:限流了!!!
2022-07-09 19:48:48.819  INFO 253832 --- [pool-1-thread-8] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:7, ===, result:正常执行!!!
2022-07-09 19:48:48.819  INFO 253832 --- [ool-1-thread-19] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:18, ===, result:正常执行!!!
2022-07-09 19:48:48.819  INFO 253832 --- [pool-1-thread-3] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:2, ===, result:限流了!!!
2022-07-09 19:48:48.818  INFO 253832 --- [ool-1-thread-11] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:10, ===, result:限流了!!!
2022-07-09 19:48:52.768  INFO 253832 --- [ool-1-thread-22] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:21, ===, result:正常执行!!!
2022-07-09 19:48:52.771  INFO 253832 --- [ool-1-thread-23] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:22, ===, result:正常执行!!!
2022-07-09 19:48:52.773  INFO 253832 --- [ool-1-thread-21] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:20, ===, result:正常执行!!!
2022-07-09 19:48:52.775  INFO 253832 --- [ool-1-thread-24] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:23, ===, result:正常执行!!!
2022-07-09 19:48:52.775  INFO 253832 --- [ool-1-thread-25] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:24, ===, result:正常执行!!!
2022-07-09 19:48:52.775  INFO 253832 --- [ool-1-thread-27] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:26, ===, result:限流了!!!
2022-07-09 19:48:52.775  INFO 253832 --- [ool-1-thread-28] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:27, ===, result:限流了!!!
2022-07-09 19:48:52.775  INFO 253832 --- [ool-1-thread-26] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:25, ===, result:限流了!!!
2022-07-09 19:48:52.776  INFO 253832 --- [ool-1-thread-29] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:28, ===, result:限流了!!!
2022-07-09 19:48:52.776  INFO 253832 --- [ool-1-thread-30] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:29, ===, result:限流了!!!
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

4.0 计算器Redis实现

计数器是一种最简单的限流算法,其原理就是:在一段时间间隔内,对请求进行计数,与阀值进行比较判断是否需要限流,一旦到了时间临界点,将计数器清零。

这里确定令牌桶中令牌数量的方法是通过计算得出,首先算出从上次请求到这次请求经过了多长时间,是否达到发令牌的时间阈值,然后增加的令牌数是多少,这些令牌能够放到桶中的是多少。

缺点也很明显: 假设刚刚开始一轮新的限流, 突然一次性1秒内大量请求到了, 会给服务器造成很大的压力, 并且后面的时间都限流,

这种时候就需要改用其他限流算法, 比如漏桶算法

4.0.1 编写Lua脚本

-- 下标从 1 开始 获取key
local key = KEYS[1]
-- 下标从 1 开始 获取参数,  Lua  tonumber这个函数会尝试将它的参数转换为数字
local max = tonumber(ARGV[1])              -- 最大限流次数
local expiredTime = tonumber(ARGV[2])      -- 缓存过期时间

-- 获取 key 中的自增数 , Redis incr 命令用于自增, 并且不会重置过期时间
local current = tonumber(redis.call('incr',key))

-- 判断如果等于1, 说明是incr命令的初始化, 设置过期时间返回 1
if current == 1 then
  redis.call("pexpire", key, expiredTime)
  return 1;

-- 否则判断增数是否达到限流
else
  -- 达到限流返回 0
  if current > max then return 0 end
  -- 否则返回1
  return 1
end
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4.0.2 进行测试

@Autowired
private RedisTemplate<String, String> redisTemplate;
@Resource(name = "countLimitRedisScript")
private RedisScript<Long> redisScript;

@Test
public void myEST() throws Exception {
    // 并发20个线程执行, 使用信号灯模拟同时调用  (预想效果是 前10个请求只有5个成功, 后10个也是5个成功)
    Semaphore semaphore = new Semaphore(20);
    ExecutorService pool = Executors.newFixedThreadPool(20);
    for (int i = 0; i < 20; i++) {
        int j = i;
        pool.execute(() -> {
            try {
                int i1;
                if (j > 10) {
                    i1 = j + 6100;
                    loggers.info("延迟6秒后执行的线程10个");
                } else {
                    i1 = j + 50;
                }

                // 睡眠
                ThreadUtil.sleep(1500 + i1);
                // 获取许可证
                semaphore.acquire();
                // 执行方法
                String result = this.executeMethod();
                loggers.info("executeMethod===, i:{}, ===, result:{}", j, result);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                semaphore.release();
            }
        });
    }
    TimeUnit.SECONDS.sleep(35);
}

private String executeMethod() {
    String key = "zhihao123";
    // (限流时间内允许的) 最大请求
    long max = 5;
    // 滑动窗口时间(限流时间范围)
    long period = 5L;
    // 限流时间单位, 默认秒
    TimeUnit timeUnit = TimeUnit.SECONDS;
    // 缓存的过期时间, 转成毫秒
    long expiredTime = timeUnit.toMillis(period);

    Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), String.valueOf(max), String.valueOf(expiredTime));
    result = Optional.ofNullable(result).orElse(-1L);
    if (result.intValue() == 0L) {
        return "限流了!!!";
    } else {
        return "正常执行!!!";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

4.0.3 结果

2022-07-07 23:21:39.744  INFO 11088 --- [ool-1-thread-16] com.zhihao.demo.DemoApplicationTests     : 延迟6秒后执行的线程10个
2022-07-07 23:21:39.744  INFO 11088 --- [ool-1-thread-19] com.zhihao.demo.DemoApplicationTests     : 延迟6秒后执行的线程10个
2022-07-07 23:21:39.744  INFO 11088 --- [ool-1-thread-20] com.zhihao.demo.DemoApplicationTests     : 延迟6秒后执行的线程10个
2022-07-07 23:21:39.744  INFO 11088 --- [ool-1-thread-14] com.zhihao.demo.DemoApplicationTests     : 延迟6秒后执行的线程10个
2022-07-07 23:21:39.744  INFO 11088 --- [ool-1-thread-13] com.zhihao.demo.DemoApplicationTests     : 延迟6秒后执行的线程10个
2022-07-07 23:21:39.744  INFO 11088 --- [ool-1-thread-12] com.zhihao.demo.DemoApplicationTests     : 延迟6秒后执行的线程10个
2022-07-07 23:21:39.744  INFO 11088 --- [ool-1-thread-17] com.zhihao.demo.DemoApplicationTests     : 延迟6秒后执行的线程10个
2022-07-07 23:21:39.744  INFO 11088 --- [ool-1-thread-15] com.zhihao.demo.DemoApplicationTests     : 延迟6秒后执行的线程10个
2022-07-07 23:21:39.744  INFO 11088 --- [ool-1-thread-18] com.zhihao.demo.DemoApplicationTests     : 延迟6秒后执行的线程10个
2022-07-07 23:21:42.365  INFO 11088 --- [ool-1-thread-10] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:9, ===, result:限流了!!!
2022-07-07 23:21:42.365  INFO 11088 --- [ool-1-thread-11] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:10, ===, result:限流了!!!
2022-07-07 23:21:42.365  INFO 11088 --- [pool-1-thread-2] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:1, ===, result:正常执行!!!
2022-07-07 23:21:42.365  INFO 11088 --- [pool-1-thread-6] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:5, ===, result:正常执行!!!
2022-07-07 23:21:42.365  INFO 11088 --- [pool-1-thread-9] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:8, ===, result:限流了!!!
2022-07-07 23:21:42.365  INFO 11088 --- [pool-1-thread-3] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:2, ===, result:正常执行!!!
2022-07-07 23:21:42.365  INFO 11088 --- [pool-1-thread-8] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:7, ===, result:正常执行!!!
2022-07-07 23:21:42.365  INFO 11088 --- [pool-1-thread-1] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:0, ===, result:正常执行!!!
2022-07-07 23:21:42.366  INFO 11088 --- [pool-1-thread-5] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:4, ===, result:限流了!!!
2022-07-07 23:21:42.366  INFO 11088 --- [pool-1-thread-7] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:6, ===, result:限流了!!!
2022-07-07 23:21:42.366  INFO 11088 --- [pool-1-thread-4] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:3, ===, result:限流了!!!
2022-07-07 23:21:47.369  INFO 11088 --- [ool-1-thread-13] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:12, ===, result:正常执行!!!
2022-07-07 23:21:47.369  INFO 11088 --- [ool-1-thread-15] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:14, ===, result:正常执行!!!
2022-07-07 23:21:47.370  INFO 11088 --- [ool-1-thread-14] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:13, ===, result:正常执行!!!
2022-07-07 23:21:47.369  INFO 11088 --- [ool-1-thread-12] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:11, ===, result:正常执行!!!
2022-07-07 23:21:47.374  INFO 11088 --- [ool-1-thread-16] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:15, ===, result:正常执行!!!
2022-07-07 23:21:47.375  INFO 11088 --- [ool-1-thread-17] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:16, ===, result:限流了!!!
2022-07-07 23:21:47.376  INFO 11088 --- [ool-1-thread-18] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:17, ===, result:限流了!!!
2022-07-07 23:21:47.380  INFO 11088 --- [ool-1-thread-20] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:19, ===, result:限流了!!!
2022-07-07 23:21:47.379  INFO 11088 --- [ool-1-thread-19] com.zhihao.demo.DemoApplicationTests     : executeMethod===, i:18, ===, result:限流了!!!
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

扩展:

Redis 执行 Lua 脚本抛出 StatusOutput does not support set(long) 异常

原因: Redis中的integer对应Java中的Long。 需要java代码中接收类型修改为long

Spring Data Redis执行脚本

Redis 2.6 及更高版本支持通过eval和evalsha命令运行 Lua 脚本。Spring Data Redis 为处理序列化并自动使用 Redis 脚本缓存的运行脚本提供了高级抽象。

脚本可以通过调用 和 的方法execute来运行。两者都使用可配置(或)来运行提供的脚本。默认情况下,(or ) 负责序列化提供的键和参数并反序列化脚本结果。这是通过模板的键和值序列化器完成的。还有一个额外的重载允许您为脚本参数和结果传递自定义序列化程序。RedisTemplateReactiveRedisTemplateScriptExecutorReactiveScriptExecutorScriptExecutorReactiveScriptExecutor

默认ScriptExecutor通过检索脚本的 SHA1 并尝试首先运行来优化性能,如果脚本尚不存在于 Redis 脚本缓存中,则evalsha回退到eval

参考资料:

https://github.com/bosima/FireflySoft.RateLimit
1

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/547944
推荐阅读
相关标签
  

闽ICP备14008679号