当前位置:   article > 正文

RabbitMQ面试题汇总

RabbitMQ面试题汇总

RabbitMQ面试题

一、RabbitMQ基础

1. 什么是RabbitMQ,它的基本架构是怎样的?

RabbitMQ 是一个开源的消息代理和队列服务器,用于通过异步消息传递方式在分布式系统或服务之间进行通信。RabbitMQ 实现了高级消息队列协议(AMQP),同时也支持其他消息协议,如 MQTT 和 STOMP。它由 Erlang 语言编写,因此继承了 Erlang 所具有的高并发、高可用和容错特性。

RabbitMQ 的基本架构包含了以下几个主要组件:

  1. 生产者(Producer):
    生产者是发送消息的应用程序。它创建消息,并可以将消息发送到交换器。

  2. 消费者(Consumer):
    消费者是接收消息的应用程序。它从队列中提取消息并进行处理。

  3. 交换器(Exchange):
    交换器负责接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列。RabbitMQ 支持几种不同类型的交换器:

    • 直接交换(Direct Exchange)
    • 扇出交换(Fanout Exchange)
    • 主题交换(Topic Exchange)
    • 头交换(Headers Exchange)
  4. 队列(Queue):
    队列是消息最终被消费者接收的地方,它是存放消息的缓冲区。生产者和消费者通常都不直接交互,而是通过队列进行通信。

  5. 绑定(Binding):
    绑定是交换器和队列之间的链接。它告诉交换器消息应该传递到哪些队列。

  6. 虚拟主机(Virtual Host):
    虚拟主机提供了一组独立的环境,每个环境都有自己的交换器、队列和绑定。它们用于逻辑上区分和隔离不同应用程序的消息环境。

  7. 连接(Connection):
    连接是一个 TCP 连接,生产者和消费者通过此连接到 RabbitMQ 服务器。在一个连接内部,可以创建多个通道。

  8. 通道(Channel):
    通道是在连接内部建立的多路传输会话。每个通道代表一个会话任务。

RabbitMQ 服务器(RabbitMQ broker)通常指的是运行RabbitMQ软件的物理服务器或集群。在分布式模式下,RabbitMQ 可以配置为集群模式,以实现负载均衡和高可用性。

RabbitMQ 是一个强大的中间件,它使得应用程序和服务能够通过消息队列以松耦合的形式交换数据,支持多种消息模式,如工作队列、发布订阅、路由和主题消息等。这为跨多个服务和应用程序的可靠消息传递提供了基础设施。

2. RabbitMQ支持哪些协议?

RabbitMQ支持AMQP协议,也支持MQTT、STOMP等其他协议。

3. 说一下AMQP协议?

在这里插入图片描述

4. 为什么要使用RabbitMQ?

  • 在分布式系统下具备异步,削峰,负载均衡等一系列高级功能;

  • 拥有持久化的机制,进程消息,队列中的信息也可以保存下来。

  • 实现消费者和生产者之间的解耦。

  • 对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定量的限流,利于数据库的操作。

  • 可以使用消息队列达到异步下单的效果,排队中,后台进行逻辑下单。

5. MQ的应用场景有哪些?

1. 异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种1.串行的方式;2.并行方式。

(1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)。

小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

2. 应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。

传统模式的缺点:
1) 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败;
2) 订单系统与库存系统耦合;

如何解决以上问题呢?引入应用消息队列后的方案,如下图:
在这里插入图片描述
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

3. 请求削峰

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
可以控制活动的人数;
可以缓解短时间内高流量压垮应用;

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;
秒杀业务根据消息队列中的请求信息,再做后续处理。

4. 日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:
日志采集客户端,负责日志数据采集,定时写受写入Kafka队列;
Kafka消息队列,负责日志数据的接收,存储和转发;
日志处理应用:订阅并消费kafka队列中的日志数据;

(1)Kafka:接收用户日志的消息队列。
(2)Logstash:做日志解析,统一成JSON输出给Elasticsearch。
(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。
(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。

5. 消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

点对点通讯:
客户端A和客户端B使用同一队列,进行消息通讯。

聊天室通讯:
客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。

6. 解耦、异步、削峰是什么?

解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合, A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。

异步:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。

削峰:减少高峰时期对服务器压力。

7. 消息队列有什么缺点?

  • 系统可用性降低
    本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;

  • 系统复杂度提高
    加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。

  • 一致性问题
    A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用还是得用的。

8. Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

在这里插入图片描述

综上,各种对比之后,有如下建议:

  • 一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;后来大家开始用RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;
  • 不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
  • 所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
  • 如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

9. 简单说一下RabbitMQ的缺点?

  1. 性能问题
  • 当队列中堆积了大量消息时,RabbitMQ的性能会急剧下降。因为它需要处理和存储这些消息,尤其在内存和磁盘资源有限的情况下,性能影响更为明显。
  • 相比于Kafka和RocketMQ,它的性能通常是最差的,尽管它可以处理中小企业的需求,但在处理大规模消息时,效率并不高。
  1. 开发语言限制
    RabbitMQ是用Erlang语言开发的。对于希望通过添加自定义功能或者进行二次开发的团队来说,有点困难,毕竟语言比较小众,大部分程序员没有Erlang开发经验。
  2. 功能性不足
    相比于Kafka和RocketMQ,RabbitMQ在一些高级功能上可能不如他们丰富。例如,Kafka在大数据和实时计算场景中有更成熟的应用,而RocketMQ在处理消息顺序和事务消息方面表现更好。
    在这里插入图片描述

10. 说说RabbitMQ的工作模式?

1. 简单模式(Simple Queue)

作用: 生产者将消息发送到队列,消费者从队列中获取消息。每条消息只能被一个消费者消费。
适用场景: 适用于消息量较小且不需要复杂路由逻辑的场景。

2. 工作队列模式(Work Queue)

作用: 通过多个消费者分担任务,达到负载均衡的目的。消息在多个消费者之间分配,每条消息只能被一个消费者处理。
适用场景: 适用于需要并行处理大量独立任务的场景,如图像处理、视频转码等。

3. 发布/订阅模式(publish/subscribe)

作用: 消息发布到交换机,所有绑定到该交换机的队列都会收到消息。常用的交换机类型是fanout。
适用场景: 适用于广播消息的场景,如日志处理、事件通知等。

4. 路由模式(routing)

作用: 消息发布到交换机,并根据路由键将消息发送到相应的队列。常用的交换机类型是direct。
适用场景: 适用于需要对消息分类处理的场景,如日志系统中按严重级别分类的日志处理等。

5. 主题模式(topics)

作用: 消息发布到交换机,并根据路由键(通配符匹配)将消息发送到相应的队列。常用的交换机类型是topic。
适用场景: 适用于需要根据复杂的路由规则进行分类处理的场景,如日志系统中按模块和严重级别分类的日志处理等。

6. RPC模式

作用: 实现远程过程调用,客户端发送请求消息到队列,服务器端处理后返回响应消息。
适用场景: 适用于需要实现远程服务调用的场景,如微服务之间的通信。
在这里插入图片描述
7. 发布确认模式

作用: 用于确保消息已经成功地发布并被broker接收。它提供了一种轻量级的方法来确认消息的持久性和可靠性,适用于需要高可靠性消息传递的场景。
适用场景: 金融交易、订单处理等确保消息被可靠地传递和处理,防止丢失的场景。

11. RabbitMQ消息是怎么路由的?

在RabbitMQ中,消息的路由是通过交换机(Exchange)来实现的。交换机根据不同的路由规则将消息分发到一个或者多个队列。
消息路由的基本流程:

  1. 生产者将消息发送到指定的交换机
  2. 交换机根据路由键(Routing Key)和绑定键(Binding Key) 将消息发送到一个或者多个队列当中。

路由键(Routing Key) 可以是字符串,也可以根据交换机的类型以及绑定的规则进行匹配。
RabbitMQ中,有四种交换机类型,分别是直连交换机(Direct Exchange)、扇出交换机(Fanout Exchange)、主题交换机(Topic Exchange)和头交换机(Header Exchange)。

  • 直连交换机(Direct Exchange)
    直连交换机通过完全匹配路由键将消息路由到队列,消息的路由键必须与队列绑定时的绑定键完全匹配。
  • 扇出交换机(Fanout Exchange)
    扇出交换机将消息广播到所有绑定的队列,而不考虑路由键。适用于广播消息的场景。
  • 主题交换机(Topic Exchange)
    主题交换机通过模式匹配路由键,将消息路由到队列。路由键可以包含通配符* 和 # , * 匹配一个单词, # 匹配零个或者多个单词。
  • 头交换机(Header Exchange)
    头交换机通过匹配消息的头部属性(Headers) 将消息路由到队列。路由规则基于消息的头部属性,而不是路由键。

如果没有消息匹配的队列的话,消息会被丢弃或者返回给生产者,根据生产者的配置决定消息的去留。

12. 简单说一下RabbitMQ发送消息的过程。

生产者发送消息时需要指定exchange和route_key ,根据AMQP协议格式构建其他内容,接着序列化为二进制格式发送给Broker,由Broker的exchange根据路由规则将数据分发到不同的Queue中。

13. RabbitMQ中的Channel有了解过吗?

RabbitMQ在网络层有Connection和Channel两个概念。
Connection其实对应的就是一条TCP连接,而Channel是Connection中的虚拟链接,一个Connection可以创建多个虚拟连接。
在这里插入图片描述

在RabbitMQ中,客户端和Broker之间的通信都是基于Channel维度的,这样可以减少实际的TCP连接数,节省系统资源的使用。

二、RabbitMQ高级

1. RabbitMQ中如何保证消息的顺序性?

RabbitMQ如果要保证消息的顺序性,首先生产者需要按序发送,确认上一条消息发送成功后再发送下一条,然后需要使得这类消息仅发送到一个队列中,并且只有一一个消费者消费这个队列,且消费者必须要消费完一条消息后,再消费另一条消息,不能是多线程并发消费,这样才能保证消息的顺序性。

如果要全局消息顺序,那么就是全局只使用一个队列,但是一般业务场景不会要求全局顺序,仅要求部分顺序,例如按照订单或者用户维度顺序即可,那么就可以利用用户ID或者订单ID对消息进行分区,即相同的用户(订单)的消息发往一个队列,保证局部顺序可提高并发度。

2. RabbitMQ 如何保证消息不被重复消费?或者说,如何保证消息消费时的幂等性?

  • 先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;
  • 但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。
  • 针对以上问题,一个解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响,保证消息等幂性。

如何实现幂等操作呢?

  1. 通过数据库:比如处理订单时,记录订单ID,在消费前,去数据库中进行查询该记录是否存在,如果存在则直接返回。
  2. 给消息设置一个全局唯一的ID,比如订单的订单号或者UUID之类的。
  • 消费者在消费消息的时候,添加分布式锁(保证互斥的都行),接着从数据库流水表中查看是否已经有这个唯一ID的消费记录。

  • 如果有的话说明已经被消费过了,即跳过后续的业务执行即可。

  • 如果没有的话说明是第一次消费这条消息,就执行业务逻辑,并将其写入数据库流水表中(和业务处理在同一个事务下,这样保证业务执行成功则流水表一定被插入),然后再释放锁。

    这样就能保证消息不会被重复消费!
    当然加锁不是必须的,通过数据库唯一索引也可以避免重复数的产生。只不过加锁可以解决高并发下的资源消耗问题(如果没有锁,每个线程都能执行完业务,然后等最后事务提交那一刻由数据库拦截,最后只有一个事务成功提交,其他都需要回滚,这挺费时间和浪费资源)

3. 如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?

发送方确认模式:

  • 将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。
  • 一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。
  • 如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。
  • 发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

接收方确认机制:

  • 消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。
  • 这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;

几种特殊情况:

  • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)。
  • 如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。

3. 如何保证RabbitMQ消息的可靠传输?

消息不可靠的情况可能是消息丢失,劫持等原因;
丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;

1. 生产者丢失消息

从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。

  • transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;
  • confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;RabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

2. 消息队列丢数据

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,RabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

那么如何持久化呢?这里顺便说一下吧,其实也很容易,就下面两步

将queue的持久化标识durable设置为true,则代表是一个持久的队列,发送消息的时候将deliveryMode=2,这样设置以后,即使RabbitMQ挂了,重启后也能恢复数据。

3. 消费者丢失消息

消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;如果这时处理消息失败,就会丢失该消息;

解决方案:处理消息成功后,手动回复确认消息。

4. RabbitMQ中如何实现消息的持久化?

RabbitMQ的持久化分为队列/交换机的持久化与消息的持久化两种:

  1. 将队列和交换机的Durable设置为True,这样创建的队列和交换机都是持久化的,即RabbitMQ服务器重启后,队列和交换机的元数据会保留。
  2. 消息的持久化是在生产者发送消息时,将Delivery Mode设置为2,表示消息会持久化消息,这样的消息在发送到持久化队列后,服务器重启后消息也还存在着。
    这里扩展下Delivery Mode:
  • 默认此参数是1,表示非持久化消息,当消息发送到broker时,不是将其写入磁盘,如果消费者还未消费这条消息,broker宕机了,那么消息可能就丢失了。
  • 参数设置为2,表示为持久化消息,会写入到磁盘中,但是这个过程其实上是异步的,所以正常情况下即使broker宕机了,也是有可能丢失消息的,只能保证大部分消息的持久化。

5. 如何实现消息的优先级处理?

RabbitMQ支持消息的优先级处理。可以通过设置消息的优先级字段来实现,优先级高的消息会优先被消费者消费。此外,还可以通过使用优先级队列来实现更细粒度的优先级控制。

6. 如何在RabbitMQ中实现优先级队列?

RabbitMQ通过插件支持优先级队列的实现。优先级队列是指根据消息的优先级进行排序和处理的队列。要实现优先级队列,可以使用RabbitMQ的插件“rabbitmq_priority_queue”,它提供了一个优先级队列交换机(Priority Queue Exchange),可以将具有不同优先级的消息路由到不同的队列中。

7. RabbitMQ如何处理高并发场景下的性能问题?

RabbitMQ通过提供集群功能来处理高并发场景下的性能问题。通过将多个节点组成一个集群,可以实现负载均衡和分发,提高系统的整体处理能力。同时,还可以通过调整各种参数来优化性能,如调整消息的持久化设置、调整交换机和队列的匹配规则等。

8. 如何在RabbitMQ中实现延迟队列?

RabbitMQ通过插件支持延迟队列的实现。延迟队列是指将消息放入队列中等待一段时间后再进行处理。要实现延迟队列,可以使用RabbitMQ的插件“rabbitmq_delayed_message_exchange”,它提供了一个延迟交换器(Delayed Message Exchange),可以将消息路由到指定的队列中等待指定的延迟时间。

9. RabbitMQ中的消息什么时候会进入死信交换机?

主要有以下三种情况会导致消息进入死信交换机:

  • 消息被拒绝(basic.reject或basic.nack),且requeue参数为false。
  • 消息在队列中过期(TTL到期),即超时未被消费。
  • 队列达到最大长度(超过最大队列长度限制)

10. 谈一下对RabbitMQ死信队列的理解?

死信,顾名思义就是无法被消费的消息,一般来说 Producer 将消息投递到 broker 或者直接丢到 queue 中,Consumer 从 Queue 中取出消息进行消费,但是某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列。

死信队列有其特殊的应用场景,例如用户在商城下单成功并点击去支付的时候,如果在指定的时间内未支付,那么就可以将该下单消息投递到死信队列中,至于后续怎么处理死信队列需要结合具体的应用场景。

11. 可以对所有消息都持久化吗?

不可以, 持久化的操作是将数据写入磁盘中,效率上肯定比写入内存中要慢很多倍。而我们一般用mq会处理很多业务消息,若是所有消息都持久化,压力无疑是巨大的。所以持久化策略需要综合考虑,以及可能遇到的问题和解决方案,或者我们可以让一些必要数据持久化。

12. 说一下RabbitMQ事务机制

1、概述
在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

RabbitMQ为我们提供了两种方式:

  • 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
  • 通过将channel设置成confirm模式来实现;

想要保证发送者一定能把消息发送给RabbitMQ,一种是通过Confirm机制,另一种就是通过事务机制。

RabbitMQ的事务机制,允许生产者将一组操作打包成一个原子事务单元,要么全部执行成功,要么全部失败。事务提供了一种确保消息完整性的方法,但需要谨慎使用,因为他们对性能有一定的影响。

RabbitMQ是基于AMQP协议实现的,RabbitMQ中,事务是通过在通道(Channel)上启用的,与事务机制有关的方法有三个:

  • txSelstct():将当前channel设置成transaction模式。
  • txCommit():提交事务。
  • txRollback():回滚事务。

我们需要先通过txSelect开启事务,然后就可以发布消息给MQ了,如果txCommit提交成功了,则消息一定到达了RabbitMQ,如果在txCommit执行之前RabbitMQ实例异常崩溃或者抛出异常,那我们就可以捕获这个异常然后执行txRollback进行回滚事务。

所以,通过事务机制,我们也能保证消息一定可以发送给RabbitMQ。

以下,是一个通过事务发送消息的方法实例:

package com.example.demo.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQTransactionExample {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            //启用事务
            channel.txSelect();


            String exchangeName = "my_exchange";
            String routingKey = "my_routing_key";

            try {
                //发送第一条消息
                String message1 = "Transaction Message 1";
                channel.basicPublish(exchangeName, routingKey, null, message1.getBytes());


                //发送第二条消息
                String message2 = "Transaction Message 2";
                channel.basicPublish(exchangeName, routingKey, null, message2.getBytes());

                //模拟一个错误
                int x = 1 / 0;

                //提交事务(如果没有发生错误)
                channel.txCommit();
                System.out.println("Transaction committed.");

            } catch (Exception e) {
                //发生错误,回滚事务
                channel.txRollback();
                System.out.println("Transaction rolled back.");
            }
        }
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

13. 说说你对RabbitMQ确认机制(应答机制)的理解?

1. 消息应答机制
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉,由于RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息也无法接收到。

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制。

消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除掉了。

2. 自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

3. 手动消息应答
//用于肯定确认,RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
channel.basicAck();

//用于否定确认
channel.basicNack();

//用于否定确认,不处理该消息了直接拒绝,可以将其丢弃了
channel.basicReject();

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

13. 说一下消息确认机制和返回机制,有什么区别?

为了保证 RabbitMQ 中消息的可靠性投递,以及消息在发生特定异常时的补偿策略,RabbitMQ诞生了消息确认和返回机制。

消息确认机制(也叫应答机制)
消息确认机制,是保障消息与 RabbitMQ消息之间可靠传输消息一种保障机制
其主要内容就是用来监听RabbitMQ消息队列收到消息之后返回给生产端的ack确认消息
消息确认机制描述了一种消息是否已经被发送到 RabbitMQ消息队列中以及 RabbitMQ消息队列是否以及接收到生产端发送的消息。

消息确认机制的作用
监听生产者的消息是否已经发送到了 RabbitMQ消息队列中;
如果消息没有被发送到 RabbitMQ消息队列中,则消息确认机制不会给生产端返回任何确认应答,也就是没有发送成功。相反,如果消息被成功发送到了 RabbitMQ消息队列中,则消息确认机制会给生产端返回一个确认应答,以通知生产者,消息已经发送到了 RabbitMQ消息队列

消息返回机制
描述不可达的消息与生产者之间的一种保障策略
其主要是用来监听,RabbitMQ消息队列中是否存在不可达的消息,并根据监听结果返回给生产端的一种监听机制,消息返回机制描述了一种 RabbitMQ消息队列中的不可达消息与生产端的关系

什么是不可达的消息
消息在被成功发送到RabbitMQ消息队列中之后,如果消息在经过当前配置的 exchangeName 或 routingKey 没有找到指定的交换机,或没有匹配到对应的消息队列,那么这个消息就被称为不可达的消息,如果此时配置了消息返回机制,那么此时RabbitMQ消息队列会返回给生产者一个信号,信号中包括消息不可达的原因,以及消息本身的内容。

消息返回机制的作用
监听生产端发动到RabbitMQ消息队列中的消息是否可达。如果消息不可达,则返回一个信号通知生产端,相反,如果消息可达,则不会返回任何信号。

14. RabbitMQ如何保证高可用?

高可用的通俗解释是,能够7*24小时不间断的对外提供服务。由于单机存在单点故障,随时可能发生宕机,所以一般都是通过搭建集群来保证高可用的。
在这里插入图片描述
RabbitMQ有三种模式,单机模式就不做过多解释了,demo级别的,生产一般不可能使用单机模式,除非MQ只是用来打印日志。

单机模式一般用于demo搭建,不适合在生产环境中使用。剩下的集群模式和镜像模式都可以帮助我们实现不同程度的高可用。

1. 普通集群模式

  1. queue的元数据包含了queue的具体信息,例如queue放在哪台broker上,是否持久化等,但不包含发送到queue里的消息
  2. 单台broker只有集群中部分的queue(queue中包含消息)和集群中所有的queue元数据
  3. 订阅queue1的消费者若在queue2拉取,则消息会先从queue1传输到queue2,再由消费者进行消费
    在这里插入图片描述

2. 镜像模式

  1. 无论元数据还是 queue 里的消息都会存在于多个broker上
  2. 每个queue都想拥有多个镜像放在其他broker上,可以选择镜像队列的数量
  3. 由于每个broker上都具有近乎完整的数据,所以消费者消费的时候并不需要进行消息传输,但由于并不是想Kafka分布式消息队列那样的分片存储,所以性能并不高

在这里插入图片描述

15. RabbitMQ如何保证高可靠?

在这里插入图片描述
高可靠也分成三个方面,生产者高可靠、MQ高可靠和消费者高可靠

生产者的高可靠主要是依靠补偿机制来实现的,确保生产者能够将消息发送至MQ,如果发送失败,则需要进行重发。

1. 发送高可靠

  • 日志记录 + 定时任务健康检查 + 消息补偿
    在这里插入图片描述

    1. 发送消息前,先将消息存入消息表,状态为0
    2. 开启confirm机制,收到ack后,修改消息对应状态为1
    3. 定时轮询状态为0的消息进行重发,超过3次则标记为异常,人工补偿
  • 消息延迟投递 + 二次确认 + 回调检查
    在这里插入图片描述
    关于延迟队列,可以采用死信队列或者插件机制实现,这里不展开,这里只需要关注生产者发送就好了。

    1. 生产者发送消息的时候,同时也发送延迟消息,比如60min。
    2. 消费者订阅到消息后,会给回调检查服务订阅的confirm queue发送消息,消息数据库生成一条记录。
    3. 回调检查服务订阅了延迟queue,60min过后,消费延迟消息,如果数据库已经存在该消息,则什么都不做,如果数据库不存在,则表明消费者尚未消费该消息,则通知生产者重新发送消息。

2. 存储高可靠

RabbitMQ提供了持久化机制,交换器、队列、消息都可以进行持久化,具体的配置也不麻烦,大家自行查阅即可。

3. 消费高可靠

消费者高可靠,一是保证消息能够被消费者消费(消费者ack机制),二是保证消息重复消费的幂等性。

关于消息消费的幂等性

  • 主要是通过一个唯一的键进行查重,比如数据库主键ID
  • 消息消费的时候,判断该ID是否已经被记录
  • 如果未记录,则记录该ID,如果已记录,则什么都不做。
  • 数据库唯一ID、去重表、Redis,基本都是这个思路

多版本(乐观锁)控制
乐观锁的方案可以参考这篇 RabbitMQ应用问题消息的可靠性保障(消息补偿机制)和幂等性问题(乐观锁解决方案)思路

16. SpringBoot如何整合RabbitMQ?

  1. 导入spring-boot-starter-amqp的包
  2. 编写config配置类,配置相关交换机,队列,绑定键
  3. 利用@RabbitListener(要绑定的队列)+ @RabbitHandler 比如前一个注解标注在类上,后一个注解标注在方法上,用来监听某一个队列后,一旦队列有消息就做出相应操作。
  4. 注入RabbitmqTemplate组件,然后利用相应的api进行发送消息,比如convertAndSend方法
  5. 在处理订单超时这种情况便是如上方案: 先配置好队列的过期时间,一旦生成订单的消息在消息队列里没有被消费,那么这个时候就会被放入到死信队列里面,这个时候,我们就可以监听这个死信队列,一旦死信队列里面产生了消息,就执行dao的方法将相应的订单消息删除。
  6. 其次,如果你想保证消息的可靠性传输,可以设置应答机制和返回机制,利用rabbtemplate里的setConfirmCallback和setReturnCallback方法,前者是在消息成功到达之后会返回一个回调,后者是消息没有成功到达会返回一个回调,最后利用@PostConstruct让这两个机制发生在容器初始化的时候被调用就可以了。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/971690
推荐阅读
相关标签
  

闽ICP备14008679号