赞
踩
MQ(Message Queue)是一种提供消息队列服务的中间件,也称消息中间件,是一套提供了海量消息生产、存储、消费全过程 API 的软件系统。
MQ 的作用:
常见的 MQ:
MQ | 开发语言 | 单机吞吐量 | Topic | 社区活跃度 |
---|---|---|---|---|
ActiveMQ | Java 语言 | 万级 | - | 低 |
RabbitMQ | ErLang 语言 | 万级 | - | 高 |
Kafka | Scala/Java 语言 | 十万级 | 百级 Topic 会影响吞吐量 | 高 |
RocketMQ | Java 语言 | 十万级 | 千级 Topic 会影响吞吐量 | 高 |
MQ 协议:
协议 | 描述 |
---|---|
JMS | JMS(Java Message Service),是 Java 平台上有关 MOM(Message Oriented Middleware 面向消息的中间件) 的技术规范,它便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接受消息的接口,简化企业应用的开发。 ActiveMQ 是该协议的典型实现。 |
STOMP | STOMP(Streaming Text Orientated Message Protocol),是一种 MOM 的简单文本协议。STOMP 提供一个可互操作式的连接格式,允许客户端与任意 STOMP 消息代理进行交互。 ActiveMQ 是该协议的典型实现,RabbitMQ 通过插件可以支持该协议。 |
AMQP | AMQP(Advanced Message Queuing Protocol),是一个提供统一消息服务的应用层标准,是应用层协议的一个开放标准,是一种 MOM 设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品、不同开发语言等条件的现在。 RabbitMQ是该协议的典型实现。 |
MQTT | MQTT(Message Queuing Telemetry Transport),是 IBM 开发的一个即时通讯协议,是一种二进制协议,主要用于服务器和低功耗 loT 设备间的通信。该协议支持所有平台,几乎可以把所有物联网物品和外部链接起来,被用来做传感器和制动器的通信协议。 RabbitMq 通过插件可以支持该协议。 |
Apache RocketMQ 是一款由阿里巴巴提供的开源的分布式消息中间件,是一个分布式消息和流式处理平台,具有低时延、高性能和高可靠性、万亿级容量和灵活扩展的特点,适用于构建海量消息堆积和异步解耦功能的应用系统。
Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。
主题(Topic):
主题是 RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过 TopicName 来做唯一标识和区分。
消息(Message):
消息是指消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。 生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。
消息类型(MessageType):
RocketMQ 中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。RocketMQ 支持的消息类 型有普通消息、顺序消息、事务消息和定时/延时消息。
RocketMQ 从5.0版本开始,支持强制校验消息类型,即每个主题 Topic 只允许发送一种消息类型的消息, 这样可以更好的运维和管理生产系统,避免混乱。但同时保证向下兼容4.x版本行为,强制校验功能默认开启。
消息队列(MessageQueue):
消息队列是 RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。RocketMQ 的所有主 题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过 QueueId 来做唯一标识和区分。
消息视图(MessageView):
消息视图是 Apache RocketMQ 面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。
消息标签(MessageTag):
消息标签 是 RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分,消费者通过订阅特定的标签来实现细粒度过滤。
消息位点(MessageQueueOffset):
消息是按到 达 RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的 Long 类型坐标,这个坐标被定义为消息位点。
消费位点(ConsumerOffset):
一条消息被 某个消费者消费完成后不会立即从队列中删除,RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。
消息索引(MessageKey):
消息索引 是 RocketMQ 提供的面向消息的索引属性,通过设置的消息索引可以快速查找到对应的消息内容。
生产者(Producer):
生产者 是 RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。
事务检查器(TransactionChecker):
事务 检查器 RocketMQ 中生产者用来执行本地事务检查和异常事务恢复的监听器,事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。
事务状态(TransactionResolution)
Rock etMQ 中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。
消费者(Consumer):
消 费者是 RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
消费者分组(ConsumerGroup):
消 费者分组是 RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
消费结果(ConsumeResult):
RocketMQ 中 PushConsumer 消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。
订阅关系(Subscription):
订阅关系是Apache RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。
消息过滤:
消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在 RocketMQ 的服务端完成。
重置消费位点:
以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到 RocketMQ 服务端的消息。
消息轨迹:
在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由 RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。
消息堆积:
生产者已经将消息发送到 RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。
事务消息:
事务消息是 RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
定时/延时消息:
定时/延时消息是 RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
顺序消息:
顺序消息是 RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
Producer 是消息生产者,负责生产消息。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
RocketMQ 中的消息生产者都是以生产者组(Producer Group)的形式出现的,生产者组是同一类生产者的集合,这类 Producer 发送相同 Topic 类型的消息。
Consumer 是消息消费者,负责消费消息。一个消息消费者会从 Broker 服务器中获取到消息,并对消息进行相关业务处理。
RocketMQ 中的消息消费者都是以消费者组(Consumer Group)的形式出现的,消费者组是同一类消费者的集合,这类 Consumer 消费的是同一个 Topic 类型的消息。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
消费者组中 Consumer 的数量应该小于等于订阅 Topic 的 Queue 数量,如果超出 Queue 数量,则多出的 Consumer 将不能消费消息。
不过,一个 Topic 类型的消息可以被多个消费者组同时消费。
NameServer 是一个 Broker 与 Topic 路由的注册中心,支持 Broker 的动态注册与发现。
NameServer 主要包括两个功能:
路由注册:
NameServer 通常也是以集群的方式部署,不过 NameServer 是无状态的,即 NameServer 集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。在 Broker 节点启动时,轮询 NameServer 列表,与每个 NameServer 节点建立长连接,发起注册请求。在 NameServer 内部维护着一个 Broker 列表,用来动态存储 Broker 的信息。
Broker 节点为了证明自己是活着的,为了维护与 NameServer 间的长连接,会将最新的信息以心跳包的方式上报给 NameServer,每50秒发送一次心跳。心跳包中包含 BrokerIld、Broker 地址、Broker 名称、Broker 所属集群名称等等。NameServer 在接收到心跳包后,会更新心跳时间戳,记录这个 Broker 的最新存活时间。
路由剔除:
由于 Broker 关机、宕机或网络抖动等原因,NameServer 没有收到 Broker 的心跳,NameServer 可能会将其从 Broker 列表中剔除。
NameServer 中有一个定时任务,每隔10秒就会扫描一次 Broker 表,查看每一个 Broker 的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定 Broker 失效,然后将其从 Broker 列表中剔除。
路由发现:
RocketMQ 的路由发现采用的是 Pull 模型。当 Topic 路由信息出现变化时,NameServer 不会主动推送给客户端,而是客户端定时拉取主题最新的路由。默认客户端每30秒会拉取一次最新的路由。
客户端 NameServer 选择策略:
客户端在配置时必须要写上 NameServer集群的地址,那么客户端到底连接的是哪个 NameServer 节点呢?客户端首先会生成一个随机数,然后再与 NameServer 节点数量取模,此时得到的就是所要连接的节点索引,然后就会进行连接。如果连接失败,则会采用 round-robin 策略,逐个尝试着去连接其它节点。
首先采用的是随即策略,失败后采用轮询策略。
Broker 充当着消息中转角色,负责存储消息、转发消息。
Broker 在 RocketMQ 系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker 同时也存储着消息相关的元数据,包括消费者组消费进度偏移 offset、主题、队列等。
Broker 功能模块:
集群部署:
为了增强 Broker 性能与吞吐量,Broker 一般都是以集群形式出现的。各集群节点中可能存放着相同 Topic 的不同 Queue。每个 Broker 集群节点可以进行横向扩展,即将 Broker 节点再建为一个 HA 集群,解决单点数据丢失问题。
Broker 节点集群是一个主从集群,即集群中具有 Master 与 Slave 两种角色。
(1)RocketMQ 下载
进入 RocketMQ 官网
点击 Download 进入下载界面,如下。选择对应的版本下载即可,如 rocketmq-all-4.8.0-bin-release.zip
将下载完成的文件上传到 Linux 服务器中,并使用以下命令完成解压
unzip rocketmq-all-4.8.0-bin-release.zip
解压后目录如下:
(2)修改配置
修改 RocketMQ 配置,需要修改 runserver.sh 文件和 runbroker.sh 文件中的初始内存。
runserver.sh 文件
修改为:
-server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
runbroker.sh 文件
修改为:
-server -Xms256m -Xmx256m -Xmn128m"
(3)启动 RocketMQ
启动 NameServer
- # 启动 nameserver
- nohup sh bin/mqnamesrv &
启动结果:
在 RocketMQ 根目录下会生成一个 nohup.out 文件,查看该文件如下:
出现 The Name Server boot success,则说明 NameServer 启动成功。可以使用如下命令查看NameServer 运行日志:
- # 查看日志
- tail -f ~/logs/rocketmqlogs/namesrv.log
日志如下:
启动 Broker
- # 启动 broker
- nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
-
- # 查看日志
- tail -f ~/logs/rocketmqlogs/proxy.log
启动结果:
查看 broker 运行日志:
查看 proxy.log 日志:
(4)测试消息发送接收
发送消息:
- $ export NAMESRV_ADDR=localhost:9876
- $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
发送结果:
接收消息:
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
接收结果:
(5)关闭 RocketMQ
先关闭 Broker
sh bin/mqshutdown broker
关闭结果:
再关闭 NameServer
sh bin/mqshutdown namesrv
关闭结果:
(1)下载 RocketMQ 控制台
RocketMQ 原来的控制台为 rocketmq-console,现在已经改为 rocketmq-externals 并单独列为一个项目,现在取名:rocketmq-dashboard。
rocketmq-externals 地址:https://github.com/apache/rocketmq-externals。
下载 rocketmq-dashboard 可以直接在官网下载
rocketmq-dashboard 是一个 springboot 项目,下载后解压,进入项目配置文件 application.properties,修改端口号和 NameServer 地址,如下:
修改完成之后,进入到项目个根路径,使用 cmd 命令行进行项目的打包,打包命令如下:
mvn clean package -Dmaven.test.skip=true
打包执行结果:
打包完成,生成如下 jar 包:
使用 java -jar 命令执行 jar 包,启动项目,结果如下:
启动成功之后,本地访问 localhost:9999,结果如下:
注意:
安装 RocketMQ 之后,需要开放端口 9876,10911,10912,10909
9876:NameServer 的端口
10911:Broker 对外服务的监听端口
10912:haService 中使用
10909:主要用于 slave 同步 master
(1)复制策略
复制策略是 Broker 的 Master 与 Slave 间的数据同步方式,分为同步复制与异步复制:
(2)刷盘策略
刷盘策略指的是 broker 中消息的落盘方式,即消息发送到 broker 内存后消息持久化到磁盘的方式。分为同步刷盘与异步刷盘:
单 Master
只有一个 Broker,这种方式也只能是在测试时使用,生产环境下不能使用,因为存在单点问题。
多 Master
Broker 集群仅由多个 Master 构成,不存在Slave。同一 Topic 的各个 Queue 会平均分布在各个 Master 节点上。
多 Master 多 Slave-异步复制
Broker 集群由多个 Master 构成,每个 Master 又配置了多个 Slave (在配置了 RAID 磁盘阵列的情况下,一个 Master 一般配置一个 Slave 即可) 。Master 与 Slave 的关系是主备关系,即 Master 负责处理消息的读写请求,而 Slave 仅负责消息的备份与 Master 宕机后的角色切换。
异步复制即前面所讲的复制策略中的异步复制策略,即消息写入 Master 成功后,Master 立即向 producer 返回成功 ACK,无需等待 Slave 同步数据成功。
该模式的最大特点之一是,当 Master 宕机后 Slave 能够自动切换为 Master 。不过由于 Slave 从 Master 的同步具有短暂的延迟(毫秒级),所以当 Master 宕机后,这种异步复制方式可能会存在少量消息的丢失问题。
多 Master 多 Slave-同步双写
该模式是多 Master 多 Slave 模式的同步复制实现。所谓同步双写,指的是消息写入 Master 成功后,Master 会等待 Slave 同步数据成功后才向 producer 返回成功 ACK,即 Master 与 Slave 都要写入成功后才会返回成功 ACK,也即双写。
该模式与异步复制模式相比,优点是消息的安全性更高,不存在消息丢失的情况。但单个消息的 RT 略高,从而导致性能要略低(大约低10%)
该模式存在一个大的问题:对于目前的版本,Master 宕机后,Slave不能自动切换到 Master。
(1)RocketMQ 集群配置文件
以 2m-2s-async 集群模式为例,进入到 2m-2s-async 目录,如下:
进入到 Master 配置文件 broker-a.properties,如下:
进入到 Slave 配置文件 broker-a-s.properties,如下
(2)添加配置
服务器1上需要配置如下配置文件:
服务器2上需要添加如下配置文件:
Master 配置文件需要添加如下配置:
Slave 配置文件需要添加如下配置:
(3)启动服务器
启动 NameServer:分别启动两台服务器上的 NameServer,启动命令如下:
- # 启动 nameserver
- nohup sh bin/mqnamesrv &
-
- # 查看日志
- tail -f ~/logs/rocketmqlogs/namesrv.log
-
- # 查看根目录下的 nohup.out 文件
- tail -f nohup.out
启动结果:
启动 Master:分别启动两台服务器上的 broker master,需要指定配置文件,命令如下:
- # 服务器1
- # 启动 broker master
- nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &
-
- # 查看日志
- tail -f ~/logs/rocketmqlogs/broker.log
-
-
-
- # 服务器2
- # 启动 broker master
- nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties &
-
- # 查看日志
- tail -f ~/logs/rocketmqlogs/broker.log
启动 Slave:分别启动两台服务器上的 broker slave,需要指定配置文件,命令如下:
- # 服务器1
- # 启动 broker master
- nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
-
- # 查看日志
- tail -f ~/logs/rocketmqlogs/broker.log
-
-
-
- # 服务器2
- # 启动 broker master
- nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &
-
- # 查看日志
- tail -f ~/logs/rocketmqlogs/broker.log
启动完成后,通过 jps 命令可以查看运行进程
Producer 可以将消息写入到某 Broker 中的某 Queue 中,其经历了如下过程:
对于无序消息,其 Queue 选择算法,也称为消息投递算法,常见的有两种:
(1)store 目录的文件
RocketMQ 中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的 store 目录中。
文件说明:
(2)commitlog 文件
commitlog 目录中存放着很多的 mappedFile 文件,当前 Broker 中的所有消息都是落盘到这些 mappedFile 文件中的。mappedFile 文件大小为1G(小于等于1G),文件名由20位十进制数构成,表示当前文件的第一条消息的起始位移偏移量。
需要注意的是,一个 Broker 中仅包含一个 commitlog 目录,所有的 mappedFile 文件都是存放在该目录中的。即无论当前 Broker 中存放着多少 Topic 的消息,这些消息都是被顺序写入到了 mappedFile 文件中的,这些消息在 Broker 中存放时并没有被按照 Topic 进行分类存放。
消息单元:
mappedFile 文件内部消息存放结构示意图如下:
mappedFile 文件内容由一个个的消息单元构成,每个消息单元中包含一下内容:
(3)consumequeue 文件
为了提高效率,会为每个 Topic 在 -/store/consumequeue 中创建一个目录,目录名为 Topic 名称。在该 Topic 目录下,会再为每个该 Topic 的 Queue 建立一个目录,目录名为 queueld。每个目录中存放着若干 consumequeue文件,consumequeue 文件是 commitlog 的索引文件,可以根据 consumequeue 定位到具体的消息。
consumequeue 文件名也由20位数字构成,表示当前文件的第一个索引条目的起始位移偏移量。与 mappedFile 文件名不同的是,其后续文件名是固定的,因为 consumequeue 文件大小是固定不变的。
索引条目
每个 consumequeue 文件可以包含30w个索引条目,每个索引条目包含了三个消息重要属性
这三个属性占20个字节,所以每个文件的大小是固定的30w *20字节。
(4)文件读写
消息写入:
一条消息进入到 Broker 后经历了以下几个过程才最终被持久化:
消息拉取:
当 Consumer 来拉取消息时会经历以下几个步骤:
RocketMQ 读写性能:
RocketMQ 对文件的读写操作是通过 mmap 零拷贝进行的,将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率。
consumequeue 中的数据是顺序存放的,还引入了 PageCache 的预读取机制,使得对 consumequeue 文件的读取几乎接近于内存读取,即使在有消息堆积情况下也不会影响性能。RocketMQ 中可能会影响性能的是对 commitlog 文件的读取。因为对 commitlog 文件来说,读取消息时会产生大量的随机访问,而随机访问会严重影响性能。不过,如果选择合适的系统 IO 调度算法,比如设置调度算法为 Deadline(采用 SSD 固态硬盘的话),随机读的性能也会有所提升。
(5)indexFile
indexFile 文件位于 store/index 目录下:
除了通过通常的指定 Topic 进行消息消费外,RocketMQ 还提供了根据 key 进行消息查询的功能,该查询就是通过 indexFile 进行索引实现的快速查询。这个 indexFile 中的索引数据是在包含了 key 的消息被发送到 Broker 时写入的。如果消息中没有包含 key,则不会写入。
indexFile 文件结构:
每个 Broker 中会包含一组 indexFile,每个 indexFile 都是以一个时间戳命名的(indexFile 被创建时的时间戳)。
使用时间戳命名好处:
根据业务 key 进行查询时,查询条件除了 key 之外,还需要指定一个要查询的时间戳,表示要查询不大于该时间戳的最新的消息,即查询指定时间戳之前存储的最新消息。这个时间戳文件名可以简化查询,提高查询效率。
每个 indexFile 文件由三部分构成
每个 indexFile 文件中包含500w个 slot 槽,而每个 slot 槽又可能会挂载很多的 index 索引单元。
indexHeader 固定40个字节,其中存放着如下数据:
indexFile 中最复杂的是 Slots 与 Indexes 间的关系。在实际存储时,Indexes 是在 Slots 后面的,但为了便于理解,将它们的关系展示为如下形式:
key 的 hash 值 % 500w 的结果即为 slot 槽位,然后将该 slot 值修改为该 index 索引单元的 indexNo,根据这个 indexNo 可以计算出该 index 单元在 indexFile 中的位置。
该取模结果的重复率是很高的,为了解决该问题,在每个 index 索引单元中增加了 preIndexNo,用于指定该 slot 中当前 index 索引单元的前一个 index 索引单元。而 slot 中始终存放的是其下最新的 index 索引单元的 indexNo,这样的话,只要找到了slot 就可以找到其最新的 index 索引单元,而通过这个 index 索引单元就可以找到其之前的所有 index 索引单元。
index 索引单元默写20个字节,其结构如下:
index 索引单元包含以下四个属性:
(1)消息消费类型
消费者从 Broker 中获取消息的方式有两种:拉取方式 pull 和推动方式 push。
pull:拉取方式
push:推动方式
(2)消费模式
消费者组对于消息消费的模式又分为两种:集群消费 Clustering 和广播消费 Broadcasting。
广播模式
广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收同一个 Topic 的全量消息,即每条消息都会被发送到 Consumer Group 中的每个 Consumer。
(3)消息进度保存
广播模式:
消费进度保存在 consumer 端。因为广播模式下 consumer group 中每个 consumer 都会消费所有消息,但它们的消费进度是不同,所以 consumer 各自保存各自的消费进度。
集群模式:
消费进度保存在 broker 中。consumer group 中的所有 consumer 共同消费同一个 Topic 中的消息,同一条消息只会被消费一次,消费进度会参与到了消费的负载均衡中,故消费进度是需要共享的。
Rebalance 即再均衡,指将一个 Topic 下的多个 Queue 在同一个 Consumer Group 中的多个 Consumer 间进行重新分配的过程。
当消费者所订阅的 Queue 数量发生变化,或消费者组中消费者的数量发生变化,就会产生 Rebalance。
Rebalance 机制的本意是为了提升消息的并行消费能力。例如,一个 Topic 下5个队列,在只有1个消费者的情况下,这个消费者将负责消费这5个队列的消息。如果此时我们增加一个消费者,那么就可以给其中一个消费者分配2个队列,给另一个分配3个队列,从而提升消息的并行消费能力。
Rebalance 限制:
由于一个队列最多分配给一个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列。
Rebalance 危害:
Rebalance 过程:
在 Broker 中维护着多个 Map 集合,这些集合中动态存放着当前 Topic 中 Queue 的信息、 Consumer Group 中 Consumer 实例的信息。一旦发现消费者所订阅的 Queue 数量发生变化,或消费者组中消费者的数量发生变化,立即向 Consumer Group 中的每个实例发出 Rebalance 通知。
Consumer 实例在接收到通知后会采用 Queue 分配算法自己获取到相应的 Queue,即由 Consumer 实例自主进行 Rebalance。
(1)Queue 分配算法
一个 Topic 中的 Queue 只能由 Consumer Group 中的一个 Consumer 进行消费,那么它们间的配对关系是根据算法策略确定的,常见的有四种策略:
(2)平均分配策略
该算法是要根据 avg= QueueCount / ConsumerCount 的计算结果进行分配的。如果能够整除,则按顺序将 avg 个 Queue 逐个分配 Consumer,如果不能整除,则将多余出的 Queue 按照 Consumer 顺序逐个分配。
(3)环形平均策略
环形平均算法是指,根据消费者的顺序,依次在由 queue 队列组成的环形图中逐个分配。
(4)一致性 hash 策略
该算法会将 consumer 的 hash 值作为 Node 节点存放到 hash 环上,然后将 queue 的 hash 值也放到 hash 环上,通过顺时针方向,距离 queue 最近的那个 consumer 就是该 queue 要分配的 consumer。
(5)同机房策略
该算法会根据 queue 的部署机房位置和 consumer 的位置,过滤出当前 consumer 相同机房的 queue。然后按照平均分配策略或环形平均策略对同机房 queue 进行分配。如果没有同机房 queue,则按照平均分配策略或环形平均策略对所有 queue 进行分配。
(6)至少一次原则
RocketMQ 有一个原则:每条消息必须要被成功消费一次。
Consumer 在消费完消息后会向其消费进度记录器提交其消费消息的 offset,offset 被成功记录到记录器中,那么这条消费就被成功消费了。
订阅关系的一致性指的是,同一个消费者组(Group ID 相同)下所有 Consumer 实例所订阅的 Topic与 Tag 及对消息的处理逻辑必须完全一致。否则,消息消费的逻辑就会混乱,甚至导致消息丢失。
消费进度 offset 是用来记录每个 Queue 的不同消费组的消费进度的。根据消费进度记录器的不同,可以分为两种模式:本地模式和沅程模式
(1)offset 本地模式
当消费模式为广播消息时,offset 使用本地模式存储。因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集。
Consumer 在广播消费模式下 offset 相关数据以 json 的形式持久化到 Consumer 本地磁盘文件中,默认文件路径为当前用户主目录下的 .rocketmqg_ offsets/${clientld}/${group} /Offsets.json。
(2)offset 远程管理模式
当消费模式为集群消费时,offset 使用远程模式管理。因为所有 Cosnumer 实例对消息采用的是均衡消费,所有 Consumer 共享 Queue 的消费进度。
Consumer 在集群消费模式下offset相关数据以 json 的形式持久化到 Broker 磁盘文件中,文件路径为当前用户主目录下的 store/config/consumerOffset.json。
Broker 启动时会加载这个文件,并写入到一个双层 Map。外层 map 的 key 为 topic@group,value 为内层 map。内层 map 的 key 为 queueld,value 为 offset。当发生 Rebalance 时,新的 Consumer 会从该 Map 中获取到相应的数据来继续消费。
(3)offset 用途
消费者要消费的第一条消息是通过 consumer.setConsumeFromWhere() 方法指定起始位置的。
在消费者启动后,其要消费的第一条消息的起始位置常用的有三种,这三种位置可以通过枚举类型常量设置,这个枚举类型为 ConsumeFromWhere。
当消费完一批消息后,Consumer 会提交其消费进度 offset 给 Broker,Broker 在收到消费进度后会将其更新到那个双层 Map 及 consumerOffset.json 文件中,然后向该 Consumer 进行 ACK,而 ACK 内容中包含三项数据:当前消费队列的最小 offset (minOffset)、最大 offset (maxOffset)、及下次消费的起始 offset (nextBeginOffset)。
(4)重试队列
当 RocketMQ 对消息的消费出现异常时,会将发生异常的消息的 offset 提交到 Broker 中的重试队列。系统在发生消息消费异常时会为当前的 topic@group 创建一个重试队列,该队列以 %RETRY% 开头,到达重试时间后进行消费重试。
(5)offset 的同步提交与异步提交
集群消费模式下,Consumer 消费完消息后会向 Broker 提交消费进度 offset,其提交方式分为两种:
同步提交:消费者在消费完一批消息后会向 Broker 提交这些消息的 offset,然后等待 Broker 的成功响应。若在等待超时之前收到了成功响应,则继续读取下一批消息进行消费。若没有收到响应,则会重新提交,直到获取到响应。而在这个等待过程中,消费者是阻塞的。其严重影响了消费者的吞吐量。
异步提交:消费者在消费完一批消息后向 Broker 提交 offset,但无需等待 Broker 的成功响应,可以继续读取并消费下一批消息。这种方式增加了消费者的吞吐量。但需要注意,Broker 在收到提交的 offset 后,还是会向消费者进行响应的。|
(1)消息幂等
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的。
在互联网应用中,尤其在网络不稳定的情况下,消息很有可能会出现重复发送或重复消费。如果重复的消息可能会影响业务处理,那么就应该对消息做幂等处理。
(2)消息重复的场景分析
(3)幂等解决方案
幂等解决方案的设计中涉及到两项要素:
对于常见的系统,幂等性操作的通用性解决方案是:
(4)消息幂等的实现
消费幂等的解决方案很简单:为消息指定不会重复的唯一标识。因为 Message ID 有可能出现重复的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 设置。
以支付场景为例,可以将消息的 Key 设置为订单号,作为幂等处理的依据。具体代码示例如下:
- Message message = new Message();
- message.setKey("OrderId_0001");
- SendResult sendResult = producer.send(message);
消费者收到消息时可以根据消息的 Key 即订单号来实现消费幂等:
- consumer.registerMessageListener(new MessageListenerConcurrently(){
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
- for(MessageExt msg: msgs){
- String key = msg.getKeys();
- // 具体根据key做幂等处理的逻辑
- ...
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
消息处理流程中,如果 Consumer 的消费速度跟不上 Producer 的发送速度,MQ 中未处理的消息会越来越多,这部分消息就被称为堆积消息。消息出现堆积进而会造成消息的消费延迟。
以下场景需要重点关注消息堆积和消费延迟问题:
消息堆积产生原因:
Consumer 使用长轮询 Pull 模式消费消息时,分为以下两个阶段:
单机线程数据
一般情况下,消费者端的消费并发度由单节点线程数和节点数量共同决定,其值为单节点线程数*节点数量。不过,通常需要优先调整单节点的线程数,若单机硬件资源达到了上限,则需要通过横向扩展来提高消费并发度。
对于一台主机中线程池中线程数的设置需要谨慎,不能盲目直接调大线程数,设置过大的线程数反而会带来大量的线程切换的开销。理想环境下单节点的最优线程数计算模型为:C*(T1 +T2)/T1。
消息是被顺序存储在 commitlog 文件的,且消息大小不定长,所以消息的清理是不可能以消息为单位进行清理的,而是以 commitlog 文件为单位进行清理的。否则会急剧下降清理效率,并实现逻辑复杂。
commitlog 文件存在一个过期时间,默认为72小时。除了用户手动清理外,在以下情况下也会被自动清理,无论文件中的消息是否被消费过:
普通消息发送类型:
同步发送消息:
同步发送消息是指 Producer 发出一条消息后,会在收到 MQ 返回的 ACK 之后才发下一条消息。该方式的消息可靠性最高,但消息发送效率太低。
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
- producer.setNamesrvAddr("192.168.136.10:9876");
- producer.start();
-
- for (int i = 0; i < 10; i++) {
- byte[] msgBody = ("This is SyncProducer -- " + i).getBytes();
- Message message = new Message("SyncProducer-TopicA", "SyncProducer-tagA", msgBody);
- SendResult sendResult = producer.send(message);
- System.out.println(sendResult);
- }
- producer.shutdown();
- }
异步发送消息:
异步发送消息是指 Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下一条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
- producer.setNamesrvAddr("192.168.136.10:9876");
- producer.start();
-
- for (int i = 0; i < 10; i++) {
- byte[] msgBody = ("This is AsyncProducer -- " + i).getBytes();
- Message message = new Message("AsyncProducer-TopicA", "AsyncProducer-tagA", msgBody);
- producer.send(message, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println(sendResult);
- }
-
- @Override
- public void onException(Throwable throwable) {
- throwable.printStackTrace();
- }
- });
- }
- // 注意:添加延时关闭,避免异步发送时producer被关闭
- TimeUnit.SECONDS.sleep(3);
- producer.shutdown();
- }

单项发送消息:
单向发送消息是指 Producer 仅负责发送消息,不等待、不处理 MQ 的 ACK,该发送方式时 MQ 也不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("SingWayProducer");
- producer.setNamesrvAddr("192.168.136.10:9876");
- producer.start();
-
- for (int i = 0; i < 10; i++) {
- byte[] msgBody = ("This is SingWayProducer -- " + i).getBytes();
- Message message = new Message("SingWayProducer-TopicA", "SingWayProducer-tagA", msgBody);
-
- producer.sendOneway(message);
- }
- producer.shutdown();
- }
顺序消息指的是,严格按照消息的发送顺序进行消费的消息。
默认情况下生产者会把消息以 Round Robin 轮询方式发送到不同的 Queue 分区队列,消费者会从多个 Queue 上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个 Queue 中,消费时也只从这个 Queue 上拉取消息,就严格保证了消息的顺序性。
有序性分类:
根据有序范围的不同,RocketMQ 可以严格地保证两种消息的有序性:分区有序与全局有序。
全局有序:当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序,称为全局有序。
分区有序:如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序。
示例:
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("OrderedProducer");
- producer.setNamesrvAddr("192.168.136.10:9876");
- producer.start();
-
- for (int i = 0; i < 10; i++) {
- Integer orderId = i;
- byte[] msgBody = ("This is OrderedProducer -- " + i).getBytes();
- Message message = new Message("OrderedProducer-TopicA", "OrderedProducer-tagD", msgBody);
- SendResult sendResult = producer.send(message, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> list, Message message, Object obj) {
- Integer id = (Integer) obj;
- int index = id % list.size();
- return list.get(index);
- }
- }, orderId);
- System.out.println(sendResult);
- }
- producer.shutdown();
- }

当消息写入到 Broker 后,在指定的时长后才可被消费处理的消息,称为延时消息。
采用 RocketMQ 的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。
延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在 RocketMQ 服务端的 Messagestoreconfig 类中的如下变量中:
messageDelayLeve1 = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h2h 1d
延时消息实现原理
Producer 将消息发送到 Broker 后,Broker 会首先将消息写入到 commitlog 文件,然后需要将其分发到相应的 consumequeue。在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发,若有则需要经历以下过程:
重新投递延时消息
Broker 内部有一个延迟消息服务类 ScheuleMessageService,其会消费 SCHEDULE_TOPIC_XXXX 中的消息,即按照每条消息的投递时间,将延时消息投递到目标 Topic 中。在投递之前会从 commitlog 中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标 Topic 中。
延迟消息服务类将延迟消息再次发送给了 commitlog,并再次形成新的消息索引条目,分发到相应 Queue。
示例:
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("DelayProducer");
- producer.setNamesrvAddr("192.168.136.10:9876");
- producer.start();
-
- for (int i = 0; i < 10; i++) {
- Integer orderId = i;
- byte[] msgBody = ("This is DelayProducer -- " + i).getBytes();
- Message message = new Message("DelayProducer-TopicA", "DelayProducer-tagD", msgBody);
- // 设置消息延时等级,3对应10s
- message.setDelayTimeLevel(3);
- SendResult sendResult = producer.send(message);
- System.out.println(sendResult);
- }
- producer.shutdown();
- }

(1)相关概念
分布式事务:
对于分布式事务,通俗地说就是,一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证操作结果的一致性。.
事务消息:
RocketMQ 提供了类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致 XA 是一种分布式事务解决方案,一种分布式事务处理模式。
半事务消息:
暂不能投递的消息,发送方已经成功地将消息发送到了 Broker,但是 Broker 未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到,处于该种状态下的消息即半事务消息。
本地事务状态:
Producer 回调操作执行的结果为本地事务状态,其会发送给 TC,而 TC 会再发送给 TM,TM 会根据 TC 发送来的本地事务状态来决定全局事务确认指令。
消息回查:
消息回查,即重新查询本地事务的执行状态。
RocketMQ中 的消息回查设置:
关于消息回查,有三个常见的属性设置,在 broker 加载的配置文件中设置:
(2)XA 模式
XA 协议:
XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,基于 XA 协议。XA 协议由 Tuxedo (Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的 Unix 事务系统)首先提出的,并交给 X/Open 组织,作为资源管理器与事务管理器的接口标准。
XA 模式中有三个重要组件:TC、TM、RM。
XA 模式架构:
XA 模式是一个典型的 2PC,其执行原理如下:
注意:
- 事务消息不支持延时消息
- 事务消息需要做好幂等检查
生产者进行消息发送时可以一次发送多条消息,这可以大大提升 Producer 的发送效率。不过需要注意以下几点:
批量发送大小:
默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:
生产者通过 send() 方法发送的 Message,并不是直接将 Message 序列化后发送到网络上的,而是通过这个 Message 生成了一个字符串发送出去的。这个字符串由四部分构成:Topic、消息Body、消息日志(占20字节),及用于描述消息的一堆属性 key-value。这些属性中包含例如生产者地址、生产时间、要发送的 Queueld 等。最终写入到 Broker中消息单元中的数据都是来自于这些属性。
Consumer 的 MessageListenerConcurrently 监听接口的 consumeMessage() 方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。若要使其一次可以消费多条消息,则可以通过修改 Consumer 的 consumeMessageBatchMaxSize 属性来指定。不过,该值不能超过32。因为默认情况下消费者每次可以拉取的消息最多是32条。若要修改一次拉取的最大值,则可通过修改 Consumer 的 pullBatchSize 属性来指定。
消费者示例:
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SyncConsumer");
- consumer.setNamesrvAddr("192.168.136.10:9876");
- // 指定消费位置
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- // 订阅topic和tag
- consumer.subscribe("SyncProducer-TopicA", "*");
- // 注册消息监听器
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- for (MessageExt msg : msgs) {
- System.out.println(msg);
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- }

DefaultLitePullConsumer:pull 消费者
DefaultMQPushConsumer:push 消费者
消息者在进行消息订阅时,除了可以指定要订阅消息的 Topic 外,还可以对指定 Topic 中的消息根据指定条件进行过滤,即可以订阅比 Topic 更加细粒度的消息类型。
对于指定 Topic 消息的过滤有两种过滤方式:Tag 过滤与 SQL 过滤。
Tag 过滤
通过 consumer 的 subscribe() 方法指定要订阅消息的 Tag,如果订阅多个 Tag 的消息,Tag 间使用或运算符(双竖线)连接。
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST");
- consumer.subscribe("TOPIC-NAME","TagA || TagB");
代码示例:
- // Producer 端
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
- producer.setNamesrvAddr("192.168.136.10:9876");
- producer.start();
-
- String[] tags = {"TagA", "TagB", "TagC", "TagD", "TagE"};
- for (int i = 0; i < 10; i++) {
- byte[] msgBody = ("This is TagFilterProducer-- " + i).getBytes();
- String tag = tags[i % tags.length];
- Message message = new Message("TagFilterProducer-TopicA", tag, msgBody);
- SendResult sendResult = producer.send(message);
- System.out.println(sendResult);
- }
- producer.shutdown();
- }
-
-
- // Consumer 端
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterConsumer");
- consumer.setNamesrvAddr("192.168.136.10:9876");
- // 指定消费位置
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- // 订阅topic和tag
- consumer.subscribe("TagFilterConsumer-TopicA", "TagA || TagC");
- // 注册消息监听器
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- for (MessageExt msg : msgs) {
- System.out.println(msg);
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- }

SQL 过滤
SQL 过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过 SQL 过滤,可以实现对消息的复杂过滤。
默认情况下 Broker 没有开启消息的 SQL 过滤,需要在 Broker 的配置文件中添加如下属性开启 SQL 过滤:
enablePropertyFilter=true
只有使用 PUSH 模式的消费者才能使用 SQL 过滤。
SOL 过滤表达式中支持多种常量类型与运算符。
支持的常量类型:
支持的运算符有:
代码示例:
- // Producer 端
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
- producer.setNamesrvAddr("192.168.136.10:9876");
- producer.start();
-
- String[] tags = {"TagA", "TagB", "TagC", "TagD", "TagE"};
- for (int i = 0; i < 10; i++) {
- byte[] msgBody = ("This is SqlFilterProducer -- " + i).getBytes();
- Message message = new Message("SqlFilterProducer-TopicA", msgBody);
- // 事先埋入用户属性
- message.putUserProperty("num", String.valueOf(i));
- SendResult sendResult = producer.send(message);
- System.out.println(sendResult);
- }
- producer.shutdown();
- }
-
-
- // Consumer 端
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
- consumer.setNamesrvAddr("192.168.136.10:9876");
- // 指定消费位置
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- // 订阅topic和 sql 过滤
- consumer.subscribe("SqlFilterConsumer-TopicA", MessageSelector.bySql("num > 8 or num <3"));
- // 注册消息监听器
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- for (MessageExt msg : msgs) {
- System.out.println(msg);
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- }

Producer 对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。
对于消息重投,需要注意以下几点:
消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略。
同步发送失败策略:
对于普通消息,消息发送默认采用 round-robin 策略来选择所发送到的队列。如果发送失败,默认重试2次,但在重试时是不会选择上次发送失败的 Broker,而是选择其它 Broker。
- DefaultMQProducer producer = new DefaultMQProducer("TestProducer");
- producer.setNamesrvAddr("192.168.100.001:9876");
- // 设置发送失败时重试发送的次数,默认2次
- producer.setRetryTimesWhenSendFailed(3);
- // 设置发送超时时间,默认3s
- producer.setSendMsgTimeout(5000);
- producer.start();
Broker 还具有失败隔离功能,使 Producer 尽量选择未发生过发送失败的 Broker 作为目标 Broker。
如果超过重试次数,则抛出异常,由 Producer 去保证消息不丢失。当生产者出现 RemotingException,MQClientException 和 MQBrokerException 时,Producer 会自动重投消息。
异步发送失败策略:
异步发送失败重试时,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,所以该策略无法保证消息不丢。
- DefaultMQProducer producer = new DefaultMQProducer("TestProducer");
- producer.setNamesrvAddr("192.168.110.001:9876");
- // 指定异步发送失败后不进行重试发送
- producer.setRetryTimesWhenSendAsyncFailed(0);
- producer.start();
消息刷盘失败策略:
消息刷盘超时或 slave 不可用(返回状态非 SEND_OK )时,默认是不会将消息尝试发送到其他 Broke r的。但是对于重要消息可以通过在 Broker 的配置文件设置 retryAnotherBrokerWhenNotStoreOK 属性为 true 来开启。
(1)消费者重试机制
顺序消息的消费重试
对于顺序消息,当 Consumer 消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。重试期间应用会出现消息消费被阻塞的情况。
顺序消息消费失败后,消费重试默认时间间隔为1000毫秒,取值范围为[10-30000]
无序消息的消费重试
对于无序消息,当 Consumer 消费消息失败时,可以通过设置返回状态达到消息重试的效果。不过需要注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息。
对于无序消息集群消费下的重试消费,每条消息默认最多重试16次,但每次重试的间隔时间是不同的,会逐渐变长。每次重试的间隔时间如下:
重试次数 | 与上次重试间隔时间 | 重试次数 | 与上次重试间隔时间 |
---|---|---|---|
1 | 10s | 9 | 7m |
2 | 30s | 10 | 8m |
3 | 1m | 11 | 9m |
4 | 2m | 12 | 10m |
5 | 3m | 13 | 20m |
6 | 4m | 14 | 30m |
7 | 5m | 15 | 1h |
8 | 6m | 16 | 2h |
通过 setMaxReconsumeTimes 可以修改重试次数:
- 当修改后次数<16,则按照系统规定间隔时间执行重试
- 当修改后次数超过16,则超过16次的重试时间间隔均为2h
(2)重试队列
对于需要重试消费的消息,并不是 Consumer 在等待了指定时长后再次去拉取原来的消息进行消费,而是将这些需要重试消费的消息放入到了一个特殊 Topic 的队列中,而后进行再次消费的。这个特殊的队列就是重试队列。
当出现需要进行重试消费的消息时,Broker 会为每个消费组都设置一个 Topic 名称为 %RETRY%consumerGroup@consumerGroup 的重试队列。
Broker 对于重试消息的处理是通过延时消息实现的,先将消息保存到 SCHEDULE_TOPIC_XXXX 延迟队列中,延迟时间到后,会将消息投递到 %RETRY%consumerGroup@consumerGroup 重试队列中。
(3)消费重试配置
集群消费方式下,消息消费失败后若希望消费重试,则需要在消息监听器接口的实现中明确进行如下三种方式之一的配置:
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- try {
- for (MessageExt msg : msgs) {
- System.out.println(msg);
- }
- } catch (Exception e) {
- // 方式1:返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
- // return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-
- // 方式2:返回 Null
- // return null;
-
- // 方式3:抛出异常
- // throw new RuntimeException("消费异常");
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });

集群消费方式下,消息消费失败后若不希望消费重试,则在捕获到异常后同样也返回与消费成功后的相同的结果,即 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,则不进行消费重试。
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- try {
- for (MessageExt msg : msgs) {
- System.out.println(msg);
- }
- } catch (Exception e) {
- // 异常也返回成功
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
当一条消息初次消费失败,消息队列会自动进行消费重试,当达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。这个队列就是死信队列(Dead-Letter Queue,DLQ),而其中的消息则称为死信消息(Dead-Letter Message,DLM)。|
死信队列的特征:
死信消息的处理:
当一条消息进入死信队列,就意味着系统中某些地方出现了问题,从而导致消费者无法正常消费该消息,比如代码中原本就存在 Bug。因此,对于死信消息,通常需要开发人员进行特殊处理。最关键的步骤是要排查可疑因素,解决代码中可能存在的 Bug,然后再将原来的死信消息再次进行投递消费。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。