当前位置:   article > 正文

Redis 协议与异步方式

Redis 协议与异步方式

redis pipeline 模式

  • redis pipeline 是一个客户端提供的机制,与 redis 无关。
  • pipeline 不具备事务性。
  • 目的:节约网络传输时间。
  • 通过一次发送多条请求命令,从而减少网络传输时间。
    在这里插入图片描述
  • 时间窗口限流
    • 系统限定某个用户某个行为在指定的时间范围内(动态)只能发生 N 次
# 指定用户 user_id 的某个行为 action 在特定时间内 period 只允许发生该行为的最大次数 max_count
# 维护一次时间窗口,将窗口外的记录全部清理掉,只保留窗口内的记录
local function is_action_allowed(red, userid, action, period, max_count)
    local key = tab_concat({"hist", userid, action}, ":")
    local now = zv.time()
    red:init_pipeline()
    -- 记录行为
    red:zadd(key, now, now)
    -- 移除时间窗口之前的行为记录,剩下的都是时间窗口内的记录
    red:zremrangebyscore(key, 0, now - period * 100)
    -- 获取时间窗口内的行为数量
    red:zcard(key)
    -- 设置过期时间,避免冷用户持续占用内存 时间窗口的长度 + 1秒
    red:expire(key, period + 1)
    local res = red:commit_pipeline()
    return res[3] <= max_count
end
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

redis 发布订阅模式

  • 为了支持消息的多播机制,redis 引入了发布订阅模块。
  • 作用:
    • 没有建立连接的服务器之间进行交互。
    • 第三方系统和与 redis 建立连接的服务器之间进行交互。
  • 缺点:不确定消息到达。kafka 分布式消息队列、redis stream 模式可以确保消息到达。
    • 发布订阅的生产者传递过来一个消息,redis 会直接找到相应的消费者并传递过去。假如没有消费者,消息直接丢弃。假如开始有 2 个消费者,一个消费者突然挂掉了,另一个消费者依然能接收到消息。但是,如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失了。
    • redis 停机重启,pubsub 的消息是不会持久化的,所有的消息都被直接丢弃。
  • 使用场景
    • 业务可以接受消息丢失。
    • redis cluster 集群之间通信。
  • 服务器需要和 redis 建立多少连接
    • 5 种基本数据结构的处理,只需要一条连接,可以使用连接池。
    • 如果有阻塞连接的需求,另外建立一条连接。
    • 如果需要发布订阅模式,另外建立一条连接。

在这里插入图片描述

# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容

subscribe news.A news.B news.C
psubscribe news.*
publish new.B 'zcoder is good'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

redis 事务

  • 前提:有并发连接。
  • 事务是用户定义的一系列的数据库操作,要么全部执行,要么全部不执行,是不可分割的单元。
  • redis 事务原理:当使用 MULTI 开启事务时,redis 会创建一个队列,后续的所有命令都会入队,直到使用 EXEC 提交事务。提交事务时会将队列中的所有命令出队执行。因为 redis 处理命令是单线程的,所以在处理队列中的命令时会阻塞其它连接的命令,直到队列中的命令全部处理完。使用 DISCARD 可以清空队列。使用 WATCH 可以观察 key,如果 key 对应的 value 变动,说明其它连接修改了 value,事务的逻辑一致性被破坏,那么调用 EXEC 就会清空该事务的队列,返回 nil。
# 开启事务
MUITI
# 提交事务 
EXEC
# 取消事务
DISCARD

# 检测 key 对应的 value 的变动,若在事务执行中,value 变动则取消事务并返回 nil。
# 在事务开启前调用,乐观锁实现(cas) 
WATCH
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 应用
    • 加倍操作
      WATCH score
      val = GET score
      MULTI
      SET score val * 2
      EXEC
      
      • 1
      • 2
      • 3
      • 4
      • 5

lua 脚本

  • redis 中加载了一个 lua 虚拟机,用来执行 lua 脚本。
  • redis lua 脚本的执行是原子性的,当某个脚本正在执行的时候,不会有其他命令或者脚本被执行。
  • lua 脚本中的命令会直接修改数据状态。
  • 先将 lua 脚本提交到 redis 中,redis 会通过 lua 虚拟机解析 lua 脚本,并返回一个 hash 值,这个 hash 值可以代替这个 lua 脚本(通过这个 hash 值去索引对应的 lua 脚本)。
    • 优点:使用较短的字符串代替复杂的 lua 脚本。意味着在网络传输的过程中可以减少发送数据的流量。其次,效率会更高,因为使用的是已经编译好的 lua 脚本。
      在这里插入图片描述
# 测试使用
EVAL script numkeys key [key ...] arg [arg ...]

# 线上使用
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 应用
    • 项目启动时,建立 redis 连接并验证,通过 script load 加载项目中使用的 lua 脚本,script load 会返回对应 lua 脚本的 hash 值。
    • 项目中若需要热更新,通过 redis-cli 执行 script flush 然后可以使用订阅发布功能通知所有服务器重新加载 lua 脚本。
    • 若项目中 lua 脚本发生阻塞,可通过 script kill 暂停当前阻塞脚本的执行。
# 从文件中读取 lua脚本内容
cat test.lua | redis-cli script load --pipe
# 加载 lua脚本字符串 生成 sha1
> script load 'local val = KEYS[1]; return val'
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"
# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
> script exists "b8059ba43af6ffe8bed3db65bac35d452f8115d8"
1) (integer) 1
# 清除所有脚本缓存
> script flush
OK
# 如果当前脚本运行时间过长(死循环),可以通过 script kill杀死当前运行的脚本
> script kill
(error) NOTBUSY No scripts in execution right now.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

ACID 特性

  • 原子性(A)
    • 事务是一个不可分割的单位,事务中的操作要么全部成功,要么全部失败。
    • redis 不支持回滚,即使事务队列中的某个命令在执行期间出现了错误,整个事务也会继续执行下去,直到将事务队列中的所有命令都执行完毕为止。
  • 一致性(C)
    • 事务的前后,所有的数据都保持一个一致的状态,不能违反数据的一致性检测。这里的一致性是指预期的一致性而不是异常后的一致性,所以 redis 也不满足。
      • 这个争议很大:redis 能确保事务执行前后的数据的完整约束,但是并不满足业务功能上的一致性,比如转账功能,一个扣钱一个加钱,可能出现扣钱执行错误,加钱执行正确,那么最终还是会加钱成功,系统凭空多了钱。
  • 隔离性(I)
    • 各个事务之间互相影响的程度,redis 是单线程执行,天然具备隔离性。
  • 持久性(D)
    • redis 只有在 aof 持久化策略的时候,并且需要在 redis.conf 中 appendfsync=always 才具备持久性,实际项目中几乎不会使用 aof 持久化策略。
  • lua 脚本满足原子性和隔离性,不满足一致性和持久性

redis 异步连接

  • 同步连接方案采用阻塞 io 实现。通常用多个线程实现线程池解决效率问题。
    • 优点:代码书写是同步的,业务逻辑没有割裂。
    • 缺点:阻塞当前线程,直至 redis 返回结果。
  • 异步连接方案采用非阻塞 io 实现。
    • 优点:没有阻塞当前线程,就算 redis 没有返回,依然可以往 redis 发送命令。
    • 缺点:代码书写是异步的(回调函数),业务逻辑割裂。
  • 基于 reactor 实现异步连接:
    1. 与 redis 建立连接
    a. 创建 socket, 设置 fd 为非阻塞 io
    b. 调用 connect(fd, &addr, &len)
    c. 将 fd 注册到 epoll, 注册写事件
    d. 如果连接建立成功, fd 的写事件会进行响应, 然后注销写事件
    
    • 1
    • 2
    • 3
    • 4
    1. 向 redis 发送数据(使用 redis 协议加密,然后通过 tcp 发送过去)
    a. int n = write(fd, buf, sz)
       如果 n < sz && n != -1 或者 n == -1 && errno = EWOULDBLOCK 
       说明 fd 对应的发送缓冲区已经满了
    b. 注册写事件, 如果写事件触发, 继续 write(fd, buf, sz),
       如果发送完毕, 注销写事件
    c. 注册读事件
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. 读取 redis 的返回(通过 tcp 接收数据并分割数据包,然后使用 redis 协议解密)
    a. 读事件触发, int n = read(fd, buf, sz)
    b. 根据 redis 协议分割数据包
    c. 使用 redis 协议解密
    
    • 1
    • 2
    • 3

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号