赞
踩
固定计数器算法----
滑动窗口计数器算法
漏桶
固定计数器算法 改进版本 滑动窗口计数器
服务限流概念:限定固定请求数量 访问服务器端 保护服务接口
限流框架:1.nginx限流 2.谷歌 Guava限流 3.阿里巴巴 Sentinel限流 4.Redis+lua实现限流
本质限流算法底层都是基于 漏桶、令牌桶、滑动窗口/固定计数器实现。
谷歌 Guava限流 提供Api RateLimiter
补充概念:qps为1 表达意思就是 每s最多只能够处理1个请求
1.目前常见的算法是漏桶算法和令牌算法
2.关于限流 从线程个数(jdk1.5 Semaphore(信号量))和RateLimiter速率(guava)
Semaphore:从线程个数限流 底层采用AQS实现
RateLimiter:从速率限流 基于谷歌guava框架实现 基于QPS限流
令牌桶算法相比漏桶算法区别,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理;
漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据
3.应用场景:
漏桶算法:限制服务器端同时处理线程数量 基于并发数限流 当调用该api的线程数达到阈值的时候,进行限流------限制api接口 最大线程数 限制 最多同时只会有2
个线程运行,如果在我们漏桶中没有足够的线程运行 就会被限流—保护服务器端
创建线程数。
令牌桶算法:限制服务器 qps 当调用该api的QPS达到阈值的时候,进行限流
限制 该接口每s 能够访问多少次
阿里巴巴 Sentinel 后台设置限流 基于线程数 (漏桶算法)或者QPS(令牌桶算法)
令牌桶的算法:
1.创建一个令牌桶 它的容量假设可以存放2个token;
2.以恒定的速度,生成令牌 如果令牌桶满了,则直接废弃 如果令牌桶
没有满 则存入到令牌桶中;
3.客户端请求过来,从令牌桶中获取令牌,如果能够获取到令牌就可以直接业务
代码,如果获取不到令牌就拒绝该请求。
令牌桶算法 底层是如何实现呢?
\1. 创建一个令牌桶 假设限制存放token数量 是为2;
\2. 单独创建一个线程每隔1秒 向 生成2个令牌,存放到
令牌桶中。在存放到令牌桶中之前判断下,如果令牌桶满了
则该令牌就直接丢弃,如果令牌桶没有满 则将该令牌存入到令牌桶中。
客户端qps 每秒=2,该接口每s 只能够支持2请求访问,
超过的话就会被限流。
\1. 客户端访问接口时,就会先去令牌桶取出该token,如果成功
取出该token,则该token是需要从令牌桶中移除。—没有满
2.单独有一个线程每隔1秒 向令牌桶中 存入token
\3. 如果客户端向令牌桶中 没有取到token,则认为 被限流呢。
Go c# 里面也是有 接口限流算法
这些Api底层都是基于漏桶、令牌桶、滑动窗口实现。
\1. 先学习微服务如何实现Api接口限流
\2. 学会如何封装Api接口限流框架
\3. 学会这些漏桶、令牌桶、滑动窗口实
public class MayiktRateLimiter { private LinkedBlockingDeque<String> tokenBlockingDeque; public MayiktRateLimiter(int permitsPerSecond) { // 创建令牌桶 设定固定的容量 tokenBlockingDeque = new LinkedBlockingDeque<String>(permitsPerSecond); // 初始化 init(permitsPerSecond); start(); } public static MayiktRateLimiter create(int permitsPerSecond) { return new MayiktRateLimiter(permitsPerSecond); } public void init(int permitsPerSecond) { for (int i = 0; i < permitsPerSecond; i++) { tokenBlockingDeque.offer("#"); } } /** * 从队列中获取token * * @return */ public boolean tryAcquire() { return tokenBlockingDeque.poll() == null ? false : true; } /** * 单独开启一个向令牌桶投递 token */ private void start() { /** * 每隔1s时间 向队列中投递固定容量大小的token */ Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() { @Override public void run() { addToken(); } }, 1, 1, TimeUnit.SECONDS); } public void addToken() { /** * 向队列投递token */ tokenBlockingDeque.offer("#"); } public static void main(String[] args) { MayiktRateLimiter mayiktRateLimiter = MayiktRateLimiter.create(1); MayiktRateLimiter mayiktRateLimiter2 = MayiktRateLimiter.create(2); System.out.println(mayiktRateLimiter2.tryAcquire()); System.out.println(mayiktRateLimiter2.tryAcquire()); try { Thread.sleep(1000); } catch (Exception e) { } System.out.println(mayiktRateLimiter2.tryAcquire()); } }
漏桶(Leaky Bucket)算法:请求先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大或者漏桶已满会直接溢,然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。
直接基于jdk1.5提供的 Semaphore(信号量) 实现 基于AQS实现
如果有同学不了解aqs的话 记得 看下第九期juc并发编程
1.测试代码
public class MayiktSemaphore { private Semaphore semaphore; public MayiktSemaphore(int permits) { semaphore = new Semaphore(permits); } public boolean tryAcquire() { return semaphore.tryAcquire(); } public void release() { semaphore.release(); } public static void main(String[] args) { MayiktSemaphore mayiktSemaphore = new MayiktSemaphore(2); for (int i = 0; i < 10; i++) { boolean result = mayiktSemaphore.tryAcquire(); if(!result){ System.out.println("被限流啦"); return; } System.out.println(Thread.currentThread().getName()); // mayiktSemaphore.release(); } } }
2.接口中使用漏桶算法
/** * 演示漏桶算法 * * @return */ @RequestMapping("/getMayiktLoophole") public String getMayiktLoophole() { boolean result = mayiktSemaphore.tryAcquire(); if (!result) { return "当前人数访问过多,请稍后重试!"; } log.info("正常执行业务逻辑代码..."); try { Thread.sleep(3000); } catch (Exception e) { } mayiktSemaphore.release(); return "mayikt"; }
1.自定义注解
import org.springframework.web.bind.annotation.Mapping; import java.lang.annotation.*; /** * @author 余胜军 * @ClassName MayiktLeakyBucket * @qq 644064779 * @addres www.mayikt.com * 微信:yushengjun644 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MayiktLeakyBucket { String name() default ""; int threads(); String msg() default "当前人数访问过多,请稍后重试!"; }
2.定义Aop
import com.mayikt.service.ext.MayiktLeakyBucket; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; /** * @author 余胜军 * @ClassName AopRateLimiter * @qq 644064779 * @addres www.mayikt.com * 微信:yushengjun644 */ @Aspect @Component @Slf4j public class AopLeakyBucketr { /** * 需要定义一个容器 */ private static ConcurrentHashMap<String, Semaphore> rateLimiters = new ConcurrentHashMap<>(); /** * 环绕通知 * * @param pjp * @return */ @Around(value = "@annotation(com.mayikt.service.ext.MayiktLeakyBucket)") public Object currentLimit(ProceedingJoinPoint pjp) throws Throwable { // 1.获取拦截到目标方法 MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); // 2.获取方法 Method method = methodSignature.getMethod(); MayiktLeakyBucket mayiktLeakyBucket = method.getDeclaredAnnotation(MayiktLeakyBucket.class); // 3.获取注解配置限流的名称 String name = mayiktLeakyBucket.name(); String semaphoreName = StringUtils.isEmpty(name) ? method.getName() : name; //4.判断该限流的 信号量是否创建 Semaphore semaphore = rateLimiters.get(semaphoreName); // 5.使用双重检验锁 判断 初始化信号量线程安全性问题 if (semaphore == null) { synchronized (this) { if (semaphore == null) { semaphore = new Semaphore(mayiktLeakyBucket.threads()); } rateLimiters.put(semaphoreName, semaphore); } } try { // 6.调用tryAcquire 如果返回是为false 当前AQS状态=0 boolean result = semaphore.tryAcquire(); if (!result) { // 返回自定义注解上配置的 返回限流提示 return mayiktLeakyBucket.msg(); } Object proceed = pjp.proceed();//执行目标方法 getMayiktLeakyBucket // 被限流的请求 对AQS的状态+1 被限流请求 对AQS状态+1 semaphore.release(); return proceed; } catch (Exception e) { // 封装目标方法如果抛出异常 semaphore.release(); return "系统错误!"; } } }
3.相关测试代码
/** * 演示漏桶算法 纯手写注解 * * @return */ @RequestMapping("/getMayiktLeakyBucket") @MayiktLeakyBucket(name = "getMayiktLeakyBucket", threads = 2) public String getMayiktLeakyBucket() { log.info("正常执行业务逻辑代码..."); try { Thread.sleep(3000); } catch (Exception e) { } return "mayikt"; }
标题:纯手写滑动窗口计数器限流算法
课程内容:
1.微服务接口限流算法有哪些
2.令牌桶/漏桶/固定计数器/滑动窗口技术算法原理
3.固定计数器算法有哪些缺点
4.滑动窗口计数器算法设计原理
5.如何滑动窗口内对接口限流
6.纯手写滑动窗口限流算法
7.不同的限流算法应用场景有哪些
20点25分准时开始
1.规定我们单位时间处理的请求数量,例如我们规定我们的一个接口一分钟只能访问1000次的话,使用固定窗口计数器算法的话可以这样实现:给定一个变量 count来记录处理的请求数量,当1分钟之内处理一个请求一次就对count+1,1分钟之内的如果count=1000的话,后续的请求就会被全部拒绝。等到 1分钟结束后,将count回归成0,重新开始计数。
2.如果流量突然激增,比如我们限制一个接口一分钟只能访问1000次的话,用户恶意在第一分钟的50秒堆积发送1000次请求,第二分钟10s堆积发送1000次请求,这样的话相当于在1分钟内收到2000次请求,失去限流保护的意义。
public class MayiktCounter { /** * 限流次数 */ private Integer limiter; /** * 计数器限流间隔的时间 */ private Integer limiterTime; /** * 记录调用的次数 */ private AtomicInteger atomicInteger; /** * 记录每次重置的时间 */ private Long startTime; public MayiktCounter(Integer limiter, Integer limiterTime) { this.limiter = limiter; this.limiterTime = limiterTime; this.atomicInteger = new AtomicInteger(1); startTime = System.currentTimeMillis(); } public boolean tryAcquire() { // 1.获取系统的时间 long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis - startTime > limiterTime) { // 重置 atomicInteger.set(1); startTime = System.currentTimeMillis(); } // 如果在时间内 判断调用的次数是否 超过限流的次数 int count = atomicInteger.get(); if (count > limiter) { // 被限流了 return false; } // 原子类+++ atomicInteger.incrementAndGet(); return true; } public static void main(String[] args) throws InterruptedException { MayiktCounter mayiktCounter = new MayiktCounter(2, 10000); for (int i = 0; i < 20; i++) { boolean result = mayiktCounter.tryAcquire(); System.out.println(result); } System.out.println("------------------"); Thread.sleep(10000); for (int i = 0; i < 20; i++) { boolean result = mayiktCounter.tryAcquire(); System.out.println(result); } } }
滑动窗口计数器算法 是固定窗口计数器的升级版本,固定计数器窗口算法缺陷,如果流量突然激增,比如我们限制一个接口一分钟只能访问1000次的话,用户恶意在第一分钟的50秒堆积发送1000次请求,第二分钟10s堆积发送1000次请求,这样的话相当于在1分钟内收到2000次请求,失去限流保护的意义。
import java.util.LinkedList; /** * @author 余胜军 * @ClassName MayiktSlideWindow * @qq 644064779 * @addres www.mayikt.com * 微信:yushengjun644 */ public class MayiktSlideWindow { /** * 定义一个 链表 */ private LinkedList<Long> linkedList; /* 限流的次数 例如:2次 */ private Integer count; /** * 窗口时间 例如 10s */ private Long timeWindow; public MayiktSlideWindow(Integer count, Long timeWindow) { this.count = count; this.timeWindow = timeWindow; linkedList = new LinkedList<>(); } public boolean tryAcquire() { // 获取到当前系统时间 long currentTimeMillis = System.currentTimeMillis(); if (linkedList.size() < count) { // 最新的 node 节点存放 在最前面 linkedList.add(0, currentTimeMillis); return true;// 没有被限流 } // 判断是否在10s 范围内 int farIndex = count - 1; Long farTime = linkedList.get(farIndex);// 取出尾结点 if (currentTimeMillis - farTime < timeWindow) { return false; } //已经超过限定时间范围内 linkedList.remove(farIndex);// 删除尾结点 linkedList.add(0, currentTimeMillis); return true; } public static void main(String[] args) throws InterruptedException { MayiktSlideWindow mayiktSlideWindow = new MayiktSlideWindow(2, 10000l); for (int i = 1; i <= 100; i++) { Thread.sleep(1000); // 10s 内只能够访问2次。 boolean result = mayiktSlideWindow.tryAcquire(); System.out.println("i" + i + "," + result); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。