赞
踩
滑动窗口算法
滑动窗口算法作为限流算法之一,运用非常广泛,最经典的运用当属于TCP协议的滑动窗口算法,加上拥塞控制,从而保证了高效的可靠传输,让世界得以互联(虽然我身处墙域网),让我等有口饭吃。
说回正题,这里要讨论的仅仅是滑动窗口算法,生产上有这样的场景,业务上需要调用第三方,例如,我们需要批量发送短信,那么就需要将手机号批量发送到三方系统,但是由于三方系统在并发吞吐量上存在性能瓶颈,所以需要我们控制避免单位时间内送号过猛,压垮三方服务,所以在单位时间内(一分钟)我们需要限制最大手机号送号量,算法简图如下:
因为是微服务分布式环境,同一服务存在多个节点,故而我们不能使用java内部的数据结构,需要借助于中间件,最容易想到的就是redis,这里我们讨论的也是如何使用redis时间滑动窗口限流。
因为我们是按批发送手机号,因此最先想到的便是利用redis的list结构,在list中保存timestamp_num这样的数据结构,即每次发送的时间戳和数量,每次从左侧入队,那么一分钟内总共发送了多少只需要将一分钟内所有timestamp_num的num累加即可,因为要保证原子操作所以必然需要使用lua脚本的方式实现,脚本如下:
- local function myf(key, timestamp, window)
- local idx, curr = 0, 0
- while true do
- local entry = redis.call('lindex', key, idx)
- if (entry == nil or entry == false) then
- break
- end
- local entry_time_str, num_str = string.match(entry, "(%d+)_(%d+)");
- local entry_time = tonumber(entry_time_str);
- if (tonumber(timestamp) - tonumber(window) * 1000 < entry_time) then
- idx = idx + 1;
- curr = curr + tonumber(num_str);
- else
- return idx, curr
- end
- end
- end;
-
- local idx, curr = myf(KEYS[1], ARGV[1], ARGV[2])
- local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
- if (result > 0) then
- local newElement = table.concat({ ARGV[1], '_', result });
- redis.pcall('lpush', KEYS[1], newElement);
- else
- result = 0
- end ;
- redis.call('ltrim', KEYS[1], 0, idx);
- redis.call('expire', KEYS[1], tonumber(ARGV[2]));
- return result;
此时我们用的时间戳是应用内部的时间戳,但是多个节点可能时间不同步,导致时间不一致,而且,可能导致list队列的时间不是单调递增,造成元素不是按时间的先后顺序,要达到顺序还需要从新排序,这无疑会增加脚本的复杂度,因此我们需要把获取时间放在脚本内部,用redis的服务器时间,从而脚本如下:
- local function myf(key, timestamp, window)
- local idx, curr = 0, 0
- while true do
- local entry = redis.call('lindex', key, idx)
- if (entry == nil or entry == false) then
- break
- end
- local entry_time_str, num_str = string.match(entry, "(%d+)_(%d+)");
- local entry_time = tonumber(entry_time_str);
- if (tonumber(timestamp) - tonumber(window) * 1000 < entry_time) then
- idx = idx + 1;
- curr = curr + tonumber(num_str);
- else
- return idx, curr
- end
- end
- end;
-
- local current_info = redis.call('time')
- -- 返回的是秒和微秒两个元素
- local curr_time = current_info[1] * 1000 + current_info[2] / 1000
- local idx, curr = myf(KEYS[1], curr_time, ARGV[2])
- local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
- if (result > 0) then
- local newElement = table.concat({ curr_time, '_', result });
- redis.pcall('lpush', KEYS[1], newElement);
- else
- result = 0
- end ;
- redis.call('ltrim', KEYS[1], 0, idx);
- redis.call('expire', KEYS[1], tonumber(ARGV[2]));
- return result;
这里我们看似完美实现了滑动窗口算法,但是呢,在生产中,我们的redis是集群部署,就会有主从节点之分,就会有主从复制,对于lua脚本,redis默认是采用传递脚本到从节点,想象以下,脚本里使用了redis.call(time)这样的命令,获取当前服务器时间,传递到从节点执行的时候这个时间肯定会不一样,那不就造成了主从节点数据不一致的情况么,设计redis的大叔为我们考虑到了这一点,所以,redis大叔设计的时候对于存在随机性的这种命令,再其后执行写操作,在集群环境下是不被允许的,会抛出如下异常:
Write commands not allowed after non deterministic commands
设计redis的大叔们也考虑到了这种直接拒绝执行的方式不太友好,所以在3.2开始又提供了另外一个命令,redis.replicate_commands()
在脚本第一行执行这个函数,Redis会将修改数据的命令收集起来,然后用MULTI/EXEC包裹起来,这种方式称为script effects replication,这个类似于mysql中的基于行的复制模式,将非纯函数的值计算出来,用来持久化和主从复制。
从而我们的脚本再次进化为:
- local function myf(key, timestamp, window)
- local idx, curr = 0, 0
- while true do
- local entry = redis.call('lindex', key, idx)
- if (entry == nil or entry == false) then
- break
- end
- local entry_time_str, num_str = string.match(entry, "(%d+)_(%d+)");
- local entry_time = tonumber(entry_time_str);
- if (tonumber(timestamp) - tonumber(window) * 1000 < entry_time) then
- idx = idx + 1;
- curr = curr + tonumber(num_str);
- else
- return idx, curr
- end
- end
- end;
- -- 告诉redis以redis命令的形式实现主从复制
- redis.replicate_commands()
- local current_info = redis.call('time')
- -- 返回的是秒和微秒两个元素
- local curr_time = current_info[1] * 1000 + current_info[2] / 1000
- local idx, curr = myf(KEYS[1], curr_time, ARGV[2])
- local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
- if (result > 0) then
- local newElement = table.concat({ curr_time, '_', result });
- redis.pcall('lpush', KEYS[1], newElement);
- else
- result = 0
- end ;
- redis.call('ltrim', KEYS[1], 0, idx);
- redis.call('expire', KEYS[1], tonumber(ARGV[2]));
- return result;
遗憾的是我们的生产环境依然使用的redis 3.0.5版本,对于这个新特性不支持,所以我们的时间戳参数只能从客户端传入,前文说了,外部传入时间戳不经有时间不一致问题,还有造成队列不是按时间单调递增的,要保持单调递增又得在脚本总增加定位排序逻辑,无疑增加复杂度。
退而求其次,我们转向了使用zset这种自带排序功能的数据结构,同时获取时间戳也是都从redis服务端获取,单独调用redis的time命令,来保证多个节点使用同一个时间生成器。最终我们的限流脚本就是这样:
- local max = tonumber(ARGV[1])
- local max_entry = redis.call('zrevrange', KEYS[1], 0, 0, 'withscores')
- if (max_entry ~= nil and table.getn(max_entry) > 0 and max_entry[2] > max) then
- max = max_entry[2]
- end
- local min = max - tonumber(ARGV[2]) * 1000 - 1
- local members = redis.call('zrevrangebyscore', KEYS[1], max, min)
- local curr, len = 0, 0
- if (members ~= nil) then
- len = table.getn(members)
- if len > 0 then
- for i = 1, len do
- local num_str = string.match(members[i], "%d+_(%d+)");
- curr = curr + tonumber(num_str)
- end
- end
- end
- if (min > tonumber(ARGV[1])) then
- return table.concat({ 0, '_', len });
- end
- local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
- if (result > 0) then
- local newElement = table.concat({ ARGV[1], '_', result });
- redis.pcall('zadd', KEYS[1], max, newElement);
- len = len + 1
- end ;
- redis.call('zremrangebyscore', KEYS[1], 0, min);
- return table.concat({ result, '_', len });
至此,鉴于redis版本较低,升级redis版本成本太大,我们实现的滑动窗口限流算法依然不完美,但是也算基本能满足业务需求,故而在以后的环境搭建中最好还是使用较新版本的为好。这也是整个踩坑填坑的心路历程。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。