当前位置:   article > 正文

15. RocketMQ面试题汇总

15. RocketMQ面试题汇总

Java全栈面试题汇总目录-CSDN博客

1. RocketMQ是什么?

RocketMQ是阿里开源的分布式消息中间件。具有高性能、低延时和高可靠等特性。主要用来提升性能、系统解耦、流量削峰等。

2. RocketMQ有什么特点?

1. 灵活可扩展性

RocketMQ天然支持集群,其核心四组件(Name Server、Broker、Producer、Consumer)每一个都可以在没有单点故障的情况下进行水平扩展。

2. 海量消息堆积能力

RocketMQ采用零拷贝原理实现超大的消息的堆积能力,据说单机已可以支持亿级消息堆积,而且在堆积了这么多消息后依然保持写入低延迟。

3. 支持顺序消息

可以保证消息消费者按照消息发送的顺序对消息进行消费。顺序消息分为全局有序和局部有序,一般推荐使用局部有序,即生产者通过将某一类消息按顺序发送至同一个队列来实现。

4. 多种消息过滤方式

消息过滤分为在服务器端过滤和在消费端过滤。服务器端过滤时可以按照消息消费者的要求做过滤,优点是减少不必要消息传输,缺点是增加了消息服务器的负担,实现相对复杂。消费端过滤则完全由具体应用自定义实现,这种方式更加灵活,缺点是很多无用的消息会传输给消息消费者。

5. 支持事务消息

RocketMQ除了支持普通消息,顺序消息之外还支持事务消息,这个特性对于分布式事务来说提供了又一种解决思路。

6. 回溯消费

回溯消费是指消费者已经消费成功的消息,由于业务上需求需要重新消费,RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

3. 你们为什么使用MQ,具体的使用场景是什么?

MQ的作用很简单,削峰填谷。以电商交易下单的场景来说,正向交易的过程可能涉及到创建订单、扣减库存、扣减活动预算、扣减积分等等。每个接口的耗时如果是100ms,那么理论上整个下单的链路就需要耗费400ms,这个时间显然是太长了。

如果这些操作全部同步处理的话,首先调用链路太长影响接口性能,其次分布式事务的问题很难处理,这时候像扣减预算和积分这种对实时一致性要求没有那么高的请求,完全就可以通过MQ异步的方式去处理了。同时,考虑到异步带来的不一致的问题,我们可以通过job去重试保证接口调用成功,而且一般公司都会有核对的平台,比如下单成功但是未扣减积分的这种问题可以通过核对作为兜底的处理方案。

使用MQ之后我们的链路变简单了,同时异步发送消息我们的整个系统的抗压能力也上升了。 

4. 那你们使用什么MQ,基于什么做的选型?

  1. 由于我们系统的QPS压力比较大,所以性能是首要考虑的要素。
  2. 开发语言,由于我们的开发语言是java,主要是为了方便二次开发。
  3. 对于高并发的业务场景是必须的,所以需要支持分布式架构的设计。
  4. 功能全面,由于不同的业务场景,可能会用到顺序消息、事务消息等。

基于以上几个考虑,我们最终选择了RocketMQ

Kafka

RocketMQ

RabbitMQ

ActiveMQ

单机吞吐量

10万级

10万级

万级

万级

开发语言

Scala

Java

Erlang

Java

高可用

分布式架构

分布式架构

主从架构

主从架构

性能

ms级

ms级

us级

ms级

功能

只支持主要的MQ功能

顺序消息、事务消息等功能完善

并发强、性能好、延时低

成熟的社区产品、文档丰富

5. 你上面提到异步发送,那消息可靠性怎么保证?

消息丢失可能发生在生产者发送消息、MQ本身丢失消息、消费者丢失消息3个方面。

生产者丢失

生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消息就丢失了。

由于同步发送的一般不会出现这样使用方式,所以我们就不考虑同步发送的问题,我们基于异步发送的场景来说。

异步发送分为两个方式:异步有回调和异步无回调,无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送+回调通知+本地消息表的形式我们就可以做出一个解决方案。以下单的场景举例。

  1. 下单后先保存本地数据和MQ消息表,这时候消息的状态是发送中,如果本地事务失败,那么下单失败,事务回滚
  2. 下单成功,直接返回客户端成功,异步发送MQ消息
  3. MQ回调通知消息发送结果,对应更新数据库MQ发送状态
  4. JOB轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试
  5. 在监控平台配置或者JOB程序处理超过一定次数一直发送不成功的消息,告警,人工介入

一般而言,对于大部分场景来说异步回调的形式就可以了,只有那种需要完全保证不能丢失消息的场景我们做一套完整的解决方案。

MQ丢失

如果生产者保证消息发送到MQ,而MQ收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。比如RocketMQ:

RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。

消费者丢失

消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。

RocketMQ默认是需要消费者回复ack确认。

消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理了。

6. 你说到消费者消费失败的问题,那么如果一直消费失败导致消息积压怎么处理?

因为考虑到时消费者消费一直出错的问题,那么我们可以从以下几个角度来考虑:

消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让consumer恢复正常消费

如果时间来不及处理很麻烦,做转发处理,写一个临时的consumer消费方案,先把消息消费,然后再转发到一个新的topic和MQ资源,这个新的topic的机器资源单独申请,要能承载住当前积压的消息

处理完积压数据后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状

7. 如果消息积压达到磁盘上限,消息被删除了怎么办?

最初,我们发送的消息记录是落库保存了的,而转发发送的数据也保存了,那么我们就可以通过这部分数据来找到丢失的那部分数据,再单独跑个脚本重发就可以了。如果转发的程序没有落库,那就和消费方的记录去做对比,只是过程会更艰难一点。

8. RocketMQ Broker中的消息被消费后会立即删除吗?

不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLogoffset)更新了。

满足如下条件,默认48小时后会删除不再使用的CommitLog文件

  • 指定时间删除,默认凌晨4点
  • 磁盘容量不足

9. 说了这么多,那你说说RocketMQ实现原理吧?

RocketMQ由NameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干Broker(RocketMQ进程)组成

  • producer集群:拥有相同的producerGroup,一般来讲,Producer不必要有集群的概念,这里的集群仅仅在RocketMQ的分布式事务中有用到
  • Name Server集群:提供topic的路由信息,路由信息数据存储在内存中,broker会定时的发送路由信息到name server中的每一个机器,来进行更新,节点之间无任何信息同步,所以name server集群可以简单理解为无状态(实际情况下可能存在每个name server机器上的数据有短暂的不一致现象,但是通过定时更新,大部分情况下都是一致的)
  • broker集群:一个集群有一个统一的名字,即brokerClusterName,默认是DefaultCluster。一个集群下有多个master,每个master下有多个slave。master和slave算是一组,拥有相同的brokerName,不同的brokerId,master的brokerId是0,而slave则是大于0的值。master和slave之间可以进行同步复制或者是异步复制
  • consumer集群:拥有相同的consumerGroup

它的架构原理是这样的:

  1. Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳,如果某个Broker超过120s都没发送心跳了,那么就认为这个Broker已经挂掉了
  2. Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息
  3. Consumer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费

10. 那Broker是怎么保存数据的呢?

RocketMQ主要的存储文件包括commitlog文件、consumequeue文件、indexfile文件。

Broker在收到消息之后,会把消息保存到commitlog的文件当中,而同时在分布式的存储当中,每个broker都会保存一部分topic的数据,同时,每个topic对应的messagequeue下都会生成consumequeue文件用于保存commitlog的物理位置偏移量offset,indexfile中会保存key和offset的对应关系。

CommitLog文件保存于${Rocket_Home}/store/commitlog目录中,从图中我们可以明显看出来文件名的偏移量,每个文件默认1G,写满后自动生成一个新的文件。

特点:

顺序写

我们知道,操作系统每次从磁盘读写数据的时候,都需要找到数据在磁盘上的地址,再进行读写。而如果是机械硬盘,寻址需要的时间往往会比较长而一般来说,如果把数据存储在内存上面,少了寻址的过程,性能会好很多;但Kafka的数据存储在磁盘上面,依然性能很好,这是为什么呢?这是因为,Kafka采用的是顺序写,直接追加数据到末尾。实际上,磁盘顺序写的性能极高,在磁盘个数一定,转数一定的情况下,基本和内存速度一致因此,磁盘的顺序写这一机制,极大地保证了Kafka本身的性能。

零拷贝

比如:读取文件,再用socket发送出去这一过程

  1. buffer = File.read
  2. Socket.send(buffer)

传统方式实现:

先读取、再发送,实际会经过以下四次复制

  1. 将磁盘文件,读取到操作系统内核缓冲区Read Buffer
  2. 将内核缓冲区的数据,复制到应用程序缓冲区Application Buffer
  3. 将应用程序缓冲区Application Buffer中的数据,复制到socket网络发送缓冲区
  4. 将Socket buffer的数据,复制到网卡,由网卡进行网络传输

传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的

那么采用零拷贝的方式发送消息,必定会大大减少读取的开销,使得RocketMQ读取消息的性能有一个质的提升

此外,还需要再提一点,零拷贝技术采用了MappedByteBuffer内存映射技术,采用这种技术有一些限制,其中有一条就是传输的文件不能超过2G,这也就是为什么RocketMQ的存储消息的文件CommitLog的大小规定为1G的原因。

11. Master和Slave之间是怎么同步数据的呢?

消息在master和slave之间的同步是根据raft协议来进行的:

  1. 在broker收到消息后,会被标记为uncommitted状态
  2. 然后会把消息发送给所有的slave
  3. slave在收到消息之后返回ack响应给master
  4. master在收到超过半数的ack之后,把消息标记为committed
  5. 发送committed消息给所有slave,slave也修改状态为committed

12. 你知道RocketMQ为什么速度快吗?

是因为使用了顺序存储、Page Cache和异步刷盘。

  1. 我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多
  2. 写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的Page Cache
  3. 最后由操作系统异步将缓存中的数据刷到磁盘

13. 什么是事务、半事务消息,怎么实现的?

事务消息就是MQ提供的类似XA的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。

半事务消息就是MQ收到了生产者的消息,但是没有收到二次确认,不能投递的消息。

实现原理如下:

  1. 生产者先发送一条半事务消息到MQ
  2. MQ收到消息后返回ack确认
  3. 生产者开始执行本地事务
  4. 如果事务执行成功发送commit到MQ,失败发送rollback
  5. 如果MQ长时间未收到生产者的二次确认commit或者rollback,MQ对生产者发起消息回查
  6. 生产者查询事务执行最终状态
  7. 根据查询事务状态再次提交二次确认

最终,如果MQ收到二次确认commit,就可以把消息投递给消费者,反之如果是rollback,消息会保存下来并且在3天后被删除。

14. RocketMQ对生产者有哪些容错机制?

基于Dledger实现RocketMQ高可用自动切换

RocketMQ 4.5之后支持了一种叫做Dledger机制,基于Raft协议实现的一个机制。

我们可以让一个Master Broker对应多个Slave Broker,一旦Master Broker宕机了,在多个Slave中通过Dledger技术将一个Slave Broker选为新的Master Broker对外提供服务。

在生产环境中可以是用Dledger机制实现自动故障切换,只要10秒或者几十秒的时间就可以完成。

15. RocketMQ是如何保证数据的高容错性的?

在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker

如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用。

如果上次失败的Broker可用那么还是会选择该Broker的队列。

如果上述情况失败,则随机选择一个进行发送。

在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间。

16. Producer发送消息到Broker,是随机选择一个Broker还是有一定的规则?

一般是负载均衡做随机选择,但也可以走其他策略,比如根据某个字段来hash

17. RocketMQ如何负载均衡?

  1. producer发送消息的负载均衡:默认会轮询向Topic的所有queue发送消息,以达到消息平均落到不同的queue上;而由于queue可以落在不同的broker上,就可以发到不同broker上(当然也可以指定发送到某个特定的queue上)
  2. consumer订阅消息的负载均衡:假设有5个队列,两个消费者,则第一个消费者消费3个队列,第二个则消费2个队列,以达到平均消费的效果。而需要注意的是,当consumer的数量大于队列的数量的话,根据RocketMQ的机制,多出来的队列不会去消费数据,因此建议consumer的数量小于或者等于queue的数量,避免不必要的浪费

18. 生产者发送消息的方式?

  • 同步发送

同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。

  • 异步发送

异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。

  • 单向发送

单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

19. 消费者消费模式有几种?

1. 集群消费

消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

2. 广播消费

消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。

20. 消费者获取消息有几种模式?

消费者获取消息有两种模式:推送模式和拉取模式。

1. PushConsumer

推送模式(虽然RocketMQ使用的是长轮询)的消费者。消息的能及时被消费。使用非常简单,内部已处理如线程池消费、流控、负载均衡、异常处理等等的各种场景。

2. PullConsumer

拉取模式的消费者。应用主动控制拉取的时机,怎么拉取,怎么消费等。主动权更高。但要自己处理各种场景。

21. RocketMQ如何保证消息的顺序消费?

首先多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的。

可以使用同一topic,同一个queue,发消息的时候一个线程去发送消息,消费的时候一个线程去消费一个queue里的消息。

22. 高吞吐量下如何优化生产者和消费者的性能?

开发

  • 同一group下,多机部署,并行消费
  • 单个Consumer提高消费线程个数
  • 批量消费
  1. 消息批量拉取
  2. 业务逻辑批量处理

运维

  • 网卡调优
  • JVM调优
  • 多线程与CPU调优
  • Page Cache

23. 如何保证RocketMQ不丢失消息?

消息的发送流程

一条消息从生产到被消费,将会经历三个阶段:

生产阶段,Producer新建消息,然后通过网络将消息投递给MQ Broker

存储阶段,消息将会存储在Broker端磁盘中

消息阶段,Consumer将会从Broker拉取消息

以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。

生产阶段

生产者(Producer)通过网络发送消息给Broker,当Broker收到之后,将会返回确认响应信息给Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。

send方法是一个同步操作,只要这个方法不抛出任何异常,就代表消息已经发送成功。

消息发送成功仅代表消息已经到了Broker端,Broker在不同配置下,可能会返回不同响应状态:

  • SendStatus.SEND_OK
  • SendStatus.FLUSH_DISK_TIMEOUT
  • SendStatus.FLUSH_SLAVE_TIMEOUT
  • SendStatus.SLAVE_NOT_AVAILABLE

引用官方状态说明:

  • SEND_OK

消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。

  • FLUSH_DISK_TIMEOUT

消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度。

如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType = SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。

  • FLUSH_SLAVE_TIMEOUT

消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。

如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。

  • SLAVE_NOT_AVAILABLE

消息发送成功,但是此时Slave不可用。

如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。

另外RocketMQ还提供异步的发送的方式,适合于链路耗时较长,对响应时间较为敏感的业务场景。

异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果。

不管是同步还是异步的方式,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。

Broker存储阶段

默认情况下,消息只要到了Broker端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后Broker定期批量的将一组消息从内存异步刷入磁盘。

这种方式减少I/O次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。

若想保证Broker端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。

修改Broker端配置如下:

  1. # 默认情况为ASYNC_FLUSH
  2. flushDiskType = SYNC_FLUSH

若Broker未在同步刷盘时间内(默认为5s)完成刷盘,将会返回SendStatus.FLUSH_DISK_TIMEOUT状态给生产者。

集群部署

默认方式下,消息写入master成功,就可以返回确认响应给生产者,接着消息将会异步复制到slave节点。

注:master配置:flushDiskType = SYNC_FLUSH

此时若master突然宕机且不可恢复,那么还未复制到slave的消息将会丢失。

为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master节点将会同步等待slave节点复制完成,才会返回确认响应。

Broker master节点同步复制配置如下:

  1. # 默认情况为ASYNC_FLUSH
  2. flushDiskType = SYNC_FLUSH

如果slave节点未在指定时间内同步返回响应,生产者将会收到SendStatus.FLUSH_SLAVE_TIMEOUT返回状态。

消费阶段

消费者从broker拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态给Broker。

如果Broker未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。

以上消费消息过程的,我们需要注意返回消息状态。只有当业务逻辑真正执行成功,我们才能返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS。否则需返回ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重试。

总结

最后我们还可以说出我们的思考,虽然提高消息可靠性,但是可能导致消息重发,重复消费。所以对于消费客户端,需要注意保证幂等性。

24. 解决RocketMQ的幂等性问题?

造成重复消费的原因

  1. 当系统的调用链路比较长的时候,比如系统A调用系统B,系统B再把消息发送到RocketMQ中,在系统A调用系统B的时候,如果系统B处理成功,但是迟迟没有将调用成功的结果返回给系统A的时候,系统A就会尝试重新发起请求给系统B,造成系统B重复处理,发起多条消息给RocketMQ造成重复消费;
  2. 在系统B发送给RocketMQ的时候,也有可能会发生和上面一样的问题,消息发送超时,结果系统B重试,导致RocketMQ接收到了重读消息;
  3. 当RocketMQ成功接收到消息,并将消息交给消费者处理,如果消费者消费完成后还没来得及提交CONSUME_SUCCESS给RocketMQ,自己宕机或者重启了,那么RocketMQ没有接收到CONSUME_SUCCESS,就会认为消费失败了,会重发消息给消费者再次消费;

通过幂等性来保证,只要保证重复消息不对结果产生影响,就完美地解决这个问题。

解决方法

生产者端

  1. RocketMQ支持消息查询的功能,只要去RocketMQ查询一下是否已经发送过该条消息就可以了,不存在则发送,存在则不发送,也就是message.setKeys()
  2. 引入Redis,在发送消息到RocketMQ成功之后,向Redis中插入一条数据,如果发送重试,则先去Redis查询一个该条消息是否已经发送过了,存在的话就不重复发送消息了

缺点

方法一:RocketMQ消息查询的性能不是特别好,如果在高并发的场景下,每条消息在发送到RocketMQ时都去查询一下,可能会影响接口的性能;

方法二:在一些极端的场景下,Redis也无法保证消息发送成功之后,就一定能写入Redis成功,比如写入消息成功而Redis此时宕机,那么再次查询Redis判断消息是否已经发送过,是无法得到正确结果的;

消费者端

  1. 建立一个消息表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突。
  2. 拿到这个消息做redis的set的操作.redis就是天然幂等性

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/602193
推荐阅读
相关标签
  

闽ICP备14008679号