赞
踩
有句话叫什么来着?心似平原走马,易放难收!有空的时候抽点时间看看书什么的也没什么,不过,如果一周两周一直不看书,或者不学习,估计再想拿起书本就有点困难了。
当然,拖得越久,就变得更懒~,恶性循环就是这么来滴!或许某一刻,一个当头棒喝袭来,才能从懒惰中醒来!
子曰:吾日三省吾身!
尚书:知易行难!
得,废话不多说,看一看传说中的滑动时间窗格。
说实话,刚开始学习这个东东,对它的作用很明确:流控(流量控制)。比如:某个接口的对流量或者对数据的请求有限制,每秒只允许100个请求。
想着既然写文章,那就看下百科定义呗,于是搜索:滑动窗口。
原来tcp请求除了三次握手,四次挥手之外,还用了滑动窗口,提高传输的效率。
我们继续,那使用滑动时间窗怎么设置呢?简单,记录每个这一秒开始的时间点和结束的时间点,在这段时间内所有的请求进行计数,如果超过了某个数字,比如100,后面的请求就直接Fail或者降级处理,提示稍后再试试什么的。然后窗格滑动到下一秒,继续进行之前的操作。
来个图看看~
如图,记录第3s的开始时间和结束时间,中间请求不能大于100,然后到了第4s就重新开始记录第4s的开始时间和结束时间,当然请求也不能超过100。
实现其实很多种,这里贴一个简单的代码
- /**
- * desc: 简单的时间窗
- * @Author 笔下天地宽
- * @date 2022/2/22 16:46
- */
- public class EasyWindow {
- /**
- * 定一个map
- * 暂定存两个序列,0秒(0-1秒),1秒(1-2秒)。如果只使用AtomicInteger的话,
- * 来回清空处理重设比较繁琐。当然,完善的滑动时间窗还会有queue记录的
- */
- private static Map<Long,AtomicInteger> windowMap = new HashMap<Long, AtomicInteger>();
- /**
- * desc: 初始时间
- */
- private final static Long startTime = System.currentTimeMillis();
-
- /**
- * desc: 定时器线程池
- */
- private static ScheduledExecutorService executorService = newScheduledThreadPool(1);
-
- public static void main(String[] args) throws InterruptedException {
- // 窗格初始化
- windowMap.put(0L, new AtomicInteger(0));
- windowMap.put(1L, new AtomicInteger(0));
- //定时器开启,删除上一秒的,需要先执行
- switchSecond();
- TimeUnit.SECONDS.sleep(1);
-
- // 模拟
- for (int i = 0; i < 200; i++) {
- // 模拟请求数量
- int num = getNum();
- //当前时间
- Long time = (System.currentTimeMillis() - startTime )/1000;
- Long key = time & 1;
-
- // 模拟请求过来
- for (int j = 1; j <= num ; j++) {
- System.out.println("key:"+key);
- int newNum = windowMap.get(key).incrementAndGet();
- if (newNum > 100 ){
- System.out.println(newNum+"超出限制啦,数字:" + num+ " "+ i);
- }
- }
- // 休息下,不然太快结束了
- TimeUnit.MILLISECONDS.sleep(20);
- }
-
- }
-
- /**
- * 产生线程随机数(请求)
- */
- public static int getNum() {
- return ThreadLocalRandom.current().nextInt(20);
- }
-
- /**
- * desc: 窗格切换,上一个窗格初始化,我就定义个线程处理了,简单点来哈
- * @return
- */
- public static void switchSecond(){
- executorService.scheduleAtFixedRate(()->{
- // 减去1s 删掉上一秒的
- Long time = (System.currentTimeMillis() - startTime )/1000;
- Long t = (time-1 )& 1;
-
- AtomicInteger atomicInteger = windowMap.get(t);
- atomicInteger.set(0);
- System.out.println("初始化成功");
- }, 0, 1, TimeUnit.SECONDS);
- }
- }
代码没什么特别的,时间窗使用了一个Map保存,当然你如果只想要一个窗格,直接使用AtomicInteger 保存,甚至可以直接使用Int 保存都没啥,处理好逻辑就行。比较详细的滑动时间窗格里面Map里面应该还有一个队列,存储请求的一些信息等等。
诚然,这只是一个很简单的滑动时间窗,实现的并不是很完美,细心的小伙伴应该已经发现,这种直接使用时间点进行截取,不一定能保证限流吧?流量陡增陡降估计就被突破了,像这样:
这种情况,明显超过了限流的标准,生产中直接可能触发降级或者服务不可有的哟~
那么如何来优化这种滑动时间窗格呢?下面说下面几种方式吧,不过万法归一,思想上都有互通之处。
1、时间分段,窗格滑动更加频繁。
画个图展示下。
如图,滑动时间窗格每个100ms进行一次滑动,就是说,每100ms内的请求会做一次统计,统计滑动窗格内的数量,超过限制,就会限流。我上面的代码是直接每隔1s才滑动一次,相当于1s进行一次的请求统计,相比这个,明显粗犷了一些。
当然这种滑动设计还是无法避免0.99s和1.01s突然有大并发请求的限流。但是呢,0.99s和1.01s突然有大并发请求,这种只是极端状况,大部分时候请求的并发在100ms内的时间段内,不会有太大变化的。
可能有哥们说,我们系统就要处理这种极端状况!咋地?
好吧,这个先不急,咱们先来看看这种频繁滑动需要做些什么。
首先,每100ms的数据都要进行存储,本人以上代码中的Map存储的数据肯定会增加的。其次,每次滑动,都要对最初的100ms数据进行清理,以及创建一个新的时间段,存储和复杂度有个不小的提升。
可以说,是使用缓存来提高了滑动时间窗格的精确度。
你还要提高精确度?可以啊,在细分就是了,1s你分成100段,1000段,到时候1.01甚至1.001的请求数量都能给你算出来。
不过划分越细致,计算越频繁,高并发的情况下,本来就要压缩每一丢丢的性能,0.001s你就进行一次统计,这么搞是不是没那么的合适?总的来说吧,这个就是 精确度与缓存大小 两者之间的一个权衡吧,可以根据自己的需要,设计滑动时间窗格每次滑动的时间长度。
2、先看下dubbo的DefaultTPSLimiter。
dubbo这个TPS限流处理,有点滑动时间窗格的设计思想,不过处理起来是比较粗暴的(等下贴个源码)。主要的逻辑就是,根据设置的单位时间,限流数量,每次请求过来,限流数量进行减一,限流小于0的话,就不允许请求通过了。之后,过了单位时间,重置限流数量,这个和我上面的代码思想有点相似。贴个代码瞅瞅。
-
- private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>();
-
- @Override
- public boolean isAllowable(URL url, Invocation invocation) {
- // 获取限流量,key为tps,从配置中心加载
- int rate = url.getParameter(TPS_LIMIT_RATE_KEY, -1);
- //tps.interval(tps间隔,默认60*1000毫秒),配置了就使用配置,否则默认一分钟
- long interval = url.getParameter(TPS_LIMIT_INTERVAL_KEY, DEFAULT_TPS_LIMIT_INTERVAL);
- String serviceKey = url.getServiceKey();
- if (rate > 0) {
- StatItem statItem = stats.get(serviceKey);
- if (statItem == null) {
- // key,限流数量,默认时间长度
- stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval));
- statItem = stats.get(serviceKey);
- } else {
- //rate or interval has changed, rebuild
- // 有点极端哈,不过也合理,rate或者interval变动,重新判断处理
- if (statItem.getRate() != rate || statItem.getInterval() != interval) {
- stats.put(serviceKey, new StatItem(serviceKey, rate, interval));
- statItem = stats.get(serviceKey);
- }
- }
- return statItem.isAllowable();
- } else {
- // 数量不大于0,直接结束呗
- StatItem statItem = stats.get(serviceKey);
- if (statItem != null) {
- stats.remove(serviceKey);
- }
- }
-
- return true;
- }
两个比较关键的东西StatItem 和 statItem.isAllowable(),其他都是java基础,大家都懂的。
继续贴一下相关代码。
StatItem 的主要属性
- class StatItem {
- /**
- * 名称
- */
- private String name;
- /**
- * 上次重置时间,相当于滑动时间窗的起始时间
- */
- private long lastResetTime;
- /**
- * 单位时间,相当于时间窗格长度
- */
- private long interval;
- /**
- * 定位请求
- */
- private LongAdder token;
- /**
- * 速度,或者说单位时间的数量
- */
- private int rate;
-
- // ... 略
- }
下面是判断逻辑:
- /**
- * desc: 请求是否允许通过
- * @return
- */
- public boolean isAllowable() {
- long now = System.currentTimeMillis();
- // 当前时间大于最后一次时间点+窗口时间段,重置
- if (now > lastResetTime + interval) {
- token = buildLongAdder(rate);
- lastResetTime = now;
- }
- // 当前请求数量少于0,不允许呗
- if (token.sum() < 0) {
- return false;
- }
- // 允许的话,可以通过的请求数减1,
- token.decrement();
- return true;
- }
是不是感觉dubbo在限流这方面做的就是一般般?
不过这个不是人家限流的主打产品哟,组件sentinel才是人家的的王牌!看了之后,只想说一句,牛掰plus!
先说下大致流程,比如1000ms内,sentinel切分了两个时间窗格,每次请求根据当前的时间戳路由到不同的时间窗格,比如1501ms路由到下标为1的时间窗格,1499路由到下标为0的时间窗格。时间窗格中有个封装类,定位到窗格之后,获取封装类进行窗格计数修改,或者窗格重置等等。这种做法相当于把请求均匀分不到两个窗格之中,不会因为突然的并发,导致某一刻加大服务器压力而出现一系列问题。当然,时间窗格还可以细分,这个可以在配置中心直接配置,灵活变动。
来看看这个核心的代码逻辑。
- /**
- *com.alibaba.csp.sentinel.node.StatisticNode#addPassRequest
- *com.alibaba.csp.sentinel.slots.statistic.metric.occupy.OccupiableBucketLeapArray#addWaiting
- */
- @Override
- public void addWaiting(long time, int acquireCount) {
- WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
- window.value().add(MetricEvent.PASS, acquireCount);
- }
两步走,定位window,计算数据。直接了当!先看下定位window。
- /**
- * desc:入参是当前时间戳
- */
- public WindowWrap<T> currentWindow(long timeMillis) {
- if (timeMillis < 0) {
- return null;
- }
-
- // 假如idx为0,相当于下标为0
- int idx = calculateTimeIdx(timeMillis);
- // 计算当前Window的开始时间
- long windowStart = calculateWindowStart(timeMillis);
-
- while (true) {
- // 获取到窗格
- WindowWrap<T> old = array.get(idx);
- if (old == null) {
- //窗格为空,重新创建一个新窗格
- WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
- if (array.compareAndSet(idx, null, window)) {
- // 成功则返回
- return window;
- } else {
- // 失败让出时间片,等下次分配再进来(下次应该就创建好了)
- Thread.yield();
- }
- } else if (windowStart == old.windowStart()) {
- //windowStart 是根据当前时间戳计算的下标为0窗格开始时间 old是之前记录的下标为0的窗格
- // 当前时间计算的窗格开始时间 和 之前记录的窗格开始时间相同,说明是同一个窗格
- return old;
- } else if (windowStart > old.windowStart()) {
- // windowStart 计算的下标为0窗格开始时间大于之前 记录的下标为0的窗格的开始时间,说明已经之前的下标为0的窗格已经过时,需要重置里面的开始时间和统计数量等
- // 加锁
- if (updateLock.tryLock()) {
- try {
- // 重置
- return resetWindowTo(old, windowStart);
- } finally {
- // 解锁
- updateLock.unlock();
- }
- } else {
- Thread.yield();
- }
- } else if (windowStart < old.windowStart()) {
- // 这种是,,时间倒退,相当于服务器时钟重置,从3点走到了2点,new一个主要是为了防止错误,统计什么的根本不会用
- return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
- }
- }
- }
里面有两个小段,calculateTimeIdx和calculateWindowStart。贴下代码:
- /**
- * desc: 计算第几号窗格
- */
- private int calculateTimeIdx(long timeMillis) {
- // 当前时间戳 按500平分,然后求余,定位到属于哪个window的下标
- long timeId = timeMillis / windowLengthInMs;
- // Calculate current index so we can map the timestamp to the leap array.
- return (int)(timeId % array.length());
- }
-
- //windowLengthInMs 传入的是2,源头 rollingCounterInSecond对象中可以看到入参
- private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
- IntervalProperty.INTERVAL);
-
- public static volatile int SAMPLE_COUNT = 2;
- public static final int DEFAULT_WINDOW_INTERVAL_MS = 1000;
- /**
- * intervalInMs:1000
- * sampleCount: 2
- * windowLengthInMs: 500 ms
- */
- public LeapArray(int sampleCount, int intervalInMs) {
- AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
- AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
- AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
-
- this.windowLengthInMs = intervalInMs / sampleCount;
- this.intervalInMs = intervalInMs;
- this.sampleCount = sampleCount;
-
- this.array = new AtomicReferenceArray<>(sampleCount);
- }
- /*
- * 当前窗格的开始时间。当前时间除以windowLengthInMs求余,减去余数就是开始的整数
- */
- protected long calculateWindowStart(long timeMillis) {
- return timeMillis - timeMillis % windowLengthInMs;
- }
获取到当前window之后,就对数据进行更改
- public void addPass(int count) {
- WindowWrap<MetricBucket> wrap = data.currentWindow();
- wrap.value().addPass(count);
- }
而MetricBucket里面又有一个数组: private final LongAdder[] counters;数组中的不同下标又记录着pass,success等等等等,看下这个枚举。
- public enum MetricEvent {
- PASS,
- BLOCK,
- EXCEPTION,
- SUCCESS,
- RT,
- OCCUPIED_PASS
- }
不仅做了流量统计,还把所有的请求状态都进行了统计,小生道一声佩服!
这个设计最精妙的部分就是,两个时间窗格均匀分布的设计。这种设计使得滑动时间窗格哪怕滑动不是那么频繁,仍然能很好的做到限流的作用。其次,就是设计的适应性特别广,不仅仅是要做到限流,而且还可以进行统计,配置等等。
这一段精简的代码,直接在页面延伸出多个功能,而且有些东东看着贼高大上!大家有兴趣可以下载下阿里的sentinel的源码瞅瞅,sentinel的源码看起来可比Spring源码看起来舒服多了,没那么烧脑,而且直接从滑动时间窗格这里进行切入,绝对成就感满满!
好啦,时间窗格的话,就先说到这里~
no sacrifice no victory~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。