当前位置:   article > 正文

基于redis实现简单的分布式/单例令牌桶限流_client.getratelimite

client.getratelimite

   通常为了防止突然过多请求或系统处理能力有限时,为了保护下游接口,通常会对下游接口限流,下面说一下使用redis实现简单限流。
可以使用zset对象实现:

  • key:为对象的行为,即限流器的名字标识
  • score:用时间戳来实现计时,当前时间-窗口时长 即为当前时间段开始时间,结束时间就为当前时间
  • value:能体现一次唯一请求即可,但是要注意节省内存

下面看下Redisson的Redisson的RedissonRateLimiter基于rediss的zset是如何限流的:

1. 创建RedissonRateLimiter

使用RedissonClient客户端创建限流器:RRateLimiter rateLimiter = this.redissonClient.getRateLimiter(rateLimiterName);

初始化限流器的同时就给限流器起了名字,及对象的行为,如myRateLimit,下面用到限流器名的地名军用myRateLimit代替

# org.redisson.Redisson
//name:限流器名
@Override
public RRateLimiter getRateLimiter(String name) {
    return new RedissonRateLimiter(commandExecutor, name);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2. 配置限流器

创建限流器后要对其进行配置,并将配置存到redis总,主要配置包括

  • RateType:限流器类型:
    • OVERALL:分布式限流;
    • PER_CLIENT:单例限流
  • rate:速率
  • rateInterval: 窗口时长
  • RateIntervalUnit :时间单位,如毫秒
# org.redisson.RedissonRateLimiter

boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);

//将限流器配置设置到redis的hash中
@Override
public RFuture<Void> setRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
     return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    // myRateLimt->rate:rateInterval内最多多少permits
            "redis.call('hset', KEYS[1], 'rate', ARGV[1]);"
                    //myRateLimt->interval:毫秒数
                    + "redis.call('hset', KEYS[1], 'interval', ARGV[2]);"
                    //myRateLimt->0/1
                    + "redis.call('hset', KEYS[1], 'type', ARGV[3]);"
                    //删除{myRateLimt}:value->{myRateLimt}:permits
                    + "redis.call('del', KEYS[2], KEYS[3]);",
            //(限流器名:myRateLimit ,{myRateLimt}:value ,{myRateLimt}:permits),rateInterval内最多多少permits,毫秒数,RateType在Enum位置
            Arrays.asList(getRawName(), getValueName(), getPermitsName()), rate, unit.toMilis(rateInterval), type.ordinal())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

3. 申请令牌

# org.redisson.RedissonRateLimiter

// permitsName:zset,key={myRateLimit}:permits,score为存放时的当前时间戳,value为(随机数+请求量)字符串
// valueName:string,key={myRateLimit}:value,当前窗口内期间剩余可请求量
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
        return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
		       //1.从redis中获取限流器配置:rate,interval,type,获取不到报错
                "local rate = redis.call('hget', KEYS[1], 'rate');"
              + "local interval = redis.call('hget', KEYS[1], 'interval');"
              + "local type = redis.call('hget', KEYS[1], 'type');"
              + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
              
			  //2.若type为PER_CLIENT类型,使用基于client的,key带有uuid
			   //1为单例,即不同实例的相同限流器名使用各实例的限流器;对应于服务相当于同一实例使用同一限流器,
			  //0为全局分布式,相同的限流器名在使用同一个限流器,对应于服务相当于同一服务使用同一限流器
			  //目前业务大多使用的是0
              + "local valueName = KEYS[2];"
              + "local permitsName = KEYS[4];"
              + "if type == '1' then "
                  + "valueName = KEYS[3];"
                  + "permitsName = KEYS[5];"
              + "end;"

              //3.rate < 此次请求的permits,报错
              + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "

              //4.获取{myRateLimit}:value值
              + "local currentValue = redis.call('get', valueName); "
              + "if currentValue ~= false then "
			        //5.1 不是第一次请求
					
					//5.1.1 获取已过期的请求数{myRateLimit}:permits,过期定义为:当前时间-interval
                     + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                     + "local released = 0; "
                     + "for i, v in ipairs(expiredValues) do "
                          + "local random, permits = struct.unpack('fI', v);"
                          + "released = released + permits;"
                     + "end; "

                      //5.1.2 过期请求数 >0,说明之前有请求量,删除过期的请求,即为节省内存,只保留窗口内的记录,重设{myRateLimit}:value。。。
					 + "if released > 0 then "
                          + "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                          + "if tonumber(currentValue) + released > tonumber(rate) then "
                               + "currentValue = tonumber(rate) - redis.call('zcard', permitsName); "
                          + "else "
                               + "currentValue = tonumber(currentValue) + released; "
                          + "end; "
                          + "redis.call('set', valueName, currentValue);"
                     + "end;"

                     + "if tonumber(currentValue) < tonumber(ARGV[1]) then "
					   //5.1.3 本次被限流,返回剩余过期时间delay,即如果时block模式,delay后重试
						  + "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "
                         + "return 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"
                     + "else "
					    //5.1.4 本次可以请求,本次请求添加到{myRateLimit}:permits,剩余可请求量{myRateLimit}:value -1,返回null
                         + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
                         + "redis.call('decrby', valueName, ARGV[1]); "
                         + "return nil; "
                     + "end; "
              + "else "
			       //5.2 即第一次请求,返回null
				   //设置String类型key{myRateLimit}:value为rate,
				   //往zset中添加key为{myRateLimit}:permits,score为当前时间戳,value为(随机数转成float,ratez转成Integer之后的字符数)
				   // 将{myRateLimit}:value 将去此次请求量permits
                     + "redis.call('set', valueName, rate); "
                     + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
                     + "redis.call('decrby', valueName, ARGV[1]); "
                     + "return nil; "
              + "end;",
			    //(myRateLimit,{myRateLimit}:value,{myRateLimit}:uuid,{myRateLimit}:permits,{{myRateLimit}:permits}:uuid
                Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
				//
                value, System.currentTimeMillis(), ThreadLocalRandom.current().nextLong());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

在这里插入图片描述

  1. 限流器配置存到hash结构的redis中,key为限流器名myRateLimt
  2. 时间段即窗口利用zset实现,score为请求时时间戳,这样就可以实现时间计算了,value为单次请求的令牌数
  3. 为了节约内存,将窗口外的数据删除,即当前时间戳-窗口时间段interval

4. 限流后处理

# org.redisson.RedissonRateLimiter

private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) {
    long s = System.currentTimeMillis();
    //1.请求令牌
    RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
    future.onComplete((delay, e) -> {
        if (e != null) {
            //2.请求异常直接返回失败
            promise.tryFailure(e);
            return;
        }
        
        if (delay == null) {
            //3.令牌获取成功返回
            promise.trySuccess(true);
            return;
        }
        
        if (timeoutInMillis == -1) {
            //4.令牌获取时超过限制,且RedissonRateLimiter.Type为BLOCK,delay秒后重试
            commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                tryAcquireAsync(permits, promise, timeoutInMillis);
            }, delay, TimeUnit.MILLISECONDS);
            return;
        }
        
        //5.timeout=执行开始到返回的时间
        //令牌获取时超过限制,且RedissonRateLimiter.Type为TRY,在未timeout之前且在窗口内重试,
        //否则返回失败
        long el = System.currentTimeMillis() - s;
        long remains = timeoutInMillis - el;
        if (remains <= 0) {
            promise.trySuccess(false);
            return;
        }
        if (remains < delay) {
            commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                promise.trySuccess(false);
            }, remains, TimeUnit.MILLISECONDS);
        } else {
            long start = System.currentTimeMillis();
            commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                long elapsed = System.currentTimeMillis() - start;
                if (remains <= elapsed) {
                    promise.trySuccess(false);
                    return;
                }
                
                tryAcquireAsync(permits, promise, remains - elapsed);
            }, delay, TimeUnit.MILLISECONDS);
        }
    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/519656
推荐阅读
相关标签
  

闽ICP备14008679号