赞
踩
本篇文章主要来讲Spring Boot 整合 Redisson 实现限流,之前我们讲过使用Redis的Lua脚本方式,我们今天主要讲使用 Redisson 提供的方法实现限流。本文中主要用到 org.redisson.api.RRateLimiter ,它的方法比较多,比如:
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.0</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.alian</groupId> <artifactId>redis-limit-java</artifactId> <version>0.0.1-SNAPSHOT</version> <name>redisCache</name> <description>redis-limit-java</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.package.directory>target</project.package.directory> <java.version>1.8</java.version> <!--com.fasterxml.jackson 版本--> <jackson.version>2.9.10</jackson.version> <!--lombok 版本--> <lombok.version>1.16.14</lombok.version> <!--阿里巴巴fastjson 版本--> <fastjson.version>1.2.68</fastjson.version> <!--junit 版本--> <junit.version>4.12</junit.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--aop--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <!--redis依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!--redisson依赖--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.17.0</version> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </exclusion> </exclusions> </dependency> <!--用于序列化--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> <!--java 8时间序列化--> <dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-jsr310</artifactId> <version>${jackson.version}</version> </dependency> <!--JSON--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <!--日志输出--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
因为我平常一直是使用 SpringBoot2.6.0 这个版本,它整合 redisson-spring-boot-starter 时会有点问题,本文的关键限流使用,不去深究这个问题,直接排除了健康检查就好了
<!--redisson依赖-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.0</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</exclusion>
</exclusions>
</dependency>
application.properties
# 端口 server.port=8090 # 上下文路径 server.servlet.context-path=/rateLimit # Redis数据库索引(默认为0) spring.redis.database=0 # Redis服务器地址 #spring.redis.host=192.168.0.193 spring.redis.host=127.0.0.1 # Redis服务器连接端口 spring.redis.port=6379 # Redis服务器连接密码(默认为空) spring.redis.password=123456 # 连接池最大连接数(使用负值表示没有限制) spring.redis.jedis.pool.max-active=20 # 连接池中的最小空闲连接 spring.redis.jedis.pool.min-idle=10 # 连接池中的最大空闲连接 spring.redis.jedis.pool.max-idle=20 # 连接池最大阻塞等待时间(使用负值表示没有限制) spring.redis.jedis.pool.max-wait=20000 # 读时间(毫秒) spring.redis.timeout=10000 # 连接超时时间(毫秒) spring.redis.connect-timeout=10000 # redisson配置文件位置 spring.redis.redisson.file=classpath:redisson-single.yml
redisson-single.yml
# 单节点设置 singleServerConfig: # redis数据库索引 database: 0 # redis地址 address: redis://127.0.0.1:6379 # redis密码 password: 123456 # 连接超时 connectTimeout: 10000 # 读超时 timeout: 3000 # 命令失败重试次数 retryAttempts: 3 # 命令重试发送时间间隔 retryInterval: 1500 # 最小空闲连接数 默认24 connectionMinimumIdleSize: 10 # 连接池大小,默认64 connectionPoolSize: 20
因为我本机redis是单节点的,所以是单节点配置相关的,还有很多配置,大家参照下面这两个类进行配置
如果你是集群模式,则参照下面这三个类进行配置
RedisConfiguration.java
package com.alian.redissonLimit.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.format.DateTimeFormatter; @Slf4j @Configuration public class RedisConfiguration { /** * redis配置 * * @param redisConnectionFactory * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { // 实例化redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); //设置连接工厂 redisTemplate.setConnectionFactory(redisConnectionFactory); // key采用String的序列化 redisTemplate.setKeySerializer(keySerializer()); // value采用jackson序列化 redisTemplate.setValueSerializer(valueSerializer()); // Hash key采用String的序列化 redisTemplate.setHashKeySerializer(keySerializer()); // Hash value采用jackson序列化 redisTemplate.setHashValueSerializer(valueSerializer()); //执行函数,初始化RedisTemplate redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * key类型采用String序列化 * * @return */ private RedisSerializer<String> keySerializer() { return new StringRedisSerializer(); } /** * value采用JSON序列化 * * @return */ private RedisSerializer<Object> valueSerializer() { //设置jackson序列化 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); //设置序列化对象 jackson2JsonRedisSerializer.setObjectMapper(getMapper()); return jackson2JsonRedisSerializer; } /** * 使用com.fasterxml.jackson.databind.ObjectMapper * 对数据进行处理包括java8里的时间 * * @return */ private ObjectMapper getMapper() { ObjectMapper mapper = new ObjectMapper(); //设置可见性 mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); //默认键入对象 mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); //设置Java 8 时间序列化 JavaTimeModule timeModule = new JavaTimeModule(); timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss"))); timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss"))); //禁用把时间转为时间戳 mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); mapper.registerModule(timeModule); return mapper; } }
提过很多次了,就是Redis的整合。
RateLimiter.java
package com.alian.redissonLimit.annotate; import org.redisson.api.RateType; import java.lang.annotation.*; @Documented @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface RedissonRateLimiter { /** * RRateLimiter 限流模式 * OVERALL 所有客户端加总限流 * PER_CLIENT 每个客户端单独计算流量 */ RateType mode() default RateType.PER_CLIENT; /** * Spel表达式 */ String[] keys() default {}; /** * 单位时间产生的令牌数,默认100 */ long rate() default 100; /** * 时间间隔,默认1秒 */ long rateInterval() default 1; /** * 拒绝请求时的提示信息 */ String showPromptMsg() default "服务器繁忙,请稍候再试"; }
自定义注解也没有什么好说的,主要是定义了:@RateLimiter
RateLimiters.java
package com.alian.redisLimit.annotate;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimiters {
RateLimiter[] value();
}
这个就不多说了
RateLimiterAspectHandler.java
package com.alian.redissonLimit.aop; import com.alian.redissonLimit.annotate.RedissonRateLimiter; import com.alian.redissonLimit.annotate.RedissonRateLimiters; import com.alian.redissonLimit.exception.RateLimiterException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.redisson.api.RRateLimiter; import org.redisson.api.RateIntervalUnit; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.DigestUtils; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; @Slf4j @Component @Aspect public class RateLimiterAspectHandler { @Autowired private RateLimiterKeyProvider keyProvider; @Autowired private RedissonClient redissonClient; @Around(value = "@annotation(redissonRateLimiter)", argNames = "point,redissonRateLimiter") public Object around(ProceedingJoinPoint point, RedissonRateLimiter redissonRateLimiter) throws Throwable { isAllow(point, redissonRateLimiter); return point.proceed(); } @Around(value = " @annotation(redissonRateLimiters)", argNames = "point,redissonRateLimiters") public Object around(ProceedingJoinPoint point, RedissonRateLimiters redissonRateLimiters) throws Throwable { RedissonRateLimiter[] limiters = redissonRateLimiters.value(); for (RedissonRateLimiter rateLimiter : limiters) { isAllow(point, rateLimiter); } return point.proceed(); } private void isAllow(ProceedingJoinPoint point, RedissonRateLimiter redissonRateLimiter) { // 获取key String key = keyProvider.getKey(point, redissonRateLimiter); // 此处是为了日志输出 boolean flag = StringUtils.isNotBlank(key); // 类路径+方法,然后计算md5 String uniqueKey = getUniqueKey((MethodSignature) point.getSignature()); // key名称 key = StringUtils.isNotBlank(key) ? uniqueKey + "." + key : uniqueKey; // 获取RRateLimiter实例 RRateLimiter limiter = redissonClient.getRateLimiter(key); // 创建令牌桶数据模型,单位时间内产生多少令牌 limiter.trySetRate(redissonRateLimiter.mode(), redissonRateLimiter.rate(), redissonRateLimiter.rateInterval(), RateIntervalUnit.SECONDS); // 尝试访问数据,timeout 时间内,允许获得的数量permits(如果获取失败,返回false) // 单位时间内不能获取到1个令牌,则返回,不阻塞 boolean tryAcquire = limiter.tryAcquire(1, redissonRateLimiter.rateInterval(), TimeUnit.SECONDS); log.info("【{}】请求,线程:【{}】,获取令牌的结果:{}", flag ? "单用户" : "多用户", Thread.currentThread().getName(), tryAcquire); if (!tryAcquire) { log.error("限流模式:{}; 限流数量:{}; 限流时间间隔:{}", redissonRateLimiter.mode().toString(), redissonRateLimiter.rate(), redissonRateLimiter.rateInterval()); throw new RateLimiterException(redissonRateLimiter.showPromptMsg()); } } private String getUniqueKey(MethodSignature signature) { String format = String.format("%s.%s", signature.getDeclaringTypeName(), signature.getMethod().getName()); return DigestUtils.md5DigestAsHex(format.getBytes(StandardCharsets.UTF_8)); } }
我们这里的两个方法:
RateLimiterKeyProvider.java
package com.alian.redissonLimit.aop; import com.alian.redissonLimit.annotate.RedissonRateLimiter; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.context.expression.MethodBasedEvaluationContext; import org.springframework.core.DefaultParameterNameDiscoverer; import org.springframework.core.ParameterNameDiscoverer; import org.springframework.expression.EvaluationContext; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; @Slf4j @Component public class RateLimiterKeyProvider { private ParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer(); private ExpressionParser parser = new SpelExpressionParser(); public String getKey(JoinPoint joinPoint, RedissonRateLimiter redissonRateLimiter) { List<String> keyList = new ArrayList<>(); Method method = getMethod(joinPoint); List<String> definitionKeys = getSpelDefinitionKey(redissonRateLimiter.keys(), method, joinPoint.getArgs()); keyList.addAll(definitionKeys); return StringUtils.collectionToDelimitedString(keyList,".","",""); } private Method getMethod(JoinPoint joinPoint) { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); if (method.getDeclaringClass().isInterface()) { try { method = joinPoint.getTarget().getClass().getDeclaredMethod(signature.getName(), method.getParameterTypes()); } catch (Exception e) { log.error(null,e); } } return method; } private List<String> getSpelDefinitionKey(String[] definitionKeys, Method method, Object[] parameterValues) { List<String> definitionKeyList = new ArrayList<>(); for (String definitionKey : definitionKeys) { if (definitionKey != null && !definitionKey.isEmpty()) { EvaluationContext context = new MethodBasedEvaluationContext(null, method, parameterValues, discoverer); String key = parser.parseExpression(definitionKey).getValue(context).toString(); definitionKeyList.add(key); } } return definitionKeyList; } }
RateLimiterException.java
package com.alian.redissonLimit.exception;
public class RateLimiterException extends RuntimeException {
public RateLimiterException(String message) {
super(message);
}
}
自定义异常类,也没啥好说的,下面就是全局异常,为了省篇幅没有把所有的异常都列出来,小伙伴可以自行添加,主要是对我们RateLimiterException 进行处理。
GlobalExceptionHandler.java
package com.alian.redissonLimit.exception; import com.alian.redissonLimit.dto.ApiResponseDto; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; import org.springframework.web.HttpRequestMethodNotSupportedException; import org.springframework.web.bind.MissingServletRequestParameterException; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import javax.servlet.http.HttpServletRequest; @Slf4j @Component @ControllerAdvice public class GlobalExceptionHandler { @ExceptionHandler @ResponseBody @ResponseStatus(HttpStatus.OK) public ApiResponseDto<?> handle(HttpRequestMethodNotSupportedException exception, HttpServletRequest request) { return logWarn(request.getRequestURI() + " " + exception.getMessage(), null, ApiResponseDto.errRequestMethod("请求方法错误")); } @ExceptionHandler @ResponseBody @ResponseStatus(HttpStatus.OK) public ApiResponseDto handle(MissingServletRequestParameterException exception) { return logWarn(exception.getMessage(), null, ApiResponseDto.errParam("参数错误")); } @ExceptionHandler @ResponseBody @ResponseStatus(HttpStatus.OK) public ApiResponseDto handle(RateLimiterException exception) { return ApiResponseDto.fail(exception.getMessage()); } @ExceptionHandler @ResponseBody @ResponseStatus(HttpStatus.OK) public ApiResponseDto handle(Exception exception) { log.info("异常类:{}", exception.getClass().getCanonicalName()); return logError(null, exception, ApiResponseDto.exception("系统异常")); } private static ApiResponseDto logWarn(String msg, Exception e, ApiResponseDto responseDto) { long timestamp = responseDto.getTimestamp(); String m = "timestamp is " + timestamp; if (msg != null) { m += ", " + msg; } if (e == null) { log.warn(m); } else { log.warn(m, e); } return responseDto; } private static ApiResponseDto logError(String msg, Exception e, ApiResponseDto responseDto) { long timestamp = responseDto.getTimestamp(); String m = "timestamp is " + timestamp; if (msg != null) { m += ", " + msg; } log.error(m, e); return responseDto; } }
对应的统一返回封装如下:
ApiResponseDto.java
package com.alian.redissonLimit.dto; import lombok.*; import lombok.experimental.Accessors; @Setter @Getter @Accessors(chain = true) @NoArgsConstructor @AllArgsConstructor @ToString(exclude = "content") public class ApiResponseDto<T> { /** 成功 */ public static String CODE_SUCCESS="0000"; /** 失败 */ public static String CODE_FAIL="1000"; /** 系统异常 */ public static String CODE_EXCEPTION="1001"; /** 签名错误 */ public static String CODE_ERR_SIGN="1002"; /** 参数错误 */ public static String CODE_ERR_PARAM="1003"; /** 业务异常 */ public static String CODE_BIZ_ERR="1004"; /** 查询无数据,使用明确的参数(如id)进行查询时未找到记录时返回此错误码 */ public static String CODE_NO_DATA="1005"; /** 错误的请求方法 */ public static String CODE_ERR_REQUEST_METHOD="1006"; /** 错误的请求内容类型 */ public static String CODE_ERR_CONTENT_TYPE="1007"; /** 系统繁忙 */ public static String CODE_SYS_BUSY="1008"; /** 显示提示 */ public static String CODE_SHOW_TIP="1009"; /** 根据bizCode进行处理 */ public static String CODE_DEAL_BIZ_CODE="1012"; /** 未找到请求 */ public static String CODE_NOT_FOUND_CODE="1013"; public final static ApiResponseDto SUCCESS=new ApiResponseDto(); private String code =CODE_SUCCESS; /** 状态说明 */ private String msg ="success"; /** 请求是否成功 */ public boolean isSuccess(){ return CODE_SUCCESS.equals(code); } /** 结果内容 */ private T content; /** 时间戳 */ private long timestamp=System.currentTimeMillis(); /** 业务状态码,由业务接口定义 */ private String bizCode; /** 业务状态说明 */ private String bizMsg; public ApiResponseDto(T content) { this.content=content; } public static <T> ApiResponseDto<T> success(){ return SUCCESS; } public static <T> ApiResponseDto<T> success(T content){ return new ApiResponseDto<T>(content); } public static <T> ApiResponseDto<T> fail(String msg){ ApiResponseDto<T> response = new ApiResponseDto<>(); response.setCode(CODE_FAIL); response.setMsg(msg); return response; } public static <T> ApiResponseDto<T> exception(String msg){ ApiResponseDto<T> response = new ApiResponseDto<>(); response.setCode(CODE_EXCEPTION); response.setMsg(msg); return response; } public static <T> ApiResponseDto<T> errSign(String msg){ ApiResponseDto<T> response = new ApiResponseDto<>(); response.setCode(CODE_ERR_SIGN); response.setMsg(msg); return response; } public static <T> ApiResponseDto<T> errParam(String msg){ ApiResponseDto<T> response = new ApiResponseDto<>(); response.setCode(CODE_ERR_PARAM); response.setMsg(msg); return response; } public static <T> ApiResponseDto<T> bizErr(String msg){ ApiResponseDto<T> response = new ApiResponseDto<>(); response.setCode(CODE_BIZ_ERR); response.setMsg(msg); return response; } public static <T> ApiResponseDto<T> notFound(String msg){ ApiResponseDto<T> response = new ApiResponseDto<>(); response.setCode(CODE_NOT_FOUND_CODE); response.setMsg(msg); return response; } public static <T> ApiResponseDto<T> noData(String msg){ ApiResponseDto<T> response = new ApiResponseDto<>(); response.setCode(CODE_NO_DATA); response.setMsg(msg); return response; } public static <T> ApiResponseDto<T> errRequestMethod(String msg){ ApiResponseDto<T> response = new ApiResponseDto<>(); response.setCode(CODE_ERR_REQUEST_METHOD); response.setMsg(msg); return response; } public static <T> ApiResponseDto<T> errContentType(){ ApiResponseDto<T> response = new ApiResponseDto<>(); response.setCode(CODE_ERR_CONTENT_TYPE); response.setMsg("错误的请求内容类型"); return response; } public static <T> ApiResponseDto<T> sysBusy(){ ApiResponseDto<T> response = new ApiResponseDto<>(); response.setCode(CODE_SYS_BUSY); response.setMsg("系统繁忙"); return response; } public static <T> ApiResponseDto<T> showTip(String tip){ ApiResponseDto<T> response = new ApiResponseDto<>(); response.setCode(CODE_SHOW_TIP); response.setMsg(tip); return response; } public ApiResponseDto<T> bizInfo(String bizCode,String bizMsg){ this.code=bizCode; this.msg=bizMsg; return this; } public static <T> ApiResponseDto<T> dealBizCode(String bizCode,String bizMsg,T content){ ApiResponseDto<T> response = new ApiResponseDto<>(content); response.setCode(CODE_DEAL_BIZ_CODE); response.setMsg("根据bizCode进行处理"); response.setBizCode(bizCode); response.setBizMsg(bizMsg); return response; } }
UserController.java
package com.alian.redissonLimit.controller; import com.alian.redissonLimit.annotate.RedissonRateLimiter; import com.alian.redissonLimit.annotate.RedissonRateLimiters; import com.alian.redissonLimit.dto.ApiResponseDto; import com.alian.redissonLimit.dto.UserDto; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RateType; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; @Slf4j @RequestMapping("/user") @RestController public class UserController { private static Map<String, UserDto> map = new HashMap<String, UserDto>() {{ put("BAT001", new UserDto("BAT001", "梁南生", 27, "研发部", 18000.0, LocalDateTime.of(2020, 5, 20, 9, 0, 0))); put("BAT002", new UserDto("BAT002", "包雅馨", 25, "财务部", 8800.0, LocalDateTime.of(2016, 11, 10, 8, 30, 0))); put("BAT003", new UserDto("BAT003", "罗考聪", 35, "测试部", 6400.0, LocalDateTime.of(2017, 3, 20, 14, 0, 0))); }}; @RedissonRateLimiters(value = { @RedissonRateLimiter(keys = {"#id"}, mode = RateType.OVERALL, rate = 1, rateInterval = 1, showPromptMsg = "您查询太快了,请稍后再试"), @RedissonRateLimiter(mode = RateType.OVERALL, rate = 3, rateInterval = 1, showPromptMsg = "系统繁忙,请稍后再试") }) @RequestMapping("/findById/{id}") public ApiResponseDto<UserDto> findById(@PathVariable("id") String id) { UserDto userDto = map.getOrDefault(id, null); if (userDto != null) { return ApiResponseDto.success(userDto); } return ApiResponseDto.noData("未查询到数据"); } }
简单模拟根据用户编号查询用户的接口,关键是我们使用注解@RateLimiter 的方法可以做限流,看是否能达到我们的要求。这里有两层意思:
虽说和我们上一篇的设计是一样的,但是得到的结果可能就不一样了,具体的我们来看看测试结果,然后了解下为啥可能会不一样。
写个简单的单元测试方法。
测试方法
package com.alian.redissonLimit; import com.alian.redissonLimit.controller.UserController; import com.alian.redissonLimit.dto.ApiResponseDto; import com.alian.redissonLimit.dto.UserDto; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.concurrent.CountDownLatch; @Slf4j @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class RedisLimitTest { @Autowired private UserController userController; @Test public void singleUserRequest() { final CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { new Thread(() -> { try { // 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回true值 countDownLatch.await(); //获得锁 ApiResponseDto<UserDto> responseDto = userController.findById("BAT001"); log.info("线程【{}】执行完,结果信息:{}", Thread.currentThread().getName(), responseDto.getMsg()); } catch (InterruptedException e) { e.printStackTrace(); } }, "Thread" + i).start(); // 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少. countDownLatch.countDown(); } try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }
单用户后台结果1:
11:39:17 953 INFO [Thread3]:【单用户】请求,线程:【Thread3】,获取令牌的结果:true
11:39:17 953 INFO [Thread3]:【多用户】请求,线程:【Thread3】,获取令牌的结果:true
11:39:17 963 INFO [Thread3]:线程【Thread3】执行完,结果信息:success
11:39:18 969 INFO [Thread2]:【单用户】请求,线程:【Thread2】,获取令牌的结果:false
11:39:18 969 INFO [Thread1]:【单用户】请求,线程:【Thread1】,获取令牌的结果:false
11:39:18 969 INFO [Thread4]:【单用户】请求,线程:【Thread4】,获取令牌的结果:false
11:39:18 969 INFO [Thread0]:【单用户】请求,线程:【Thread0】,获取令牌的结果:false
11:39:18 969 ERROR [Thread1]:限流模式:OVERALL; 限流数量:1; 限流时间间隔:1
11:39:18 969 ERROR [Thread4]:限流模式:OVERALL; 限流数量:1; 限流时间间隔:1
11:39:18 969 ERROR [Thread2]:限流模式:OVERALL; 限流数量:1; 限流时间间隔:1
11:39:18 969 ERROR [Thread0]:限流模式:OVERALL; 限流数量:1; 限流时间间隔:1
从上面的结果我们可以看到同时1秒内同时接收5个请求,只有1个请求拿到了令牌,我们之前就说了,可能结果还能不同,我们这里的间隔是1秒钟,但是如果我们把获取令牌的时间改成2秒呢?
boolean tryAcquire = limiter.tryAcquire(1, redissonRateLimiter.rateInterval(), TimeUnit.SECONDS);
如果改成
boolean tryAcquire = limiter.tryAcquire(1, 2, TimeUnit.SECONDS);
单用户后台结果2:
13:19:43 617 INFO [Thread4]:【单用户】请求,线程:【Thread4】,获取令牌的结果:true
13:19:43 623 INFO [Thread4]:线程【Thread4】执行完,结果信息:success
13:19:44 627 INFO [Thread3]:【单用户】请求,线程:【Thread3】,获取令牌的结果:true
13:19:44 627 INFO [Thread3]:线程【Thread3】执行完,结果信息:success
13:19:45 617 INFO [Thread1]:【单用户】请求,线程:【Thread1】,获取令牌的结果:false
13:19:45 617 INFO [Thread0]:【单用户】请求,线程:【Thread0】,获取令牌的结果:false
13:19:45 617 INFO [Thread2]:【单用户】请求,线程:【Thread2】,获取令牌的结果:false
13:19:45 617 ERROR [Thread1]:限流模式:OVERALL; 限流数量:1; 限流时间间隔:1
13:19:45 617 ERROR [Thread2]:限流模式:OVERALL; 限流数量:1; 限流时间间隔:1
13:19:45 617 ERROR [Thread0]:限流模式:OVERALL; 限流数量:1; 限流时间间隔:1
从这里我们可以看到单用户请求5次,最后有2个拿到了令牌,tryAcquire(1, 2, TimeUnit.SECONDS),这个意思就是在2秒内获取1个令牌即可,虽说第一秒只有1个令牌,但是到第二秒内又产生了1个令牌,所以5个请求,有2个请求拿到了令牌。其实这种也有一定的好处,就是先请求进来的,有一定的概率会分配到锁,也就是先到先得的概率大一点,类似在排队一样。
我们还是把上面的修改,改回去(tryAcquire(1, redissonRateLimiter.rateInterval(), TimeUnit.SECONDS))。
测试方法
package com.alian.redissonLimit; import com.alian.redissonLimit.controller.UserController; import com.alian.redissonLimit.dto.ApiResponseDto; import com.alian.redissonLimit.dto.UserDto; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.concurrent.CountDownLatch; @Slf4j @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class RedisLimitTest { @Autowired private UserController userController; @Test public void multiUserRequest() { final CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 1; i < 6; i++) { String id = "BAT00" + i; new Thread(() -> { try { // 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回true值 countDownLatch.await(); //获得锁 ApiResponseDto<UserDto> responseDto = userController.findById(id); log.info("线程【{}】执行完,结果信息:{}", Thread.currentThread().getName(), responseDto.getMsg()); } catch (InterruptedException e) { e.printStackTrace(); } }, "Thread" + i).start(); // 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少. countDownLatch.countDown(); } try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }
我们测试方法是5个用户,5个线程同时请求我们的接口,所以单用户限流都是能拿到令牌的,5个请求都拿到了令牌,而接口限流是每秒3个令牌,所以有2个触发了限流。
单用户后台结果1:
13:38:02 763 INFO [Thread4]:【单用户】请求,线程:【Thread4】,获取令牌的结果:true
13:38:02 763 INFO [Thread1]:【单用户】请求,线程:【Thread1】,获取令牌的结果:true
13:38:02 763 INFO [Thread2]:【单用户】请求,线程:【Thread2】,获取令牌的结果:true
13:38:02 763 INFO [Thread5]:【单用户】请求,线程:【Thread5】,获取令牌的结果:true
13:38:02 763 INFO [Thread3]:【单用户】请求,线程:【Thread3】,获取令牌的结果:true
13:38:02 764 INFO [Thread5]:【多用户】请求,线程:【Thread5】,获取令牌的结果:true
13:38:02 764 INFO [Thread4]:【多用户】请求,线程:【Thread4】,获取令牌的结果:true
13:38:02 764 INFO [Thread2]:【多用户】请求,线程:【Thread2】,获取令牌的结果:true
13:38:02 768 INFO [Thread5]:线程【Thread5】执行完,结果信息:未查询到数据
13:38:02 768 INFO [Thread2]:线程【Thread2】执行完,结果信息:success
13:38:02 768 INFO [Thread4]:线程【Thread4】执行完,结果信息:未查询到数据
13:38:03 777 INFO [Thread1]:【多用户】请求,线程:【Thread1】,获取令牌的结果:false
13:38:03 777 INFO [Thread3]:【多用户】请求,线程:【Thread3】,获取令牌的结果:false
13:38:03 777 ERROR [Thread1]:限流模式:OVERALL; 限流数量:3; 限流时间间隔:1
13:38:03 777 ERROR [Thread3]:限流模式:OVERALL; 限流数量:3; 限流时间间隔:1
从结果上可以看出,先是用户限流执行,然后接口限流执行。其实这里还和我们的注解顺序有关系,如果我们把com.alian.redissonLimit.controller的 findById 方法上面的组合注解
@RedissonRateLimiters(value = {
@RedissonRateLimiter(keys = {"#id"}, mode = RateType.OVERALL, rate = 1, rateInterval = 1, showPromptMsg = "您查询太快了,请稍后再试"),
@RedissonRateLimiter(mode = RateType.OVERALL, rate = 3, rateInterval = 1, showPromptMsg = "系统繁忙,请稍后再试")
})
改成(注解顺序改变)
@RedissonRateLimiters(value = {
@RedissonRateLimiter(mode = RateType.OVERALL, rate = 3, rateInterval = 1, showPromptMsg = "系统繁忙,请稍后再试"),
@RedissonRateLimiter(keys = {"#id"}, mode = RateType.OVERALL, rate = 1, rateInterval = 1, showPromptMsg = "您查询太快了,请稍后再试")
})
单用户后台结果2:
13:40:18 786 INFO [Thread3]:【多用户】请求,线程:【Thread3】,获取令牌的结果:true
13:40:18 786 INFO [Thread5]:【多用户】请求,线程:【Thread5】,获取令牌的结果:true
13:40:18 786 INFO [Thread4]:【多用户】请求,线程:【Thread4】,获取令牌的结果:true
13:40:18 805 INFO [Thread4]:【单用户】请求,线程:【Thread4】,获取令牌的结果:true
13:40:18 805 INFO [Thread5]:【单用户】请求,线程:【Thread5】,获取令牌的结果:true
13:40:18 805 INFO [Thread3]:【单用户】请求,线程:【Thread3】,获取令牌的结果:true
13:40:18 807 INFO [Thread4]:线程【Thread4】执行完,结果信息:未查询到数据
13:40:18 807 INFO [Thread3]:线程【Thread3】执行完,结果信息:success
13:40:18 807 INFO [Thread5]:线程【Thread5】执行完,结果信息:未查询到数据
13:40:19 797 INFO [Thread2]:【多用户】请求,线程:【Thread2】,获取令牌的结果:false
13:40:19 797 INFO [Thread1]:【多用户】请求,线程:【Thread1】,获取令牌的结果:false
13:40:19 797 ERROR [Thread2]:限流模式:OVERALL; 限流数量:3; 限流时间间隔:1
13:40:19 797 ERROR [Thread1]:限流模式:OVERALL; 限流数量:3; 限流时间间隔:1
从结果上可以看出,先是接口限流执行,然后用户限流执行,和之前的执行顺序相比就是相反的了。
所以还是建议使用我之前介绍的采用Lua脚本:Spring Boot 整合Redis使用Lua脚本实现限流
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。