当前位置:   article > 正文

Java简单实现滑动窗口_java 滑动窗口算法

java 滑动窗口算法

由于最近有一个统计单位时间内某key的访问次数的需求,譬如每5秒访问了redis的某key超过100次,就取出该key单独处理。

这样的单位时间统计,很明显我们都知道有个边界问题,譬如5秒内100次的限制。刚好前4.99秒访问都是0,最后0.01秒来了100次,5.01秒又来了100次。也就是访问有明显的毛刺情况出现,为了弱化这个毛刺情况,我们可以采用滑动窗口

滑动窗口

滑动窗口的主要原理比较简单,就是将这个单位时间进行拆分,譬如5秒的统计范围,我们将它划分成5个1秒。

当请求进来时,先判断当前请求属于这5个1秒的时间片中的哪个,然后将对应的时间片对应的统计值加1,再判断当前加上前4个时间片的次数总和是否已经超过了设置的阈值。

当时间已经到达第6个时间片时,就把第一个时间片给干掉,因为无论第一片是多少个统计值,它都不会再参与后续的计算了。

就这样,随着时间的推移,统计值就随着各个时间片的滚动,不断地进行统计。

具体要将单位时间拆分为多少片,要根据实际情况来决定。当然,毫无疑问的是切分的越小,毛刺现象也越少。系统统计也越准确,随之就是内存占用会越大,因为你的这个窗口的数组会更大。

代码实现思路就是定义好分片数量,每个分片都有一个独立的计数器,所有的分片合计为一个数组。当请求来时,按照分片规则,判断请求应该划分到哪个分片中去。要判断是否超过阈值,就将前N个统计值相加,对比定义的阈值即可。

代码我直接引用别人写好的了,源代码在https://www.iteye.com/blog/go12345-1744728

  1. import java.util.concurrent.atomic.AtomicInteger;
  2. /**
  3. * 滑动窗口。该窗口同样的key,都是单线程计算。
  4. *
  5. * @author wuweifeng wrote on 2019-12-04.
  6. */
  7. public class SlidingWindow {
  8. /**
  9. * 循环队列,就是装多个窗口用,该数量是windowSize的2倍
  10. */
  11. private AtomicInteger[] timeSlices;
  12. /**
  13. * 队列的总长度
  14. */
  15. private int timeSliceSize;
  16. /**
  17. * 每个时间片的时长,以毫秒为单位
  18. */
  19. private int timeMillisPerSlice;
  20. /**
  21. * 共有多少个时间片(即窗口长度)
  22. */
  23. private int windowSize;
  24. /**
  25. * 在一个完整窗口期内允许通过的最大阈值
  26. */
  27. private int threshold;
  28. /**
  29. * 该滑窗的起始创建时间,也就是第一个数据
  30. */
  31. private long beginTimestamp;
  32. /**
  33. * 最后一个数据的时间戳
  34. */
  35. private long lastAddTimestamp;
  36. public static void main(String[] args) {
  37. //1秒一个时间片,窗口共5
  38. SlidingWindow window = new SlidingWindow(100, 4, 8);
  39. for (int i = 0; i < 100; i++) {
  40. System.out.println(window.addCount(2));
  41. window.print();
  42. System.out.println("--------------------------");
  43. try {
  44. Thread.sleep(102);
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }
  50. public SlidingWindow(int duration, int threshold) {
  51. //超过10分钟的按10分钟
  52. if (duration > 600) {
  53. duration = 600;
  54. }
  55. //要求5秒内探测出来的,
  56. if (duration <= 5) {
  57. this.windowSize = 5;
  58. this.timeMillisPerSlice = duration * 200;
  59. } else {
  60. this.windowSize = 10;
  61. this.timeMillisPerSlice = duration * 100;
  62. }
  63. this.threshold = threshold;
  64. // 保证存储在至少两个window
  65. this.timeSliceSize = windowSize * 2;
  66. reset();
  67. }
  68. public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) {
  69. this.timeMillisPerSlice = timeMillisPerSlice;
  70. this.windowSize = windowSize;
  71. this.threshold = threshold;
  72. // 保证存储在至少两个window
  73. this.timeSliceSize = windowSize * 2;
  74. reset();
  75. }
  76. /**
  77. * 初始化
  78. */
  79. private void reset() {
  80. beginTimestamp = SystemClock.now();
  81. //窗口个数
  82. AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
  83. for (int i = 0; i < timeSliceSize; i++) {
  84. localTimeSlices[i] = new AtomicInteger(0);
  85. }
  86. timeSlices = localTimeSlices;
  87. }
  88. private void print() {
  89. for (AtomicInteger integer : timeSlices) {
  90. System.out.print(integer + "-");
  91. }
  92. }
  93. /**
  94. * 计算当前所在的时间片的位置
  95. */
  96. private int locationIndex() {
  97. long now = SystemClock.now();
  98. //如果当前的key已经超出一整个时间片了,那么就直接初始化就行了,不用去计算了
  99. if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
  100. reset();
  101. }
  102. return (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
  103. }
  104. /**
  105. * 增加count个数量
  106. */
  107. public boolean addCount(int count) {
  108. //当前自己所在的位置,是哪个小时间窗
  109. int index = locationIndex();
  110. // System.out.println("index:" + index);
  111. //然后清空自己前面windowSize到2*windowSize之间的数据格的数据
  112. //譬如1秒分4个窗口,那么数组共计8个窗口
  113. //当前index5时,就清空6781。然后把2345的加起来就是该窗口内的总和
  114. clearFromIndex(index);
  115. int sum = 0;
  116. // 在当前时间片里继续+1
  117. sum += timeSlices[index].addAndGet(count);
  118. //加上前面几个时间片
  119. for (int i = 1; i < windowSize; i++) {
  120. sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
  121. }
  122. System.out.println(sum + "---" + threshold);
  123. lastAddTimestamp = SystemClock.now();
  124. return sum >= threshold;
  125. }
  126. private void clearFromIndex(int index) {
  127. for (int i = 1; i <= windowSize; i++) {
  128. int j = index + i;
  129. if (j >= windowSize * 2) {
  130. j -= windowSize * 2;
  131. }
  132. timeSlices[j].set(0);
  133. }
  134. }
  135. }
  1. import java.util.concurrent.Executors;
  2. import java.util.concurrent.ScheduledExecutorService;
  3. import java.util.concurrent.ThreadFactory;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.atomic.AtomicLong;
  6. /**
  7. * 用于解决高并发下System.currentTimeMillis卡顿
  8. * @author lry
  9. */
  10. public class SystemClock {
  11. private final int period;
  12. private final AtomicLong now;
  13. private static class InstanceHolder {
  14. private static final SystemClock INSTANCE = new SystemClock(1);
  15. }
  16. private SystemClock(int period) {
  17. this.period = period;
  18. this.now = new AtomicLong(System.currentTimeMillis());
  19. scheduleClockUpdating();
  20. }
  21. private static SystemClock instance() {
  22. return InstanceHolder.INSTANCE;
  23. }
  24. private void scheduleClockUpdating() {
  25. ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
  26. @Override
  27. public Thread newThread(Runnable runnable) {
  28. Thread thread = new Thread(runnable, "System Clock");
  29. thread.setDaemon(true);
  30. return thread;
  31. }
  32. });
  33. scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
  34. }
  35. private long currentTimeMillis() {
  36. return now.get();
  37. }
  38. /**
  39. * 用来替换原来的System.currentTimeMillis()
  40. */
  41. public static long now() {
  42. return instance().currentTimeMillis();
  43. }
  44. }

 

参照代码main方法,通过修改每个时间片的时间,窗口数量,阈值,来进行测试。

这就是简单实现了。

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/274711
推荐阅读
相关标签
  

闽ICP备14008679号