当前位置:   article > 正文

reids滑动窗口限流(踩坑记)_zrevrangebyscore +inf 限流

zrevrangebyscore +inf 限流

滑动窗口算法

        滑动窗口算法作为限流算法之一,运用非常广泛,最经典的运用当属于TCP协议的滑动窗口算法,加上拥塞控制,从而保证了高效的可靠传输,让世界得以互联(虽然我身处墙域网),让我等有口饭吃。

        说回正题,这里要讨论的仅仅是滑动窗口算法,生产上有这样的场景,业务上需要调用第三方,例如,我们需要批量发送短信,那么就需要将手机号批量发送到三方系统,但是由于三方系统在并发吞吐量上存在性能瓶颈,所以需要我们控制避免单位时间内送号过猛,压垮三方服务,所以在单位时间内(一分钟)我们需要限制最大手机号送号量,算法简图如下:

        因为是微服务分布式环境,同一服务存在多个节点,故而我们不能使用java内部的数据结构,需要借助于中间件,最容易想到的就是redis,这里我们讨论的也是如何使用redis时间滑动窗口限流。

        因为我们是按批发送手机号,因此最先想到的便是利用redis的list结构,在list中保存timestamp_num这样的数据结构,即每次发送的时间戳和数量,每次从左侧入队,那么一分钟内总共发送了多少只需要将一分钟内所有timestamp_num的num累加即可,因为要保证原子操作所以必然需要使用lua脚本的方式实现,脚本如下:

  1. local function myf(key, timestamp, window)
  2. local idx, curr = 0, 0
  3. while true do
  4. local entry = redis.call('lindex', key, idx)
  5. if (entry == nil or entry == false) then
  6. break
  7. end
  8. local entry_time_str, num_str = string.match(entry, "(%d+)_(%d+)");
  9. local entry_time = tonumber(entry_time_str);
  10. if (tonumber(timestamp) - tonumber(window) * 1000 < entry_time) then
  11. idx = idx + 1;
  12. curr = curr + tonumber(num_str);
  13. else
  14. return idx, curr
  15. end
  16. end
  17. end;
  18. local idx, curr = myf(KEYS[1], ARGV[1], ARGV[2])
  19. local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
  20. if (result > 0) then
  21. local newElement = table.concat({ ARGV[1], '_', result });
  22. redis.pcall('lpush', KEYS[1], newElement);
  23. else
  24. result = 0
  25. end ;
  26. redis.call('ltrim', KEYS[1], 0, idx);
  27. redis.call('expire', KEYS[1], tonumber(ARGV[2]));
  28. return result;

此时我们用的时间戳是应用内部的时间戳,但是多个节点可能时间不同步,导致时间不一致,而且,可能导致list队列的时间不是单调递增,造成元素不是按时间的先后顺序,要达到顺序还需要从新排序,这无疑会增加脚本的复杂度,因此我们需要把获取时间放在脚本内部,用redis的服务器时间,从而脚本如下:

  1. local function myf(key, timestamp, window)
  2. local idx, curr = 0, 0
  3. while true do
  4. local entry = redis.call('lindex', key, idx)
  5. if (entry == nil or entry == false) then
  6. break
  7. end
  8. local entry_time_str, num_str = string.match(entry, "(%d+)_(%d+)");
  9. local entry_time = tonumber(entry_time_str);
  10. if (tonumber(timestamp) - tonumber(window) * 1000 < entry_time) then
  11. idx = idx + 1;
  12. curr = curr + tonumber(num_str);
  13. else
  14. return idx, curr
  15. end
  16. end
  17. end;
  18. local current_info = redis.call('time')
  19. -- 返回的是秒和微秒两个元素
  20. local curr_time = current_info[1] * 1000 + current_info[2] / 1000
  21. local idx, curr = myf(KEYS[1], curr_time, ARGV[2])
  22. local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
  23. if (result > 0) then
  24. local newElement = table.concat({ curr_time, '_', result });
  25. redis.pcall('lpush', KEYS[1], newElement);
  26. else
  27. result = 0
  28. end ;
  29. redis.call('ltrim', KEYS[1], 0, idx);
  30. redis.call('expire', KEYS[1], tonumber(ARGV[2]));
  31. return result;

        这里我们看似完美实现了滑动窗口算法,但是呢,在生产中,我们的redis是集群部署,就会有主从节点之分,就会有主从复制,对于lua脚本,redis默认是采用传递脚本到从节点,想象以下,脚本里使用了redis.call(time)这样的命令,获取当前服务器时间,传递到从节点执行的时候这个时间肯定会不一样,那不就造成了主从节点数据不一致的情况么,设计redis的大叔为我们考虑到了这一点,所以,redis大叔设计的时候对于存在随机性的这种命令,再其后执行写操作,在集群环境下是不被允许的,会抛出如下异常:

Write commands not allowed after non deterministic commands

设计redis的大叔们也考虑到了这种直接拒绝执行的方式不太友好,所以在3.2开始又提供了另外一个命令,redis.replicate_commands() 

在脚本第一行执行这个函数,Redis会将修改数据的命令收集起来,然后用MULTI/EXEC包裹起来,这种方式称为script effects replication,这个类似于mysql中的基于行的复制模式,将非纯函数的值计算出来,用来持久化和主从复制。

 从而我们的脚本再次进化为:

  1. local function myf(key, timestamp, window)
  2. local idx, curr = 0, 0
  3. while true do
  4. local entry = redis.call('lindex', key, idx)
  5. if (entry == nil or entry == false) then
  6. break
  7. end
  8. local entry_time_str, num_str = string.match(entry, "(%d+)_(%d+)");
  9. local entry_time = tonumber(entry_time_str);
  10. if (tonumber(timestamp) - tonumber(window) * 1000 < entry_time) then
  11. idx = idx + 1;
  12. curr = curr + tonumber(num_str);
  13. else
  14. return idx, curr
  15. end
  16. end
  17. end;
  18. -- 告诉redis以redis命令的形式实现主从复制
  19. redis.replicate_commands()
  20. local current_info = redis.call('time')
  21. -- 返回的是秒和微秒两个元素
  22. local curr_time = current_info[1] * 1000 + current_info[2] / 1000
  23. local idx, curr = myf(KEYS[1], curr_time, ARGV[2])
  24. local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
  25. if (result > 0) then
  26. local newElement = table.concat({ curr_time, '_', result });
  27. redis.pcall('lpush', KEYS[1], newElement);
  28. else
  29. result = 0
  30. end ;
  31. redis.call('ltrim', KEYS[1], 0, idx);
  32. redis.call('expire', KEYS[1], tonumber(ARGV[2]));
  33. return result;

        遗憾的是我们的生产环境依然使用的redis 3.0.5版本,对于这个新特性不支持,所以我们的时间戳参数只能从客户端传入,前文说了,外部传入时间戳不经有时间不一致问题,还有造成队列不是按时间单调递增的,要保持单调递增又得在脚本总增加定位排序逻辑,无疑增加复杂度。

        退而求其次,我们转向了使用zset这种自带排序功能的数据结构,同时获取时间戳也是都从redis服务端获取,单独调用redis的time命令,来保证多个节点使用同一个时间生成器。最终我们的限流脚本就是这样:

  1. local max = tonumber(ARGV[1])
  2. local max_entry = redis.call('zrevrange', KEYS[1], 0, 0, 'withscores')
  3. if (max_entry ~= nil and table.getn(max_entry) > 0 and max_entry[2] > max) then
  4. max = max_entry[2]
  5. end
  6. local min = max - tonumber(ARGV[2]) * 1000 - 1
  7. local members = redis.call('zrevrangebyscore', KEYS[1], max, min)
  8. local curr, len = 0, 0
  9. if (members ~= nil) then
  10. len = table.getn(members)
  11. if len > 0 then
  12. for i = 1, len do
  13. local num_str = string.match(members[i], "%d+_(%d+)");
  14. curr = curr + tonumber(num_str)
  15. end
  16. end
  17. end
  18. if (min > tonumber(ARGV[1])) then
  19. return table.concat({ 0, '_', len });
  20. end
  21. local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
  22. if (result > 0) then
  23. local newElement = table.concat({ ARGV[1], '_', result });
  24. redis.pcall('zadd', KEYS[1], max, newElement);
  25. len = len + 1
  26. end ;
  27. redis.call('zremrangebyscore', KEYS[1], 0, min);
  28. return table.concat({ result, '_', len });

        至此,鉴于redis版本较低,升级redis版本成本太大,我们实现的滑动窗口限流算法依然不完美,但是也算基本能满足业务需求,故而在以后的环境搭建中最好还是使用较新版本的为好。这也是整个踩坑填坑的心路历程。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/453799
推荐阅读
相关标签
  

闽ICP备14008679号