赞
踩
docker搭建的redis集群,docker宿主机ip:192.168.75.103
redis是三主三从,ip和端口如下,密码是123456
- 192.168.75.103:7001
- 192.168.75.103:7002
- 192.168.75.103:7003
- 192.168.75.103:7004
- 192.168.75.103:7005
- 192.168.75.103:7006 ```
openresty,是在宿主机直接安装,ip是:192.168.75.103,端口是:80
反向代理是nginx,使用docker安装,ip是:192.168.75.103,端口是:8080
调用顺序是,请求先到宿主机上的nginx,然后转到openresty,openresty中使用lua脚本,先调用redis,如果没有内容,再发送http请求到tomcat中。
nginx反向代理服务器的nginx.conf
worker_processes 1; events { worker_connections 1024; } http { include mime.types; default_type application/json; sendfile on; keepalive_timeout 65; server { listen 8080; server_name localhost; # 指定前端项目所在的位置 location / { #root html/hmdp; root html; index index.html index.htm; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } location /api { proxy_pass http://nginx-cluster; } } upstream nginx-cluster { #**此处将请求转到了openresty中** server 192.168.75.103 max_fails=5 fail_timeout=10s weight=1; } }
#user nobody; worker_processes 1; error_log logs/error.log; events { worker_connections 1024; } http { include mime.types; default_type application/octet-stream; sendfile on; keepalive_timeout 65; #lua 模块 lua_package_path "/usr/local/openresty/lualib/?.lua;;"; #c模块 lua_package_cpath "/usr/local/openresty/lualib/?.so;;"; lua_shared_dict redis_cluster_slot_locks 100k; upstream tomcat-cluster { hash $request_uri; server 192.168.3.42:8081; server 192.168.3.42:8082; } server { listen 80; server_name localhost; location ~ /api/item/(\d+) { default_type application/json; content_by_lua_file lua/item.lua; } location /item { # 这里是windows电脑的ip和Java服务端口,需要确保windows防火墙处于关闭状态 proxy_pass http://tomcat-cluster; } location / { root html; index index.html index.htm; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } } }
其中的关键内容为:
这是写lua脚本必须配置的
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
lua_shared_dict redis_cluster_slot_locks 100k;
这是lua脚本的路径
location ~ /api/item/(\d+) {
default_type application/json;
content_by_lua_file lua/item.lua;
}
lua脚本,item.lua的路径为/usr/local/openresty/nginx/lua/item.lua
local common = require "common" local read_http = common.read_http local read_redis = common.read_redis local cjson = require('cjson') local id = ngx.var[1] local function read_data(key, path, params) local val = read_redis(key) if not val then ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key) val = read_http(path, params) end return val end local itemJson = read_data('item:id:'..id,'/item/' .. id, nil) local stockJson = read_data('item:stock:id:'..id, '/item/stock/' ..id, nil) local item = cjson.decode(itemJson) local stock = cjson.decode(stockJson) item.stock = stock.stock item.sold = stock.sold ngx.say(cjson.encode(item))
common.lua,是公共的调用类,路径为:openresty/lualib/common.lua
-- 封装函数,发送http请求,并解析响应 local function read_http(path, params) local resp = ngx.location.capture(path,{ method = ngx.HTTP_GET, args = params, }) if not resp then -- 记录错误信息,返回404 ngx.log(ngx.ERR, "http请求查询失败, path: ", path , ", args: ", args) ngx.exit(404) end return resp.body end local config = { name = "testCluster", --rediscluster name serv_list = { --redis cluster node list(host and port), { ip = "192.168.75.103", port = 7001 }, { ip = "192.168.75.103", port = 7002 }, { ip = "192.168.75.103", port = 7003 }, { ip = "192.168.75.103", port = 7004 }, { ip = "192.168.75.103", port = 7005 }, { ip = "192.168.75.103", port = 7006 } }, keepalive_timeout = 60000, --redis connection pool idle timeout keepalive_cons = 1000, --redis connection pool size connect_timeout = 1000, --timeout while connecting read_timeout = 1000, --timeout while reading send_timeout = 1000, --timeout while sending max_redirection = 5, --maximum retry attempts for redirection, max_connection_attempts = 1, --maximum retry attempts for connection auth = "123456" --set password while setting auth } local redis_cluster = require "rresty.ediscluster" local function read_redis(key) local red = redis_cluster:new(config) local resp,err = red:get(key) -- 查询失败处理 if not resp then ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key) end --得到的数据为空处理 if resp == ngx.null then resp = nil ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key) end return resp end -- 将方法导出 local _M = { read_http = read_http, read_redis = read_redis } return _M
common中引用的是github上的一个开源项目,要将此项目中的两个文件rediscluster.lua、xmodem.lua,放到openresty的openresty/lualib/resty目录中,
开源项目地址为:redis访问集群github地址
放进去之后,重启openresty的nginx,发送请求,即可成功。
因为github,需要科学上网才能访问,所以在最后,留下这两个文件:
rediscluster.lua
local redis = require "resty.redis" local resty_lock = require "resty.lock" local xmodem = require "resty.xmodem" local setmetatable = setmetatable local tostring = tostring local string = string local type = type local table = table local ngx = ngx local math = math local rawget = rawget local pairs = pairs local unpack = unpack local ipairs = ipairs local tonumber = tonumber local match = string.match local char = string.char local table_insert = table.insert local string_find = string.find local redis_crc = xmodem.redis_crc local DEFAULT_SHARED_DICT_NAME = "redis_cluster_slot_locks" local DEFAULT_REFRESH_DICT_NAME = "refresh_lock" local DEFAULT_MAX_REDIRECTION = 5 local DEFAULT_MAX_CONNECTION_ATTEMPTS = 3 local DEFAULT_KEEPALIVE_TIMEOUT = 55000 local DEFAULT_KEEPALIVE_CONS = 1000 local DEFAULT_CONNECTION_TIMEOUT = 1000 local DEFAULT_SEND_TIMEOUT = 1000 local DEFAULT_READ_TIMEOUT = 1000 local function parse_key(key_str) local left_tag_single_index = string_find(key_str, "{", 0) local right_tag_single_index = string_find(key_str, "}", 0) if left_tag_single_index and right_tag_single_index then --parse hashtag return key_str.sub(key_str, left_tag_single_index + 1, right_tag_single_index - 1) else return key_str end end local _M = {} local mt = { __index = _M } local slot_cache = {} local master_nodes = {} local cmds_for_all_master = { ["flushall"] = true, ["flushdb"] = true } local cluster_invalid_cmds = { ["config"] = true, ["shutdown"] = true } local function redis_slot(str) return redis_crc(parse_key(str)) end local function check_auth(self, redis_client) if type(self.config.auth) == "string" then local count, err = redis_client:get_reused_times() if count == 0 then local _ _, err = redis_client:auth(self.config.auth) end if not err then return true, nil else return nil, err end else return true, nil end end local function release_connection(red, config) local ok,err = red:set_keepalive(config.keepalive_timeout or DEFAULT_KEEPALIVE_TIMEOUT, config.keepalive_cons or DEFAULT_KEEPALIVE_CONS) if not ok then ngx.log(ngx.ERR,"set keepalive failed:", err) end end local function split(s, delimiter) local result = {}; for m in (s..delimiter):gmatch("(.-)"..delimiter) do table_insert(result, m); end return result; end local function try_hosts_slots(self, serv_list) local start_time = ngx.now() local errors = {} local config = self.config if #serv_list < 1 then return nil, "failed to fetch slots, serv_list config is empty" end for i = 1, #serv_list do local ip = serv_list[i].ip local port = serv_list[i].port local redis_client = redis:new() local ok, err, max_connection_timeout_err redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, config.send_timeout or DEFAULT_SEND_TIMEOUT, config.read_timeout or DEFAULT_READ_TIMEOUT) --attempt to connect DEFAULT_MAX_CONNECTION_ATTEMPTS times to redis for k = 1, config.max_connection_attempts or DEFAULT_MAX_CONNECTION_ATTEMPTS do local total_connection_time_ms = (ngx.now() - start_time) * 1000 if (config.max_connection_timeout and total_connection_time_ms > config.max_connection_timeout) then max_connection_timeout_err = "max_connection_timeout of " .. config.max_connection_timeout .. "ms reached." ngx.log(ngx.ERR, max_connection_timeout_err) table_insert(errors, max_connection_timeout_err) break end ok, err = redis_client:connect(ip, port, self.config.connect_opts) if ok then break end if err then ngx.log(ngx.ERR,"unable to connect, attempt nr ", k, " : error: ", err) table_insert(errors, err) end end if ok then local _, autherr = check_auth(self, redis_client) if autherr then table_insert(errors, autherr) return nil, errors end local slots_info slots_info, err = redis_client:cluster("slots") if slots_info then local slots = {} -- while slots are updated, create a list of servers present in cluster -- this can differ from self.config.serv_list if a cluster is resized (added/removed nodes) local servers = { serv_list = {} } for n = 1, #slots_info do local sub_info = slots_info[n] --slot info item 1 and 2 are the subrange start end slots local start_slot, end_slot = sub_info[1], sub_info[2] -- generate new list of servers for j = 3, #sub_info do servers.serv_list[#servers.serv_list + 1 ] = { ip = sub_info[j][1], port = sub_info[j][2] } end for slot = start_slot, end_slot do local list = { serv_list = {} } --from 3, here lists the host/port/nodeid of in charge nodes for j = 3, #sub_info do list.serv_list[#list.serv_list + 1] = { ip = sub_info[j][1], port = sub_info[j][2] } slots[slot] = list end end end --ngx.log(ngx.NOTICE, "finished initializing slotcache...") slot_cache[self.config.name] = slots slot_cache[self.config.name .. "serv_list"] = servers else table_insert(errors, err) end -- cache master nodes local nodes_res, nerr = redis_client:cluster("nodes") if nodes_res then local nodes_info = split(nodes_res, char(10)) for _, node in ipairs(nodes_info) do local node_info = split(node, " ") if #node_info > 2 then local is_master = match(node_info[3], "master") ~= nil if is_master then local ip_port = split(split(node_info[2], "@")[1], ":") table_insert(master_nodes, { ip = ip_port[1], port = tonumber(ip_port[2]) }) end end end else table_insert(errors, nerr) end release_connection(redis_client, config) -- refresh of slots and master nodes successful -- not required to connect/iterate over additional hosts if nodes_res and slots_info then return true, nil end elseif max_connection_timeout_err then break else table_insert(errors, err) end if #errors == 0 then return true, nil end end return nil, errors end function _M.fetch_slots(self) local serv_list = self.config.serv_list local serv_list_cached = slot_cache[self.config.name .. "serv_list"] local serv_list_combined -- if a cached serv_list is present, start with that if serv_list_cached then serv_list_combined = serv_list_cached.serv_list -- then append the serv_list from config, in the event that the entire -- cached serv_list no longer points to anything usable for _, s in ipairs(serv_list) do table_insert(serv_list_combined, s) end else -- otherwise we bootstrap with our serv_list from config serv_list_combined = serv_list end serv_list_cached = nil -- important! local _, errors = try_hosts_slots(self, serv_list_combined) if errors then local err = "failed to fetch slots: " .. table.concat(errors, ";") ngx.log(ngx.ERR, err) return nil, err end end function _M.refresh_slots(self) local worker_id = ngx.worker.id() local lock, err, elapsed, ok lock, err = resty_lock:new(self.config.dict_name or DEFAULT_SHARED_DICT_NAME, {time_out = 0}) if not lock then ngx.log(ngx.ERR, "failed to create lock in refresh slot cache: ", err) return nil, err end local refresh_lock_key = (self.config.refresh_lock_key or DEFAULT_REFRESH_DICT_NAME) .. worker_id elapsed, err = lock:lock(refresh_lock_key) if not elapsed then return nil, 'race refresh lock fail, ' .. err end self:fetch_slots() ok, err = lock:unlock() if not ok then ngx.log(ngx.ERR, "failed to unlock in refresh slot cache:", err) return nil, err end end function _M.init_slots(self) if slot_cache[self.config.name] then -- already initialized return true end local ok, lock, elapsed, err lock, err = resty_lock:new(self.config.dict_name or DEFAULT_SHARED_DICT_NAME) if not lock then ngx.log(ngx.ERR, "failed to create lock in initialization slot cache: ", err) return nil, err end elapsed, err = lock:lock("redis_cluster_slot_" .. self.config.name) if not elapsed then ngx.log(ngx.ERR, "failed to acquire the lock in initialization slot cache: ", err) return nil, err end if slot_cache[self.config.name] then ok, err = lock:unlock() if not ok then ngx.log(ngx.ERR, "failed to unlock in initialization slot cache: ", err) end -- already initialized return true end local _, errs = self:fetch_slots() if errs then ok, err = lock:unlock() if not ok then ngx.log(ngx.ERR, "failed to unlock in initialization slot cache:", err) end return nil, errs end ok, err = lock:unlock() if not ok then ngx.log(ngx.ERR, "failed to unlock in initialization slot cache:", err) end -- initialized return true end function _M.new(_, config) if not config.name then return nil, " redis cluster config name is empty" end if not config.serv_list or #config.serv_list < 1 then return nil, " redis cluster config serv_list is empty" end local inst = { config = config } inst = setmetatable(inst, mt) local _, err = inst:init_slots() if err then return nil, err end return inst end local function pick_node(self, serv_list, slot, magic_radom_seed) local host local port local slave local index if #serv_list < 1 then return nil, nil, nil, "serv_list for slot " .. slot .. " is empty" end if self.config.enable_slave_read then if magic_radom_seed then index = magic_radom_seed % #serv_list + 1 else index = math.random(#serv_list) end host = serv_list[index].ip port = serv_list[index].port --cluster slots will always put the master node as first if index > 1 then slave = true else slave = false end --ngx.log(ngx.NOTICE, "pickup node: ", c(serv_list[index])) else host = serv_list[1].ip port = serv_list[1].port slave = false --ngx.log(ngx.NOTICE, "pickup node: ", cjson.encode(serv_list[1])) end return host, port, slave end local ask_host_and_port = {} local function parse_ask_signal(res) --ask signal sample:ASK 12191 127.0.0.1:7008, so we need to parse and get 127.0.0.1, 7008 if res ~= ngx.null then if type(res) == "string" and string.sub(res, 1, 3) == "ASK" then local matched = ngx.re.match(res, [[^ASK [^ ]+ ([^:]+):([^ ]+)]], "jo", nil, ask_host_and_port) if not matched then return nil, nil end return matched[1], matched[2] end if type(res) == "table" then for i = 1, #res do if type(res[i]) == "string" and string.sub(res[i], 1, 3) == "ASK" then local matched = ngx.re.match(res[i], [[^ASK [^ ]+ ([^:]+):([^ ]+)]], "jo", nil, ask_host_and_port) if not matched then return nil, nil end return matched[1], matched[2] end end end end return nil, nil end local function has_moved_signal(res) if res ~= ngx.null then if type(res) == "string" and string.sub(res, 1, 5) == "MOVED" then return true else if type(res) == "table" then for i = 1, #res do if type(res[i]) == "string" and string.sub(res[i], 1, 5) == "MOVED" then return true end end end end end return false end local function handle_command_with_retry(self, target_ip, target_port, asking, cmd, key, ...) local config = self.config key = tostring(key) local slot = redis_slot(key) for k = 1, config.max_redirection or DEFAULT_MAX_REDIRECTION do if k > 1 then ngx.log(ngx.NOTICE, "handle retry attempts:" .. k .. " for cmd:" .. cmd .. " key:" .. key) end local slots = slot_cache[self.config.name] if slots == nil or slots[slot] == nil then return nil, "not slots information present, nginx might have never successfully executed cluster(\"slots\")" end local serv_list = slots[slot].serv_list -- We must empty local reference to slots cache, otherwise there will be memory issue while -- coroutine swich happens(eg. ngx.sleep, cosocket), very important! slots = nil local ip, port, slave, err if target_ip ~= nil and target_port ~= nil then -- asking redirection should only happens at master nodes ip, port, slave = target_ip, target_port, false else ip, port, slave, err = pick_node(self, serv_list, slot) if err then ngx.log(ngx.ERR, "pickup node failed, will return failed for this request, meanwhile refereshing slotcache " .. err) self:refresh_slots() return nil, err end end local redis_client = redis:new() redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, config.send_timeout or DEFAULT_SEND_TIMEOUT, config.read_timeout or DEFAULT_READ_TIMEOUT) local ok, connerr = redis_client:connect(ip, port, self.config.connect_opts) if ok then local authok, autherr = check_auth(self, redis_client) if autherr then return nil, autherr end if slave then --set readonly ok, err = redis_client:readonly() if not ok then self:refresh_slots() return nil, err end end if asking then --executing asking ok, err = redis_client:asking() if not ok then self:refresh_slots() return nil, err end end local need_to_retry = false local res if cmd == "eval" or cmd == "evalsha" then res, err = redis_client[cmd](redis_client, ...) else res, err = redis_client[cmd](redis_client, key, ...) end if err then if string.sub(err, 1, 5) == "MOVED" then --ngx.log(ngx.NOTICE, "find MOVED signal, trigger retry for normal commands, cmd:" .. cmd .. " key:" .. key) --if retry with moved, we will not asking to specific ip,port anymore release_connection(redis_client, config) target_ip = nil target_port = nil self:refresh_slots() need_to_retry = true elseif string.sub(err, 1, 3) == "ASK" then --ngx.log(ngx.NOTICE, "handle asking for normal commands, cmd:" .. cmd .. " key:" .. key) release_connection(redis_client, config) if asking then --Should not happen after asking target ip,port and still return ask, if so, return error. return nil, "nested asking redirection occurred, client cannot retry " else local ask_host, ask_port = parse_ask_signal(err) if ask_host ~= nil and ask_port ~= nil then return handle_command_with_retry(self, ask_host, ask_port, true, cmd, key, ...) else return nil, " cannot parse ask redirection host and port: msg is " .. err end end elseif string.sub(err, 1, 11) == "CLUSTERDOWN" then return nil, "Cannot executing command, cluster status is failed!" else --There might be node fail, we should also refresh slot cache self:refresh_slots() return nil, err end end if not need_to_retry then release_connection(redis_client, config) return res, err end else --There might be node fail, we should also refresh slot cache self:refresh_slots() if k == config.max_redirection or k == DEFAULT_MAX_REDIRECTION then -- only return after allowing for `k` attempts return nil, connerr end end end return nil, "failed to execute command, reaches maximum redirection attempts" end local function generate_magic_seed(self) --For pipeline, We don't want request to be forwarded to all channels, eg. if we have 3*3 cluster(3 master 2 replicas) we --alway want pick up specific 3 nodes for pipeline requests, instead of 9. --Currently we simply use (num of allnode)%count as a randomly fetch. Might consider a better way in the future. -- use the dynamic serv_list instead of the static config serv_list local nodeCount = #slot_cache[self.config.name .. "serv_list"].serv_list return math.random(nodeCount) end local function _do_cmd_master(self, cmd, key, ...) local errors = {} for _, master in ipairs(master_nodes) do local redis_client = redis:new() redis_client:set_timeouts(self.config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, self.config.send_timeout or DEFAULT_SEND_TIMEOUT, self.config.read_timeout or DEFAULT_READ_TIMEOUT) local ok, err = redis_client:connect(master.ip, master.port, self.config.connect_opts) if ok then _, err = redis_client[cmd](redis_client, key, ...) end if err then table_insert(errors, err) end release_connection(redis_client, self.config) end return #errors == 0, table.concat(errors, ";") end local function _do_cmd(self, cmd, key, ...) if cluster_invalid_cmds[cmd] == true then return nil, "command not supported" end local _reqs = rawget(self, "_reqs") if _reqs then local args = { ... } local t = { cmd = cmd, key = key, args = args } table_insert(_reqs, t) return end if cmds_for_all_master[cmd] then return _do_cmd_master(self, cmd, key, ...) end local res, err = handle_command_with_retry(self, nil, nil, false, cmd, key, ...) return res, err end local function construct_final_pipeline_resp(self, node_res_map, node_req_map) --construct final result with origin index local finalret = {} for k, v in pairs(node_res_map) do local reqs = node_req_map[k].reqs local res = v local need_to_fetch_slots = true for i = 1, #reqs do --deal with redis cluster ask redirection local ask_host, ask_port = parse_ask_signal(res[i]) if ask_host ~= nil and ask_port ~= nil then --ngx.log(ngx.NOTICE, "handle ask signal for cmd:" .. reqs[i]["cmd"] .. " key:" .. reqs[i]["key"] .. " target host:" .. ask_host .. " target port:" .. ask_port) local askres, err = handle_command_with_retry(self, ask_host, ask_port, true, reqs[i]["cmd"], reqs[i]["key"], unpack(reqs[i]["args"])) if err then return nil, err else finalret[reqs[i].origin_index] = askres end elseif has_moved_signal(res[i]) then --ngx.log(ngx.NOTICE, "handle moved signal for cmd:" .. reqs[i]["cmd"] .. " key:" .. reqs[i]["key"]) if need_to_fetch_slots then -- if there is multiple signal for moved, we just need to fetch slot cache once, and do retry. self:refresh_slots() need_to_fetch_slots = false end local movedres, err = handle_command_with_retry(self, nil, nil, false, reqs[i]["cmd"], reqs[i]["key"], unpack(reqs[i]["args"])) if err then return nil, err else finalret[reqs[i].origin_index] = movedres end else finalret[reqs[i].origin_index] = res[i] end end end return finalret end local function has_cluster_fail_signal_in_pipeline(res) for i = 1, #res do if res[i] ~= ngx.null and type(res[i]) == "table" then for j = 1, #res[i] do if type(res[i][j]) == "string" and string.sub(res[i][j], 1, 11) == "CLUSTERDOWN" then return true end end end end return false end function _M.init_pipeline(self) self._reqs = {} end function _M.commit_pipeline(self) local _reqs = rawget(self, "_reqs") if not _reqs or #_reqs == 0 then return end self._reqs = nil local config = self.config local slots = slot_cache[config.name] if slots == nil then return nil, "not slots information present, nginx might have never successfully executed cluster(\"slots\")" end local node_res_map = {} local node_req_map = {} local magicRandomPickupSeed = generate_magic_seed(self) --construct req to real node mapping for i = 1, #_reqs do -- Because we will forward req to different nodes, so the result will not be the origin order, -- we need to record the original index and finally we can construct the result with origin order _reqs[i].origin_index = i local key = _reqs[i].key local slot = redis_slot(tostring(key)) if slots[slot] == nil then return nil, "not slots information present, nginx might have never successfully executed cluster(\"slots\")" end local slot_item = slots[slot] local ip, port, slave, err = pick_node(self, slot_item.serv_list, slot, magicRandomPickupSeed) if err then -- We must empty local reference to slots cache, otherwise there will be memory issue while -- coroutine swich happens(eg. ngx.sleep, cosocket), very important! slots = nil self:refresh_slots() return nil, err end local node = ip .. tostring(port) if not node_req_map[node] then node_req_map[node] = { ip = ip, port = port, slave = slave, reqs = {} } node_res_map[node] = {} end local ins_req = node_req_map[node].reqs ins_req[#ins_req + 1] = _reqs[i] end -- We must empty local reference to slots cache, otherwise there will be memory issue while -- coroutine swich happens(eg. ngx.sleep, cosocket), very important! slots = nil for k, v in pairs(node_req_map) do local ip = v.ip local port = v.port local reqs = v.reqs local slave = v.slave local redis_client = redis:new() redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, config.send_timeout or DEFAULT_SEND_TIMEOUT, config.read_timeout or DEFAULT_READ_TIMEOUT) local ok, err = redis_client:connect(ip, port, self.config.connect_opts) if ok then local authok, autherr = check_auth(self, redis_client) if autherr then return nil, autherr end if slave then --set readonly local ok, err = redis_client:readonly() if not ok then self:refresh_slots() return nil, err end end redis_client:init_pipeline() for i = 1, #reqs do local req = reqs[i] if #req.args > 0 then if req.cmd == "eval" or req.cmd == "evalsha" then redis_client[req.cmd](redis_client, unpack(req.args)) else redis_client[req.cmd](redis_client, req.key, unpack(req.args)) end else redis_client[req.cmd](redis_client, req.key) end end local res, err = redis_client:commit_pipeline() if err then --There might be node fail, we should also refresh slot cache self:refresh_slots() return nil, err .. " return from " .. tostring(ip) .. ":" .. tostring(port) end if has_cluster_fail_signal_in_pipeline(res) then return nil, "Cannot executing pipeline command, cluster status is failed!" end release_connection(redis_client, config) node_res_map[k] = res else --There might be node fail, we should also refresh slot cache self:refresh_slots() return nil, err .. "pipeline commit failed while connecting to " .. tostring(ip) .. ":" .. tostring(port) end end --construct final result with origin index local final_res, err = construct_final_pipeline_resp(self, node_res_map, node_req_map) if not err then return final_res else return nil, err .. " failed to construct final pipeline result " end end function _M.cancel_pipeline(self) self._reqs = nil end local function _do_eval_cmd(self, cmd, ...) --[[ eval command usage: eval(script, 1, key, arg1, arg2 ...) eval(script, 0, arg1, arg2 ...) ]] local args = {...} local keys_num = args[2] if type(keys_num) ~= "number" then return nil, "Cannot execute eval without keys number" end if keys_num > 1 then return nil, "Cannot execute eval with more than one keys for redis cluster" end local key = args[3] or "no_key" return _do_cmd(self, cmd, key, ...) end -- dynamic cmd setmetatable(_M, { __index = function(_, cmd) local method = function(self, ...) if cmd == "eval" or cmd == "evalsha" then return _do_eval_cmd(self, cmd, ...) else return _do_cmd(self, cmd, ...) end end -- cache the lazily generated method in our -- module table _M[cmd] = method return method end }) return _M
xmodem.lua
-- -- This is the CRC16 algorithm used by Redis Cluster to hash keys. -- Implementation according to CCITT standards. -- -- This is actually the XMODEM CRC 16 algorithm, using the -- following parameters: -- -- Name : "XMODEM", also known as "ZMODEM", "CRC-16/ACORN" -- Width : 16 bit -- Poly : 1021 (That is actually x^16 + x^12 + x^5 + 1) -- Initialization : 0000 -- Reflect Input byte : False -- Reflect Output CRC : False -- Xor constant to output CRC : 0000 -- Output for "123456789" : 31C3 -- -- https://redis.io/topics/cluster-spec#appendix-a-crc16-reference-implementation-in-ansi-c local bit = require("bit") local band = bit.band local bxor = bit.bxor local lshift = bit.lshift local rshift = bit.rshift local byte = string.byte local XMODEMCRC16LOOKUP = { 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7, 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6, 0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de, 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485, 0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d, 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4, 0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc, 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823, 0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b, 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12, 0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a, 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41, 0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49, 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70, 0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78, 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f, 0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067, 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e, 0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256, 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d, 0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405, 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c, 0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634, 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab, 0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3, 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a, 0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92, 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9, 0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1, 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8, 0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0 } local _M = {} -- Depends on luajit bitop extension. local function crc16(str) local crc = 0 str = tostring(str) for i = 1, #str do local b = byte(str, i) crc = bxor(band(lshift(crc, 8), 0xffff), XMODEMCRC16LOOKUP[band(bxor(rshift(crc, 8), b), 0xff) + 1]) end return crc end function _M.redis_crc(str) return band(crc16(str), 0x3fff) end return _M
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。