当前位置:   article > 正文

限流-滑动窗口_滑动窗口限流

滑动窗口限流

1. 介绍

      目前主流的限流算法:令牌、漏桶、滑动窗口。Nginx都实现了漏桶算法,Springcloud Gateway和Guava Ratelimiter实现了令牌桶,阿里的 Sentinel实现了滑动窗口。

1.1 为什么需要限流

  • 大量正常用户高频访问导致服务器宕机
  • 恶意用户高频访问导致服务器宕机
  • 网页爬虫 ,对于这些情况我们需要对用户的访问进行限流访问

1.2 为什么引入滑动窗口

    固定窗口可能遇到的问题

  • 限流不均匀

  • 两倍的配置速率问题

假如限流设置为:

1秒钟1000个请求,在 第一秒的最后100ms,以及 第二秒 最开始100ms,都收到1000次请求。就会出现在这个 200ms 的周期中收到 2000次请求,并且限流通过,这就是 两倍的配置速率问题

1.3 定义

       滑动窗口为固定窗口的改良版,解决了固定窗口在窗口切换时会受到两倍于阈值数量的请求,滑动窗口在固定窗口的基础上,将一个窗口分为若干个等份的小窗口,每个小窗口对应不同的时间点,拥有独立的计数器,当请求的时间点大于当前窗口的最大时间点时,则将窗口向前平移一个小窗口(将第一个小窗口的数据舍弃,第二个小窗口变成第一个小窗口,当前请求放在最后一个小窗口),整个窗口的所有请求数相加不能大于阀值。

2. 源码

2.1 阿里Sentinel

 

2.1.1 系统级别限流SystemSlot

   系统自适应限流 —— 过载保护定义自适应限流规则需要提供多个参数

  • 系统的负载水平线,超过这个值时触发过载保护功能
  • 允许的最大线程数、最长响应时间和最大 QPS,可以不设置
  1. List<SystemRule> rules = new ArrayList<SystemRule>();
  2. SystemRule rule = new SystemRule();
  3. rule.setHighestSystemLoad(3.0);
  4. rule.setAvgRt(10);
  5. rule.setQps(20);
  6. rule.setMaxThread(10);
  7. rules.add(rule);
  8. SystemRuleManager.loadRules(Collections.singletonList(rule));

       从代码中也可以看出系统自适应限流规则不需要定义资源名称,因为它是全局的规则,会自动应用到所有的临界区。如果当负载超标时,所有临界区资源将一起勒紧裤腰带渡过难关。

(1)检查系统指标,qps、thread、rt、load、cpu

定时任务会每秒检查自身系统状况。

  1. public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
  2. if (resourceWrapper == null) {
  3. return;
  4. }
  5. // Ensure the checking switch is on.
  6. // 只有配置了系统自适应限流规则才能进入,SystemRuleManager.loadRules去加载
  7. if (!checkSystemStatus.get()) {
  8. return;
  9. }
  10. // for inbound traffic only
  11. // 只有入口流量才能进入
  12. if (resourceWrapper.getEntryType() != EntryType.IN) {
  13. return;
  14. }
  15. // total qps
  16. double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
  17. if (currentQps > qps) {
  18. throw new SystemBlockException(resourceWrapper.getName(), "qps");
  19. }
  20. // total thread
  21. int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
  22. if (currentThread > maxThread) {
  23. throw new SystemBlockException(resourceWrapper.getName(), "thread");
  24. }
  25. double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
  26. if (rt > maxRt) {
  27. throw new SystemBlockException(resourceWrapper.getName(), "rt");
  28. }
  29. // load. BBR algorithm.
  30. if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
  31. if (!checkBbr(currentThread)) {
  32. throw new SystemBlockException(resourceWrapper.getName(), "load");
  33. }
  34. }
  35. // cpu usage
  36. if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
  37. throw new SystemBlockException(resourceWrapper.getName(), "cpu");
  38. }
  39. }

(2) 如何加载系统规则

  1. List<SystemRule> rules = new ArrayList<SystemRule>();
  2. SystemRule rule = new SystemRule();
  3. rule.setHighestSystemLoad(3.0);
  4. rule.setAvgRt(10);
  5. rule.setQps(20);
  6. rule.setMaxThread(10);
  7. rules.add(rule);
  8. SystemRuleManager.loadRules(Collections.singletonList(rule));

2.1.2 流控规则FlowSlot

FlowSlot使用的核心思路是滑动窗口。

(1)定义窗口

  1. public class StatisticNode implements Node {
  2. /**
  3. * Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans
  4. * by given {@code sampleCount}.
  5. */
  6. private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
  7. IntervalProperty.INTERVAL);
  8. /**
  9. * 滚动计数器:保存最近60秒的统计信息。windowLengthInMs故意设置为1000毫秒,意思是每一个桶每秒,这样我们就可以得到每一秒的准确统计。
  10. */
  11. private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
  12. /**
  13. * 线程数
  14. */
  15. private LongAdder curThreadNum = new LongAdder();
  16. /**
  17. * 获取度量时的最后一个时间戳
  18. */
  19. private long lastFetchTime = -1;
  20. // ...
  21. }
  1. public class ArrayMetric implements Metric {
  2. private final LeapArray<MetricBucket> data;
  3. public ArrayMetric(int sampleCount, int intervalInMs) {
  4. this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
  5. }
  6. public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
  7. if (enableOccupy) {
  8. this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
  9. } else {
  10. this.data = new BucketLeapArray(sampleCount, intervalInMs);
  11. }
  12. }
  13. // ...
  14. }
  • sampleCount

要知道有多少个小窗口,在sentinel中也就是sampleCount,比如说我们有60个窗口。

  • intervalInMs(间隔-毫秒)

intervalInMs是用来计算这个窗口长度的,intervalInMs/窗口数量= 窗口长度。也就是我给你1分钟,你给我分成60个窗口,这个时候窗口长度就是1s了,那如果我给你1s,你给我分2个窗口,这个时候窗口长度就是500毫秒了,这个1分钟,就是intervalInMs。

  • enableOccupy

是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量,这里对应 LeapArray 的两个实现类,如果允许抢占,则为 OccupiableBucketLeapArray,否则为 BucketLeapArray。

(2)判断是否触发限流标准

以qps为基准,qps的定义是每秒的流量,sentinel以滑动窗口作为核心,中qps的计算公式:每秒通过数量/每秒的间隔。本质上rollingCounterInSecond.pass()就代表了qps指标,因为默认的间隔为1s,只是窗口数量为2。

  1. public double passQps() {
  2. return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
  3. }

判断qps是否超标

  1. public boolean canPass(Node node, int acquireCount, boolean prioritized) {
  2. int curCount = avgUsedTokens(node);
  3. if (curCount + acquireCount > count) {
  4. // 流量等级,VIP通道问题
  5. if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
  6. long currentTime;
  7. long waitInMs;
  8. currentTime = TimeUtil.currentTimeMillis();
  9. waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
  10. if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
  11. node.addWaitingRequest(currentTime + waitInMs, acquireCount);
  12. node.addOccupiedPass(acquireCount);
  13. sleep(waitInMs);
  14. // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
  15. throw new PriorityWaitException(waitInMs);
  16. }
  17. }
  18. // 流量超标,直接拒绝
  19. return false;
  20. }
  21. return true;
  22. }

(3)滑动窗口的设计

计算每秒的通过数量

  1. public long pass() {
  2. // 锁定当前时间所在的滑动窗口,为什么没有返回是因为滑动窗口的新建或者更新是需要锁资源的,当无法竞争到资源时需要等待,等待是会释放CPU控制权,而不释放锁
  3. data.currentWindow();
  4. long pass = 0;
  5. // 根据当前时间筛选未过期的滑动窗口,如何判定:当前时间-窗口开始时间>间隔时间
  6. List<MetricBucket> list = data.values();
  7. for (MetricBucket window : list) {
  8. pass += window.pass();
  9. }
  10. return pass;
  11. }

 如何根据当前时间去锁定滑动窗口(CAS + 可重入锁)

  1. public WindowWrap<T> currentWindow(long timeMillis) {
  2. if (timeMillis < 0) {
  3. return null;
  4. }
  5. // 根据当前时间计算滑动窗口下标
  6. int idx = calculateTimeIdx(timeMillis);
  7. // 根据当前时间计算滑动窗口的开始时间
  8. long windowStart = calculateWindowStart(timeMillis);
  9. while (true) {
  10. WindowWrap<T> old = array.get(idx);
  11. if (old == null) {
  12. // CAS
  13. if (array.compareAndSet(idx, null, window)) {
  14. return window;
  15. } else {
  16. Thread.yield();
  17. }
  18. } else if (windowStart == old.windowStart()) {
  19. return old;
  20. } else if (windowStart > old.windowStart()) {
  21. if (updateLock.tryLock()) {
  22. try {
  23. return resetWindowTo(old, windowStart);
  24. } finally {
  25. updateLock.unlock();
  26. }
  27. } else {
  28. Thread.yield();
  29. }
  30. } else if (windowStart < old.windowStart()) {
  31. // 基本不可能到这里
  32. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
  33. }
  34. }
  35. }

前面分析了如何统计qps,现在分析qps如何往上加

  1. public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
  2. boolean prioritized, Object... args) throws Throwable {
  3. try {
  4. // Do some checking.
  5. fireEntry(context, resourceWrapper, node, count, prioritized, args);
  6. // 计数
  7. node.increaseThreadNum();
  8. node.addPassRequest(count);
  9. if (context.getCurEntry().getOriginNode() != null) {
  10. // Add count for origin node.
  11. context.getCurEntry().getOriginNode().increaseThreadNum();
  12. context.getCurEntry().getOriginNode().addPassRequest(count);
  13. }
  14. if (resourceWrapper.getEntryType() == EntryType.IN) {
  15. // Add count for global inbound entry node for global statistics.
  16. Constants.ENTRY_NODE.increaseThreadNum();
  17. Constants.ENTRY_NODE.addPassRequest(count);
  18. }
  19. // Handle pass event with registered entry callback handlers.
  20. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
  21. handler.onPass(context, resourceWrapper, node, count, args);
  22. }
  23. } catch (PriorityWaitException ex) {
  24. // ...
  25. } catch (Throwable e) {
  26. // ...
  27. }
  28. }

3. 实战

4. FAQ

4.1 如何判断系统已经到达崩溃边缘,qps、thread、rt、load、cpu?

4.2 LongAdder和AtomicLong有什么区别?

并发场景下的number操作,都会选用java.util.concurrent.atomic包下的数据结构,比如典型的计数场景,而AtomicLong是其中一个,但是jdk1.8后Doug Lea大神又添加LongAdder和DoubleAdder,在大并发场景下性能会远高于AtomicLong。AtomicLong是以CAS保证原子性;而LongAdder是拆分锁的粒度,比如原先是多个线程争抢修改一个value的,变成多个线程争抢修改多个value。

4.3 根据当前时间筛选未过期的滑动窗口为什么不是:当前时间-窗口开始时间>窗口长度时间,而是大于间隔时间。按照样本数量为2计算,窗口长度时间为500ms,而间隔时间为1000ms?

A:pass数量是以整个窗口(包含N个滑动窗口)的长度为测量单位的,滑动窗口越多只能说明更精确。

4.4 固定窗口的两倍速率问题,滑动窗口也存在的吧,只是滑动窗口将大窗口拆细,来减少可能额外超出的量不至于太多。

5. 参考资料

Sentinel学习比较(一)SpringBoot集成alibaba-sentinel实现接口限流入门

阿里巴巴开源限流系统 Sentinel 全解析

Sentinel 集群限流设计原理

Sentinel源码解析之滑动窗口

AtomicLong和LongAdder的区别

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/274855
推荐阅读
相关标签
  

闽ICP备14008679号