赞
踩
版本 v2.2.6.RELEASE
代码地址:https://gitee.com/wyusig/spring-webflux-demo.git
环境需求:jdk1.8、Nacos、Redis
在sc-gateway-registry模块的application.yml文件中,放开关于限流的配置(和另一个负载均衡例子写同一个例子里面了,所以配置都注释起来了)
启动Nacos(Nacos在本系列-基于注册中心的动态路由有提供下载链接)
启动Redis
运行sc-user-service和sc-gateway-registry模块
访问http://localhost:8888/user-service/user/get?id=10,观察该ip是否已被限流成每秒一个请求
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
/**
* 使用请求 IP 作为限流键的 KeyResolver
**/
public class RemoteAddrKeyResolver implements KeyResolver {
public static final String BEAN_NAME = "remoteAddrKeyResolver";
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
}
}
@Bean(name = RemoteAddrKeyResolver.BEAN_NAME)
@ConditionalOnBean(RateLimiter.class)
public RemoteAddrKeyResolver remoteAddrKeyResolver() {
return new RemoteAddrKeyResolver();
}
default-filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 1 #令牌桶填充速率,单位个/秒
redis-rate-limiter.burstCapacity: 1 #令牌桶容量
redis-rate-limiter.requestedTokens: 1 #一次请求需要消耗多少个令牌
key-resolver: "#{@remoteAddrKeyResolver}" #限流依据,如根据ip限流
根据本系列-Spring-Cloud-Gateway基本组件可以知道,限流通过RequestRateLimiterGatewayFilterFactory生产一个限流GatewayFilter,我们来看一下RequestRateLimiterGatewayFilterFactory Bean注册的代码
class GatewayRedisAutoConfiguration { //RedisScript,lua脚本文件在META-INF/scripts/request_rate_limiter.lua @Bean @SuppressWarnings("unchecked") public RedisScript redisRequestRateLimiterScript() { DefaultRedisScript redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource( new ClassPathResource("META-INF/scripts/request_rate_limiter.lua"))); redisScript.setResultType(List.class); return redisScript; } //需要ReactiveStringRedisTemplate、DefaultRedisScript等 @Bean @ConditionalOnMissingBean public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate, @Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript, ConfigurationService configurationService) { return new RedisRateLimiter(redisTemplate, redisScript, configurationService); } }
public class GatewayAutoConfiguration { //如果容器内没有KeyResolver及其子类,就注册一个PrincipalNameKeyResolver @Bean(name = PrincipalNameKeyResolver.BEAN_NAME) @ConditionalOnBean(RateLimiter.class) @ConditionalOnMissingBean(KeyResolver.class) public PrincipalNameKeyResolver principalNameKeyResolver() { return new PrincipalNameKeyResolver(); } //RequestRateLimiterGatewayFilterFactory @Bean @ConditionalOnBean({ RateLimiter.class, KeyResolver.class }) @ConditionalOnEnabledFilter public RequestRateLimiterGatewayFilterFactory requestRateLimiterGatewayFilterFactory( RateLimiter rateLimiter, KeyResolver resolver) { return new RequestRateLimiterGatewayFilterFactory(rateLimiter, resolver); } }
Spring-Cloud-Gateway使用RedisTemplate、Lua脚本等,基于令牌桶算法来进行限流
public interface RateLimiter<C> extends StatefulConfigurable<C> { //判断是否被限流 Mono<Response> isAllowed(String routeId, String id); class Response { //判断是否被限流,true为未被限流,false为被限流 private final boolean allowed; //令牌桶令牌剩余数量 private final long tokensRemaining; private final Map<String, String> headers; } }
@ConfigurationProperties("spring.cloud.gateway.redis-rate-limiter") public class RedisRateLimiter extends AbstractRateLimiter<RedisRateLimiter.Config> implements ApplicationContextAware{ @Override @SuppressWarnings("unchecked") public Mono<Response> isAllowed(String routeId, String id) { //如果初始化还未完成,抛出异常 if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } //获取Route配置,主要是为了获取限流Filter Config routeConfig = loadConfiguration(routeId); //获取配置的令牌桶填充速率 // How many requests per second do you want a user to be allowed to do? int replenishRate = routeConfig.getReplenishRate(); //获取令牌桶容量 // How much bursting do you want to allow? int burstCapacity = routeConfig.getBurstCapacity(); //获取一个请求需要使用多少个令牌 // How many tokens are requested per request? int requestedTokens = routeConfig.getRequestedTokens(); try { //获得tokenKey、timestampKey List<String> keys = getKeys(id); // The arguments to the LUA script. time() returns unixtime in seconds. //构造Lua脚本参数 List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", requestedTokens + ""); // allowed, tokens_left = redis.eval(SCRIPT, keys, args) //执行Lua脚本,返回[是否获取令牌成功,剩余令牌数量] Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); // .log("redisratelimiter", Level.FINER); return flux.onErrorResume(throwable -> { if (log.isDebugEnabled()) { log.debug("Error calling rate limiter lua", throwable); } //如果执行Luau脚本发生错误,返回获取令牌成功,剩余令牌-1个 return Flux.just(Arrays.asList(1L, -1L)); }).reduce(new ArrayList<Long>(), (longs, l) -> { //正常执行,把返回值装到一个ArrayList里面 longs.addAll(l); return longs; }).map(results -> { //判断是否获取令牌成功 boolean allowed = results.get(0) == 1L; //剩余令牌数 Long tokensLeft = results.get(1); //构造Response Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft)); if (log.isDebugEnabled()) { log.debug("response: " + response); } //返回 return response; }); } catch (Exception e) { /* * We don't want a hard dependency on Redis to allow traffic. Make sure to set * an alert so you know if this is happening too much. Stripe's observed * failure rate is 0.01%. */ log.error("Error determining if user allowed from redis", e); } return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); } }
--获取传进来的第一个keys参数 local tokens_key = KEYS[1] --获取传进来的第二个keys参数 local timestamp_key = KEYS[2] --redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key) --获取第一个arg参数:令牌桶填充速率 local rate = tonumber(ARGV[1]) --获取第二个arg参数:令牌桶容量 local capacity = tonumber(ARGV[2]) --获取第三个arg参数:当时时间戳(秒) local now = tonumber(ARGV[3]) --获取第四个arg参数:一次请求会消耗多少个令牌 local requested = tonumber(ARGV[4]) --计算填充满令牌桶需要的总时间 local fill_time = capacity/rate --乘与2保证时间充足 local ttl = math.floor(fill_time*2) --redis.log(redis.LOG_WARNING, "rate " .. ARGV[1]) --redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2]) --redis.log(redis.LOG_WARNING, "now " .. ARGV[3]) --redis.log(redis.LOG_WARNING, "requested " .. ARGV[4]) --redis.log(redis.LOG_WARNING, "filltime " .. fill_time) --redis.log(redis.LOG_WARNING, "ttl " .. ttl) --获取令牌桶剩余令牌数 local last_tokens = tonumber(redis.call("get", tokens_key)) --令牌桶剩余令牌数默认为令牌桶容量 if last_tokens == nil then last_tokens = capacity end --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens) --获取最后填充令牌时间戳(秒) local last_refreshed = tonumber(redis.call("get", timestamp_key)) --最后填充令牌时间戳(秒)为0 if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed) --计算从现在到距离上次填充令牌花了多少时间 local delta = math.max(0, now-last_refreshed) --计算距离上次填充过了delta时间后,应该往令牌桶填充多少令牌,得到现在令牌桶令牌数(不能超过令牌桶容量) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) --判断当前令牌桶令牌数能否满足一次请求需要的令牌数 local allowed = filled_tokens >= requested --现在令牌桶令牌数 local new_tokens = filled_tokens local allowed_num = 0 --如果当前令牌桶能满足一次请求需要的令牌数,则扣除一次请求需要的令牌数,得到新的令牌桶剩余令牌数 if allowed then new_tokens = filled_tokens - requested --1代表获取令牌成功 allowed_num = 1 end --redis.log(redis.LOG_WARNING, "delta " .. delta) --redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens) --redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num) --redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens) --把令牌桶剩余令牌数、最后一次填充令牌时间戳(秒)存进redis,该key的生存时间为ttl,也就是2*填充满整个令牌桶需要时间 if ttl > 0 then redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) end -- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens } --返回是否获取令牌成功,令牌桶剩余令牌数 return { allowed_num, new_tokens }
@ConfigurationProperties("spring.cloud.gateway.filter.request-rate-limiter") public class RequestRateLimiterGatewayFilterFactory extends AbstractGatewayFilterFactory<RequestRateLimiterGatewayFilterFactory.Config> { @Override public GatewayFilter apply(Config config) { //获取KeyResolver KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver); //获取RateLimiter RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter); boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey); HttpStatusHolder emptyKeyStatus = HttpStatusHolder .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode)); return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY) .flatMap(key -> { //执行KeyResolver拿到key if (EMPTY_KEY.equals(key)) { if (denyEmpty) { //设置状态码为403 setResponseStatus(exchange, emptyKeyStatus); //设置流为完成 return exchange.getResponse().setComplete(); } return chain.filter(exchange); } String routeId = config.getRouteId(); if (routeId == null) { Route route = exchange .getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); routeId = route.getId(); } //使用RateLimiter执行lua脚本,把routeId和key传给RateLimiter#isAllowed,达到按key限流目的 return limiter.isAllowed(routeId, key).flatMap(response -> { //把RateLimiter返回的Header添加到exchange.getResponse()的Header for (Map.Entry<String, String> header : response.getHeaders() .entrySet()) { exchange.getResponse().getHeaders().add(header.getKey(), header.getValue()); } if (response.isAllowed()) { //如果获取令牌成功,下一个filter return chain.filter(exchange); } //否则设置状态码为429,被限流 setResponseStatus(exchange, config.getStatusCode()); //设置流为完成 return exchange.getResponse().setComplete(); }); }); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。