/** * 最简单的计数器限流算法 */ public class Counter { public long timeStamp = System.currentTimeMillis(); // 当前时间 public int reqCount = 0; // 初始化计数器 public final int limit = 100; // 时间窗口内最大请求数 public final long interval = 1000 * 60; // 时间窗口ms public boolean limit() { long now = System.currentTimeMillis(); if (now < timeStamp + interval) { // 在时间窗口内 reqCount++; // 判断当前时间窗口内是否超过最大请求控制数 return reqCount <= limit; } else { timeStamp = now; // 超时后重置 reqCount = 1; return true; } } }
但是这种方法有一个问题:无法计算当前时间往前推的一段时间内的请求数,只能计算某个时间段,即timeStamp(/now) + interval这个时间段的值。统计的精度太低。
在上图中,整个红色的矩形框表示一个时间窗口,在我们的例子中,一个时间窗口就是一分钟。然后我们将时间窗口进行划分,比如图中,我们就将滑动窗口划成了6格,所以每格代表的是10秒钟。每过10秒钟,我们的时间窗口就会往右滑动一格。每一个格子都有自己独立的计数器counter,比如当一个请求 在0:35秒的时候到达,那么0:30~0:39对应的counter就会加1。计数器算法其实就是滑动窗口算法。只是它没有对时间窗口做进一步地划分,所以只有1格。由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。
/**2 * 滑动时间窗口限流实现 3 * 假设某个服务最多只能每秒钟处理100个请求,我们可以设置一个1秒钟的滑动时间窗口, 4 * 窗口中有10个格子,每个格子100毫秒,每100毫秒移动一次,每次移动都需要记录当前服务请求的次数*/ public class SlidingTimeWindow { //服务访问次数,可以放在Redis中,实现分布式系统的访问计数8 Long counter = 0L; //使用LinkedList来记录滑动窗口的10个格子。 LinkedList<Long> slots = new LinkedList<Long>(); public static void main(String[] args) throws InterruptedException { SlidingTimeWindow timeWindow = new SlidingTimeWindow(); new Thread(new Runnable() { @Override public void run() { try { timeWindow.doCheck(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); while (true){ //TODO 判断限流标记 timeWindow.counter++; Thread.sleep(new Random().nextInt(15)); } }
NodeSelectorSlot --> ClusterBuilderSlot --> LOgSlot --> StatisticSlot 。。。。
其调用逻辑是:fireEntry(…) 先执行后面的限流、垄断降级的Slot
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Do some checking. fireEntry(context, resourceWrapper, node, count, prioritized, args); // Request passed, add thread count and pass count. // 如果请求都通过了,增加线程数和通过的计数 node.increaseThreadNum(); node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) { // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // Add block count. node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { // Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } }
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
关注new OccupiableBucketLeapArray(sampleCount, intervalInMs); intervalInMs是时间间隔,而sampleCount则是滑动时间窗的格数
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;//窗口的长度
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
public void addRtAndSuccess(long rt, int successCount) {
public void addSuccess(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } /*long timeId = timeMillis / windowLengthInMs; return (int)(timeId % array.length()); 计算出当前时间应该落在那个滑动窗口下了,比如分成了多块的窗口,是1还是2 。。。*/ int idx = calculateTimeIdx(timeMillis); //计算是位于当前桶的哪个位置timeMillis - timeMillis % windowLengthInMs; // Calculate current bucket start time. long windowStart = calculateWindowStart(timeMillis); /* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. */ while (true) { //根据坐标先拿出一个时间窗口 WindowWrap<T> old = array.get(idx); if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update * * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; } else if (windowStart > old.windowStart()) { /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss. */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
超时了,那么重置resetWindowTo(old, windowStart)
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
return passLocalCheck(rule, context, node, acquireCount, prioritized);
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
rule.getRater().canPass(selectedNode, acquireCount, prioritized);有三种策略,分别对应的就是快速失败,预热和排队等待。
avgUsedTokens获取到qps,然后curCount + acquireCount > count这一步做判断,是否需要限流
@Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) { if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }
avgUsedTokens(node) 方法是获取到统计数据的
private int avgUsedTokens(Node node) {
if (node == null) {
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
node.passQps() 这里用到了统计数据!!!
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
public long pass() {
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
return pass;
