赞
踩
在高并发系统中我们通常需要考虑当请求量过大时,如果进行限流、降级,这里我们讨论下常用的限流方案,最后给出合理的实例
计数器法是实现起来最简单的一种算法。其思路是,比如比如我们规定某个接口在一分钟之内只能处理100个请求,那么每次有请求进来的时候我们按每分钟进行计数,当请求大于100个的时候就拒绝请求,如果到了第二分钟则重新从0开始计数,代码示例如下
//固定map大小为5,超出最大数量时抛弃较早的元素 static Map<String, AtomicInteger> cache = Collections.synchronizedMap(new LinkedHashMap<String, AtomicInteger>() { @Override protected boolean removeEldestEntry(final Map.Entry eldest) { return size() > 5; } }); public static boolean hasGrant() { String currentMinute = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm")); AtomicInteger integer = cache.get(currentMinute); if (integer == null) { integer = new AtomicInteger(); cache.put(currentMinute,integer); }else{ integer.incrementAndGet(); } if(integer.get() > 100){ return false; }else{ return true; } }
这样实现虽然简单,但有临界的问题,比如在某分钟的最后一秒、及下一份的第一秒瞬时有大量的请求,此时系统请求量会突然暴增,超过我们限制的每分钟100个请求的初衷,因此有了第二种方案
在滑动窗口法中,我们对时间进行跟细力度的划分,如把一分钟分为6个时间窗口,每格代表10s,每隔各自都有自己独立的计数器,然后每隔10s窗口会滑动一格,这样就解决了临界的问题
由此可见,我们对每隔时间窗口划分的越精细,限流的统计就会越精确,且计数器法是滑动窗口的一种只有一个窗口的实现
代码代码示例如下:
public class SlidingWindowLimiter { //循环队列,就是装多个窗口用,数量是windowSize的2倍 private AtomicInteger[] timeSlices; //队列的总长度 private int timeSliceSize; //每个时间片的时长,以毫秒为单位 private int timeMillisPerSlice; //共有多少个时间片(即窗口长度 private int windowSize; //在一个完整窗口期内允许通过的最大阈值 private int threshold; //该滑窗的起始创建时间,也就是第一个数据 private long beginTimestamp; //最后一个数据的时间戳 private long lastAddTimestamp; public SlidingWindowLimiter(int timeMillisPerSlice, int windowSize, int threshold) { this.timeMillisPerSlice = timeMillisPerSlice; this.windowSize = windowSize; this.threshold = threshold; // 保证存储在至少两个window,也就是让窗口滑动起来 this.timeSliceSize = windowSize * 2; reset(); } private void reset() { beginTimestamp = System.currentTimeMillis(); //窗口个数 AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize]; for (int i = 0; i < timeSliceSize; i++) { localTimeSlices[i] = new AtomicInteger(0); } timeSlices = localTimeSlices; } private void clearFromIndex(int index) { for (int i = 1; i <= windowSize; i++) { int j = index + i; if (j >= windowSize * 2) { j -= windowSize * 2; } timeSlices[j].set(0); } } //计算时间窗口的索引 private int locationIndex() { long now = System.currentTimeMillis(); //如果当前的key已经超出一整个时间片了,那么就直接初始化就行了,不用去计算了 if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) { reset(); } return (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize); } public boolean hasGrant() { int index = locationIndex(); //然后清空自己前面windowSize到2*windowSize之间的数据格的数据, //如当前index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来就是该窗口内的总和 clearFromIndex(index); int sum = 0; // 在当前时间片里继续+1 sum += timeSlices[index].incrementAndGet(); //加上前面几个时间片 for (int i = 1; i < windowSize; i++) { sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get(); } lastAddTimestamp = System.currentTimeMillis(); return sum >= threshold; } private void print() { for (AtomicInteger integer : timeSlices) { System.out.print(integer + "-"); } } public static void main(String[] args) { //1秒一个时间片,窗口共6个,即6秒允许8个请求 SlidingWindowLimiter limiter = new SlidingWindowLimiter(1000, 6, 8); for (int i = 0; i < 100; i++) { System.out.println(limiter.hasGrant()); limiter.print(); System.out.println("--------------------------"); try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } } }
漏桶算法的思路就是有一个固定的桶,桶里面的水会以固定的速度流出去;当流入的水(即请求)没有超过桶的容量时,请求正常,当桶满了即被限流
漏桶算法限制了请求的速度,会让一个借口匀速的处理请求,所以不会有临界的情况,适用于接口处理速度有限的情况
代码示例如下
//桶容量,也就是说当处理速度有限时,我们能够接受多少请求,大于这个值的请求则直接拒绝 public AtomicInteger capacity = new AtomicInteger(10); //当前缓存的请求数量 public AtomicInteger water = new AtomicInteger(0); //请求处理速度,结合下方 10/1000,可以理解为1秒允许10个请求 public int rate = 10; //最后一次请求时间 public long lastTime = System.currentTimeMillis(); public boolean hasGrant() { long now = System.currentTimeMillis(); //计算当前水量 water = new AtomicInteger(Math.max(0, (int) (water.get() - (now - lastTime) * rate / 1000))); lastTime = now; if (capacity.get() - water.get() < 1) { // 若桶满,则拒绝 return false; } else { // 还有容量 water.incrementAndGet(); return true; } }
令牌桶算法的思路是,首先我们有一个固定容量的桶用于存放令牌(token),一开始桶是空的,token会以固定的速度往桶内填充,知道桶满了,多余的token会被丢弃;有请求时会尝试从桶内获取token,获取失败则被拒绝
这种算法稍微有点复杂,实现可以参考guava包中的RateLimiter类的实现
上面介绍了集中常用的算法,下面讲一下在实际项目中,我们如何做限流降级
首先定义一个注解
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface RateLimiterAnno { /** * 每秒允许的请求数量 * @return */ double permitsPerSecond() default 30; /** * 超时时间,单位毫秒 * 如果N秒内不能成功访问,则进入失败逻辑 * @return */ long timeout() default 2L; }
此注解可以加在我们想要限流的方法上,如
@RateLimiterAnno(permitsPerSecond = 0.1)
@RequestMapping(value = "/adinfo/bonusRule")
public List bonusRule() {
//do something
return new ArrayList();
}
然后我们定义一个切面用于限流
@Slf4j @Aspect @Component public class RateLimiterAdvice { private Map<String, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>(); @Around(value = "execution(* com.kevindai.core.controller.*Controller.*(..))") public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable { MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); Method method = methodSignature.getMethod(); RateLimiterAnno limiter = method.getAnnotation(RateLimiterAnno.class); if (limiter != null) { RequestAttributes ra = RequestContextHolder.getRequestAttributes(); ServletRequestAttributes sra = (ServletRequestAttributes) ra; HttpServletRequest request = sra.getRequest(); String requestURI = request.getRequestURI(); String contentType = request.getContentType(); String path = requestURI + "_" + contentType; RateLimiter rateLimiter = null; if (rateLimiterMap.containsKey(path)) { rateLimiter = rateLimiterMap.get(path); } else { rateLimiter = RateLimiter.create(limiter.permitsPerSecond()); rateLimiterMap.put(path, rateLimiter); } boolean tryAcquire = rateLimiter.tryAcquire(limiter.timeout(), TimeUnit.MILLISECONDS); if (tryAcquire) { return joinPoint.proceed(); } else { log.warn("method {} cann't accept so many request", requestURI); throw new SystemBussyException(ErrorCode.SYSTEM_BUSY); } } return joinPoint.proceed(); } }
这样就完成了限流的功能了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。