赞
踩
通常为了防止突然过多请求或系统处理能力有限时,为了保护下游接口,通常会对下游接口限流,下面说一下使用redis实现简单限流。
可以使用zset对象实现:
下面看下Redisson的Redisson的RedissonRateLimiter基于rediss的zset是如何限流的:
使用RedissonClient客户端创建限流器:RRateLimiter rateLimiter = this.redissonClient.getRateLimiter(rateLimiterName);
初始化限流器的同时就给限流器起了名字,及对象的行为,如myRateLimit,下面用到限流器名的地名军用myRateLimit代替
# org.redisson.Redisson
//name:限流器名
@Override
public RRateLimiter getRateLimiter(String name) {
return new RedissonRateLimiter(commandExecutor, name);
}
创建限流器后要对其进行配置,并将配置存到redis总,主要配置包括
# org.redisson.RedissonRateLimiter boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit); //将限流器配置设置到redis的hash中 @Override public RFuture<Void> setRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // myRateLimt->rate:rateInterval内最多多少permits "redis.call('hset', KEYS[1], 'rate', ARGV[1]);" //myRateLimt->interval:毫秒数 + "redis.call('hset', KEYS[1], 'interval', ARGV[2]);" //myRateLimt->0/1 + "redis.call('hset', KEYS[1], 'type', ARGV[3]);" //删除{myRateLimt}:value->{myRateLimt}:permits + "redis.call('del', KEYS[2], KEYS[3]);", //(限流器名:myRateLimit ,{myRateLimt}:value ,{myRateLimt}:permits),rateInterval内最多多少permits,毫秒数,RateType在Enum位置 Arrays.asList(getRawName(), getValueName(), getPermitsName()), rate, unit.toMilis(rateInterval), type.ordinal())
# org.redisson.RedissonRateLimiter // permitsName:zset,key={myRateLimit}:permits,score为存放时的当前时间戳,value为(随机数+请求量)字符串 // valueName:string,key={myRateLimit}:value,当前窗口内期间剩余可请求量 private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, //1.从redis中获取限流器配置:rate,interval,type,获取不到报错 "local rate = redis.call('hget', KEYS[1], 'rate');" + "local interval = redis.call('hget', KEYS[1], 'interval');" + "local type = redis.call('hget', KEYS[1], 'type');" + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')" //2.若type为PER_CLIENT类型,使用基于client的,key带有uuid //1为单例,即不同实例的相同限流器名使用各实例的限流器;对应于服务相当于同一实例使用同一限流器, //0为全局分布式,相同的限流器名在使用同一个限流器,对应于服务相当于同一服务使用同一限流器 //目前业务大多使用的是0 + "local valueName = KEYS[2];" + "local permitsName = KEYS[4];" + "if type == '1' then " + "valueName = KEYS[3];" + "permitsName = KEYS[5];" + "end;" //3.rate < 此次请求的permits,报错 + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); " //4.获取{myRateLimit}:value值 + "local currentValue = redis.call('get', valueName); " + "if currentValue ~= false then " //5.1 不是第一次请求 //5.1.1 获取已过期的请求数{myRateLimit}:permits,过期定义为:当前时间-interval + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); " + "local released = 0; " + "for i, v in ipairs(expiredValues) do " + "local random, permits = struct.unpack('fI', v);" + "released = released + permits;" + "end; " //5.1.2 过期请求数 >0,说明之前有请求量,删除过期的请求,即为节省内存,只保留窗口内的记录,重设{myRateLimit}:value。。。 + "if released > 0 then " + "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); " + "if tonumber(currentValue) + released > tonumber(rate) then " + "currentValue = tonumber(rate) - redis.call('zcard', permitsName); " + "else " + "currentValue = tonumber(currentValue) + released; " + "end; " + "redis.call('set', valueName, currentValue);" + "end;" + "if tonumber(currentValue) < tonumber(ARGV[1]) then " //5.1.3 本次被限流,返回剩余过期时间delay,即如果时block模式,delay后重试 + "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); " + "return 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));" + "else " //5.1.4 本次可以请求,本次请求添加到{myRateLimit}:permits,剩余可请求量{myRateLimit}:value -1,返回null + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end; " + "else " //5.2 即第一次请求,返回null //设置String类型key{myRateLimit}:value为rate, //往zset中添加key为{myRateLimit}:permits,score为当前时间戳,value为(随机数转成float,ratez转成Integer之后的字符数) // 将{myRateLimit}:value 将去此次请求量permits + "redis.call('set', valueName, rate); " + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end;", //(myRateLimit,{myRateLimit}:value,{myRateLimit}:uuid,{myRateLimit}:permits,{{myRateLimit}:permits}:uuid Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()), // value, System.currentTimeMillis(), ThreadLocalRandom.current().nextLong()); }
# org.redisson.RedissonRateLimiter private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) { long s = System.currentTimeMillis(); //1.请求令牌 RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits); future.onComplete((delay, e) -> { if (e != null) { //2.请求异常直接返回失败 promise.tryFailure(e); return; } if (delay == null) { //3.令牌获取成功返回 promise.trySuccess(true); return; } if (timeoutInMillis == -1) { //4.令牌获取时超过限制,且RedissonRateLimiter.Type为BLOCK,delay秒后重试 commandExecutor.getConnectionManager().getGroup().schedule(() -> { tryAcquireAsync(permits, promise, timeoutInMillis); }, delay, TimeUnit.MILLISECONDS); return; } //5.timeout=执行开始到返回的时间 //令牌获取时超过限制,且RedissonRateLimiter.Type为TRY,在未timeout之前且在窗口内重试, //否则返回失败 long el = System.currentTimeMillis() - s; long remains = timeoutInMillis - el; if (remains <= 0) { promise.trySuccess(false); return; } if (remains < delay) { commandExecutor.getConnectionManager().getGroup().schedule(() -> { promise.trySuccess(false); }, remains, TimeUnit.MILLISECONDS); } else { long start = System.currentTimeMillis(); commandExecutor.getConnectionManager().getGroup().schedule(() -> { long elapsed = System.currentTimeMillis() - start; if (remains <= elapsed) { promise.trySuccess(false); return; } tryAcquireAsync(permits, promise, remains - elapsed); }, delay, TimeUnit.MILLISECONDS); } }); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。