赞
踩
常见的限流算法
限流是对某一时间窗口内的请求数进行限制,保持系统的可用性和稳定性,防止因流量暴增而导致的系统运行缓慢或宕机。
常见的限流算法有三种:
「原理:」
计数器限流(固定窗口)
Code:
- import java.time.LocalTime;
- import java.util.concurrent.atomic.AtomicInteger;
-
- /**
- * @ClassName: FixedWindow
- * @Description: 固定窗口算法
- **/
- public class FixedWindow {
-
- /**
- * 阈值
- */
- private static Integer QPS = 2;
-
- /**
- * 时间窗口(毫秒)
- */
- private static long TIME_WINDOWS = 1000;
-
- /**
- * 计数器
- */
- private static AtomicInteger REQ_COUNT = new AtomicInteger();
-
- /**
- * 窗口开始时间
- */
- private static long START_TIME = System.currentTimeMillis();
-
- public synchronized static boolean tryAcquire() {
- //超时窗口
- if ((System.currentTimeMillis() - START_TIME) > TIME_WINDOWS) {
- REQ_COUNT.set(0);
- START_TIME = System.currentTimeMillis();
- }
- return REQ_COUNT.incrementAndGet() <= QPS;
- }
-
- public static void main(String[] args) throws InterruptedException {
- for (int i = 0; i < 10; i++) {
- Thread.sleep(250);
- LocalTime now = LocalTime.now();
- if (!tryAcquire()) {
- System.out.println(now + " 被限流");
- } else {
- System.out.println(now + " 做点什么");
- }
- }
- }
- }
虽然我们限制了 QPS 为 2,但是当遇到时间窗口的临界突变时,如 1s 中的后 500 ms 和第 2s 的前 500ms 时,虽然是加起来是 1s 时间,却可以被请求 4 次。
计数器限流(固定窗口)
滑动窗口算法是对固定窗口算法的改进
滑动窗口
将单位时间划分为多个区间,一般都是均分为多个小的时间段;
每一个区间内都有一个计数器,有一个请求落在该区间内,则该区间内的计数器就会加一;
每过一个时间段,时间窗口就会往右滑动一格,抛弃最老的一个区间,并纳入新的一个区间;
计算整个时间窗口内的请求总数时会累加所有的时间片段内的计数器,计数总和超过了限制数量,则本窗口内所有的请求都被丢弃。
滑动窗口
上图的示例中,每 500ms 滑动一次窗口,可以发现窗口滑动的间隔越短,时间窗口的临界突变问题发生的概率也就越小,不过只要有时间窗口的存在,还是有可能发生时间窗口的临界突变问题
【CODE】
-
-
- import lombok.Data;
-
- import java.time.LocalTime;
- import java.util.concurrent.atomic.AtomicInteger;
-
- /**
- * @ClassName: SlidingWindow
- * @Description: 滑动窗口
- **/
- public class SlidingWindow {
- /**
- * 阈值
- */
- private int qps = 2;
- /**
- * 时间窗口总大小(毫秒)
- */
- private long windowSize = 1000;
- /**
- * 多少个子窗口
- */
- private Integer windowCount = 10;
- /**
- * 窗口列表
- */
- private WindowInfo[] windowArray = new WindowInfo[windowCount];
-
- public SlidingWindow(int qps) {
- this.qps = qps;
- long currentTimeMillis = System.currentTimeMillis();
- for (int i = 0; i < windowArray.length; i++) {
- windowArray[i] = new WindowInfo(currentTimeMillis, new AtomicInteger(0));
- }
- }
-
- /**
- * 1. 计算当前时间窗口
- * 2. 更新当前窗口计数 & 重置过期窗口计数
- * 3. 当前 QPS 是否超过限制
- * @return 是否被限流
- */
- public synchronized boolean tryAcquire() {
- long currentTimeMillis = System.currentTimeMillis();
- // 1. 计算当前时间窗口
- int currentIndex = (int)(currentTimeMillis % windowSize / (windowSize / windowCount));
- // 2. 更新当前窗口计数 & 重置过期窗口计数
- int sum = 0;
- for (int i = 0; i < windowArray.length; i++) {
- WindowInfo windowInfo = windowArray[i];
- if ((currentTimeMillis - windowInfo.getTime()) > windowSize) {
- windowInfo.getNumber().set(0);
- windowInfo.setTime(currentTimeMillis);
- }
- if (currentIndex == i && windowInfo.getNumber().get() < qps) {
- windowInfo.getNumber().incrementAndGet();
- }
- sum = sum + windowInfo.getNumber().get();
- }
- // 3. 当前 QPS 是否超过限制
- return sum <= qps;
- }
-
- @Data
- private class WindowInfo {
- // 窗口开始时间
- private Long time;
- // 计数器
- private AtomicInteger number;
-
- public WindowInfo(long time, AtomicInteger number) {
- this.time = time;
- this.number = number;
- }
- // get...set...
- }
-
- public static void main(String[] args) throws InterruptedException {
- int qps = 2, count = 20, sleep = 300, success = count * sleep / 1000 * qps;
- System.out.println(String.format("当前QPS限制为:%d,当前测试次数:%d,间隔:%dms,预计成功次数:%d", qps, count, sleep, success));
- success = 0;
- SlidingWindow myRateLimiter = new SlidingWindow(qps);
- for (int i = 0; i < count; i++) {
- Thread.sleep(sleep);
- if (myRateLimiter.tryAcquire()) {
- success++;
- if (success % qps == 0) {
- System.out.println(LocalTime.now() + ": success, ");
- } else {
- System.out.print(LocalTime.now() + ": success, ");
- }
- } else {
- System.out.println(LocalTime.now() + ": fail");
- }
- }
- System.out.println();
- System.out.println("实际测试成功次数:" + success);
- }
- }
漏桶算法
漏桶算法思路很简单,我们把水比作是请求,漏桶比作是系统处理能力极限,水先进入到漏桶里,漏桶里的水按一定速率流出,当流出的速率小于流入的速率时,由于漏桶容量有限,后续进入的水直接溢出(拒绝请求),以此实现限流。
❝
由介绍可以知道,漏桶模式中的消费处理总是能以恒定的速度进行,可以很好的保护自身系统不被突如其来的流量冲垮;但是这也是漏桶模式的缺点,假设 QPS 为 2,同时 2 个请求进来,2 个请求并不能同时进行处理响应,因为每 1s / 2= 500ms 只能处理一个请求
- import java.time.LocalTime;
- import java.util.LinkedList;
- import java.util.Queue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
-
- /**
- * @ClassName: LeakyBucket
- * @Description: 漏桶算法
- **/
- public class LeakyBucket {
-
- /**
- * 水桶的大小
- */
- private final int bucket;
-
- /**
- * qps,水露出的速度
- */
- private int qps;
-
- /**
- * 当前水量
- */
- private long water;
-
- private long timeStamp = System.currentTimeMillis();
-
- public LeakyBucket(int bucket, int qps) {
- this.bucket = bucket;
- this.qps = qps;
- }
-
- /**
- * 桶是否已经满了
- * @return true未满
- */
- public boolean tryAcquire(){
- //1.计算剩余水量
- long now = System.currentTimeMillis();
- long timeGap = (now - timeStamp)/1000;
- water = Math.max(0,water-timeGap*qps);
- timeStamp = now;
-
- // 如果未满,放行
- if(water< bucket){
- water += 1;
- return true;
- }
- return false;
- }
-
- public static void main(String[] args) throws InterruptedException {
- ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
- ExecutorService singleThread = Executors.newSingleThreadExecutor();
-
- LeakyBucket rateLimiter = new LeakyBucket(20, 2);
- // 存储流量的队列
- Queue<Integer> queue = new LinkedList<>();
- // 模拟请求 不确定速率注水
- singleThread.execute(() -> {
- int count = 0;
- while (true) {
- count++;
- boolean flag = rateLimiter.tryAcquire();
- if (flag) {
- queue.offer(count);
- System.out.println(count + "--------流量被放行--------");
- } else {
- System.out.println(count + "流量被限制");
- }
- try {
- Thread.sleep((long) (Math.random() * 1000));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
-
- // 模拟处理请求 固定速率漏水
- scheduledExecutorService.scheduleAtFixedRate(() -> {
- if (!queue.isEmpty()) {
- System.out.println(queue.poll() + "被处理");
- }
- }, 0, 100, TimeUnit.MILLISECONDS);
-
- // 保证主线程不会退出
- while (true) {
- Thread.sleep(10000);
- }
- }
-
- }
令牌桶算法的原理也比较简单,我们可以理解成医院的挂号看病,只有拿到号以后才可以进行诊病。
系统会维护一个令牌(token)桶,以一个恒定的速度往桶里放入令牌(token),这时如果有请求进来想要被处理,则需要先从桶里获取一个令牌(token),当桶里没有令牌(token)可取时,则该请求将被拒绝服务。令牌桶算法通过控制桶的容量、发放令牌的速率,来达到对请求的限制。
原理:
思考:
- // 使用Google封装的令牌桶RateLimiter
-
- /**
- * 代码中限制 QPS 为 2,也就是每隔 500ms 生成一个令牌,但是程序每隔 250ms 获取一次令牌,所以两次获取中只有一次会成功。
- */
- public static void main(String[] args) throws InterruptedException {
- RateLimiter rateLimiter = RateLimiter.create(2);
-
- for (int i = 0; i < 10; i++) {
- String time = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
- System.out.println(time + ":" + rateLimiter.tryAcquire());
- Thread.sleep(250);
- }
- }
限流有很多种方法实现:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。