赞
踩
OpenResty使用Lua大全(一)Lua语法入门实战
OpenResty使用Lua大全(二)在OpenResty中使用Lua
OpenResty使用Lua大全(三)OpenResty使用Json模块解析json
OpenResty使用Lua大全(四)OpenResty中使用Redis
OpenResty使用Lua大全(五)OpenResty中使用MySQL
OpenResty使用Lua大全(六)OpenResty发送http请求
OpenResty使用Lua大全(七)OpenResty使用全局缓存
OpenResty使用Lua大全(八)OpenResty执行流程与阶段详解
OpenResty使用Lua大全(九)实战:nginx-lua-redis实现访问频率控制
OpenResty使用Lua大全(十)实战: Lua + Redis 实现动态封禁 IP
OpenResty使用Lua大全(十一)实战: nginx实现接口签名安全认证
OpenResty使用Lua大全(十二)实战: 动手实现一个网关框架
---定义 redis关闭连接的方法 local function close_redis(red) if not red then return end local ok, err = red:close() if not ok then ngx.say("close redis error : ", err) end end local redis = require "resty.redis" --引入redis模块 local red = redis:new() --创建一个对象,注意是用冒号调用的 --设置超时(毫秒) red:set_timeout(1000) --建立连接 local ip = "127.0.0.1" local port = 6379 local ok, err = red:connect(ip, port) if not ok then ngx.say("connect to redis error : ", err) return close_redis(red) end --调用API设置key ok, err = red:set("msg", "hello world") if not ok then ngx.say("set msg error : ", err) return close_redis(red) end --调用API获取key值 local resp, err = red:get("msg") if not resp then ngx.say("get msg error : ", err) return close_redis(red) end ngx.say("msg : ", resp) close_redis(red)
请求结果 msg : hello world
注意:得到的数据为空处理 ,redis返回的空 为null,所以不能用nil判断,而要用ngx.null判断
if resp == ngx.null then
resp = '' --比如默认值
end
-- 在red:connect成功后,调用red:auth认证密码
ok, err = red:auth("123456")
if not ok then
ngx.say("failed to auth: ", err)
return close_redis(red)
end
redis的连接是tcp连接,建立TCP连接需要三次握手,而释放TCP连接需要四次握手,而这些往返时延仅需要一次,
以后应该复用TCP连接,此时就可以考虑使用连接池,即连接池可以复用连接。
我们需要把close_redis函数改造一下
local function close_redis(red)
if not red then
return
end
--释放连接(连接池实现)
local pool_max_idle_time = 10000 --毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.say("set keepalive error : ", err)
end
end
即设置空闲连接超时时间防止连接一直占用不释放;设置连接池大小来复用连接。
注意:
1、连接池是每Worker进程的,而不是每Server的;
2、当连接超过最大连接池大小时,会按照LRU算法回收空闲连接为新连接使用;
3、连接池中的空闲连接出现异常时会自动被移除;
4、连接池是通过ip和port标识的,即相同的ip和port会使用同一个连接池(即使是不同类型的客户端);
5、连接池第一次set_keepalive时连接池大小就确定下了,不会再变更;
注意:我们如何知道,redis连接对象是从连接池中获取的,还是新创建的连接呢??
使用 red:get_reused_times —>得到此连接被使用的次数
如果当前连接不是从内建连接池中获取的,该方法总是返回 0 ,也就是说,该连接还没有被使用过。
如果连接来自连接池,那么返回值永远都是非零。所以这个方法可以用来确认当前连接是否来自池子。
采用连接池,连接带认证的redis
---定义 redis关闭连接的方法 local function close_redis(red) if not red then return end --释放连接(连接池实现) local pool_max_idle_time = 10000 --毫秒 local pool_size = 100 --连接池大小 local ok, err = red:set_keepalive(pool_max_idle_time, pool_size) if not ok then ngx.say("set keepalive error : ", err) end end local redis = require "resty.redis" --引入redis模块 local red = redis:new() --创建一个对象,注意是用冒号调用的 --设置超时(毫秒) red:set_timeout(1000) --建立连接 local ip = "127.0.0.1" local port = 6379 local ok, err = red:connect(ip, port) if not ok then ngx.say("connect to redis error : ", err) return close_redis(red) end local count, err = red:get_reused_times() if 0 == count then ----新建连接,需要认证密码 ok, err = red:auth("123456") if not ok then ngx.say("failed to auth: ", err) return end elseif err then ----从连接池中获取连接,无需再次认证密码 ngx.say("failed to get reused times: ", err) return end --调用API设置key ok, err = red:set("msg", "hello world333333333") if not ok then ngx.say("set msg error : ", err) return close_redis(red) end --调用API获取key值 local resp, err = red:get("msg") if not resp then ngx.say("get msg error : ", err) return close_redis(red) end ngx.say("msg : ", resp) close_redis(red)
非常注意:连接池使用过程中,业务代码有select方法,会导致数据错乱
ok, err = red:select(1) --->选择db
if not ok then
ngx.say("failed to select db: ", err)
return
end
如:
A业务使用了db1,所以使用了 select(1);
B业务使用默认的db0,select(0)遗漏
但A,B业务共用了连接池,很有可能 B业务拿到的 A业务使用的连接,而此连接操作的数据库db1;
而B业务中代码没有指定select数据库,所以B业务操作数据到了db1中;导致数据错乱
在关于web+lua+openresty开发中,项目中会大量操作redis,
重复创建连接–>数据操作–>关闭连接(或放到连接池)这个完整的链路调用完毕,
甚至还要考虑不同的 return 情况做不同处理,就很快发现代码中有大量的重复
推荐一个二次封装的类库
在/usr/local/openresty/lualib/resty
目录创建redis_iresty.lua
文件:
local redis_c = require "resty.redis" local ok, new_tab = pcall(require, "table.new") if not ok or type(new_tab) ~= "function" then new_tab = function (narr, nrec) return {} end end local _M = new_tab(0, 155) _M._VERSION = '0.01' local commands = { "append", "auth", "bgrewriteaof", "bgsave", "bitcount", "bitop", "blpop", "brpop", "brpoplpush", "client", "config", "dbsize", "debug", "decr", "decrby", "del", "discard", "dump", "echo", "eval", "exec", "exists", "expire", "expireat", "flushall", "flushdb", "get", "getbit", "getrange", "getset", "hdel", "hexists", "hget", "hgetall", "hincrby", "hincrbyfloat", "hkeys", "hlen", "hmget", "hmset", "hscan", "hset", "hsetnx", "hvals", "incr", "incrby", "incrbyfloat", "info", "keys", "lastsave", "lindex", "linsert", "llen", "lpop", "lpush", "lpushx", "lrange", "lrem", "lset", "ltrim", "mget", "migrate", "monitor", "move", "mset", "msetnx", "multi", "object", "persist", "pexpire", "pexpireat", "ping", "psetex", "psubscribe", "pttl", "publish", --[[ "punsubscribe", ]] "pubsub", "quit", "randomkey", "rename", "renamenx", "restore", "rpop", "rpoplpush", "rpush", "rpushx", "sadd", "save", "scan", "scard", "script", "sdiff", "sdiffstore", "select", "set", "setbit", "setex", "setnx", "setrange", "shutdown", "sinter", "sinterstore", "sismember", "slaveof", "slowlog", "smembers", "smove", "sort", "spop", "srandmember", "srem", "sscan", "strlen", --[[ "subscribe", ]] "sunion", "sunionstore", "sync", "time", "ttl", "type", --[[ "unsubscribe", ]] "unwatch", "watch", "zadd", "zcard", "zcount", "zincrby", "zinterstore", "zrange", "zrangebyscore", "zrank", "zrem", "zremrangebyrank", "zremrangebyscore", "zrevrange", "zrevrangebyscore", "zrevrank", "zscan", "zscore", "zunionstore", "evalsha" } local mt = { __index = _M } local function is_redis_null( res ) if type(res) == "table" then for k,v in pairs(res) do if v ~= ngx.null then return false end end return true elseif res == ngx.null then return true elseif res == nil then return true end return false end function _M.close_redis(self, redis) if not redis then return end --释放连接(连接池实现) local pool_max_idle_time = self.pool_max_idle_time --最大空闲时间 毫秒 local pool_size = self.pool_size --连接池大小 local ok, err = redis:set_keepalive(pool_max_idle_time, pool_size) if not ok then ngx.say("set keepalive error : ", err) end end -- change connect address as you need function _M.connect_mod( self, redis ) redis:set_timeout(self.timeout) local ok, err = redis:connect(self.ip, self.port) if not ok then ngx.say("connect to redis error : ", err) return self:close_redis(redis) end if self.password then ----密码认证 local count, err = redis:get_reused_times() if 0 == count then ----新建连接,需要认证密码 ok, err = redis:auth(self.password) if not ok then ngx.say("failed to auth: ", err) return end elseif err then ----从连接池中获取连接,无需再次认证密码 ngx.say("failed to get reused times: ", err) return end end return ok,err; end function _M.init_pipeline( self ) self._reqs = {} end function _M.commit_pipeline( self ) local reqs = self._reqs if nil == reqs or 0 == #reqs then return {}, "no pipeline" else self._reqs = nil end local redis, err = redis_c:new() if not redis then return nil, err end local ok, err = self:connect_mod(redis) if not ok then return {}, err end redis:init_pipeline() for _, vals in ipairs(reqs) do local fun = redis[vals[1]] table.remove(vals , 1) fun(redis, unpack(vals)) end local results, err = redis:commit_pipeline() if not results or err then return {}, err end if is_redis_null(results) then results = {} ngx.log(ngx.WARN, "is null") end -- table.remove (results , 1) --self.set_keepalive_mod(redis) self:close_redis(redis) for i,value in ipairs(results) do if is_redis_null(value) then results[i] = nil end end return results, err end local function do_command(self, cmd, ... ) if self._reqs then table.insert(self._reqs, {cmd, ...}) return end local redis, err = redis_c:new() if not redis then return nil, err end local ok, err = self:connect_mod(redis) if not ok or err then return nil, err end redis:select(self.db_index) local fun = redis[cmd] local result, err = fun(redis, ...) if not result or err then -- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err) return nil, err end if is_redis_null(result) then result = nil end --self.set_keepalive_mod(redis) self:close_redis(redis) return result, err end for i = 1, #commands do local cmd = commands[i] _M[cmd] = function (self, ...) return do_command(self, cmd, ...) end end function _M.new(self, opts) opts = opts or {} local timeout = (opts.timeout and opts.timeout * 1000) or 1000 local db_index= opts.db_index or 0 local ip = opts.ip or '127.0.0.1' local port = opts.port or 6379 local password = opts.password local pool_max_idle_time = opts.pool_max_idle_time or 60000 local pool_size = opts.pool_size or 100 return setmetatable({ timeout = timeout, db_index = db_index, ip = ip, port = port, password = password, pool_max_idle_time = pool_max_idle_time, pool_size = pool_size, _reqs = nil }, mt) end return _M
local redis = require "resty.redis_iresty" local opts = { ip = "127.0.0.1", port = "6379", password = "123456", db_index = 1 } local red = redis:new(opts) local ok, err = red:set("dog", "an animal11111") if not ok then ngx.say("failed to set dog: ", err) return end ngx.say("set result: ", ok)
每次set、get都会获取一个连接,可以使用管道的方式,将一组命令使用同一个连接进行处理。
red:init_pipeline()
red:set("cat", "Marry")
red:set("horse", "Bob")
red:get("cat")
red:get("horse")
local results, err = red:commit_pipeline()
if not results then
ngx.say("failed to commit the pipelined requests: ", err)
return
end
for i, res in ipairs(results) do
ngx.say(res,"<br/>");
end
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。