赞
踩
个人网站:www.wangdatao.com
分布式系统中进场会提到限速和降级的概念。所谓限流,可以认为是服务降级的一种,限流就是限制系统的输入和输出流量, 以达到保护系统的目的 系统上线之前,一般都会进行压测,压测之后吞吐量是可以被测算的,为了保证系统的稳定运行, 一旦达到了设定限制的阔值,就需要限制流量并采取一些措施以完成限制流目的。常见的限流方案为:延迟处理、拒绝处理和部分拒绝处理等。
一般高并发系统常见的限流有:限制总并发数(比如:数据库连接池、线程池)、限制瞬时并发数、限制时间窗口内的平均速率;其他还有如限制远程接口调用速率、限制MQ的消费速率;另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。
算法思想:规定一个时间周期,统计在一个周期内总共请求的次数,如果时间超过一个周期,则次数计入到下一个周期内。
比如:当接收到第一个请求时,记录第一次的时间 beginTime, 计数器Counter设置为1,当第二个请求来时,判断第二个请求的时间与beginTime的间隔有没有超过最初设计的时间周期,如果没有超过,判断Counter是否超过设置的最大速度数,都满足时Counter加1;如果超过设计的时间周期,beginTime时间改为第二次请求的时间,Counter重置为1。
//不严谨的算法代码 public class CounterTest { public static long timeStamp = getNowTime(); public static int reqCount = 0; public static final int limit = 3; // 时间周期内最大请求数 public static final long interval = 1000; // 时间窗口ms public static boolean grant() { long now = getNowTime(); if (now < timeStamp + interval) { // 在时间窗口内 reqCount++; // 判断当前时间窗口内是否超过最大请求控制数 return reqCount <= limit; } else { timeStamp = now; // 超时后重置 reqCount = 1; return true; } } public static long getNowTime() { return System.currentTimeMillis(); } } //很不严谨的调用测试 @RestController @RequestMapping("/count") public class CounterTestController { @GetMapping("/test1") public boolean test() { return CounterTest.grant(); } }
问题分析:当服务器最大的请求压力为100次每分钟时,有用户第一次请求的时间为第一秒,在第5960秒时请求了99次。由于次数有重置机制,在60秒后的第一次请求,统计会重置为1,那么算法上6061秒也是允许的。此时在59~61秒实际放入了199个请求。
思考:如果每分钟最大允许100个请求,把时间周期设置为30秒,30S内只允许50个请求,当出现上述临界问题时一分钟也不会超过100,这样做会带来什么问题吗?
为了解决上述问题,我们引入了滑动窗口算法。把一个限速周期,分割成若干个时间小周期,每个小周期单独计数;当一个请求过来时,对应时间小周期的统计Counter加1。
到底怎么解决临界问题的?
时间周期的统计不以大周期统计,以小周期累加统计。比如每分钟限速100,把1分钟分为6个小周期,那么第二个大周期的计算为第10S-70S(1m10s),第三个周期计算标准为20S-80S 以此类推。
如果用户在第一个小周期请求一次触发计时,第六个周期(50~60)请求了99次,那么在第二个大周期统计的时间范围是10-70S,由于50-60S时已经请求了99次,那么在60-70S只允许请求1次,从而达到了限速效果。
import java.util.concurrent.atomic.AtomicInteger; //不严谨的代码实现 public class SlidingWindow { private AtomicInteger[] timeSlices; // 队列的总长度 private final int timeSliceSize; // 每个时间片的时长 private final long timeMillisPerSlice; // 窗口长度 private final int windowSize; // 最大允许访问次数 private int maxVisitor; // 当前所使用的时间片位置 private AtomicInteger cursor = new AtomicInteger(0); // 默认窗口数量 private final static int DEFAULT_WINDOW_SIZE = 5; // 默认时间片时长 private final static Time DEFAULT_TIME = Time.SECONDS; // 默认时间片时长 private final static int DEFAULT_MAX_VISITOR = 10; public static enum Time { MILLISECONDS(1), SECONDS(1000), MINUTES(SECONDS.getMillis() * 60), HOURS(MINUTES.getMillis() * 60), DAYS(HOURS.getMillis() * 24), WEEKS(DAYS.getMillis() * 7); private long millis; Time(long millis) { this.millis = millis; } public long getMillis() { return millis; } } public SlidingWindow() { this(DEFAULT_WINDOW_SIZE, DEFAULT_TIME, DEFAULT_MAX_VISITOR); } public SlidingWindow(int windowSize, Time timeSlice, int maxVisitor) { this.timeMillisPerSlice = timeSlice.millis; this.windowSize = windowSize; // 保证存储在至少两个window this.timeSliceSize = windowSize * 2 + 1; this.maxVisitor = maxVisitor; init(); } /** * 初始化 */ private void init() { AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize]; for (int i = 0; i < timeSliceSize; i++) { localTimeSlices[i] = new AtomicInteger(0); } timeSlices = localTimeSlices; } private int locationIndex() { long time = System.currentTimeMillis(); return (int) ((time / timeMillisPerSlice) % timeSliceSize); } /** * 判断是否允许进行访问,未超过阈值的话才会对某个时间片+1 * * @return */ public boolean allow() { int index = locationIndex(); int sum = 0; int oldCursor = cursor.getAndSet(index); if (oldCursor != index) { timeSlices[index].set(0); clearBetween(oldCursor, index); } for (int i = 0; i < windowSize; i++) { sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get(); } // 阈值判断 if (sum < maxVisitor) { // 未超过阈值才+1 timeSlices[index].incrementAndGet(); return true; } return false; } /** * <p>将fromIndex~toIndex之间的时间片计数都清零 * <p>极端情况下,当循环队列已经走了超过1个timeSliceSize以上,这里的清零并不能如期望的进行 * * @param fromIndex 不包含 * @param toIndex 不包含 */ private void clearBetween(int fromIndex, int toIndex) { for (int index = (fromIndex + 1) % timeSliceSize; index != toIndex; index = (index + 1) % timeSliceSize) { timeSlices[index].set(0); } } public static void main(String[] args) { SlidingWindow window = new SlidingWindow(2, Time.MINUTES,5); for (int i = 0; i < 10; i++) { System.out.println(window.allow()); } } }
思考:优点-能解决临界问题。缺点-持续的高流量请求时,处理很容易集中在某一个时间格内。
算法思想:有一个空的漏桶,上面不断的往里面倒水、下面不断的往外面放水;当倒水速度小于放水速度时,桶永远不会满;当倒水速度大于放水速度时,随着时间,桶迟早会满。
漏桶算法能把来不及处理的请求暂存在“桶”中,接口实现层能够按照“节奏”处理请求。需要注意的是桶的容量如果很大,水要流很久才能流完,可能会出现超时。
package com.datao.text.util; import java.util.concurrent.atomic.AtomicInteger; //不严谨的代码实现 public class LeakyBucket { //桶的容量 private int capacity = 10; // 木桶剩余的水滴的量(初始化的时候的空的桶) private AtomicInteger water = new AtomicInteger(0); // 水滴的流出的速率 每1000毫秒流出1滴 private int leakRate; // 第一次请求之后,木桶在这个时间点开始漏水 private long leakTimeStamp; public LeakyBucket(int leakRate) { this.leakRate = leakRate; } public boolean acquire() { // 如果是空桶,就当前时间作为桶开是漏出的时间 if (water.get() == 0) { leakTimeStamp = System.currentTimeMillis(); water.addAndGet(1); return true; } // 计算时间间隔 float intervalTime = ((float) (System.currentTimeMillis() - leakTimeStamp)) / 1000; // 计算时间间隔内流出了多少水 int intervalTimeWater = Math.round(intervalTime * leakRate); // 计算剩余的水 int waterLeft = water.get() - intervalTimeWater; // 剩余的水不能为负 water.set(Math.max(0, waterLeft)); // 尝试加水,并且水还未满 if ((water.get()) < capacity) { water.addAndGet(1); // 重新更新leakTimeStamp leakTimeStamp = System.currentTimeMillis(); return true; } else { // 水满,拒绝加水 return false; } } public static void main(String[] args) throws InterruptedException { LeakyBucket leakyBucket = new LeakyBucket(3); for (int i = 0; i < 50; i++) { Thread.sleep(100); System.out.println(leakyBucket.acquire()); } } }
思考:为什么成功调用后更新时间,没成功调用不更新时间? 代码设计与漏洞思想有什么偏差,该怎么解决?
算法思想:有一个生产令牌车间,按照固定速率生产令牌,生成好放入到桶里;当桶满时,生产的令牌丢弃。每来一个请求,都会从桶里领一个令牌,如果拿不到令牌则等待或拒绝请求。
令牌桶算法能应对突发的桶内令牌个数的请求,当消耗完后,后续的处理速度就登录令牌产生的速度。与漏桶算法的区别是面大量突发请求时,令牌桶算法能及时处理对应令牌缓存的请求,漏桶算法只能等待慢慢“流出”。
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
@Service
public class AccessLimitService {
//每秒只发出5个令牌
RateLimiter rateLimiter = RateLimiter.create(5.0);
/**
* 尝试获取令牌
* @return
*/
public boolean tryAcquire(){
return rateLimiter.tryAcquire();
}
}
@Controller public class HelloController { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Autowired private AccessLimitService accessLimitService; @RequestMapping("/access") @ResponseBody public String access(){ //尝试获取令牌 if(accessLimitService.tryAcquire()){ //模拟业务执行500毫秒 try { Thread.sleep(500); // service.do(); }catch (InterruptedException e){ e.printStackTrace(); } return "aceess success [" + sdf.format(new Date()) + "]"; }else{ return "aceess limit [" + sdf.format(new Date()) + "]"; } } }
由于Spring 没有封装好的工具可用,咱们可以使用上述的工具或算法实现
①:自定义一个注解。
②:通过AOP切此注解,在切面里通过Guava实现限速功能。
③给需要限速的方法添加该注解。
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ServiceLimit {
String description() default "";
}
@Component @Scope @Aspect public class LimitAspect { 每秒只发出5个令牌 private static RateLimiter rateLimiter = RateLimiter.create(5.0); //Service层切点 限流 @Pointcut("@annotation(com.datao.config.aop.ServiceLimit)") public void ServiceAspect() { } @Around("ServiceAspect()") public Object around(ProceedingJoinPoint joinPoint) { Boolean flag = rateLimiter.tryAcquire(); Object obj = null; try { if(flag){ obj = joinPoint.proceed(); } } catch (Throwable e) { e.printStackTrace(); } return obj; } }
@Override
@ServiceLimit
public Result test() {
//todo 操作
}
改进思路:
实现不同方法不同的限速功能:
- 在方法中加入枚举属性,不同的方法加入不同的枚举
- 使用时在注解参数中传入
- 在AOP中根据不同的属性实现不同的策略
实现思路:
- SpringCloud Gateway 过滤器
- 过滤器介绍及使用:https://blog.csdn.net/forezp/article/details/85057268
- 在过滤器中使用本文中上述算法或工具,根据自己的业务比如:IP限流、用户ID、接口等自定义实现限速限流。
- 当然除了比较流行Guava外,还有其他工具:https://github.com/bbeck/token-bucket 或自行找一些更优质的工具使用。
Spring Cloud Gateway官方就提供了RequestRateLimiterGatewayFilterFactory
这个类,使用Redis和lua脚本实现了令牌桶的方式。具体实现逻辑在RequestRateLimiterGatewayFilterFactory类中,lua脚本在如下图所示的文件夹中:
以IP、URL、自定义参数限流示例:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
spring: application: name: gateway cloud: gateway: routes: - id: limit_route uri: lb://dt-text predicates: - Path= /text/** filters: - name: RequestRateLimiter args: key-resolver: '#{@taoKeyResolver}' redis-rate-limiter.replenishRate: 1 redis-rate-limiter.burstCapacity: 3 redis: host: localhost port: 6379 database: 0
配置关键词介绍:
key-resolver: 用于限流的键的解析器的 Bean 对象的名字。它使用 SpEL 表达式根据#{@beanName}从 Spring 容器中获取 Bean 对象。
redis-rate-limiter.replenishRate: 令牌桶每秒填充平均速率。
redis-rate-limiter.burstCapacity: 令牌桶总容量。
package com.datao.gateway.config; import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver; import org.springframework.context.annotation.Bean; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import java.util.Objects; public class TaoKeyResolver implements KeyResolver { /** * 根据IP限流 * @param exchange * @return */ @Override public Mono<String> resolve(ServerWebExchange exchange) { return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress()); } // /** // * 根据Url限流 // * @param exchange // * @return // */ // @Override // public Mono<String> resolve(ServerWebExchange exchange) { // return Mono.just(exchange.getRequest().getURI().getPath()); // } // // /** // * 根据自定义参数限流 // * @return // */ // @Bean // public Mono<String> resolve(ServerWebExchange exchange) { // return Mono.just(Objects.requireNonNull(exchange.getRequest().getQueryParams().getFirst("tao"))); // } @Bean public TaoKeyResolver taoKeyResolver() { return new TaoKeyResolver(); } }
local tokens_key = KEYS[1] #请求唯一标识 local timestamp_key = KEYS[2] #请求时间 local rate = tonumber(ARGV[1]) #速率,如上面例子里的 1 local capacity = tonumber(ARGV[2]) #容量,如上面例子里的 3 local now = tonumber(ARGV[3]) #当前时间 local requested = tonumber(ARGV[4])#请求数量,默认是1 local fill_time = capacity/rate local ttl = math.floor(fill_time*2) #得到过期时间 local last_tokens = tonumber(redis.call("get", tokens_key)) #剩余可用令牌,没有值则为桶的容量,上面例子里值范围是 0~3 if last_tokens == nil then last_tokens = capacity end local last_refreshed = tonumber(redis.call("get", timestamp_key)) #上次请求时间,没值则为0 if last_refreshed == nil then last_refreshed = 0 end local delta = math.max(0, now-last_refreshed) #单前时间与上次请求时间的差值,最小是0; local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) #可用的令牌,最大为桶容量,范围是0~桶容量, 上面例子里是 0~3 local allowed = filled_tokens >= requested #是否允许请求, 可用令牌是否足够 local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested #可用令牌减去1 allowed_num = 1 end redis.call("setex", tokens_key, ttl, new_tokens) #缓存可用令牌 redis.call("setex", timestamp_key, ttl, now) #缓存当前时间 return { allowed_num, new_tokens }
实现思路:在过滤其中+算法,使用Guava,或自定义算法加Redis,实现分布式限速。
具体实现:可参考这篇文章 https://blog.csdn.net/justlpf/article/details/84528669
思路:Zuul 自带的没有限流的功能,需要引入第三方Jar包实现限速功能
工具:Spring-cloud-zuul-ratelimit https://github.com/marcosbarbero/spring-cloud-zuul-ratelimit
<dependency>
<groupId>com.marcosbarbero.cloud</groupId>
<artifactId>spring-cloud-zuul-ratelimit</artifactId>
<version>2.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
zuul: routes: dt_text: service-id: dt-text path: /text/** stripPrefix: false ratelimit: key-prefix: rate enabled: true repository: REDIS default-policy-list: # ########### 如下的配置就是说:每1秒内不能超过2个请求,2个请求时间加起来不能超过1秒(quota)############ - limit: 2 #在一个单位时间窗口的请求数量 quota: 1 1 #在一个单位时间窗口的请求时间限制(秒) refresh-interval: 60 #时间窗口 (in seconds) type: ##根据什么控制流量,可以组合使用,如url、httpmethod组合,就会把 /orders的get和post请求分开处理 - url - http_method #- user #根据用户控制需要Security支持,(一般不用) #- origin #根据客户端的ip控制
由于配置阈值比较低,手动在浏览器即可触发:
基本介绍:
官网地址:https://github.com/alibaba/Sentinel
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
Sentinel 具有以下特征:
- 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。- 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
- 完善的 SPI 扩展点:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。
- 介绍:https://github.com/alibaba/Sentinel/wiki/%E4%BB%8B%E7%BB%8D
没有比官方文档更清晰的文档了,不介绍了。
https://github.com/alibaba/Sentinel/wiki/新手指南
Nginx主要有两种限速方式:
请各位大侠不要压测小弟的服务器,1核、1G、1M的非常脆弱,跪谢!
按连接数限速
是指限制单个IP(或者其他的key)同时发起的连接数,超出这个限制后,Nginx将直接拒绝更多的连接。可以对于一些服务器流量异常、负载过大,甚至是大流量的恶意攻击访问等,进行并发数的限制。
-Nginx配置
http {
limit_conn_zone $binary_remote_addr zone=addr:10m;
limit_conn_status 429;
...
server {
...
location /coon.html {
limit_conn addr 1;
}
- 使用
limit_conn_zone
关键字,我们定义了一个名为addr
大小为10MB的共享内存区域(zone),用来存放限速相关的统计信息,限速的key值为二进制的IP地址`$binary_remote_addr。- 使用
limit_req_status
关键字,是更改默认限速后返回的状态码。默认返回的是503(服务暂时不可用)。limit_conn addr 1;
允许最大1个并发
按请求速率限速
是指限制单个IP(或者其他的key)发送请求的速率,超出指定速率后,Nginx将直接拒绝更多的请求。采用漏桶算法
(leaky bucket)算法实现。
下面简单的实现:
http {
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=1r/s;
limit_req_status 429;
...
server {
...
location /req.html {
limit_req zone=mylimit burst=2 nodelay;
}
error_page 429 /429.html;
...
- 使用
limit_req_zone
关键字,我们定义了一个名为mylimit
大小为10MB的共享内存区域(zone),用来存放限速相关的统计信息,限速的key值为二进制的IP地址$binary_remote_addr
,限速上限rate
为1r/s(1次每秒);接着我们使用limit_req
关键字将上述规则作用到/
所有请求上。- 使用
limit_req_status
关键字,是更改默认限速后返回的状态码。默认返回的是503(服务不可用)。burst
可以理解为漏桶的容量,不设置这个值时速度达到rate
后就报错,设置后有对应的缓冲。nodelay
关键字,因为在”桶“中会缓存一些请求等待处理,这时在用户看来请求的时间就边长了,不是很友好。加上这个关键词就可以理解为:既然能进入桶中,那么这个请求应该就是有效的(超过桶的容量直接就报429错误了),那么就不用排队直接处理就好了。nodelay
与burst
共同使用时才生效
失败时:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。