- local function myf(key, timestamp, window)
- local idx, curr = 0, 0
- while true do
- local entry = redis.call('lindex', key, idx)
- if (entry == nil or entry == false) then
- break
- end
- local entry_time_str, num_str = string.match(entry, "(%d+)_(%d+)");
- local entry_time = tonumber(entry_time_str);
- if (tonumber(timestamp) - tonumber(window) * 1000 < entry_time) then
- idx = idx + 1;
- curr = curr + tonumber(num_str);
- else
- return idx, curr
- end
- end
- end;
- local idx, curr = myf(KEYS[1], ARGV[1], ARGV[2])
- local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
- if (result > 0) then
- local newElement = table.concat({ ARGV[1], '_', result });
- redis.pcall('lpush', KEYS[1], newElement);
- else
- result = 0
- end ;
- redis.call('ltrim', KEYS[1], 0, idx);
- redis.call('expire', KEYS[1], tonumber(ARGV[2]));
- return result;

- local function myf(key, timestamp, window)
- local idx, curr = 0, 0
- while true do
- local entry = redis.call('lindex', key, idx)
- if (entry == nil or entry == false) then
- break
- end
- local entry_time_str, num_str = string.match(entry, "(%d+)_(%d+)");
- local entry_time = tonumber(entry_time_str);
- if (tonumber(timestamp) - tonumber(window) * 1000 < entry_time) then
- idx = idx + 1;
- curr = curr + tonumber(num_str);
- else
- return idx, curr
- end
- end
- end;
- local current_info = redis.call('time')
- -- 返回的是秒和微秒两个元素
- local curr_time = current_info[1] * 1000 + current_info[2] / 1000
- local idx, curr = myf(KEYS[1], curr_time, ARGV[2])
- local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
- if (result > 0) then
- local newElement = table.concat({ curr_time, '_', result });
- redis.pcall('lpush', KEYS[1], newElement);
- else
- result = 0
- end ;
- redis.call('ltrim', KEYS[1], 0, idx);
- redis.call('expire', KEYS[1], tonumber(ARGV[2]));
- return result;

Write commands not allowed after non deterministic commands
在脚本第一行执行这个函数,Redis会将修改数据的命令收集起来,然后用MULTI/EXEC包裹起来,这种方式称为script effects replication,这个类似于mysql中的基于行的复制模式,将非纯函数的值计算出来,用来持久化和主从复制。
- local function myf(key, timestamp, window)
- local idx, curr = 0, 0
- while true do
- local entry = redis.call('lindex', key, idx)
- if (entry == nil or entry == false) then
- break
- end
- local entry_time_str, num_str = string.match(entry, "(%d+)_(%d+)");
- local entry_time = tonumber(entry_time_str);
- if (tonumber(timestamp) - tonumber(window) * 1000 < entry_time) then
- idx = idx + 1;
- curr = curr + tonumber(num_str);
- else
- return idx, curr
- end
- end
- end;
- -- 告诉redis以redis命令的形式实现主从复制
- redis.replicate_commands()
- local current_info = redis.call('time')
- -- 返回的是秒和微秒两个元素
- local curr_time = current_info[1] * 1000 + current_info[2] / 1000
- local idx, curr = myf(KEYS[1], curr_time, ARGV[2])
- local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
- if (result > 0) then
- local newElement = table.concat({ curr_time, '_', result });
- redis.pcall('lpush', KEYS[1], newElement);
- else
- result = 0
- end ;
- redis.call('ltrim', KEYS[1], 0, idx);
- redis.call('expire', KEYS[1], tonumber(ARGV[2]));
- return result;

遗憾的是我们的生产环境依然使用的redis 3.0.5版本,对于这个新特性不支持,所以我们的时间戳参数只能从客户端传入,前文说了,外部传入时间戳不经有时间不一致问题,还有造成队列不是按时间单调递增的,要保持单调递增又得在脚本总增加定位排序逻辑,无疑增加复杂度。
- local max = tonumber(ARGV[1])
- local max_entry = redis.call('zrevrange', KEYS[1], 0, 0, 'withscores')
- if (max_entry ~= nil and table.getn(max_entry) > 0 and max_entry[2] > max) then
- max = max_entry[2]
- end
- local min = max - tonumber(ARGV[2]) * 1000 - 1
- local members = redis.call('zrevrangebyscore', KEYS[1], max, min)
- local curr, len = 0, 0
- if (members ~= nil) then
- len = table.getn(members)
- if len > 0 then
- for i = 1, len do
- local num_str = string.match(members[i], "%d+_(%d+)");
- curr = curr + tonumber(num_str)
- end
- end
- end
- if (min > tonumber(ARGV[1])) then
- return table.concat({ 0, '_', len });
- end
- local result = math.min(tonumber(ARGV[3]), tonumber(ARGV[4]) - curr);
- if (result > 0) then
- local newElement = table.concat({ ARGV[1], '_', result });
- redis.pcall('zadd', KEYS[1], max, newElement);
- len = len + 1
- end ;
- redis.call('zremrangebyscore', KEYS[1], 0, min);
- return table.concat({ result, '_', len });

