当前位置:   article > 正文

面试:消息中间件RabbitMQ和Kafka(保证消息不丢失、重复消费、死信交换机、延迟队列、消息堆积、高可用机制、消费的顺序性、数据清理机制、高性能设计)_mq如何避免消息重复和消息丢失

mq如何避免消息重复和消息丢失

目录

一、RabbitMQ

1、如何保证消息不丢失

(1)生产者确认机制

(2)消息持久化

(3)消费者确认

2、解决RabbitMQ的重复消费

(1)出现重复消费的原因

(2)解决方案

3、RabbitMQ中死信交换机、延迟队列

(1)延时队列的使用场景

(2)死信交换机

(3)TTL

(4)延迟队列插件

4、如何解决消息堆积

(1)惰性队列

5、高可用机制

(1)普通集群

(2)镜像集群

(3)仲裁队列

二、Kafka

1、Kafka是如何保证消息不丢失的

(1)生产者发送消息到Brocker丢失

(2)消息在Brocker中存储丢失

2、Kafka如何保证消费的顺序性

3、Kafka的高可用机制

(1)集群模式

(2)分区备份机制

4、Kafka的数据清理机制

(1)Kafka文件存储机制

(2)数据清理机制

5、Kafka的高性能设计

(1)零拷贝


一、RabbitMQ

1、如何保证消息不丢失


(1)生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功

消息失败后如何处理?

  • 回调方法即时重发
  • 记录日志
  • 保存到数据库然后定时重发,成功发送后即刻删除表中的数据
(2)消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

a.交换机持久化


b.队列持久化


c.消息持久化

(SpringAMQP中的的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定)

(3)消费者确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。

manual:手动ack,需要在业务代码结束后,调用api发送ack;

auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack;

none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除;

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,设置重试次数,当次数达到了以后,如果消息依然失败,将消息投递到异常交换机,交由人工处理。


2、解决RabbitMQ的重复消费

(1)出现重复消费的原因

网络抖动、消费者挂了。

(2)解决方案

  • 每条消息设置一个唯一的标识id(推荐)
  • 幂等方案:【分布式锁、数据库锁(悲观锁、乐观锁)】

3、RabbitMQ中死信交换机、延迟队列

(1)延时队列的使用场景

进入队列的消息会被延迟消费的队列(延迟队列 = 死信交换机 + TTL(生存时间))

场景:超时订单、限时优惠、定时发布

(2)死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter、不会被消费) :

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

要给simple.queue加上dead-letter-exchange = dl.direct;dead-letter-routing-key = dl;

指定那个交换机是死信交换机,而且死信交换机也要配置dl,绑定其他消息队列。

(3)TTL

TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:(以时间短的为准

  • 消息所在的队列设置了存活时间
  • 消息本身设置了存活时间
(4)延迟队列插件

DelayExchange插件,需要安装在RabbitMQ中
RabbitMQ有一个官方的插件社区,地址为: 官方文档

DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。


4、如何解决消息堆积

生产者发送消息的速度超过消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。

解决消息堆积有三种种思路:

  1. 增加更多消费者,提高消费速度
  2. 在消费者内开启线程池加快消息处理速度
  3. 扩大队列容积,提高堆积上限
(1)惰性队列

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

5、高可用机制

在生产环境下,使用集群来保证高可用性普通集群镜像集群仲裁队列

(1)普通集群

普通集群,或者叫标准集群(classic cluster),具备下列特征:

  • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息(其他队列的索引)。不包含队列中的消息。
  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回。
  • 队列所在节点宕机,队列中的消息就会丢失
(2)镜像集群

镜像集群:本质是主从模式,具备下面的特征:

  • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份
  • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点
  • 一个队列的主节点可能是另一个队列的镜像节点
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主节点宕机后,镜像节点会替代成新的主节点
(3)仲裁队列

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步。
  • 使用非常简单,没有复杂的配置。
  • 主从同步基于Raft协议,强一致。


二、Kafka

1、Kafka是如何保证消息不丢失的

  • 生产者发送消息到Brocker丢失
  • 消息在Brocker中存储丢失
  • 消费者从Brocker接收消息丢失

(1)生产者发送消息到Brocker丢失
  • 设置异步发送

  • 消息重试

(2)消息在Brocker中存储丢失
  • 确认发送机制acks

类似于主从机制

  • Kafka中的分区机制指的是将每个主题划分成多个分区(Partition)
  • topic分区中消息只能由消费者组中的唯一一个消费者处理不同的分区分配给不同的消费者(同一个消费者组)

消费者默认是自动按期提交已经消费的偏移量(每消费一个就偏移量加一),默认是每隔5s提交一次如果出现重平衡的情况,可能会重复消费或丢失数据。

重平衡出现的情况:

  • 消费者2在消费完第6的消息后宕机了,但是只提交了3个偏移量,其他消费者就会从3开始继续消费,这样就造成了重复消费。
  • 消费者读取消息和提交偏移量不是同步的,可能读取到3了,然后提交偏移量是3,然后开始消费1-3,消费完1,宕机了,造成消息的丢失

解决方案:

  • 禁用自动提交偏移量,改为手动
  • 同步提交
  • 异步提交
  • 同步+异步组合提交

2、Kafka如何保证消费的顺序性

问题原因:

一个topic的数据可能存储在不同的分区中,每个分区都有一个按照顺序的存储的偏移量,如果消费者关联了多个分区不能保证顺序性

解决方案:

  • 发送消息时指定分区号
  • 发送消息时按照相同的业务设置相同的key

3、Kafka的高可用机制

(1)集群模式

  • Kafka 的服务器端由被称为Broker的服务进程构成,即一个Kafka集群由多个Broker组成
  • 这样如果集群中某一台机器宕机,其他机器上的 Broker也依然能够对外提供服务。这其实就是Kafka 提供高可用的手段之一

(2)分区备份机制

  • 一个topic有多个分区,每个分区有多个副本,其中有一个leader,其余的是follower,副本存储在不同的broker中
  • 所有的分区副本的内容是都是相同的,如果leader发生故障时,会自动将其中一个follower提升为leader

follower有两种:

  • lSR (in-sync replica)需要同步复制保存的follower
  • 普通:异步的follower

如果leader失效后,需要选出新的leader,选举的原则如下: (优先ISR)

  • 第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的
  • 第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取

4、Kafka的数据清理机制

(1)Kafka文件存储机制

(2)数据清理机制

日志的清理策略有两个:

  • 根据消息的保留时间,当消息在kafka中保存的时间超过了指定的时间,就会触发清理过程。
  • 根据topic存储的数据大小,当topic所占的日志文件大小大于一定的阈值,则开始删除最久的消息。需手动开启。

5、Kafka的高性能设计

  • 消息分区:不受单台服务器的限制,可以不受限的处理更多的数据。
  • 顺序读写:磁盘顺序读写,提升读写效率。
  • 页缓存:把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
  • 零拷贝:减少上下文切换及数据拷贝。
  • 消息压缩:减少磁盘IO和网络lO
  • 分批发送:将消息打包批量发送减少网络开销。
(1)零拷贝

  • 在一般流程中,消息要通过websocket连接网卡发送给消费者;
  • 而在Kafka将发送这一流程交给了系统去做,直接将消息从页缓存拷贝给了网卡减少了两次拷贝

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/809491
推荐阅读
相关标签
  

闽ICP备14008679号