当前位置:   article > 正文

JAVA面试题分享四百六十九:Sentinel底层滑动时间窗限流算法怎么实现的?_时间窗算法数据结构

时间窗算法数据结构

目录

美团面试:Sentinel底层滑动时间窗限流算法怎么实现的?

滑动窗口的核心数据结构

ArrayMetric 源码

LeapArray  源码

MetricBucket  源码

WindowWrap  源码

滑动窗口 统计 源码实现

如何 定位 Bucket?


美团面试:Sentinel底层滑动时间窗限流算法怎么实现的?

Sentinel是一个系统性的高可用保障工具,提供了限流、降级、熔断等一系列的能力,基于这些能力做了语意化概念抽象,这些概念对于理解实现机制特别有帮助,所以这里也复述一下。

流量控制有以下几个角度:

  • 资源的调用关系,例如资源的调用链路,资源和资源之间的关系;

  • 运行指标,例如 QPS、线程池、系统负载等;

  • 控制的效果,例如直接限流、冷启动、排队等。

Sentinel 的设计理念是让您自由选择控制的角度,并进行灵活组合,从而达到想要的效果。

Sentinel使用滑动时间窗口算法来实现流量控制,流量统计。

滑动时间窗算法的核心思想是将一段时间划分为多个时间窗口,并在每个时间窗口内对请求进行计数,以确定是否允许继续请求。

以下是Sentinel底层滑动时间窗口限流算法的简要实现步骤:

  • 时间窗口划分:将整个时间范围划分为多个固定大小的时间窗口(例如1秒一个窗口)。这些时间窗口会随着时间的流逝依次滑动。

  • 计数器:为每个时间窗口维护一个计数器,用于记录在该时间窗口内的请求数。

  • 请求计数:当有请求到来时,将其计入当前时间窗口的计数器中。

  • 滑动时间窗口:定期滑动时间窗口,将过期的时间窗口删除,并创建新的时间窗口。这样可以保持时间窗口的滚动。

  • 限流判断:当有请求到来时,Sentinel会检查当前时间窗口内的请求数是否超过了预设的限制阈值。如果超过了限制阈值,请求将被拒绝或执行降级策略。

  • 计数重置:定期重置过期时间窗口的计数器,以确保计数器不会无限增长。

这种滑动时间窗口算法允许在一段时间内平滑控制请求的流量,而不是仅基于瞬时请求速率进行限流。

它考虑了请求的历史分布,更适用于应对突发流量和周期性负载的情况。

我们知道,Sentinel可以用来帮助我们实现流量控制、服务降级、服务熔断,而这些功能的实现都离不开接口被调用的实时指标数据,本文便是关于 Sentinel 是如何实现指标数据统计的。

上图中的右上角就是滑动窗口的示意图,是 StatisticSlot 的具体实现。

StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。

Sentinel 是基于滑动窗口实现的实时指标数据收集统计,底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。

图片

滑动窗口的核心数据结构

  • ArrayMetric:滑动窗口核心实现类。

  • LeapArray:滑动窗口顶层数据结构,包含一个一个的窗口数据。

  • WindowWrap:每一个滑动窗口的包装类,其内部的数据结构用 MetricBucket 表示。

  • MetricBucket:指标桶,例如通过数量、阻塞数量、异常数量、成功数量、响应时间,已通过未来配额(抢占下一个滑动窗口的数量)。

  • MetricEvent:指标类型,例如通过数量、阻塞数量、异常数量、成功数量、响应时间等。

ArrayMetric 源码

滑动窗口的入口类为 ArrayMetric,实现了 Metric 指标收集核心接口,该接口主要定义一个滑动窗口中成功的数量、异常数量、阻塞数量,TPS、响应时间等数据。

  1. public class ArrayMetric implements Metric {
  2.     private final LeapArray<MetricBucket> data;
  3.     public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
  4.         if (enableOccupy) {
  5.             this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
  6.         } else {
  7.             this.data = new BucketLeapArray(sampleCount, intervalInMs);
  8.         }
  9.     }
  • int intervalInMs:表示一个采集的时间间隔,即滑动窗口的总时间,例如 1 分钟。

  • int sampleCount:在一个采集间隔中抽样的个数,默认为 2,即一个采集间隔中会包含两个相等的区间,一个区间就是一个窗口。

  • boolean enableOccupy:是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量。

LeapArray  源码

LeapArray 用来承载滑动窗口,即成员变量 array,类型为 AtomicReferenceArray<WindowWrap<T>>,保证创建窗口的原子性(CAS)。

  1. public abstract class LeapArray<T> {
  2.     //每一个窗口的时间间隔,单位为毫秒
  3.     protected int windowLengthInMs;
  4.     //抽样个数,就一个统计时间间隔中包含的滑动窗口个数
  5.     protected int sampleCount;
  6.     //一个统计的时间间隔
  7.     protected int intervalInMs;
  8.     //滑动窗口的数组,滑动窗口类型为 WindowWrap<MetricBucket>
  9.     protected final AtomicReferenceArray<WindowWrap<T>> array;
  10.     private final ReentrantLock updateLock = new ReentrantLock();
  11.     
  12.     public LeapArray(int sampleCount, int intervalInMs) {
  13.         this.windowLengthInMs = intervalInMs / sampleCount;
  14.         this.intervalInMs = intervalInMs;
  15.         this.sampleCount = sampleCount;
  16.         this.array = new AtomicReferenceArray<>(sampleCount);
  17.     }

MetricBucket  源码

Sentinel 使用 MetricBucket 统计一个窗口时间内的各项指标数据,这些指标数据包括请求总数、成功总数、异常总数、总耗时、最小耗时、最大耗时等,而一个 Bucket 可以是记录一秒内的数据,也可以是 10 毫秒内的数据,这个时间长度称为窗口时间。

  1. public class MetricBucket {
  2.     /**
  3.      * 存储各事件的计数,比如异常总数、请求总数等
  4.      */
  5.     private final LongAdder[] counters;
  6.     /**
  7.      * 这段事件内的最小耗时
  8.      */
  9.     private volatile long minRt;
  10. }

Bucket 记录一段时间内的各项指标数据用的是一个 LongAdder 数组,数组的每个元素分别记录一个时间窗口内的请求总数、异常数、总耗时。也就是说:MetricBucket 包含一个 LongAdder 数组,数组的每个元素代表一类 MetricEvent。LongAdder 保证了数据修改的原子性,并且性能比 AtomicLong 表现更好。

  1. public enum MetricEvent {
  2.     PASS,
  3.     BLOCK,
  4.     EXCEPTION,
  5.     SUCCESS,
  6.     RT,
  7.     OCCUPIED_PASS
  8. }

当需要获取 Bucket 记录总的成功请求数或者异常总数、总的请求处理耗时,可根据事件类型 (MetricEvent) 从 Bucket 的 LongAdder 数组中获取对应的 LongAdder,并调用 sum 方法获取总数。

  1. public long get(MetricEvent event) {
  2.     return counters[event.ordinal()].sum();
  3. }

当需要 Bucket 记录一个成功请求或者一个异常请求、处理请求的耗时,可根据事件类型(MetricEvent)从 LongAdder 数组中获取对应的 LongAdder,并调用其 add 方法。

  1. public void add(MetricEvent event, long n) {
  2.      counters[event.ordinal()].add(n);
  3. }

WindowWrap  源码

因为 Bucket 自身并不保存时间窗口信息,所以 Sentinel 给 Bucket 加了一个包装类 WindowWrap。Bucket 用于统计各项指标数据,WindowWrap 用于记录 Bucket 的时间窗口信息(窗口的开始时间、窗口的大小),WindowWrap 数组就是一个滑动窗口。

  1. public class WindowWrap<T> {
  2.     /**
  3.      * 单个窗口的时间长度(毫秒)
  4.      */
  5.     private final long windowLengthInMs;
  6.     /**
  7.      * 窗口的开始时间戳(毫秒)
  8.      */
  9.     private long windowStart;
  10.     /**
  11.      * 统计数据
  12.      */
  13.     private T value;
  14. }

总的来说:

  • WindowWrap 用于包装 Bucket,随着 Bucket 一起创建。

  • WindowWrap 数组实现滑动窗口,Bucket 只负责统计各项指标数据,WindowWrap 用于记录 Bucket 的时间窗口信息。

  • 定位 Bucket 实际上是定位 WindowWrap,拿到 WindowWrap 就能拿到 Bucket。

滑动窗口 统计 源码实现

如果我们希望能够知道某个接口的每秒处理成功请求数(成功 QPS)、每秒处理失败请求数(失败 QPS),以及处理每个成功请求的平均耗时(avg RT),注意这里我们只需要控制 Bucket 统计一秒钟的指标数据即可,但如何才能确保 Bucket 存储的就是精确到 1 秒内的数据呢?

Sentinel 是这样实现的:定义一个 Bucket 数组,根据时间戳来定位到数组的下标。

由于只需要保存最近一分钟的数据。那么 Bucket 数组的大小就可以设置为 60,每个 Bucket 的 windowLengthInMs(窗口时间)大小就是 1 秒。

内存资源是有限的,而这个数组可以循环使用,并且永远只保存最近 1 分钟的数据,这样可以避免频繁的创建 Bucket,减少内存资源的占用。

那如何定位 Bucket 呢?我们只需要将当前时间戳减去毫秒部分,得到当前的秒数,再将得到的秒数与数组长度取余数,就能得到当前时间窗口的 Bucket 在数组中的位置(索引)。

calculateTimeIdx 方法中,取余数就是实现循环利用数组。

如果想要获取连续的一分钟的 Bucket 数据,就不能简单的从头开始遍历数组,而是指定一个开始时间和结束时间,从开始时间戳开始计算 Bucket 存放在数组中的下标,然后循环每次将开始时间戳加上 1 秒,直到开始时间等于结束时间。

  1. private int calculateTimeIdx(long timeMillis) {
  2.     long timeId = timeMillis / windowLengthInMs;
  3.     return (int)(timeId % array.length());
  4. }

由于循环使用的问题,当前时间戳与一分钟之前的时间戳和一分钟之后的时间戳都会映射到数组中的同一个 Bucket,

因此,必须要能够判断取得的 Bucket 是否是统计当前时间窗口内的指标数据,这便要数组每个元素都存储 Bucket 时间窗口的开始时间戳。

比如当前时间戳是 1577017626812,Bucket 统计一秒的数据,将时间戳的毫秒部分全部替换为 0,就能得到 Bucket 时间窗口的开始时间戳为 1577017626000。

  1. //计算时间窗口开始时间戳
  2. protected long calculateWindowStart(long timeMillis) {
  3.     return timeMillis - timeMillis % windowLengthInMs;
  4. }
  5. //判断时间戳是否在当前时间窗口内
  6. public boolean isTimeInWindow(long timeMillis) {
  7.     return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
  8. }

如何 定位 Bucket?

通过时间戳 定位 Bucket的。

当接收到一个请求时,可根据接收到请求的时间戳计算出一个数组索引,从滑动窗口(WindowWrap 数组)中获取一个 WindowWrap,从而获取 WindowWrap 包装的 Bucket,调用 Bucket 的 add 方法记录相应的事件。

  1. /**
  2.  * 根据时间戳获取 bucket
  3.  * @param timeMillis 时间戳(毫秒)
  4.  * @return 如果时间有效,则在提供的时间戳处显示当前存储桶项;如果时间无效,则为空
  5.  */
  6. public WindowWrap<T> currentWindow(long timeMillis) {
  7.     if (timeMillis < 0) {
  8.         return null;
  9.     }
  10.     // 获取时间戳映射到的数组索引
  11.     int idx = calculateTimeIdx(timeMillis);
  12.     // 计算 bucket 时间窗口的开始时间
  13.     long windowStart = calculateWindowStart(timeMillis);
  14.     // 从数组中死循环查找当前的时间窗口,因为可能多个线程都在获取当前时间窗口
  15.     while (true) {
  16.         WindowWrap<T> old = array.get(idx);
  17.         // 一般是项目启动时,时间未到达一个周期,数组还没有存储满,没有到复用阶段,所以数组元素可能为空
  18.         if (old == null) {
  19.             // 创建新的 bucket,并创建一个 bucket 包装器
  20.             WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
  21.             // cas 写入,确保线程安全,期望数组下标的元素是空的,否则就不写入,而是复用
  22.             if (array.compareAndSet(idx, null, window)) {
  23.                 return window;
  24.             } else {
  25.                 Thread.yield();
  26.             }
  27.         }
  28.         // 如果 WindowWrap 的 windowStart 正好是当前时间戳计算出的时间窗口的开始时间,则就是我们想要的 bucket
  29.         else if (windowStart == old.windowStart()) {
  30.             return old;
  31.         }
  32.         // 复用旧的 bucket
  33.         else if (windowStart > old.windowStart()) {
  34.             if (updateLock.tryLock()) {
  35.                 try {
  36.                     // 重置 bucket,并指定 bucket 的新时间窗口的开始时间
  37.                     return resetWindowTo(old, windowStart);
  38.                 } finally {
  39.                     updateLock.unlock();
  40.                 }
  41.             } else {
  42.                 Thread.yield();
  43.             }
  44.         }
  45.         // 计算出来的当前 bucket 时间窗口的开始时间比数组当前存储的 bucket 的时间窗口开始时间还小,
  46.         // 直接返回一个空的 bucket 就行
  47.         else if (windowStart < old.windowStart()) {
  48.             return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
  49.         }
  50.     }
  51. }

上面代码的实现是:通过当前时间戳计算出当前时间窗口的 (new) Bucket 在数组中的索引,通过索引从数组中取得 (old) Bucket;并计算 (new) Bucket 时间窗口的开始时间,与 (old) Bucket 时间窗口的开始时间作对比。

  1. 如果旧的 bucket 不存在,那么我们在 windowStart 处创建一个新的 bucket,然后尝试通过 CAS 操作更新环形数组。只有一个线程可以成功更新,保证原子性。

  2. 如果当前 windowStart 等于旧桶的开始时间戳,表示时间在桶内,所以直接返回桶。

  3. 如果旧桶的开始时间戳落后于所提供的时间,这意味着桶已弃用,我们可以复用该桶,并将桶重置为当前的 windowStart。注意重置和清理操作很难是原子的,所以我们需要一个更新锁来保证桶更新的正确性。只有当 bucket 已弃用才会上锁,所以在大多数情况下它不会导致性能损失。

  4. 不应该通过这里,因为提供的时间已经落后了,一般是时钟回拨导致的。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/544037
推荐阅读
相关标签
  

闽ICP备14008679号