赞
踩
由于Redis的列表(List)是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。
所以可以直接使用Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop
Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。
Sortes Set(有序列表),类似于java的SortedSet和HashMap的结合体,一方面它是一个set,保证内部value的唯一性,另一方面它可以给每个value赋予一个score,代表这个value的排序权重。内部实现是“跳跃表”。
有序集合的方案是在自己确定消息顺序ID时比较常用,使用集合成员的Score来作为消息ID,保证顺序,还可以保证消息ID的单调递增。通常可以使用时间戳+序号的方案。确保了消息ID的单调递增,利用SortedSet的依据
Score排序的特征,就可以制作一个有序的消息队列了。
就是可以自定义消息ID,在消息ID有意义时,比较重要。
缺点也明显,不允许重复消息(因为是集合),同时消息ID确定有错误会导致消息的顺序出错。
首先,Stream通过XADD和XREAD完成最简单的生产、消费模型:
- // *表示让Redis自动生成消息ID
- 127.0.0.1:6379> XADD queue * name zhangsan
- "1618469123380-0"
使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。
消费者拉取消息
- // 从开头读取5条消息,0-0表示从开头读取
- 127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
- 1) 1) "queue"
- 2) 1) 1) "1618469123380-0"
- 2) 1) "name"
- 2) "zhangsan"
- 2) 1) "1618469127777-0"
- 2) 1) "name"
- 2) "lisi"
可以的,在读取消息时,只需要增加 BLOCK 参数即可。
- // BLOCK 0 表示阻塞等待,不设置超时时间
- 127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。
Stream 通过以下命令完成发布订阅:
除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。
当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。
-
- // group1下的 1618472043089-0 消息已处理完成
- 127.0.0.1:6379> XACK queue group1 1618472043089-0
Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。
当消息队列发生消息堆积时,一般只有 2 个解决方案:
发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。
- // 队列长度最大10000
- 127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
- "1618473015018-0"
当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。
这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。
消息丢失、消息服务不稳定的问题严重限制了pubsub的应用场景,所以Redis需要重新设计一套机制,来解决这些问题,这就有了后来的stream结构。
一个稳定的消息服务需要具备几个要点,要保证消息不会丢失,至少被消费一次,要具备削峰填谷的能力,来匹配生产者和消费者吞吐的差异。在2018年Redis 5.0加入了stream结构,这次考虑了list、pubsub在应用场景下的缺陷,对标kafka的模型重新设计全内存消息队列结构,从这时开始Redis消息队列功能算是能和主流消息队列产品pk一把了。
stream的改进分为多个方面
成本:
功能:
消息不丢失:
- #基于stream完成消息的生产和消费,并确保异常状态下消息至少被消费一次
-
- #创建mystream,并且创建一个consumergroup为mygroup
- XGROUP CREATE mystream mygroup $ MKSTREAM
- OK
-
- #写入一条消息,由redis自动生成消息id,消息的内容是一个kv数组,这里包含field1 value1 field2 value2
- XADD mystream * field1 value1 field2 value2
- "1645517760385-0"
-
- #消费者组mygroup中的消费者consumer1从mystream读取一条消息,>表示读取一条该消费者组从未读取过的消息
- XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
- 1) 1) "mystream"
- 2) 1) 1) "1645517760385-0"
- 2) 1) "field1"
- 2) "value1"
- 3) "field2"
- 4) "value2"
-
- #消费完成后ack确认消息
- xack mystream mygroup 1645517760385-0
- (integer) 1
-
- #如果消费者应用在ack前异常宕机,恢复后重新获取未处理的消息id。
- XPENDING mystream mygroup - + 10
- 1) 1) "1645517760385-0"
- 2) "consumer1"
- 3) (integer) 305356
- 4) (integer) 1
-
- #如果consumer1永远宕机,其他消费者可以把pending状态的消息移动到自己名下后继续消费
- #将消息id 1645517760385-0移动到consumer2下
- XCLAIM mystream mygroup consumer2 0 1645517760385-0
- 1) 1) "1645517760385-0"
- 2) 1) "field1"
- 2) "value1"
- 3) "field2"
- 4) "value2"
Redis stream保证了消息至少被处理一次,但如果想做到每条消息仅被处理一次还需要应用逻辑的介入。
消息被重复处理要么是生产者重复投递,要么是消费者重复消费。
stream的模型做到了消息的高效分发,而且保证了消息至少被处理一次,通过应用逻辑的改造能做到消息仅被处理一次,它的能力对标kafka,但吞吐高于kafka,在高吞吐场景下成本比kafka低,那它又有哪些不足了。
首先消息队列很重要的一个功能就是削峰填谷,来匹配生产者和消费者吞吐的差异,生产者和消费者吞吐差异越大,持续时间越长,就意味着steam中需要堆积更多的消息,而Redis作为一个全内存的产品,数据堆积的成本比磁盘高。
其次stream通过ack机制保证了消息至少被消费一次,但这有个前提就是存储在Redis中的消息本身不会丢失。Redis数据的持久化依赖aof和rdb文件,aof落盘方式有几种,通过配置appendfsync决定,通常我们不会配置为always来让每条命令执行完后都做一次fsync,线上配置一般为everysec,每秒做一次fsync,而rdb是全量备份时生成,这意味了宕机恢复可能会丢掉最近一秒的数据。另一方面线上生产环境的Redis都是高可用架构,当主节点宕机后通常不会走恢复逻辑,而是直接切换到备节点继续提供服务,而Redis的同步方式是异步同步,这意味着主节点上新写入的数据可能还没同步到备节点,在切换后这部分数据就丢失了。所以在故障恢复中Redis中的数据可能会丢失一部分,在这样的背景下无论stream的接口设计的多么完善,都不能保证消息至少被消费一次。
优势
不足
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。