当前位置:   article > 正文

Redis消息队列_redis 消息队列

redis 消息队列

消息队列:字面意思就是存放消息的队列。使用队列的好处在于解耦 。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)

  • 生产者:发送消息到消息队列

  • 消费者:从消息队列获取消息并处理消息

Redis消息队列

  • 基于List实现消息队列
  • 基于PubSub的消息队列
  • 基于Stream的消息队列

基于List结构模拟消息队列

Redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null

优点:

  • 利用Redis存储

  • 基于Redis的持久化机制,数据安全性有保证

  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失

  • 不能重复消费

  • 只支持单消费者

基于PubSub的消息队列

PubSub(发布订阅)消息传递模型。消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  1. SUBSCRIBE channel [channel] :订阅一个或多个频道
  2. PUBLISH channel msg :向一个频道发送消息
  3. PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

发布订阅机制存在以下缺点,都是跟丢失数据有关:

  1. 发布/订阅机制没有基于任何数据类型实现,所以不具备【数据持久化】的能力,也就是发布/订阅机制的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,发布/订阅机制的数据也会全部丢失。
  2. 发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后不能消费之前的历史消息。
  3. 当消费端有一定的消息积压时,也就是生产者发送的消息,消费者消费不过来时,如果超过 32M 或者是 60s 内持续保持在 8M 以上,消费端会被强行断开,这个参数是在配置文件中设置的,默认值是 client-output-buffer-limit pubsub 32mb 8mb 60

所以,发布/订阅机制只适合即时通讯的场景

基于Stream的消息队列

支持消息的持久化、支持自动生成全局唯一 ID、支持 ack 确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠。大致流程如下

  1. 消息保序:XADD/XREAD
  2. 阻塞读取:XREAD block
  3. 重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;
  4. 消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;
  5. 支持消费组形式消费数据

Stream 消息队列操作命令

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
  • XLEN :查询消息长度;
  • XREAD:用于读取消息,可以按 ID 读取数据;
  • XDEL : 根据消息 ID 删除消息;
  • DEL :删除整个 Stream;
  • XRANGE :读取区间消息
  • XREADGROUP:按消费组形式读取消息;
  • XPENDING 命令可以用来查询每个消费组内所有消费者「已读取、但尚未确认」的消息
  • XACK 命令用于向消息队列确认消息处理已完成

Stream 可以以使用 XGROUP 创建消费组,创建消费组之后,Stream 可以使用 XREADGROUP 命令让消费组内的消费者读取消息

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称

  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者

  • count:本次查询的最大数量

  • BLOCK milliseconds:当没有消息时最长等待时间

  • NOACK:无需手动ACK,获取到消息后自动确认

  • STREAMS key:指定队列名称

  • ID:获取消息的起始ID:

">":从下一个未消费的消息开始 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始。

PENDING List

pending-list确保消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息

Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示

消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息

  1. #查看一下 group 中各个消费者已读取、但尚未确认的消息个数
  2. XPENDING KEY GROUP
  3. #想查看某个消费者具体读取了哪些数据
  4. XPENDING KEY GROUP - + start end count
  5. #XACK 命令通知 Streams
  6. XACK KEY GROUP consumerid

消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息
比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:

  1. > XREADGROUP GROUP group1 consumer1 STREAMS mymq >
  2. (nil)

但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息)

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/395922
推荐阅读
相关标签
  

闽ICP备14008679号