赞
踩
上文我们介绍了RateLimiter文章路径针对IP来限流的方式,上文的限流方案,只针对单应用情况,分布式集群下就不能使用上文的方式,分布式下的限流方案有很多种,这边展示的是Redis的封装redission框架。
可以这么讲,jdk中的juc包提供的是单机版的并发业务。那么Redisson基本是基于juc实现的分布式的业务。
我们先去Redission官网喵喵
redission官方地址
我们可以看到wiki提供了很多功能介绍,
分布式锁等,我们这篇文章主要讲限流。进入正题
我们看到如下有个限流器,我们点进去看看
org.redisson.redisson 3.11.4
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.4</version>
</dependency>
仔细看,它和谷歌guava实现的限流器(RateLimiter)名字很像,多了一个R。它支持在分布式环境下,现在调用方的请求频率,可以实现不同Redisson实例下的多线程限流,也适用于相同实例Redisson下的多线程限流。是非公平性阻塞。
当前版本提供了6个方法,其余都是重载
针对每个方法的注释讲解下作用
根据注释过来的意思:
从此RateLimiter处获取许可,直到获得一个许可为止。
可以知道,是一个阻塞限流,直到获取到令牌。那我们可以对其分析一下源代码
首先我们看下注释
仅在调用时可用时才获得许可。
如果有许可证,则获取许可证,然后立即返回,其值为true,将可用许可证的数量减少一个。
如果没有可用的许可,则此方法将立即返回值false。
返回值:
true如果获得许可证,false 否则
permits仅在调用时全部可用时才获取给定数量。
如果所有许可证都可用,则获取许可证,然后立即返回值true,将值减少给定许可证数量的可用许可证数量。
如果没有可用的许可证,则此方法将立即返回值false。
参数:
permits -获得许可证的数量
返回值:
true如果获得许可证,false 否则
分析之前我们先把依赖引入maven项目并分析,见步骤二:
根据官方文档,
添加测试类
@Autowired
private RedissonClient redissonClient;
RRateLimiter myRateLimter = redissonClient.getRateLimiter("myRateLimter");
myRateLimter.tryAcquire();
源码:
我们进入实现类看看:
我们这里可以看到都给到了一个默认值1,一个许可证的数量
我们从它内部调用的方法见:public RFutrue acquireAsync(long permits);分析一下
先创建异步计算对象promise ,然后再调用方法tryAcquireAsync(permits, -1, null),然后将其结果,通过RFutrue返回。
我们再看看它的重载方法
public RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit)
看内部的代码,该对象,基本上所有的方法都是最终指向了上方法,只是参数,我们都封装好了。便于直接调用。
我们现在来分析下如上的方法到底做了什么事情
我们设定的超时时间统一转换为毫秒值,如果是-1,则不转换,直接为-1.
再定义一个异步任务,传递到tryAcquireAsync(permits, promise, timeoutInMillis);
我们看看源码
首先我们注意到的是,当前方法是私有的
第一行代码,
long s = System.currentTimeMillis();
就是记录该方法的起始时间
第二行代码
RFuture<Long> future = this.tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
进入该方法,发现这个内部代码执行的是一个lua脚本.
这个就是redis获取令牌的命令,其中会判断是否超出获取许可数量。然后将其获取结果放回。返回的结果就是一个Long类型数据,那么我们再通过异步计算,判断其返回结果是否为成功?是否等于NULL?
看源码的条件是if else if,多种条件多种处理。
1:首先判断c是否等于null
如果不等于null,则代表获取到许可证,则立即返回
2:判断delay是否等于null
判断是否延迟,如果delay等于null,则将其异步标记为成功,也立即返回
3:判断超时时间是否为-1
仔细查看代码
如果为-1,则进入递归任务,再获取一次许可,直至退出递归
4: 如果超时时间也不为-1,说明是有确切的超时时间
那看代码,有判断当前消耗的时间是否大于当前任务设定的超时时间
4.1:判断当前时间
如果当前时间已经大于,则将立即返回,并标记结果失败
4.2:如果没有超时,那判断当前剩余的超时时间,是否小于延迟时间(delay)
小于:则结束,并标记失败
大于:则判断当前过去时间是否小于等于剩余超时时间,如果小于等于,则返回,标记失败,否则,则进入下个递归,并且传入剩余超时时间为最新超时时间。
getConfig()
返回此RateLimiter对象的当前配置。
acquire()
从该速率限制器获取许可证,直到有许可证可用为止。
获得许可证(如果有),并立即返回,将可用许可证的数量减少一个
acquire(long permits)
从该速率限制器获取指定的许可证,直到有一个可用的许可证为止。
获取给定数量的许可证(如果可用),并立即返回,将可用许可证的数量减少给定的数量。
参数:
许可证-获得许可证的数量
仔细看看方法是不是如上面源码所说的,都封装统一调用方法
见源码:
trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit)
初始化RateLimiter的状态并将配置存储到Redis服务器。
参数:
mode–速率模式
rate–速率
rateInterval–速率时间间隔
rateIntervalUnit–速率时间间隔单位
退货:
如果设置了速率为true,否则为false
代码中的例子:
// var1设置访问速率实例,var2为访问数,var3为单位时间,var4为时间单位
// 每10秒产生1个令牌 总体限流
// 创建令牌桶数据模型
rateLimiter.trySetRate(limit.mode(), limit.count(), limit.period(), RateIntervalUnit.SECONDS);
所有实例共享(RateType.OVERALL所有实例共享、RateType.CLIENT单实例端共享)
pom文件添加如下依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.4</version>
</dependency>
application.yml添加如下配置
redission-dev.yml
# 单节点设置 singleServerConfig: address: redis://127.0.0.1:6379 database: 2 password: 123456 # (连接空闲超时,单位:毫秒)如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。 idleConnectionTimeout: 10000 # 未知 pingTimeout: 1000 #连接间隔 心跳 pingConnectionInterval: 1000 # (连接超时,单位:毫秒)同节点建立连接时的等待超时。时间单位是毫秒。 connectTimeout: 10000 # (命令等待超时,单位:毫秒)等待节点回复命令的时间。该时间从命令发送成功时开始计时。 timeout: 3000 # (命令失败重试次数)如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。 retryAttempts: 3 # (命令重试发送时间间隔,单位:毫秒)在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。 retryInterval: 1500 # (重新连接时间间隔 单位:毫秒) reconnectionTimeout: 3000 # (执行失败最大次数) failedAttempts: 3 # (客户端名称) clientName: null # 发布和订阅连接的最小空闲连接数 默认1 多从节点的环境里,每个 从服务节点里用于发布和订阅连接的最小保持连接数(长连接)。Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。 subscriptionConnectionMinimumIdleSize: 1 # 发布和订阅连接池大小 默认50 多从节点的环境里,每个 从服务节点里用于发布和订阅连接的连接池最大容量。连接池的连接数量自动弹性伸缩。 subscriptionConnectionPoolSize: 10 # 单个连接最大订阅数量 默认5 subscriptionsPerConnection: 5 # 最小空闲连接数 默认32,现在暂时不需要那么多的线程 多从节点的环境里,每个 从服务节点里用于普通操作(非 发布和订阅)的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时读取反映速度。 connectionMinimumIdleSize: 5 # (连接池大小) 默认64,现在暂时不需要那么多的线程 在启用该功能以后,Redisson将会监测DNS的变化情况。 connectionPoolSize: 20 # (线程池数量)这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。 threads: 0 #(Netty线程池数量) 这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。 nettyThreads: 0 #(编码)Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。 # FstCodec 10倍于JDK序列化性能而且100%兼容的编码 #codec: # class: com.efh.otms.common.serializer.redisson.FstCodec # (传输模式)默认 NIO transportMode: NIO
添加AOP实现
我们定义一个限流类型的枚举
/** * @author Gaci * @className LimiterTypeEnum * @description 定义一个限流类型的枚举 * @date 2021/5/18 14:51 **/ public enum LimiterTypeEnum { /** * 传统类型 */ CUSTOMER, /** * 根据 IP地址限制 */ IP }
import com.peng.sentinel.enums.LimiterTypeEnum; import org.redisson.api.RateType; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author Gaci * @className RedisRLimiter * @description 限流自定义注解 * @date 2021/5/18 14:49 **/ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RedisRLimiter { /** * 资源名称,用于描述接口功能 */ String name() default ""; /** * 限制访问次数(单位时间内产生的令牌数) */ int count(); /** * 时间间隔,单位秒 */ int period(); /** * 资源 key */ String key() default ""; /** * 限制类型(ip/方法名) */ LimiterTypeEnum limitType() default LimiterTypeEnum.CUSTOMER; /** * RRateLimiter 速度类型 * OVERALL, // 所有客户端加总限流 * PER_CLIENT; // 每个客户端单独计算流量 * @return */ RateType mode() default RateType.PER_CLIENT;
import com.peng.sentinel.GaciException; import com.peng.sentinel.annotation.RedisRLimiter; import com.peng.sentinel.util.HttpContextUtils; import com.peng.sentinel.util.IPHelper; import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.redisson.api.RRateLimiter; import org.redisson.api.RateIntervalUnit; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.servlet.http.HttpServletRequest; import java.lang.reflect.Method; import java.util.concurrent.TimeUnit; /** * @author Gaci * @className RedisRLimiterAspect * @description 类描述 * @date 2021/5/18 14:55 **/ @Aspect @Component public class RedisRLimiterAspect { private static final String REDIS_LIMIT_KEY_HEAD = "limit"; private static Logger logger = LoggerFactory.getLogger(RedisRLimiterAspect.class); @Autowired private RedissonClient redisson; // 切入点 @Pointcut("@annotation(com.peng.sentinel.annotation.RedisRLimiter)") public void pointcut() { } @Around("pointcut()") public Object around(ProceedingJoinPoint point) throws Throwable { HttpServletRequest request = HttpContextUtils.getHttpServletRequest(); MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); RedisRLimiter limit = method.getAnnotation(RedisRLimiter.class); String ip = IPHelper.getIpAddr(); String key; switch (limit.limitType()) { case IP: // ip类型 key = ip; break; case CUSTOMER: // 传统类型,采用注解提供key key = limit.key(); break; default: // 默认采用方法名 key = StringUtils.upperCase(method.getName()); } // ImmutableList keys = ImmutableList.of(StringUtils.join(REDIS_LIMIT_KEY_HEAD, limit.prefix(), ":", ip, key)); // 生成key final String ofRateLimiter = REDIS_LIMIT_KEY_HEAD + ip + key; RRateLimiter rateLimiter = redisson.getRateLimiter(ofRateLimiter); // 根据官方文档的描述 /* * RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter"); * // 初始化 * // 最大流速 = 每1秒钟产生10个令牌 * rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS); * // 需要1个令牌 if(rateLimiter.tryAcquire(1)){ // TODO:Do something } */ // 设置访问速率,var2为访问数,var4为单位时间,var6为时间单位 // 每10秒产生1个令牌 总体限流 // 创建令牌桶数据模型 rateLimiter.trySetRate(limit.mode(), limit.count(), limit.period(), RateIntervalUnit.SECONDS); // permits 允许获得的许可数量 (如果获取失败,返回false) 1秒内不能获取到1个令牌,则返回,不阻塞 // 尝试访问数据,占数据计算值var1,设置等待时间var3 // acquire() 默认如下参数 如果超时时间为-1,则永不超时,则将线程阻塞,直至令牌补充 // 此处采用3秒超时方式,服务降级 if (!rateLimiter.tryAcquire(1, 2, TimeUnit.SECONDS)) { logger.error("IP【{}】访问接口【{}】超出频率限制,限制规则为[限流模式:{}; 限流数量:{}; 限流时间间隔:{};]", ip, method.getName(), limit.mode().toString(), limit.count(), limit.period()); throw new GaciException("接口访问超出频率限制,请稍后重试"); } return point.proceed(); }
相关代码
启动项目进行测试
访问页面测试
第一次访问
第二次访问:由于5秒生成一个令牌,导致5秒内令牌已经被获取,提示错误
更细节的功能以及要求,请自行根据需求编写代码
演示就到这里
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。