当前位置:   article > 正文

基于Redis组件之分布式RateLimiter限流_redisratelimiter

redisratelimiter

上文我们介绍了RateLimiter文章路径针对IP来限流的方式,上文的限流方案,只针对单应用情况,分布式集群下就不能使用上文的方式,分布式下的限流方案有很多种,这边展示的是Redis的封装redission框架。
可以这么讲,jdk中的juc包提供的是单机版的并发业务。那么Redisson基本是基于juc实现的分布式的业务。

一:Redission官网

我们先去Redission官网喵喵
redission官方地址
在这里插入图片描述
我们可以看到wiki提供了很多功能介绍,
分布式锁等,我们这篇文章主要讲限流。进入正题
在这里插入图片描述
我们看到如下有个限流器,我们点进去看看
在这里插入图片描述
在这里插入图片描述

1:依赖版本

org.redisson.redisson 3.11.4

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.11.4</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2:限流器(RRateLimiter)

仔细看,它和谷歌guava实现的限流器(RateLimiter)名字很像,多了一个R。它支持在分布式环境下,现在调用方的请求频率,可以实现不同Redisson实例下的多线程限流,也适用于相同实例Redisson下的多线程限流。是非公平性阻塞。

3:API接口文档

在这里插入图片描述
当前版本提供了6个方法,其余都是重载
针对每个方法的注释讲解下作用

3.1:acquire()

根据注释过来的意思:
从此RateLimiter处获取许可,直到获得一个许可为止。
可以知道,是一个阻塞限流,直到获取到令牌。那我们可以对其分析一下源代码
在这里插入图片描述
首先我们看下注释

3.1.1:tryAcquire()

仅在调用时可用时才获得许可。
如果有许可证,则获取许可证,然后立即返回,其值为true,将可用许可证的数量减少一个。
如果没有可用的许可,则此方法将立即返回值false。
返回值:
true如果获得许可证,false 否则

3.1.2:tryAcquire(long permits)

permits仅在调用时全部可用时才获取给定数量。
如果所有许可证都可用,则获取许可证,然后立即返回值true,将值减少给定许可证数量的可用许可证数量。
如果没有可用的许可证,则此方法将立即返回值false。
参数:
permits -获得许可证的数量
返回值:
true如果获得许可证,false 否则

4:源码分析

分析之前我们先把依赖引入maven项目并分析,见步骤二:
根据官方文档,
添加测试类


@Autowired
private RedissonClient redissonClient;

RRateLimiter myRateLimter = redissonClient.getRateLimiter("myRateLimter");
myRateLimter.tryAcquire();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述
源码:
在这里插入图片描述
我们进入实现类看看:
在这里插入图片描述

在这里插入图片描述
我们这里可以看到都给到了一个默认值1,一个许可证的数量
我们从它内部调用的方法见:public RFutrue acquireAsync(long permits);分析一下
在这里插入图片描述
先创建异步计算对象promise ,然后再调用方法tryAcquireAsync(permits, -1, null),然后将其结果,通过RFutrue返回。
我们再看看它的重载方法

public RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit) 
  • 1

在这里插入图片描述
看内部的代码,该对象,基本上所有的方法都是最终指向了上方法,只是参数,我们都封装好了。便于直接调用。

我们现在来分析下如上的方法到底做了什么事情
我们设定的超时时间统一转换为毫秒值,如果是-1,则不转换,直接为-1.
再定义一个异步任务,传递到tryAcquireAsync(permits, promise, timeoutInMillis);
我们看看源码
在这里插入图片描述首先我们注意到的是,当前方法是私有的
第一行代码,

long s = System.currentTimeMillis();
就是记录该方法的起始时间

  • 1
  • 2
  • 3

第二行代码


RFuture<Long> future = this.tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
进入该方法,发现这个内部代码执行的是一个lua脚本.
  • 1
  • 2
  • 3

在这里插入图片描述
这个就是redis获取令牌的命令,其中会判断是否超出获取许可数量。然后将其获取结果放回。返回的结果就是一个Long类型数据,那么我们再通过异步计算,判断其返回结果是否为成功?是否等于NULL?

在这里插入图片描述

看源码的条件是if else if,多种条件多种处理。
1:首先判断c是否等于null

如果不等于null,则代表获取到许可证,则立即返回

2:判断delay是否等于null
判断是否延迟,如果delay等于null,则将其异步标记为成功,也立即返回

3:判断超时时间是否为-1
在这里插入图片描述
仔细查看代码
如果为-1,则进入递归任务,再获取一次许可,直至退出递归

4: 如果超时时间也不为-1,说明是有确切的超时时间
在这里插入图片描述
那看代码,有判断当前消耗的时间是否大于当前任务设定的超时时间

4.1:判断当前时间
在这里插入图片描述
如果当前时间已经大于,则将立即返回,并标记结果失败

4.2:如果没有超时,那判断当前剩余的超时时间,是否小于延迟时间(delay)
在这里插入图片描述
小于:则结束,并标记失败
在这里插入图片描述
大于:则判断当前过去时间是否小于等于剩余超时时间,如果小于等于,则返回,标记失败,否则,则进入下个递归,并且传入剩余超时时间为最新超时时间。

getConfig()
返回此RateLimiter对象的当前配置。
在这里插入图片描述

acquire()
从该速率限制器获取许可证,直到有许可证可用为止。
获得许可证(如果有),并立即返回,将可用许可证的数量减少一个

acquire(long permits)
从该速率限制器获取指定的许可证,直到有一个可用的许可证为止。
获取给定数量的许可证(如果可用),并立即返回,将可用许可证的数量减少给定的数量。
参数:
许可证-获得许可证的数量

仔细看看方法是不是如上面源码所说的,都封装统一调用方法

见源码:

在这里插入图片描述

trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit)
  • 1

初始化RateLimiter的状态并将配置存储到Redis服务器。
参数:
mode–速率模式
rate–速率
rateInterval–速率时间间隔
rateIntervalUnit–速率时间间隔单位
退货:
如果设置了速率为true,否则为false
代码中的例子:


// var1设置访问速率实例,var2为访问数,var3为单位时间,var4为时间单位
// 每10秒产生1个令牌 总体限流
// 创建令牌桶数据模型
rateLimiter.trySetRate(limit.mode(), limit.count(), limit.period(), RateIntervalUnit.SECONDS);
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述
所有实例共享(RateType.OVERALL所有实例共享、RateType.CLIENT单实例端共享)

二:集成Redission

pom文件添加如下依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.11.4</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

application.yml添加如下配置
在这里插入图片描述
redission-dev.yml
在这里插入图片描述

# 单节点设置
singleServerConfig:
  address: redis://127.0.0.1:6379
  database: 2
  password: 123456
  #  (连接空闲超时,单位:毫秒)如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。
  idleConnectionTimeout: 10000
  #  未知
  pingTimeout: 1000
  #连接间隔 心跳
  pingConnectionInterval: 1000
  #  (连接超时,单位:毫秒)同节点建立连接时的等待超时。时间单位是毫秒。
  connectTimeout: 10000
  #  (命令等待超时,单位:毫秒)等待节点回复命令的时间。该时间从命令发送成功时开始计时。
  timeout: 3000
  #  (命令失败重试次数)如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
  retryAttempts: 3
  #  (命令重试发送时间间隔,单位:毫秒)在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。
  retryInterval: 1500
  #  (重新连接时间间隔 单位:毫秒)
  reconnectionTimeout: 3000
  #  (执行失败最大次数)
  failedAttempts: 3
  #  (客户端名称)
  clientName: null
  # 发布和订阅连接的最小空闲连接数 默认1 多从节点的环境里,每个 从服务节点里用于发布和订阅连接的最小保持连接数(长连接)。Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。
  subscriptionConnectionMinimumIdleSize: 1
  # 发布和订阅连接池大小 默认50  多从节点的环境里,每个 从服务节点里用于发布和订阅连接的连接池最大容量。连接池的连接数量自动弹性伸缩。
  subscriptionConnectionPoolSize: 10
  # 单个连接最大订阅数量 默认5
  subscriptionsPerConnection: 5
  # 最小空闲连接数 默认32,现在暂时不需要那么多的线程 多从节点的环境里,每个 从服务节点里用于普通操作(非 发布和订阅)的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时读取反映速度。
  connectionMinimumIdleSize: 5
  # (连接池大小) 默认64,现在暂时不需要那么多的线程 在启用该功能以后,Redisson将会监测DNS的变化情况。
  connectionPoolSize: 20
  # (线程池数量)这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。

threads: 0
  #(Netty线程池数量) 这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。
nettyThreads: 0
  #(编码)Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。
  # FstCodec  10倍于JDK序列化性能而且100%兼容的编码
#codec:
#  class: com.efh.otms.common.serializer.redisson.FstCodec
  #  (传输模式)默认 NIO
transportMode: NIO
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

三:编写代码实现

添加AOP实现
我们定义一个限流类型的枚举

3.1 枚举

在这里插入图片描述

/**
 * @author Gaci
 * @className LimiterTypeEnum
 * @description 定义一个限流类型的枚举
 * @date 2021/5/18 14:51
 **/
public enum LimiterTypeEnum {
    /**
     * 传统类型
     */
    CUSTOMER,

    /**
     *  根据 IP地址限制
     */
    IP
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3.2 定义一个自定义的限流注解

在这里插入图片描述

import com.peng.sentinel.enums.LimiterTypeEnum;
import org.redisson.api.RateType;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author Gaci
 * @className RedisRLimiter
 * @description 限流自定义注解
 * @date 2021/5/18 14:49
 **/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisRLimiter {

    /**
     * 资源名称,用于描述接口功能
     */
    String name() default "";

    /**
     * 限制访问次数(单位时间内产生的令牌数)
     */
    int count();

    /**
     * 时间间隔,单位秒
     */
    int period();

    /**
     * 资源 key
     */
    String key() default "";

    /**
     * 限制类型(ip/方法名)
     */
    LimiterTypeEnum limitType() default LimiterTypeEnum.CUSTOMER;

    /**
     * RRateLimiter 速度类型
     * OVERALL,    // 所有客户端加总限流
     * PER_CLIENT; // 每个客户端单独计算流量
     * @return
     */
    RateType mode() default RateType.PER_CLIENT;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

3.3 切面类

在这里插入图片描述

import com.peng.sentinel.GaciException;
import com.peng.sentinel.annotation.RedisRLimiter;
import com.peng.sentinel.util.HttpContextUtils;
import com.peng.sentinel.util.IPHelper;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

/**
 * @author Gaci
 * @className RedisRLimiterAspect
 * @description 类描述
 * @date 2021/5/18 14:55
 **/
@Aspect
@Component
public class RedisRLimiterAspect {
    private static final String REDIS_LIMIT_KEY_HEAD = "limit";
    private static Logger logger = LoggerFactory.getLogger(RedisRLimiterAspect.class);
    @Autowired
    private RedissonClient redisson;

    // 切入点
    @Pointcut("@annotation(com.peng.sentinel.annotation.RedisRLimiter)")
    public void pointcut() {
    }

    @Around("pointcut()")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        HttpServletRequest request = HttpContextUtils.getHttpServletRequest();
        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();
        RedisRLimiter limit = method.getAnnotation(RedisRLimiter.class);
        String ip = IPHelper.getIpAddr();
        String key;
        switch (limit.limitType()) {
            case IP:
                // ip类型
                key = ip;
                break;
            case CUSTOMER:
                // 传统类型,采用注解提供key
                key = limit.key();
                break;
            default:
                // 默认采用方法名
                key = StringUtils.upperCase(method.getName());
        }
//        ImmutableList keys = ImmutableList.of(StringUtils.join(REDIS_LIMIT_KEY_HEAD, limit.prefix(), ":", ip, key));
        // 生成key
        final String ofRateLimiter = REDIS_LIMIT_KEY_HEAD + ip + key;
        RRateLimiter rateLimiter = redisson.getRateLimiter(ofRateLimiter);

        // 根据官方文档的描述
        /*
         * RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");
         * // 初始化
         * // 最大流速 = 每1秒钟产生10个令牌
         * rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);
         * // 需要1个令牌
           if(rateLimiter.tryAcquire(1)){
                // TODO:Do something
           }
         */

        // 设置访问速率,var2为访问数,var4为单位时间,var6为时间单位
        // 每10秒产生1个令牌 总体限流
        // 创建令牌桶数据模型
        rateLimiter.trySetRate(limit.mode(), limit.count(), limit.period(), RateIntervalUnit.SECONDS);

        // permits 允许获得的许可数量 (如果获取失败,返回false) 1秒内不能获取到1个令牌,则返回,不阻塞
        // 尝试访问数据,占数据计算值var1,设置等待时间var3
        // acquire() 默认如下参数 如果超时时间为-1,则永不超时,则将线程阻塞,直至令牌补充
        // 此处采用3秒超时方式,服务降级
        if (!rateLimiter.tryAcquire(1, 2, TimeUnit.SECONDS)) {
            logger.error("IP【{}】访问接口【{}】超出频率限制,限制规则为[限流模式:{}; 限流数量:{}; 限流时间间隔:{};]",
                    ip, method.getName(), limit.mode().toString(), limit.count(), limit.period());
            throw new GaciException("接口访问超出频率限制,请稍后重试");
        }
        return point.proceed();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95

3.4 controller

在这里插入图片描述

相关代码
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
启动项目进行测试
访问页面测试
第一次访问
在这里插入图片描述
第二次访问:由于5秒生成一个令牌,导致5秒内令牌已经被获取,提示错误
在这里插入图片描述
更细节的功能以及要求,请自行根据需求编写代码

演示就到这里

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/224994
推荐阅读
相关标签
  

闽ICP备14008679号