赞
踩
限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
限流的对象:
计数器法是限流算法里最简单的一种算法。
定义,对于A接口来说,1分钟的访问次数不能超过100个。设置一个计数器counter,效时间为1分钟(即每分钟计数器会被重置为0),每当一个请求过来,counter就加1,如果counter的值大于100,则说明请求数过多,限制后续请求访问;
劣势:临界时间点产生突发流量,统计数量不准确。
假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求 n-1
(n 为限流请求数量),在下一分钟的 00:01 发送 n 个请求,这样在 2 秒钟内请求到达了 2n - 1
个。
设每分钟请求数量为 60 个,每秒可以处理 1 个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时 2 秒钟有 120 个请求(每秒 60 个请求),远远大于了每秒钟处理数量的阈值。
import java.util.concurrent.atomic.AtomicInteger; public class Counter { /** * 最大访问数量 */ private final int limit = 10; /** * 访问时间差 */ private final long timeout = 1000; /** * 请求时间 */ private long time; /** * 当前计数器 */ private AtomicInteger reqCount = new AtomicInteger(0); public boolean limit() { long now = System.currentTimeMillis(); if (now < time + timeout) { // 单位时间内 reqCount.addAndGet(1); return reqCount.get() <= limit; } else { // 超出单位时间 time = now; reqCount = new AtomicInteger(0); return true; } } }
滑动窗口是对计数器方式的改进,增加一个时间粒度的度量单位,把一分钟分成若干等分(6 份,每份 10 秒),在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加 1。当等分数量越大限流统计就越详细。
/** 队列id和队列的映射关系,队列里面存储的是每一次通过时候的时间戳,这样可以使得程序里有多个限流队列 */ private volatile static Map<String, List<Long>> MAP = new ConcurrentHashMap<>(); private SlideWindow() {} public static void main(String[] args) throws InterruptedException { while (true) { // 任意10秒内,只允许2次通过 System.out.println(LocalTime.now().toString() + SlideWindow.isGo("ListId", 2, 10000L)); // 睡眠0-10秒 Thread.sleep(1000 * new Random().nextInt(10)); } } /** * 滑动时间窗口限流算法 * 在指定时间窗口,指定限制次数内,是否允许通过 * * @param listId 队列id * @param count 限制次数 * @param timeWindow 时间窗口大小 * @return 是否允许通过 */ public static synchronized boolean isGo(String listId, int count, long timeWindow) { // 获取当前时间 long nowTime = System.currentTimeMillis(); // 根据队列id,取出对应的限流队列,若没有则创建 List<Long> list = MAP.computeIfAbsent(listId, k -> new LinkedList<>()); // 如果队列还没满,则允许通过,并添加当前时间戳到队列开始位置 if (list.size() < count) { list.add(0, nowTime); return true; } // 队列已满(达到限制次数),则获取队列中最早添加的时间戳 Long farTime = list.get(count - 1); // 用当前时间戳 减去 最早添加的时间戳 if (nowTime - farTime <= timeWindow) { // 若结果小于等于timeWindow,则说明在timeWindow内,通过的次数大于count // 不允许通过 return false; } else { // 若结果大于timeWindow,则说明在timeWindow内,通过的次数小于等于count // 允许通过,并删除最早添加的时间戳,将当前时间添加到队列开始位置 list.remove(count - 1); list.add(0, nowTime); return true; } }
漏桶(Leaky Bucket)算法思路:
规定固定容量的桶,有水进入,有水流出。对于流进的水我们无法估计进来的数量、速度,对于流出的水我们可以控制速度。
用白话具体说明:假设漏斗总支持并发100个最大请求,如果当前处理速率超过100,那么拒绝超出的请求
在Nginx限流中,有使用该算法的配置。
示例代码:
可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(total),另一个是水桶漏洞的大小(rate),伪代码如下:
import lombok.extern.slf4j.Slf4j; import org.junit.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; // 漏桶 限流 @Slf4j public class LeakBucketLimiter { // 计算的起始时间 private static long lastOutTime = System.currentTimeMillis(); // 流出速率 每秒 2 次 private static int leakRate = 2; // 桶的容量 private static int capacity = 2; //剩余的水量 private static AtomicInteger water = new AtomicInteger(0); //返回值说明: // false 没有被限制到 // true 被限流 public static synchronized boolean isLimit(long taskId, int turn) { // 如果是空桶,就当前时间作为漏出的时间 if (water.get() == 0) { lastOutTime = System.currentTimeMillis(); water.addAndGet(1); return false; } // 执行漏水 int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate; // 计算剩余水量 int waterLeft = water.get() - waterLeaked; water.set(Math.max(0, waterLeft)); // 重新更新leakTimeStamp lastOutTime = System.currentTimeMillis(); // 尝试加水,并且水还未满 ,放行 if ((water.get()) < capacity) { water.addAndGet(1); return false; } else { // 水满,拒绝加水, 限流 return true; } } //线程池,用于多线程模拟测试 private ExecutorService pool = Executors.newFixedThreadPool(10); @Test public void testLimit() { // 被限制的次数 AtomicInteger limited = new AtomicInteger(0); // 线程数 final int threads = 2; // 每条线程的执行轮数 只测试一秒 final int turns = 5; // 线程同步器 CountDownLatch countDownLatch = new CountDownLatch(threads); long start = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { pool.submit(() -> { try { for (int j = 0; j < turns; j++) { long taskId = Thread.currentThread().getId(); boolean intercepted = isLimit(taskId, j); if (intercepted) { // 被限制的次数累积 limited.getAndIncrement(); } Thread.sleep(200); } } catch (Exception e) { e.printStackTrace(); } //等待所有线程结束 countDownLatch.countDown(); }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } float time = (System.currentTimeMillis() - start) / 1000F; //输出统计结果 log.info("限制的次数为:" + limited.get() +",通过的次数为:" + (threads * turns - limited.get())); log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns)); log.info("运行的时长为:" + time); } }
令牌桶(Token Bucket)算法思路:
漏桶和令牌桶的比较:
开源实现场景:
Guava提供了限流工具类RateLimiter,该类基于令牌桶算法来完成限流,RateLimiter 是单机(单进程)的限流,是JVM级别的的限流,所有的令牌生成与消费都是在内存中,
示例代码:
// 令牌桶 限速 @Slf4j public class TokenBucketLimiter { // 上一次令牌发放时间 public long lastTime = System.currentTimeMillis(); // 桶的容量 public int capacity = 2; // 令牌生成速度 /s public int rate = 2; // 当前令牌数量 public AtomicInteger tokens = new AtomicInteger(0); ; //返回值说明: // false 没有被限制到 // true 被限流 public synchronized boolean isLimited(long taskId, int applyCount) { long now = System.currentTimeMillis(); //时间间隔,单位为 ms long gap = now - lastTime; //计算时间段内的令牌数 int reverse_permits = (int) (gap * rate / 1000); int all_permits = tokens.get() + reverse_permits; // 当前令牌数 tokens.set(Math.min(capacity, all_permits)); log.info("tokens {} capacity {} gap {} ", tokens, capacity, gap); if (tokens.get() < applyCount) { // 若拿不到令牌,则拒绝 // log.info("被限流了.." + taskId + ", applyCount: " + applyCount); return true; } else { // 还有令牌,领取令牌 tokens.getAndAdd( - applyCount); lastTime = now; // log.info("剩余令牌.." + tokens); return false; } } //线程池,用于多线程模拟测试 private ExecutorService pool = Executors.newFixedThreadPool(10); @Test public void testLimit() { // 被限制的次数 AtomicInteger limited = new AtomicInteger(0); // 线程数 final int threads = 2; // 每条线程的执行轮数 final int turns = 20; // 同步器 CountDownLatch countDownLatch = new CountDownLatch(threads); long start = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { pool.submit(() -> { try { for (int j = 0; j < turns; j++) { long taskId = Thread.currentThread().getId(); boolean intercepted = isLimited(taskId, 1); if (intercepted) { // 被限制的次数累积 limited.getAndIncrement(); } Thread.sleep(200); } } catch (Exception e) { e.printStackTrace(); } //等待所有线程结束 countDownLatch.countDown(); }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } float time = (System.currentTimeMillis() - start) / 1000F; //输出统计结果 log.info("限制的次数为:" + limited.get() +",通过的次数为:" + (threads * turns - limited.get())); log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns)); log.info("运行的时长为:" + time); } }
Guava的RateLimiter是一个基于令牌桶算法实现的限流器,常用于控制网站的QPS。与Semaphore不同,Semaphore控制的是某一时刻的访问量,RateLimiter控制的是某一时间间隔的访问量。
代码解析:基于guava-31.1-jre版本
测试demo
public void testLimit2() { RateLimiter rateLimiter = RateLimiter.create(5); // 被限制的次数 AtomicInteger limited = new AtomicInteger(0); // 线程数 final int threads = 5; // 每条线程的执行轮数 final int turns = 200; // 线程同步器 CountDownLatch countDownLatch = new CountDownLatch(threads); long start = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { pool.submit(() -> { try { for (int j = 0; j < turns; j++) { long taskId = Thread.currentThread().getId(); boolean isAcquire = rateLimiter.tryAcquire(); if (isAcquire) { log.info("可以运行:taskId={} j={}", taskId, j); } else { log.info("被拒绝:taskId={} j={}", taskId, j); // 被限制的次数累积 limited.getAndIncrement(); } Thread.sleep(200); } } catch (Exception e) { e.printStackTrace(); } //等待所有线程结束 countDownLatch.countDown(); }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } float time = (System.currentTimeMillis() - start) / 1000F; //输出统计结果 log.info("限制的次数为:" + limited.get() + ",通过的次数为:" + (threads * turns - limited.get())); log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns)); log.info("运行的时长为:" + time); } // 运行结果 [main] INFO com.conpany.project.junittest.SimpleTest - 限制的次数为:796,通过的次数为:204 [main] INFO com.conpany.project.junittest.SimpleTest - 限制的比例为:0.796 [main] INFO com.conpany.project.junittest.SimpleTest - 运行的时长为:40.864
https://www.cnblogs.com/duanxz/p/4123068.html 几种限流算法
https://www.cnblogs.com/crazymakercircle/p/15187184.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。