赞
踩
redis是Remote Dictionary Server的简称,即远程字典服务。
可以把它想象成unordered_map<string,value>结构。节点通过TCP与redis建立连接交互,是一种请求回应模式(命令+key去请求操作redis,操作完后redis返回结果)。
redis是内存数据库、Key-Value数据库、数据结构数据库。
内存数据库是指数据一定在内存当中,不存在磁盘中有数据而内存中没有数据的现象;即数据都在内存当中,不可能出现数据不在内存当中,而磁盘有这个数据。
Key-Value数据库是描述redis的操作方式(通过Key操作或查询Value)以及存储方式(Key-Value由散列表组织)。
数据结构数据库是指Key-Value中的Value提供了丰富的数据结构(string、list、hash、zset、set等)。
Redis 应用非常广泛,如 Twitter、暴雪娱乐、Github、StackOverflow、腾讯、阿里巴巴、京东、华为、新浪微博等,很多中小型公司也在使用。
Redis 命令查看
(1)安装:
git clone https://gitee.com/mirrors/redis.git -b 6.2
cd redis
make
make test
sudo make install
# 默认安装在 /usr/local/bin
# redis-server 是服务端程序
# redis-cli 是客户端程序
(2)启动:
mkdir redis-data
# 把redis文件夹下 redis.conf 拷贝到 redis-data
# 修改 redis.conf
# requirepass 修改密码 123456
# daemonize yes
cd redis-data
redis-server redis.conf
# 通过 redis-cli 访问 redis-server
redis-cli -h 127.0.0.1 -a 123456
所有的key都是string类型。
127.0.0.1:6379> set role:1001 100
OK
127.0.0.1:6379> set role:1002 101
OK
redis提供丰富的数据结构,string、list、hash、zset、set、stream、hyperloglog等等。
redis存储结构(KV):
key-value是存储在一起的,value丰富的数据结构,每个value都可以指向不同的数据结构。key_value使用散列表组织起来。
(1)redis的string与C++中的string有些类似,它也是二进制安全字符串。C语言的字符串是以‘\0’作为分隔符,而二进制安全字符串是以长度作为分隔符,可以存储二进制字符串(图片、视频、字节流数据等)。
(2)redis的hash也是也散列表组织的,对顺序不关注,field 是唯一的。
127.0.0.1:6379> hset roleinfo:1001 age 20
(integer) 1
127.0.0.1:6379> hget roleinfo:1001 age
"20"
127.0.0.1:6379> hset roleinfo:1001 age 21
(integer) 1
127.0.0.1:6379> hget roleinfo:1001 age
"21"
(3)双端队列 (链表)list:有序(插入有序)。但是list不具备唯一性,即相同的key可以同时存在,它不会去重。
127.0.0.1:6379> lpush teacher fly1 fly2 fly3 (integer) 3 127.0.0.1:6379> lpop teacher "fly3" 127.0.0.1:6379> lrange teacher 0 -1 1) "fly2" 2) "fly1" 127.0.0.1:6379> lpush teacher fly fly fly (integer) 5 127.0.0.1:6379> lrange teacher 0 -1 1) "fly" 2) "fly" 3) "fly" 4) "fly2" 5) "fly1"
(4)无序集合 set:对顺序不关注,里面的值都是唯一的。set的value是string类型的。
127.0.0.1:6379> set role:1002 101
OK
(5)有序集合 zset:对顺序是关注的,里面的值是唯一的;根据member 来确定唯一;根据 score 来确定有序。
127.0.0.1:6379> zadd rank 80 fly1 (integer) 1 127.0.0.1:6379> zadd rank 90 fly2 (integer) 1 127.0.0.1:6379> zadd rank 100 fly3 (integer) 1 127.0.0.1:6379> Zrange rank 0 -1 1) "fly1" 2) "fly2" 3) "fly3" 127.0.0.1:6379> zadd rank 91 fly1 (integer) 0 127.0.0.1:6379> Zrange rank 0 -1 1) "fly2" 2) "fly1" 3) "fly3"
字符数组,该字符串是动态字符串 raw,字符串长度小于1M时,加倍扩容;超过 1M 每次只多扩 1M;字符串最大长度为512M。
注意:redis 字符串是二进制安全字符串;可以存储图片,二进制协议等二进制数据
string类型内部是动态字符串(sds.h)。
typedef char *sds; /* Note: sdshdr5 is never used, we just access the flags byte directly. * However is here to document the layout of type 5 SDS strings. */ struct __attribute__ ((__packed__)) sdshdr5 { unsigned char flags; /* 3 lsb of type, and 5 msb of string length */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr8 { uint8_t len; /* used */ uint8_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr16 { uint16_t len; /* used */ uint16_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr32 { uint32_t len; /* used */ uint32_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr64 { uint64_t len; /* used */ uint64_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; };
其中,
len是字符串长度;
alloc是分配长度(为了进行惰性删除,因为c语言扩缩容会不断的申请释放内存,使用alloc来记录当前分配多少内存来减少频繁的内存申请和释放);
flag标识字符串类型;buf是柔性数组,初始的时候不占用空间,避免多层次的内存申请和释放。
分配存储字符串的大小时,是这样的:
malloc(sizeof(struct sdshdr8)+64)
// 这里以sdshdr8为例,存储字符串的数组长度就是64
释放的时候,由于柔性数组的存在,只需要释放一次就可以了。
# 设置 key 的 value 值 SET key val # 获取 key 的 value GET key # 执行原子加一的操作 INCR key # 执行原子加一个整数的操作 INCRBY key increment # 执行原子减一的操作 DECR key # 执行原子减一个整数的操作 DECRBY key decrement # 如果key不存在,这种情况下等同SET命令。 当key存在时,什 么也不做 # set Not eXist ok 这个命令是否执行了 0,1 是不是操 作结果是不是成功 SETNX key value # 删除 key val 键值对 DEL key # 设置或者清空key的value(字符串)在offset处的bit值。 setbit embstr raw int # 动态字符串 能够节约内存 SETBIT key offset value # 返回key对应的string在offset处的bit值 GETBIT key offset # 统计字符串被设置为1的bit数. BITCOUNT key
示例:
127.0.0.1:6379> set fly 12 OK 127.0.0.1:6379> get fly "12" 127.0.0.1:6379> incr fly1 (integer) 1 127.0.0.1:6379> incrby fly1 10 (integer) 11 127.0.0.1:6379> decr fly1 (integer) 10 127.0.0.1:6379> setnx fly2 100 (integer) 1 127.0.0.1:6379> setnx fly2 101 (integer) 0 127.0.0.1:6379> del fly (integer) 1 127.0.0.1:6379> type fly none 127.0.0.1:6379> setbit sign 1 1 (integer) 0 127.0.0.1:6379> setbit sign 3 1 (integer) 0 127.0.0.1:6379> setbit sign 2 0 (integer) 0 127.0.0.1:6379> getbit sign 3 (integer) 1 127.0.0.1:6379> getbit sign 2 (integer) 0 127.0.0.1:6379> bitcount sign (integer) 2
字符串长度小于等于 20 且能转成整数,则使用 int 存储。
字符串长度小于等于 44,则使用 embstr 存储。
字符串长度大于 44,则使用 raw 存储。
(1)对象存储。存储的数据不能改变,如果是需要改变的数据,使用hash来存储。
SET role:10001 '{["name"]:"mark",["sex"]:"male",["age"]:30}'
SET role:10002 '{["name"]:"darren",["sex"]:"male",["age"]:30}'
# 极少修改,对象属性字段很少改变的时候
GET role:10001
# key 如何来设置
# 1. 有意义的字段 role 有多行
# 2. role:10001 redis 客户端 role:10001:recharge role:10001:activity:10001
示例:
127.0.0.1:6379> SET role:10001 '{["name"]:"mark",["sex"]:"male",["age"]:30}'
OK
127.0.0.1:6379> get role:10001
"{[\"name\"]:\"mark\",[\"sex\"]:\"male\",[\"age\"]:30}"
(2)累加器。
# 统计阅读数 累计加1
incr reads
# 累计加100
incrby reads 100
(3)分布式锁。
# 加锁 加锁 和 解析 redis 实现是 非公平锁 ectd
zk 用来实现公平锁
# 阻塞等待 阻塞连接的方式
# 介绍简单的原理: 事务
setnx lock 1 # 不存在才能设置 定义加锁行为 占用锁
setnx lock uuid # expire 30 过期
set lock uuid nx ex 30
# 释放锁
del lock
# if (get(lock) == uuid)
# del(lock);
(4)位运算。比如游戏中的月签到功能。
# 猜测一下 string 是用的 int 类型 还是 string 类型
# 月签到功能 10001 用户id 202106 2021年6月份的签到 6月份的第1天
setbit sign:10001:202106 1 1
# 计算 2021年6月份 的签到情况
bitcount sign:10001:202106
# 获取 2021年6月份 第二天的签到情况 1 已签到 0 没有签到
getbit sign:10001:202106 2
双向链表实现,列表首尾操作(删除和增加)时间复杂度O(1);查找中间元素时间复杂度为O(n);
列表中数据是否压缩的依据:
# 从队列的左侧入队一个或多个元素 LPUSH key value [value ...] # 从队列的左侧弹出一个元素 LPOP key # 从队列的右侧入队一个或多个元素 RPUSH key value [value ...] # 从队列的右侧弹出一个元素 RPOP key # 返回从队列的 start 和 end 之间的元素 0, 1 2 负索引 LRANGE key start end # 从存于 key 的列表里移除前 count 次出现的值为 value 的元素 # list 没有去重功能 hash set zset LREM key count value # 裁剪 LTRIM key start end # 它是 RPOP 的阻塞版本,因为这个命令会在给定list无法弹出任何元素的时候阻塞连接 BRPOP key timeout # 超时时间 + 延时队列
示例:
127.0.0.1:6379> lpush list fly1 fly2 fly3 (integer) 3 127.0.0.1:6379> lrange list 0 -1 1) "fly3" 2) "fly2" 3) "fly1" 127.0.0.1:6379> lpop list "fly3" 127.0.0.1:6379> lrange list 0 -1 1) "fly2" 2) "fly1" 127.0.0.1:6379> rpush list right1 right2 right3 (integer) 5 127.0.0.1:6379> lrange list 0 -1 1) "fly2" 2) "fly1" 3) "right1" 4) "right2" 5) "right3" 127.0.0.1:6379> rpop list 2 1) "right3" 2) "right2" 127.0.0.1:6379> lrange list 0 -1 1) "fly2" 2) "fly1" 3) "right1" 127.0.0.1:6379> lpush list fly1 (integer) 4 127.0.0.1:6379> lrange list 0 -1 1) "fly1" 2) "fly2" 3) "fly1" 4) "right1" 127.0.0.1:6379> lrem list 1 fly1 (integer) 1 127.0.0.1:6379> lrange list 0 -1 1) "fly2" 2) "fly1" 3) "right1" 127.0.0.1:6379> ltrim list 0 1 OK 127.0.0.1:6379> lrange list 0 -1 1) "fly2" 2) "fly1"
typedef struct quicklistNode { struct quicklistNode *prev; struct quicklistNode *next; unsigned char *zl; unsigned int sz; /* ziplist size in bytes */ unsigned int count : 16; /* count of items in ziplist */ unsigned int encoding : 2; /* RAW==1 or LZF==2 */ unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ unsigned int recompress : 1; /* was this node previous compressed? */ unsigned int attempted_compress : 1; /* node can't compress; too small */ unsigned int extra : 10; /* more bits to steal for future usage */ } quicklistNode; /* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist. * 'count' is the number of total entries. * 'len' is the number of quicklist nodes. * 'compress' is: 0 if compression disabled, otherwise it's the number * of quicklistNodes to leave uncompressed at ends of quicklist. * 'fill' is the user-requested (or default) fill factor. * 'bookmakrs are an optional feature that is used by realloc this struct, * so that they don't consume memory when not used. */ typedef struct quicklist { quicklistNode *head; quicklistNode *tail; unsigned long count; /* total count of all entries in all ziplists */ unsigned long len; /* number of quicklistNodes */ int fill : QL_FILL_BITS; /* fill factor for individual nodes */ unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */ unsigned int bookmark_count: QL_BM_BITS; quicklistBookmark bookmarks[]; } quicklist;
注意,里面的节点有可能会进行压缩。
(1)栈(先进后出 FILO)。
LPUSH + LPOP
# 或者
RPUSH + RPOP
(2)队列(先进先出 FIFO)。
LPUSH + LPOP
# 或者
RPUSH + RPOP
(3)阻塞队列(blocking queue)。
LPUSH + BRPOP
# 或者
RPUSH + BLPOP
可以有多个,但是只有一个获取成功。
(3)异步消息队列。
操作与队列一样,但是在不同系统间;生成者和消费者。
(4)获取固定窗口记录(战绩)。
在某些业务场景下,需要获取固定数量的记录;比如获取最近50条战绩;这些记录需要按照插入的先后顺序返回。
lpush says fly1 fly2 fly3 fly4 fly5 fly6 fly7 fly8
# 裁剪最近5条记录 战绩 近50条
ltrim says 0 5
lrange says 0 -1
示例:
127.0.0.1:6379> lpush says fly1 fly2 fly3 fly4 fly5 fly6 fly7 fly8
(integer) 8
127.0.0.1:6379> ltrim says 0 4
OK
127.0.0.1:6379> lrange says 0 -1
1) "fly8"
2) "fly7"
3) "fly6"
4) "fly5"
5) "fly4"
实际项目中需要保证命令的原子性,所以一般用 lua 脚本 或者使用 pipeline 命令。
-- redis lua脚本
local record = KEYS[1]
redis.call("LPUSH", "says", record)
redis.call("LTRIM", "says", 0, 4)
散列表,在很多高级语言当中包含这种数据结构;c++unordered_map 通过 key 快速索引 value。值得注意的是,redis中最多允许有两层hash。
# 获取 key 对应 hash 中的 field 对应的值 HGET key field # 获取所有的元素 HGETALL key # 设置 key 对应 hash 中的 field 对应的值 HSET key field value # 设置多个hash键值对 HMSET key field1 value1 field2 value2 ... fieldn valuen # 获取多个field的值 HMGET key field1 field2 ... fieldn # 给 key 对应 hash 中的 field 对应的值加一个整数值 HINCRBY key field increment # 获取 key 对应的 hash 有多少个键值对 HLEN key # 删除 key 对应的 hash 的键值对,该键为field HDEL key field
节点数量大于 512(hash-max-ziplist-entries) 或所有字符串长度大于 64(hash-max-ziplist-value),则使用 dict 实现;节点数量小于等于 512 且有一个字符串长度小于 64,则使用
ziplist 实现;
(1)存储对象。
hmset hash:10001 name mark age 18 sex male
# 与 string 比较
set hash:10001 '{["name"]:"mark",["sex"]:"male",["age"]:18}'
# 假设现在修改 mark的年龄为19岁
# hash:
hset hash:10001 age 19
# string:
get role:10001
# 将得到的字符串调用json解密,取出字段,修改 age 值
# 再调用json加密
set role:10001 '{["name"]:"mark",["sex"]:"male",["age"]:19}'
(2)购物车。
# 将用户id作为 key # 商品id作为 field # 商品数量作为 value # 注意:这些物品是按照我们添加顺序来显示的; # 添加商品: hset MyCart:10001 40001 1 lpush MyItem:10001 40001 # 增加数量: hincrby MyCart:10001 40001 1 hincrby MyCart:10001 40001 -1 // 减少数量1 # 显示所有物品数量: hlen MyCart:10001 # 删除商品: hdel MyCart:10001 40001 lrem MyItem:10001 1 40001 # 获取所有物品: lrange MyItem:10001 # 40001 40002 40003 hget MyCart:10001 40001 hget MyCart:10001 40002 hget MyCart:10001 40003
集合,用来存储唯一性字段,不要求有序;存储不需要有序,操作(交并差集的时候排序)?
set是一个无序集合,虽然使用的时候作为一个无序集合,但是它的底层实现是一个有序的结构,存储是有序的。底层作为有序的原因,是为了集合做交并差集。如果set存储的是数字,使用的是整数数组存储;如果set存储的是字符串,那么使用字典来存储(hash)。使用字典存储是无序,但是做交并差集时会将所以的字符串取出来,然后排序,再做交并差集。
# 添加一个或多个指定的member元素到集合的 key中 SADD key member [member ...] # 计算集合元素个数 SCARD key # SMEMBERS key # 列出所有的数据 SMEMBERS key # 返回成员 member 是否是存储的集合 key的成员 SISMEMBER key member # 随机返回key集合中的一个或者多个元素,不删除这些元素 SRANDMEMBER key [count] # 从存储在key的集合中移除并返回一个或多个随机元素 SPOP key [count] # 返回一个集合与给定集合的差集的元素 SDIFF key [key ...] # 返回指定所有的集合的成员的交集 SINTER key [key ...] # 返回给定的多个集合的并集中的所有成员 SUNION key [key ...]
示例:
127.0.0.1:6379> sadd members fly1 fly2 fly3 fly (integer) 4 127.0.0.1:6379> scard members (integer) 4 127.0.0.1:6379> smembers members 1) "fly2" 2) "fly3" 3) "fly1" 4) "fly" 127.0.0.1:6379> sismember members fly (integer) 1 127.0.0.1:6379> sismember members fly4 (integer) 0 127.0.0.1:6379> SRANDMEMBER members "fly" 127.0.0.1:6379> SRANDMEMBER members 2 1) "fly2" 2) "fly3" 127.0.0.1:6379> SRANDMEMBER members "fly3" 127.0.0.1:6379> spop members "fly1" 127.0.0.1:6379> sadd follow:A mark king darren mole vico (integer) 5 127.0.0.1:6379> sadd follow:C mark king darren (integer) 3 127.0.0.1:6379> sinter follow:A follow:C 1) "darren" 2) "mark" 3) "king" 127.0.0.1:6379> sdiff follow:A follow:C 1) "mole" 2) "vico"
元素都为整数且节点数量小于等于 512(set-max-intset-entries),则使用整数数组存储。
元素当中有一个不是整数或者节点数量大于 512,则使用字典存储。
(1)抽奖。
# 添加抽奖用户
sadd Award:1 10001 10002 10003 10004 10005 10006
sadd Award:1 10009
# 查看所有抽奖用户
smembers Award:1
# 抽取多名幸运用户
srandmember Award:1 10
# 如果抽取一等奖1名,二等奖2名,三等奖3名,该如何操作?
(2)共同关注。
sadd follow:A mark king darren mole vico
sadd follow:C mark king darren
sinter follow:A follow:C
(3)推荐好友。
sadd follow:A mark king darren mole vico
sadd follow:C mark king darren
# C可能认识的人:
sdiff follow:A follow:C
有序集合;用来实现排行榜;它是一个有序唯一。zset是实时有序的,zset内部是使用调表实现的。
# 添加到键为key有序集合(sorted set)里面 ZADD key [NX|XX] [CH] [INCR] score member [scoremember ...] # 从键为key有序集合中删除 member 的键值对 ZREM key member [member ...] # 返回有序集key中,成员member的score值 ZSCORE key member # 为有序集key的成员member的score值加上增量increment ZINCRBY key increment member # 返回key的有序集元素个数 ZCARD key # 返回有序集key中成员member的排名 ZRANK key member # 返回存储在有序集合key中的指定范围的元素 order by id limit 1,100 ZRANGE key start stop [WITHSCORES] # 返回有序集key中,指定区间内的成员(逆序) ZREVRANGE key start stop [WITHSCORES]
节点数量大于 128 或者有一个字符串长度大于 64,则使用跳表(skiplist);
节点数量小于等于 128(zset-max-ziplist-entries)且所有字符串长度小于等于 64(zset-max-ziplist-value),则使用
ziplist 存储;
数据少的时候,节省空间。时间复杂度O(n)。
数量多的时候,访问性能;时间复杂度O(1)或者O(
l
o
g
2
n
log_2n
log2n)。
(1)百度热搜。
# 点击新闻:
zincrby hot:20210601 1 10001
zincrby hot:20210601 1 10002
zincrby hot:20210601 1 10003
zincrby hot:20210601 1 10004
zincrby hot:20210601 1 10005
zincrby hot:20210601 1 10006
zincrby hot:20210601 1 10007
zincrby hot:20210601 1 10008
zincrby hot:20210601 1 10009
zincrby hot:20210601 1 10010
# 获取排行榜:
zrevrange hot:20210601 0 9 withscores
(2)延时队列。
将消息序列化成一个字符串作为 zset 的 member;这个消息的到期处理时间作为 score,然后用多个线程轮询 zset 获取到期的任务进行处理。
(3)分布式定时器。
生产者将定时任务 hash 到不同的 redis 实体中,为每一个redis 实体分配一个 dispatcher 进程,用来定时获取 redis 中超时事件并发布到不同的消费者中。
(4)时间窗口限流。
系统限定用户的某个行为在指定的时间范围内(动态)只能发生 N 次。
-- 指定用户 user_id 的某个行为 action 在特定时间内period 只允许发生该行为做大次数 max_count local function is_action_allowed(red, userid,action, eriod, max_count) local key = tab_concat({"hist", userid,action}, ":") local now = zv.time() red:init_pipeline() -- 记录行为 red:zadd(key, now, now) -- 移除时间窗口之前的行为记录,剩下的都是时间窗口内的记录 red:zremrangebyscore(key, 0, now - period*100) -- 获取时间窗口内的行为数量 red:zcard(key) -- 设置过期时间,避免冷用户持续占用内存 时间窗口的长度+1秒 red:expire(key, period + 1) local res = red:commit_pipeline() return res[3] <= max_count end -- 维护一次时间窗口,将窗口外的记录全部清理掉,只保留窗口内的记录; -- 缺点:记录了所有时间窗口内的数据,如果这个量很大,不适合做这样的限流;漏斗限流 -- 注意:如果用 key + expire 操作也能实现,但是实现的是熔断限流,这里是时间窗口限流的功能;
redis 内存数据库:
127.0.0.1:6379> type sign
string
127.0.0.1:6379> keys *
1) "sign"
2) "fly2"
3) "role:10001"
4) "fly1"
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。