当前位置:   article > 正文

Apache RocketMQ知识点表格总结及示例

Apache RocketMQ知识点表格总结及示例

        RocketMQ的使用大概还是四五年前的时候,最近几年参与的项目基本上都是使用的RabbitMQ,就我个人来说,我也更喜欢使用RabbitMQ,不过消息队列的功能都差不多,大同小异。原理也类似。我喜欢使用RabbitMQ,是因为RabbitMQ安装部署和管理更方便,而且学习也很容易,上手容易,官方的示例也很友,而且功能也很强大,这篇先总结RocketMQ,然后再开一篇总结一下RabbitMQ,还会再来一篇Kafka的总结,不过Kafka我还没在项目中使用过,只是自己空闲时间学习过。

   RocketMQ是一款高性能、高吞吐量、低延迟的分布式消息中间件,由Alibaba开源,后捐赠给Apache软件基金会,成为Apache顶级项目.

目录

 1.RocketMQ基本概念

2. RocketMQ 架构组成

3. RocketMQ 生产者(Producer)

4.RocketMQ 消费者(Consumer)

5.RocketMQ 的消息队列(Queue)

6.RocketMQ的消息(Message)概念

7.RocketMQ 事务消息

8. RocketMQ 消息顺序性

9.RocketMQ 延时消息概念

10.RocketMQ 消息轨迹与监控

11.RocketMQ 集群模式

12.RocketMQ 消息过滤与标签订阅概念

13.RocketMQ 消息重试与死信队列

14.RocketMQ 消息持久化与存储

15.RocketMQ 高可用性(High Availability, HA)

16.RocketMQ 事务性

17.RocketMQ 广播与集群消费

18.RocketMQ NameServer与Broker角色

19.RocketMQ 消息批处理

20.RocketMQ 消息桥接与流控

21.Spring Boot整合RocketMQ

22. 一些面试问题


 1.RocketMQ基本概念
概念描述
消息模型基于发布/订阅模式,支持高吞吐量的消息传递。
高性能支持大规模分布式系统的消息传递,具有高吞吐量和低延迟。
可靠性提供持久化消息存储,确保消息不会因为系统故障而丢失。
灵活性支持不同的消息传递模式,如点对点、发布/订阅等。
容错性支持消息队列的高可用部署,能够在节点故障时自动恢复。
可扩展性可以水平扩展,通过增加更多的Broker来提高系统容量。
多样化的客户端提供了多种语言的客户端,如Java、C++、Python等。
事务消息支持事务消息,确保消息的原子性和一致性。
顺序消息支持按照特定顺序传递消息,满足严格的顺序要求。
延时消息允许设置消息的延迟时间,实现定时发送。
消息轨迹提供消息的发送和消费轨迹追踪,方便问题排查。
监控与报警集成了监控系统,可以实时监控集群状态,并设置报警阈值。
2. RocketMQ 架构组成
组件功能描述
NameServer轻量级的服务发现和路由注册中心,负责Broker集群的地址发现。
Broker消息存储服务器,负责消息的存储、投递和查询。
Producer消息生产者,负责发送消息到Broker。
Consumer消息消费者,负责从Broker订阅并消费消息。
Topic消息主题,是消息的分类单位,可以绑定一个或多个队列。
Queue消息队列,用于存储消息,每个Topic可以有多个Queue。
PushConsumer用于订阅Topic的消费者,支持集群消费和广播消费。
PullConsumer用于主动拉取消息的消费者。
Message消息实体,包含了消息体和消息属性。
Tag消息标签,用于对消息进行分类,方便消费者按标签订阅消息。
Cluster由多个Broker组成的集群,提供高可用和负载均衡。
Master/Slave主从模式,Master负责写操作,Slave负责读操作,支持数据同步。
3. RocketMQ 生产者(Producer)
概念描述
发送消息生产者将消息发送到Broker。
异步发送非阻塞发送,发送后立即返回,通过回调函数处理发送结果。
同步发送阻塞发送,发送后等待确认,确保消息发送成功。
批量发送一次发送多条消息,提高发送效率。
消息过滤通过消息标签(Tag)对消息进行过滤,只发送到指定标签的队列。
事务消息支持事务消息,确保消息发送的原子性。
延时消息设置消息的延时级别,实现定时发送。
消息重试发送失败时,支持消息重试机制。
消息确认通过回调确认消息是否成功发送。
消息压缩支持对消息体进行压缩,减少网络传输量。
消息序列化支持自定义消息序列化方式,以适应不同的数据格式。
负载均衡通过内置的负载均衡机制,将消息均匀分配到不同的队列。
自定义发送策略可以自定义发送策略,如轮询、随机等。

 生产者是RocketMQ中负责发送消息的组件,它支持多种发送方式和高级特性,以满足不同的业务需求。

4.RocketMQ 消费者(Consumer)
概念描述
消息消费消费者从Broker拉取或接收消息进行处理。
推模式(Push)消费者订阅主题,Broker将消息推送给消费者。
拉模式(Pull)消费者主动从Broker拉取消息。
集群消费多个消费者实例组成集群,共同消费消息,提高消费能力。
广播消费每个消费者实例独立消费所有消息,适用于需要每条消息都被处理的场景。
消息确认消费者处理完消息后,向Broker确认消息已被成功处理。
消费偏移量记录消费者消费到的位置,确保消息不会被重复消费。
幂等性保证消息处理的幂等性,避免因重复消息导致的问题。
消费过滤通过消息标签(Tag)对消息进行过滤,只消费特定标签的消息。
顺序消费保证消息按照发送顺序被消费。
事务消费支持事务消息的消费,确保数据的一致性。
消息重试处理失败的消息可以进行重试,直到成功或达到重试上限。
消费速率控制控制消息的消费速率,避免因消费速度过快导致的问题。
消费超时设置消息消费的超时时间,超时后Broker会将消息重新分配。
消费分组消费者可以分组,同一组内的消费者不会消费相同的消息。

 消费者是RocketMQ中负责接收和处理消息的组件,它支持多种消费模式和高级特性,以满足不同的消费需求。

5.RocketMQ 的消息队列(Queue)
概念描述
队列模型消息存储的基本单元,每个Topic下可以有多个队列。
负载均衡Broker可以创建多个队列,实现消息的负载均衡。
高可用通过主从复制,实现队列的高可用性。
持久化消息存储在磁盘上,保证消息不会因为Broker故障而丢失。
顺序消息保证同一个队列中的消息按照发送顺序被消费。
消息过滤支持通过消息标签(Tag)对消息进行过滤。
消息重试队列处理失败的消息可以被放入重试队列,进行重试处理。
死信队列重试超过一定次数后,消息可以被放入死信队列。
延迟队列支持设置消息的延迟时间,实现定时发送。
队列状态可以查询每个队列的状态,包括消息数量、消费者数量等。
队列配置可以对队列进行配置,如设置队列的读写权限、消息保留时间等。
队列监控提供队列的监控信息,如消息流入流出速率、队列大小等。
队列优先级可以设置队列的优先级,影响消息的投递顺序。

 消息队列是RocketMQ中用于存储消息的基本单元,它支持多种特性以满足不同的消息存储和消费需求。

6.RocketMQ的消息(Message)概念
概念描述
消息体消息的主要内容,可以是任何二进制数据。
消息属性描述消息特征的键值对,如消息ID、主题、标签、业务属性等。
消息标签用于对消息进行分类的标签,方便消费者根据标签订阅消息。
消息ID消息的唯一标识,用于追踪和识别消息。
消息重试消息发送或消费失败时,可以进行重试。
消息顺序保证同一个消息队列中的消息按照发送顺序被消费。
事务消息支持事务机制,确保消息发送的原子性和一致性。
延时消息可以设置消息的延时级别,实现定时发送。
消息确认消费者处理完消息后,向Broker确认消息已被成功处理。
消息过滤支持通过消息标签对消息进行过滤,只消费特定标签的消息。
消息压缩支持对消息体进行压缩,减少网络传输量和存储空间。
消息轨迹提供消息的发送和消费轨迹追踪,方便问题排查。
消息批处理支持批量发送和消费消息,提高效率。
消息大小限制单条消息有大小限制,通常为4MB。
消息生命周期可以设置消息在队列中的存活时间。

 消息是RocketMQ中数据传递的基本单位,它包含了消息体和一系列描述消息特征的属性。

7.RocketMQ 事务消息
概念描述
事务消息一种特殊的消息类型,用于确保消息发送与本地事务的原子性。
事务状态事务消息在发送时会附带一个状态,表明事务的进行状态。
半消息发送事务消息时,首先发送的是半消息,等待事务状态确认。
事务反悔如果事务失败,可以通过事务反悔机制回滚半消息。
事务提交本地事务执行成功后,确认半消息,将其转换为正常消息。
事务回滚本地事务执行失败时,可以回滚半消息,不将其发送给消费者。
事务超时如果事务状态确认超时,Broker会认为事务失败并回滚消息。
事务日志事务消息处理过程中,相关操作会被记录在事务日志中。
事务监听器用于监听本地事务状态,根据事务状态来确认或回滚半消息。
事务消息的重试事务消息在发送失败时可以进行重试。
事务消息的顺序事务消息可以保证在同一个队列中发送的顺序性。
事务消息的幂等性事务消息的设计保证了消息处理的幂等性,避免重复处理。
事务消息的监控提供事务消息的监控,可以追踪事务消息的状态。

事务消息是RocketMQ中非常重要的特性,它确保了本地事务的执行与消息发送能够原子性地完成。 

8. RocketMQ 消息顺序性
概念描述
顺序消息保证消息按照发送的顺序被消费。
全局顺序所有消息按照发送的全局顺序进行消费。
分区顺序每个队列(Partition)内的消息保持发送的顺序。
顺序消费消费者按照消息的发送顺序进行消费。
顺序标签用于标识顺序消息,通常是一个递增的序列号。
消息队列分配顺序消息通常需要在同一个队列中处理,以保持顺序性。
顺序消息的实现通过在发送和消费时附加顺序标签来实现顺序性。
顺序消息的性能顺序消息可能会因为队列瓶颈而影响整体性能。
顺序消息的应用场景需要严格保证事务顺序的业务场景,如交易系统。
顺序消息的可靠性即使在Broker故障的情况下,顺序消息也能保持顺序性。
顺序消息的重试如果消费失败,顺序消息支持重试机制,同时保持消息顺序。
顺序消息的事务可以结合事务消息使用,确保事务的顺序性和原子性。
顺序消息的监控提供顺序消息的监控,可以追踪消息的顺序状态。

 顺序性是RocketMQ中的一个重要特性,它在需要严格保证消息处理顺序的场景中非常重要。

9.RocketMQ 延时消息概念
概念描述
延时消息一种特殊类型的消息,可以指定一个延迟时间,在这个时间之后消息才可供消费。
延迟级别RocketMQ支持不同的延迟级别,以满足不同的延时需求。
定时轮转系统会根据消息的延迟级别将消息在不同的队列间轮转,直到可以被消费。
定时器使用定时器来触发延迟消息的投递。
消息状态延迟消息在系统中会经历不同的状态,包括等待、可投递等。
定时发送允许用户指定一个具体的时间点来发送消息。
定时消费消费者可以消费已经达到延迟时间的消息。
定时器线程通常有一个专门的线程来处理延迟消息的投递。
性能影响延迟消息可能会对系统的性能产生一定影响,尤其是在大量延迟消息时。
消息超时如果延迟时间设置过长,可能会导致消息超时。
定时器精度定时器的精度会影响延迟消息的准确投递。
消息重试延迟消息在投递失败时可以进行重试。
定时任务调度延迟消息的投递可以看作是一种定时任务调度。

 延时消息是RocketMQ中用于实现定时发送和消费消息的机制,它在需要定时触发业务逻辑的场景中非常有用

10.RocketMQ 消息轨迹与监控
概念描述
消息轨迹记录消息从发送到消费的整个生命周期,包括发送时间、消费时间、消费次数等。
消息追踪提供消息的发送和消费轨迹追踪,方便问题排查和性能分析。
消费重试记录消息消费失败和重试的次数和结果。
NameServer监控监控NameServer的状态,包括CPU、内存使用率等。
Broker监控监控Broker的状态,包括队列消息数、消息流入流出速率、磁盘使用情况等。
Producer监控监控Producer的消息发送情况,如发送速率、失败率等。
Consumer监控监控Consumer的消息消费情况,如消费速率、消费延迟等。
集群监控提供整个RocketMQ集群的监控视图,展示集群的健康状况。
告警系统可以设置告警阈值,当监控指标超过阈值时触发告警。
日志记录记录系统运行的日志,包括错误日志、操作日志等。
性能指标提供系统的性能指标,如吞吐量、延迟、系统负载等。
可视化界面提供图形化的监控界面,方便用户直观地查看系统状态。
自定义监控支持用户自定义监控项,以满足特定的监控需求。

 消息轨迹和监控是RocketMQ中用于确保系统稳定性和可观测性的重要特性,它们对于维护和优化消息系统非常关键

11.RocketMQ 集群模式
概念描述
主从模式一个Broker可以配置为主节点(Master)或从节点(Slave)。
故障转移在Master节点故障时,Slave节点可以接管,保证服务的可用性。
负载均衡集群可以分散消息负载,提高整体的处理能力。
数据同步Slave节点通过同步Master节点的数据来保持数据一致性。
读写分离通常Master负责写操作,Slave负责读操作。
集群部署可以部署多个Broker节点,形成集群以提高吞吐量和可用性。
NameServer集群NameServer也可以以集群模式运行,提高发现服务的可靠性。
Broker集群Broker节点组成集群,可以提供更好的负载均衡和故障转移。
高可用性通过集群模式,RocketMQ可以提供高可用的消息服务。
自动故障恢复集群可以自动处理节点故障,减少人工干预。
多数据中心支持跨数据中心部署,提高容灾能力。
集群监控提供集群级别的监控,可以查看整个集群的状态。
集群管理提供集群管理工具,方便对集群进行配置和管理。

 集群模式是RocketMQ中用于提高消息系统可用性和吞吐量的关键特性

12.RocketMQ 消息过滤与标签订阅概念
概念描述
消息过滤允许消费者根据特定的规则过滤消息,只消费感兴趣的消息。
标签订阅通过为消息设置标签,消费者可以订阅带有特定标签的消息。
SQL表达式支持使用SQL92标准的表达式进行更复杂的消息过滤。
消息属性利用消息的属性字段进行过滤,如消息头、消息体等。
标签匹配消费者可以通过指定标签与消息的标签进行匹配,实现订阅。
批量过滤支持批量过滤消息,提高消费效率。
过滤规则管理提供管理界面或API,方便用户设置和管理过滤规则。
过滤性能过滤操作可能会影响消息处理的性能,需要合理设计过滤规则。
过滤后的消息确认过滤掉的消息不会被确认,也不会被消费。
过滤表达式的优化合理编写过滤表达式,可以减少不必要的消息处理。
消息队列的选择过滤和订阅可以结合队列的概念,实现更细粒度的控制。
消息消费模式过滤和订阅机制支持Push和Pull两种消息消费模式。
消息过滤的事务性过滤操作不会影响事务消息的处理。

 消息过滤和标签订阅是RocketMQ中用于提高消息消费效率和灵活性的重要特性,它们允许消费者只处理相关的消息,从而优化资源利用和业务逻辑处理。

13.RocketMQ 消息重试与死信队列
概念描述
消息重试当消息消费失败时,系统会自动将消息重新放入队列进行重试。
重试次数可以设置消息的重试次数,超过重试次数后将不再重试。
死信队列超过重试次数的消息可以被放入死信队列,进行特殊处理。
消费异常消费者处理消息时出现的异常情况,如业务逻辑错误等。
重试策略可以自定义重试策略,如立即重试、延迟重试等。
重试延迟设置消息重试的时间间隔,避免频繁重试导致的性能问题。
死信队列的作用用于存储无法正常消费的消息,方便后续分析和处理。
死信队列的监控对死信队列进行监控,及时发现和处理消费异常。
消息状态跟踪跟踪消息的重试次数和状态,以便于问题排查。
重试队列除了死信队列,还可以设置重试队列,用于存放需要重试的消息。
业务幂等性确保消息重复消费时,业务逻辑能够正确处理,避免数据异常。
重试与事务消息事务消息的重试需要保证事务的最终一致性。
重试与顺序消息顺序消息的重试需要保持消息的顺序性。

 消息重试和死信队列是RocketMQ中处理消费失败消息的重要机制,它们帮助系统更有效地管理消息消费过程中的异常情况。

14.RocketMQ 消息持久化与存储
概念描述
持久化消息数据写入磁盘,保证消息不会因为Broker故障而丢失。
存储机制RocketMQ使用高效的存储机制,支持大数据量的持久化存储。
CommitLog消息主体内容存储在CommitLog中,它是消息存储的核心。
ConsumeQueue消息索引存储在ConsumeQueue中,提供快速的消息索引。
消息刷盘消息写入内存后,定期或达到一定量时写入磁盘。
同步刷盘消息写入磁盘的操作是同步进行的,确保数据的一致性。
异步刷盘消息写入磁盘的操作是异步进行的,提高消息发送的吞吐量。
存储空间分配可以根据需要动态地分配存储空间给不同的队列。
存储空间回收系统会自动回收已经消费确认的消息占用的存储空间。
存储空间监控提供存储空间使用情况的监控,避免空间耗尽导致服务中断。
数据一致性通过CommitLog和ConsumeQueue的配合,保证数据的一致性。
存储性能优化支持对存储系统进行优化,如使用SSD、调整刷盘策略等。
数据备份支持数据的备份和恢复,提高数据的安全性。
存储配置提供存储相关的配置选项,如刷盘方式、存储路径等。

 消息的持久化和存储是RocketMQ中确保消息可靠性的重要特性,它们对于保证消息不丢失和系统稳定运行非常关键。

15.RocketMQ 高可用性(High Availability, HA)
概念描述
主从架构Broker采用主从架构,主节点负责写操作,从节点负责读操作。
故障转移在主节点故障时,从节点可以自动接管,继续提供服务。
数据同步从节点实时同步主节点的数据,保证数据的一致性。
NameServer HANameServer可以组成集群,避免单点故障。
Broker HABroker也可以组成集群,提高消息存储的可靠性。
自动故障检测系统会实时监控节点状态,自动进行故障检测。
自动故障恢复一旦检测到故障,系统会自动进行故障恢复。
集群状态监控提供集群状态的监控,方便管理员了解集群的健康状况。
故障恢复时间故障恢复的时间非常短,对业务的影响很小。
数据不丢失即使在节点故障的情况下,也能保证数据不丢失。
无感知故障转移故障转移对消费者和生产者来说是透明的,无需人工干预。
多数据中心支持跨数据中心部署,提高容灾能力。
高可用性配置提供高可用性相关的配置选项,如同步/异步刷盘、主从切换策略等。

 高可用性是RocketMQ中确保消息服务稳定运行的关键特性,通过主从架构、故障转移、数据同步等机制,RocketMQ能够在节点故障的情况下快速恢复服务。

16.RocketMQ 事务性
概念描述
事务消息支持本地事务的执行与消息发送的原子性。
事务状态事务消息发送时会附带事务状态,可以是半消息、提交或回滚。
本地事件生产者本地事务的执行结果,决定事务消息的最终状态。
事务反悔如果本地事务失败,可以通过事务反悔机制回滚事务消息。
事务提交本地事务成功执行后,提交事务消息,使其对消费者可见。
事务回滚本地事务失败时,回滚事务消息,不提交给消费者。
事务日志记录事务消息的状态变更,用于事务状态的确认和恢复。
事务超时如果事务状态确认超时,Broker会将事务消息标记为回滚。
幂等性事务消息的设计保证了消息处理的幂等性,避免重复处理。
事务消息的重试事务消息在发送失败时可以进行重试。
事务消息的顺序性事务消息可以保证在同一个队列中发送的顺序性。
事务消息的监控提供事务消息的监控,可以追踪事务消息的状态。

 事务性是RocketMQ中用于确保消息发送与本地事务一致性的重要特性,它通过事务消息的机制来实现。

17.RocketMQ 广播与集群消费
概念描述
广播消费消息发送到所有的消费实例,每个实例都会独立消费消息。
集群消费消息在多个消费实例中负载均衡,每个实例消费不同的消息子集。
消费分配集群消费模式下,消费任务在消费者之间进行分配。
消费确认集群消费时,每个消费者都需要独立确认消息。
消费偏移量记录每个消费者消费到的位置,确保消息不会被重复消费。
消费重试消费失败时,可以重试,直到成功或达到重试上限。
消费幂等性保证消息即使被重复消费,也不会导致业务逻辑错误。
消费顺序性集群消费可能无法保证消息的全局顺序性,但可以保证单个消费者内的消息顺序性。
消费策略可以配置不同的消费策略,如广播、集群等。
消费实例管理需要管理消费实例的注册、发现和状态同步。
消费性能集群消费可以提高消费性能,但也可能增加资源消耗。
消息丢失在广播模式下,如果某个实例失败,可能会丢失消息。
消费监控需要监控每个消费实例的状态,及时发现和处理消费异常。

 广播消费和集群消费是RocketMQ中用于提高消息消费效率和可靠性的两种模式,它们适用于不同的业务场景。

18.RocketMQ NameServer与Broker角色
概念描述
NameServerNameServer在RocketMQ中充当服务发现和路由注册中心的角色。
BrokerBroker是实际存储消息的节点,负责消息的发送和接收。
服务发现NameServer允许客户端动态发现Broker地址,实现负载均衡。
路由注册Broker向NameServer注册自己的路由信息,包括地址和队列信息。
轻量级NameServer是一个轻量级的组件,不存储消息,只负责服务发现。
集群部署NameServer和Broker都支持集群部署,提高系统的可用性和可靠性。
NameServer集群多个NameServer实例可以组成集群,避免单点故障。
Broker集群多个Broker实例可以组成集群,实现消息存储的高可用。
故障转移NameServer可以感知Broker的故障,并更新路由信息。
动态更新NameServer可以动态更新Broker的路由信息,无需重启。
消息发送Producer通过NameServer获取Broker地址,然后直接发送消息到Broker。
消息消费Consumer通过NameServer获取Broker地址,然后从Broker拉取消息。
NameServer监控提供NameServer的监控信息,包括CPU、内存使用率等。
Broker监控提供Broker的监控信息,包括队列消息数、消息流入流出速率等。

 NameServer和Broker是RocketMQ架构中的关键组件,它们共同工作以实现消息的发送、接收和存储。

19.RocketMQ 消息批处理
概念描述
批量发送Producer可以一次发送多条消息,提高发送效率。
批量拉取Consumer可以一次拉取多条消息,减少网络请求次数。
批处理机制RocketMQ内部实现了批处理机制,优化消息处理性能。
消息缓冲Producer和Consumer可以缓冲一批消息后再进行发送或拉取。
事务消息批处理事务消息也支持批量发送,但需保证事务的一致性。
延时消息批处理延时消息同样可以批量发送,但需按照延时级别进行分类。
批处理大小可以配置每次批处理的消息数量,平衡性能和资源消耗。
批处理策略可以根据业务需求定制批处理策略,如消息大小、数量等。
性能优化批处理可以显著提高消息处理的性能,减少I/O操作。
内存管理需要合理管理内存,避免因批量处理过多消息导致的内存溢出。
批处理与顺序性批处理消息时需保持消息的顺序性,尤其是顺序消息。
批处理与事务性批处理事务消息时,需保证事务的原子性和一致性。
监控和跟踪对批处理的消息进行监控和跟踪,确保消息处理的正确性。

消息批处理是RocketMQ中用于提高消息发送和消费效率的重要特性,它通过减少网络请求和I/O操作来优化性能

20.RocketMQ 消息桥接与流控
概念描述
消息桥接RocketMQ支持与其他消息系统的桥接,实现消息互通。
异构系统整合可以作为中间件,整合不同的消息系统和服务。
消息转换在桥接过程中,支持不同消息格式之间的转换。
流控机制用于控制消息流入和流出的速率,防止系统过载。
流量整形通过流控,可以平滑流量波动,避免流量高峰时的系统压力。
令牌桶算法一种常见的流控算法,通过令牌的生成和消耗来控制流量。
消息队列大小限制可以限制消息队列的大小,避免内存或存储资源耗尽。
QoS保证提供服务质量保证,即使在高负载情况下也能保证消息的基本处理。
动态流控根据系统的实际负载动态调整流控策略。
消费速率限制可以限制消费者的最大消费速率,避免过快消费导致的问题。
生产者速率限制可以限制生产者的最大发送速率,避免消息积压。
资源隔离通过资源隔离,确保关键业务的消息处理不受影响。
监控和报警提供流控相关的监控指标,并可以设置报警阈值。

消息桥接和流控是RocketMQ中用于实现不同消息系统间的互联互通以及系统流量控制的重要特性。

21.Spring Boot整合RocketMQ

Spring Boot整合RocketMQ可以通过以下步骤实现:

  1. 添加依赖:在Spring Boot项目的pom.xml文件中添加RocketMQ的Spring Boot Starter依赖。
  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>erocketmq-spring-boot-starter</artifactId>
  4. <version>版本号</version>
  5. </dependency>

     2.配置文件:在application.propertiesapplication.yml中配置RocketMQ的NameServer地址和其他相关配置。

rocketmq.name-server=127.0.0.1:9876
# 其他配置...

       3.生产者配置:配置生产者相关的属性,如组名、消息发送超时时间等。

rocketmq.producer.group=my-group
rocketmq.producer.sendMessageTimeout=10000

       4.消费者配置:配置消费者相关的属性,如消费组名、每次拉取的消息数量等。

rocketmq.consumer.group=my-consumer-group
rocketmq.consumer.pull-batch-size=10

         5. 编写生产者代码:使用RocketMQTemplate来发送消息。

  1. @RestController
  2. public class ProducerController {
  3. @Autowired
  4. private RocketMQTemplate rocketMQTemplate;
  5. @GetMapping("/send")
  6. public String send() {
  7. Message<String> message = MessageBuilder.withPayload("Hello, RocketMQ").build();
  8. SendResult sendResult = rocketMQTemplate.syncSend("my-topic", message);
  9. return "Send result: " + sendResult;
  10. }
  11. }

      6.编写消费者代码:使用@RocketMQMessageListener注解来创建消费者。

  1. @Service
  2. @RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group")
  3. public class MyConsumer implements RocketMQListener<String> {
  4. @Override
  5. public void onMessage(String message) {
  6. System.out.println("Received message: " + message);
  7. }
  8. }

        7. 启动应用:运行Spring Boot应用,生产者和消费者将会自动启动。

        8.消息轨迹和事务消息:根据需要配置和使用消息轨迹功能以及事务消息功能。

注意:当然正常启动使用的前提是你已经安装了RocketMQ的服务。去官网下载软件安装。

22. 一些面试问题

22.1.消息追踪和回溯功能在哪些场景下特别有用?

 消息追踪功能的应用场景:

  1. 问题诊断:当系统出现问题时,如消息处理失败,消息追踪可以帮助快速定位问题源头,了解消息在系统中的流动路径。

  2. 性能优化:通过追踪消息的传递时间,可以分析系统的性能瓶颈,如消息在某个Broker处理过慢,从而进行针对性的优化。

  3. 安全审计:在需要记录操作日志和审计的场景中,消息追踪可以提供消息的详细历史,包括谁发送了消息,谁消费了消息,以及处理的时间点。

  4. 数据一致性校验:在分布式事务中,追踪消息可以确保各个系统组件之间的数据一致性,尤其是在微服务架构中。

  5. 业务监控:业务分析师可以使用消息追踪来监控关键业务流程的执行情况,如订单处理流程。

消息回溯功能的应用场景:

  1. 数据恢复:在数据丢失或损坏的情况下,可以通过消息回溯将消息消费进度回溯到之前的状态,重新处理消息。

  2. 错误修正:如果由于消费端的逻辑错误导致数据处理不正确,可以利用消息回溯重新消费消息,修正错误。

  3. 系统回滚:在系统升级或变更后,如果发现新版本存在问题,可以回溯到系统变更前的状态,以减少对业务的影响。

  4. 数据核对:在财务或交易系统中,定期进行数据核对时,可能需要重新处理一段时间内的所有消息以确保数据的准确性。

  5. 审计要求:某些行业可能有严格的审计要求,需要定期或按需重新检查消息处理的历史记录。

  6. 复杂查询:在需要进行复杂数据分析或生成报告时,可以利用消息回溯功能重新处理特定时间段内的消息。

通过结合消息追踪和回溯功能,RocketMQ为用户提供了强大的工具来确保消息系统的可靠性和可审计性。这些功能对于维护系统稳定性、提高问题解决效率以及满足合规性要求都至关重要。

 22.2.RocketMQ是如何实现消息去重的

RocketMQ 支持消息去重,以确保消息不会因为网络波动、客户端重试等原因而被重复处理。实现消息去重通常涉及以下几个关键步骤:

1.唯一标识符(ID)

每个消息都有一个业务标识符,这个标识符通常是业务相关的,比如订单号、交易号等。这个标识符在发送消息时由生产者指定,并作为消息属性发送。

2.去重存储

消费者在处理消息之前,会将接收到的消息ID与去重存储中的ID进行比对。去重存储可以是一个外部数据库、内存数据结构、分布式缓存(如Redis)等。

3.检查与处理

  • 如果ID已存在:说明该消息已经被处理过,消费者可以忽略这条消息或将其标记为已消费,而不执行实际的业务逻辑处理。
  • 如果ID不存在:消费者将该消息ID存入去重存储,并正常处理这条消息。

4.去重策略

去重策略可以是:

  • 本地去重:在消费者本地进行去重,适用于单实例部署,但不支持集群消费。
  • 分布式去重:使用分布式存储进行去重,支持集群消费,但可能会增加延迟。

5.去重存储的持久化 

为了保证去重信息的持久性,去重存储需要定期持久化到稳定存储中,防止重启或故障导致去重信息丢失。

6.去重信息的清理

长时间运行后,去重存储中会积累大量已处理的消息ID。需要定期清理这些ID,以避免存储空间的无限增长。

示例实现:

假设使用Redis作为去重存储:

  1. String messageId = "order_12345";
  2. boolean isDuplicate = redis.exists(messageId);
  3. if (!isDuplicate) {
  4. // 处理消息
  5. redis.set(messageId, true);
  6. } else {
  7. // 消息已处理,忽略
  8. }

在上述示例中,redis.exists(messageId) 检查消息是否已经被处理过,如果没有,则处理消息并使用 redis.set(messageId, true) 将消息ID标记为已处理。

消息去重是一个重要的特性,特别是在需要保证消息处理幂等性的业务场景中。然而,去重也会带来额外的性能开销和复杂性,因此需要根据实际业务需求来权衡是否启用去重功能。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/538389
推荐阅读
相关标签
  

闽ICP备14008679号