当前位置:   article > 正文

04.手写服务限流算法: 令牌桶、漏桶、滑动窗口算法_手写令牌桶

手写令牌桶

服务限流实现方案

固定计数器算法----

滑动窗口计数器算法

令牌桶

漏桶

固定计数器算法 改进版本 滑动窗口计数器

服务限流概念:限定固定请求数量 访问服务器端 保护服务接口

限流框架:1.nginx限流 2.谷歌 Guava限流 3.阿里巴巴 Sentinel限流 4.Redis+lua实现限流

本质限流算法底层都是基于 漏桶、令牌桶、滑动窗口/固定计数器实现。

谷歌 Guava限流 提供Api RateLimiter

补充概念:qps为1 表达意思就是 每s最多只能够处理1个请求

服务限流算法的实现

1.目前常见的算法是漏桶算法和令牌算法

2.关于限流 从线程个数(jdk1.5 Semaphore(信号量))和RateLimiter速率(guava)

Semaphore:从线程个数限流 底层采用AQS实现

RateLimiter:从速率限流 基于谷歌guava框架实现 基于QPS限流

令牌桶算法相比漏桶算法区别,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理;

漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据

3.应用场景:

漏桶算法:限制服务器端同时处理线程数量 基于并发数限流 当调用该api的线程数达到阈值的时候,进行限流------限制api接口 最大线程数 限制 最多同时只会有2

个线程运行,如果在我们漏桶中没有足够的线程运行 就会被限流—保护服务器端

创建线程数。

令牌桶算法:限制服务器 qps 当调用该api的QPS达到阈值的时候,进行限流

限制 该接口每s 能够访问多少次

阿里巴巴 Sentinel 后台设置限流 基于线程数 (漏桶算法)或者QPS(令牌桶算法)

img

令牌桶算法

令牌桶算法实现原理

img

令牌桶的算法:

1.创建一个令牌桶 它的容量假设可以存放2个token;

2.以恒定的速度,生成令牌 如果令牌桶满了,则直接废弃 如果令牌桶

没有满 则存入到令牌桶中;

3.客户端请求过来,从令牌桶中获取令牌,如果能够获取到令牌就可以直接业务

代码,如果获取不到令牌就拒绝该请求。

手写令牌桶代码思路

令牌桶算法 底层是如何实现呢?

\1. 创建一个令牌桶 假设限制存放token数量 是为2;

\2. 单独创建一个线程每隔1秒 向 生成2个令牌,存放到

令牌桶中。在存放到令牌桶中之前判断下,如果令牌桶满了

则该令牌就直接丢弃,如果令牌桶没有满 则将该令牌存入到令牌桶中。

客户端qps 每秒=2,该接口每s 只能够支持2请求访问,

超过的话就会被限流。

\1. 客户端访问接口时,就会先去令牌桶取出该token,如果成功

取出该token,则该token是需要从令牌桶中移除。—没有满

2.单独有一个线程每隔1秒 向令牌桶中 存入token

\3. 如果客户端向令牌桶中 没有取到token,则认为 被限流呢。

Go c# 里面也是有 接口限流算法

这些Api底层都是基于漏桶、令牌桶、滑动窗口实现。

\1. 先学习微服务如何实现Api接口限流

\2. 学会如何封装Api接口限流框架

\3. 学会这些漏桶、令牌桶、滑动窗口实

手写令牌桶算法相关代码

public class MayiktRateLimiter {
    private LinkedBlockingDeque<String> tokenBlockingDeque;

    public MayiktRateLimiter(int permitsPerSecond) {
        // 创建令牌桶 设定固定的容量
        tokenBlockingDeque = new LinkedBlockingDeque<String>(permitsPerSecond);
        // 初始化
        init(permitsPerSecond);
        start();
    }

    public static MayiktRateLimiter create(int permitsPerSecond) {
        return new MayiktRateLimiter(permitsPerSecond);
    }

    public void init(int permitsPerSecond) {
        for (int i = 0; i < permitsPerSecond; i++) {
            tokenBlockingDeque.offer("#");
        }
    }
    /**
     * 从队列中获取token
     *
     * @return
     */
    public boolean tryAcquire() {
        return tokenBlockingDeque.poll() == null ? false : true;
    }

    /**
     * 单独开启一个向令牌桶投递 token
     */
    private void start() {
        /**
         * 每隔1s时间  向队列中投递固定容量大小的token
         */
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                addToken();
            }
        }, 1, 1, TimeUnit.SECONDS);

    }

    public void addToken() {
        /**
         * 向队列投递token
         */
        tokenBlockingDeque.offer("#");

    }

    public static void main(String[] args) {
        MayiktRateLimiter mayiktRateLimiter = MayiktRateLimiter.create(1);
        MayiktRateLimiter mayiktRateLimiter2 = MayiktRateLimiter.create(2);
        System.out.println(mayiktRateLimiter2.tryAcquire());
        System.out.println(mayiktRateLimiter2.tryAcquire());
        try {
            Thread.sleep(1000);
        } catch (Exception e) {

        }
        System.out.println(mayiktRateLimiter2.tryAcquire());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

漏桶算法

漏桶算法实现原理

漏桶(Leaky Bucket)算法:请求先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大或者漏桶已满会直接溢,然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。

直接基于jdk1.5提供的 Semaphore(信号量) 实现 基于AQS实现

如果有同学不了解aqs的话 记得 看下第九期juc并发编程

img

手写漏桶算法相关代码

1.测试代码

public class MayiktSemaphore {
    private Semaphore semaphore;

    public MayiktSemaphore(int permits) {
        semaphore = new Semaphore(permits);
    }

    public boolean tryAcquire() {

        return semaphore.tryAcquire();
    }

    public void release() {
        semaphore.release();
    }

    public static void main(String[] args) {
        MayiktSemaphore mayiktSemaphore = new MayiktSemaphore(2);
        for (int i = 0; i < 10; i++) {
            boolean result = mayiktSemaphore.tryAcquire();
            if(!result){
                System.out.println("被限流啦");
                return;
            }
            System.out.println(Thread.currentThread().getName());
//            mayiktSemaphore.release();
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

2.接口中使用漏桶算法

    /**
     * 演示漏桶算法
     *
     * @return
     */
    @RequestMapping("/getMayiktLoophole")
    public String getMayiktLoophole() {
        boolean result = mayiktSemaphore.tryAcquire();
        if (!result) {
            return "当前人数访问过多,请稍后重试!";
        }
        log.info("正常执行业务逻辑代码...");
        try {
            Thread.sleep(3000);
        } catch (Exception e) {

        }
        mayiktSemaphore.release();
        return "mayikt";
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

自定义注解封装漏桶算法

1.自定义注解

import org.springframework.web.bind.annotation.Mapping;

import java.lang.annotation.*;

/**
 * @author 余胜军
 * @ClassName MayiktLeakyBucket
 * @qq 644064779
 * @addres www.mayikt.com
 * 微信:yushengjun644
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MayiktLeakyBucket {
    String name() default "";

    int threads();

    String msg() default "当前人数访问过多,请稍后重试!";
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

2.定义Aop

import com.mayikt.service.ext.MayiktLeakyBucket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

/**
 * @author 余胜军
 * @ClassName AopRateLimiter
 * @qq 644064779
 * @addres www.mayikt.com
 * 微信:yushengjun644
 */
@Aspect
@Component
@Slf4j
public class AopLeakyBucketr {

    /**
     * 需要定义一个容器
     */
    private static ConcurrentHashMap<String, Semaphore> rateLimiters = new ConcurrentHashMap<>();

    /**
     * 环绕通知
     *
     * @param pjp
     * @return
     */
    @Around(value = "@annotation(com.mayikt.service.ext.MayiktLeakyBucket)")
    public Object currentLimit(ProceedingJoinPoint pjp) throws Throwable {
        // 1.获取拦截到目标方法
        MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
        // 2.获取方法
        Method method = methodSignature.getMethod();
        MayiktLeakyBucket mayiktLeakyBucket = method.getDeclaredAnnotation(MayiktLeakyBucket.class);
        // 3.获取注解配置限流的名称
        String name = mayiktLeakyBucket.name();
        String semaphoreName = StringUtils.isEmpty(name) ? method.getName() : name;
        //4.判断该限流的   信号量是否创建
        Semaphore semaphore = rateLimiters.get(semaphoreName);
        // 5.使用双重检验锁 判断 初始化信号量线程安全性问题
        if (semaphore == null) {
            synchronized (this) {
                if (semaphore == null) {
                    semaphore = new Semaphore(mayiktLeakyBucket.threads());
                }
                rateLimiters.put(semaphoreName, semaphore);
            }
        }
        try {
            // 6.调用tryAcquire 如果返回是为false  当前AQS状态=0
            boolean result = semaphore.tryAcquire();
            if (!result) {
                // 返回自定义注解上配置的 返回限流提示
                return mayiktLeakyBucket.msg();
            }
            Object proceed = pjp.proceed();//执行目标方法 getMayiktLeakyBucket
            // 被限流的请求 对AQS的状态+1 被限流请求 对AQS状态+1
            semaphore.release();
            return proceed;
        } catch (Exception e) {
            // 封装目标方法如果抛出异常
            semaphore.release();
            return "系统错误!";
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

3.相关测试代码

/**
     * 演示漏桶算法 纯手写注解
     *
     * @return
     */
    @RequestMapping("/getMayiktLeakyBucket")
    @MayiktLeakyBucket(name = "getMayiktLeakyBucket", threads = 2)
    public String getMayiktLeakyBucket() {
        log.info("正常执行业务逻辑代码...");
        try {
            Thread.sleep(3000);
        } catch (Exception e) {

        }
        return "mayikt";
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

固定窗口计数器算法

标题:纯手写滑动窗口计数器限流算法
课程内容:
1.微服务接口限流算法有哪些
2.令牌桶/漏桶/固定计数器/滑动窗口技术算法原理
3.固定计数器算法有哪些缺点
4.滑动窗口计数器算法设计原理
5.如何滑动窗口内对接口限流
6.纯手写滑动窗口限流算法
7.不同的限流算法应用场景有哪些

20点25分准时开始

固定窗口计数器算法实现原理

1.规定我们单位时间处理的请求数量,例如我们规定我们的一个接口一分钟只能访问1000次的话,使用固定窗口计数器算法的话可以这样实现:给定一个变量 count来记录处理的请求数量,当1分钟之内处理一个请求一次就对count+1,1分钟之内的如果count=1000的话,后续的请求就会被全部拒绝。等到 1分钟结束后,将count回归成0,重新开始计数。

2.如果流量突然激增,比如我们限制一个接口一分钟只能访问1000次的话,用户恶意在第一分钟的50秒堆积发送1000次请求,第二分钟10s堆积发送1000次请求,这样的话相当于在1分钟内收到2000次请求,失去限流保护的意义。

img

手写固定窗口计数器算法代码

public class MayiktCounter {
    /**
     * 限流次数
     */
    private Integer limiter;
    /**
     * 计数器限流间隔的时间
     */
    private Integer limiterTime;
    /**
     * 记录调用的次数
     */
    private AtomicInteger atomicInteger;

    /**
     * 记录每次重置的时间
     */
    private Long startTime;

    public MayiktCounter(Integer limiter, Integer limiterTime) {
        this.limiter = limiter;
        this.limiterTime = limiterTime;
        this.atomicInteger = new AtomicInteger(1);
        startTime = System.currentTimeMillis();
    }

    public boolean tryAcquire() {
        // 1.获取系统的时间
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - startTime > limiterTime) {
            //  重置
            atomicInteger.set(1);
            startTime = System.currentTimeMillis();
        }
        // 如果在时间内 判断调用的次数是否 超过限流的次数
        int count = atomicInteger.get();
        if (count > limiter) {
            // 被限流了
            return false;
        }
        // 原子类+++
        atomicInteger.incrementAndGet();
        return true;
    }

    public static void main(String[] args) throws InterruptedException {
        MayiktCounter mayiktCounter = new MayiktCounter(2, 10000);
        for (int i = 0; i < 20; i++) {
            boolean result = mayiktCounter.tryAcquire();
            System.out.println(result);
        }
        System.out.println("------------------");
        Thread.sleep(10000);
        for (int i = 0; i < 20; i++) {
            boolean result = mayiktCounter.tryAcquire();
            System.out.println(result);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

滑动窗口技术器算法

滑动窗口技术器算法原理

滑动窗口计数器算法 是固定窗口计数器的升级版本,固定计数器窗口算法缺陷,如果流量突然激增,比如我们限制一个接口一分钟只能访问1000次的话,用户恶意在第一分钟的50秒堆积发送1000次请求,第二分钟10s堆积发送1000次请求,这样的话相当于在1分钟内收到2000次请求,失去限流保护的意义。

img

手写滑动窗口技术器算法相关代码

import java.util.LinkedList;

/**
 * @author 余胜军
 * @ClassName MayiktSlideWindow
 * @qq 644064779
 * @addres www.mayikt.com
 * 微信:yushengjun644
 */
public class MayiktSlideWindow {
    /**
     * 定义一个 链表
     */
    private LinkedList<Long> linkedList;
  
    /*
     限流的次数  例如:2次
     */
    private Integer count;
    /**
     * 窗口时间 例如 10s
     */
    private Long timeWindow;

    public MayiktSlideWindow(Integer count, Long timeWindow) {
        this.count = count;
        this.timeWindow = timeWindow;
        linkedList = new LinkedList<>();
    }


    public boolean tryAcquire() {
        // 获取到当前系统时间
        long currentTimeMillis = System.currentTimeMillis();
        if (linkedList.size() < count) {
            // 最新的 node 节点存放 在最前面
            linkedList.add(0, currentTimeMillis);
            return true;// 没有被限流
        }
        // 判断是否在10s 范围内
        int farIndex = count - 1;
        Long farTime = linkedList.get(farIndex);// 取出尾结点
        if (currentTimeMillis - farTime < timeWindow) {
            return false;
        }
        //已经超过限定时间范围内
        linkedList.remove(farIndex);// 删除尾结点
        linkedList.add(0, currentTimeMillis);
        return true;
    }

    public static void main(String[] args) throws InterruptedException {
        MayiktSlideWindow mayiktSlideWindow = new MayiktSlideWindow(2, 10000l);
        for (int i = 1; i <= 100; i++) {
            Thread.sleep(1000);
            // 10s 内只能够访问2次。
            boolean result = mayiktSlideWindow.tryAcquire();
            System.out.println("i" + i + "," + result);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/628877
推荐阅读
相关标签
  

闽ICP备14008679号