赞
踩
目前在redis想要实现消息队列的功能有如下的两种方案:
- 1:基于List的lpush和rpop
- 2:Streams
这里不将pub/sub考虑在内,因为其不具备持久化的能力,消息会丢失。
其中1是利用其有的先进先出特性实现,2是redis为了实现消息队列专门在redis5版本中定义的一种新的数据结构,这里注意,其也是一种数据结构,和String,Set等处于同等位置的数据结构,只不过内部增加了一些针对消息队列的一些特有操作来实现消息队列的功能,后续我们会详细分析其用法。
消息保序
即要保证消息发送的顺序和消费的顺序是一致的,不一致的话可能会导致业务上的错误。
重复消息处理
消息的重复处理,准确来说是应该通过消费者自身来实现的,在应用程序内部实现这些逻辑,但是对于消息中间件来说也要有此类机制,即对于一个已经被消费的消息(已经收到ACK)不能再次被消费。
消息可靠性
换句话说,要具有持久化的能力,避免消息丢失,这样当消费者异常宕机导致再次重启后需要重新消费消息时可以再次获取。
接下来我们先来看下使用List如何实现消息队列。
生产消息我们可以使用lpush,消费消息可以使用rpop,如下图生产和消费消息的过程:
但是注意,这里我们使用rpop消费消息时,如果没有消息则会直接返回, 且有新消息时也不会通知,此时就需要通过诸如while(true),for(;;)之类的死循环来不断查询,这样就会不断的消耗CPU资源,造成不必要的CPU资源浪费,为了解决这个问题,redis提供了rpop阻塞版本命令brpop,其中bblock就代表了阻塞的特性,当有消息可以消费时,这样就解决了浪费CPU资源的问题。
到这里,对于消息中间件的3个需求,List都满足哪些呢?首先第一个消息保序,是天然支持的,对于重复消息处理List本身是不支持的,但是我们在前面也分析了,这准确来说更应该是消费者本身来实现的,因此,生产者在生产消息时只要给每一个消息一个唯一的标识,然后消费者通过此来避免消息重复处理就可以了,比如LPUSH mq "101030001:stock:5",其中的101030001就是其唯一标识。最后是消息可靠性,为了满足消息可靠性,redis进一步对rpop进行了增强,提供了BRPOPLPUSH命令,消息弹出后,会重新压到一个新的队列中,这样当消费者异常重启后就可以从这里重新消费消息了,如下图:
到这里,List在一定程序上都满足了消息中间件需要满足的3个条件,这里还需要考虑另外一种情况,即当消费者的消费能力严重不足,或者是生产者生产的消息量非常大,即生产能力远大于消费能力的时候List就没有什么办法了,因为其只能一个一个的弹出消息,对于这个问题就需要依靠redis专门针对消息队列的场景实现的新数据结构Streams,注意这也是一种新的数据结构。
我们前面说了,Streams是一种redis专门为消息队列定义的一种数据结构,所以自然的我们是先要看如何定义这种数据结构了,和其它的数据结构一样,我们不需要显式的创建,在执行第一次数据添加的时候自动创建,添加数据的命令是XADD,语法格式是XADD key ID field value [field value ...],参数说明如下:
- key:redis的key
- ID:消息的唯一标识,可以指定,也可以设置为*,设置为*时id会自动生成,id是递增
- field value:消息的字段和值
如下生产(创建)若干条消息:
redis> XADD mystream * name Sara surname OConnor "1601372323627-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1601372323627-1" redis> XLEN mystream (integer) 2 redis> XRANGE mystream - + 1) 1) "1601372323627-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "OConnor" 2) 1) "1601372323627-1" 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2" 5) "field3" 6) "value3" redis> 1234567891011121314151617181920
其中XLEN用来查看消息的个数,XRANGE用来通过范围查询基于递增ID获取消息,-相当于是负无穷,+相当于是正无穷,即获取所有消息。我们接着再来看下其它一些命令。
根据ID删除消息,如下测试:
> XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-0 2) 1) "a" 2) "1" 2) 1) 1538561701744-0 2) 1) "c" 2) "3" 123456789101112131415
获取消息的数量,语法格式xlen key,如下:
- redis> XADD mystream * item 1
- "1601372563177-0"
- redis> XADD mystream * item 2
- "1601372563178-0"
- redis> XADD mystream * item 3
- "1601372563178-1"
- redis> XLEN mystream
- (integer) 3
- redis>
- 123456789
查询指定范围的消息,语法格式XRANGE key start end [COUNT count],解释如下:
- key :队列名
- start :开始值, - 表示最小值
- end :结束值, + 表示最大值
- count :数量
- 1234
测试如下:
redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) "1601372577811-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) "1601372577811-1" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" redis> 123456789101112131415161718192021222324
从后往前获取消息,语法格式XREVRANGE key end start [COUNT count],解释如下:
- key :队列名
- end :结束值, + 表示最大值
- start :开始值, - 表示最小值
- count :数量
- 1234
实例如下:
redis> XADD writers * name Virginia surname Woolf "1601372731458-0" redis> XADD writers * name Jane surname Austen "1601372731459-0" redis> XADD writers * name Toni surname Morrison "1601372731459-1" redis> XADD writers * name Agatha surname Christie "1601372731459-2" redis> XADD writers * name Ngozi surname Adichie "1601372731459-3" redis> XLEN writers (integer) 5 redis> XREVRANGE writers + - COUNT 1 1) 1) "1601372731459-3" 2) 1) "name" 2) "Ngozi" 3) "surname" 4) "Adichie" 123456789101112131415161718
以阻塞或者是非阻塞的方式获取消息,即消费消息的命令,语法格式XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...],解释如下:
- count :数量
- milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
- key :队列名
- id :消息 ID
- 1234
测试如下:
# 从 Stream 头部读取两条消息 > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream" 2) 1) 1) 1526984818136-0 2) 1) "duration" 2) "1532" 3) "event-id" 4) "5" 5) "user-id" 6) "7782813" 2) 1) 1526999352406-0 2) 1) "duration" 2) "812" 3) "event-id" 4) "9" 5) "user-id" 6) "388234" 2) 1) "writers" 2) 1) 1) 1526985676425-0 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) 1526985685298-0 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" 12345678910111213141516171819202122232425262728
创建消费者组,使用消费者可以对消息进行并发的消费,解决消费者消费能力不足的问题,语法格式为XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername],解释如下:
- key :队列名称,如果不存在就创建
- groupname :组名。
- $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。
如下从头开始消费:
XGROUP CREATE mystream consumer-group-name 0-0
如下从尾部开始消费:
XGROUP CREATE mystream consumer-group-name $
在实际的场景中我们可以通过设置多个消费者组的不同开始消费的位置来实现并发消费的效果,此时可能如下图:
图中主要元素解释如下:
- 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
- Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
- last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
- pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。
读取消费者组中的消息,语法格式如下:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
解释如下:
- group :消费组名
- consumer :消费者名。
- count : 读取数量。
- milliseconds : 阻塞毫秒数。
- key : 队列名。
- ID : 消息 ID。
- 123456
如下测试:
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
符号>标识从第一条尚未被消费的消息开始消费。
-
- XADD mystream * name Sara surname OConnor
-
-
-
- XGROUP CREATE mystream consumer-group-name 0-0
-
-
-
- XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
参考文章列表:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。