当前位置:   article > 正文

MQ面试题

MQ面试题

为什么要使用消息队列?

优点:解耦、异步、流量削峰
缺点:可用性降低、复杂性提高、一致性问题

为什么选择了RabbitMQ而不是其它的MQ?

kafka是以吞吐量高而闻名,不过其数据稳定性一般,而且无法保证消息有序性。我们公司的日志收集也有使用,业务模块中则使用的RabbitMQ。
阿里巴巴的RocketMQ基于Kafka的原理,弥补了Kafka的缺点,继承了其高吞吐的优势,其客户端目前以Java为主。但是我们担心阿里巴巴开源产品的稳定性,所以就没有使用。
RabbitMQ基于面向并发的语言Erlang开发,吞吐量不如Kafka,但是对我们公司来讲够用了。而且消息可靠性较好,并且消息延迟极低,集群搭建比较方便。支持多种协议,并且有各种语言的客户端,比较灵活。Spring对RabbitMQ的支持也比较好,使用起来比较方便,比较符合我们公司的需求。
综合考虑我们公司的并发需求以及稳定性需求,我们选择了RabbitMQ。

activeMQRabbitMQRocketMQKafka
性能6000/单机12000/单机10万/单机100万/单机
持久化都支持(性能会下降)都支持(性能会下降)天生支持天生支持
多语言支持主流都支持主流都支持只支持Java主流都支持
优缺点缺乏大规模运用,不推荐消息可靠性高,功能全面,吞吐量比较低,消息积累会影响性能高吞吐,高性能,高可用,功能全面。缺点:会丢数据、功能单一
使用场景缺乏大规模运用,不推荐企业内部小规模系统调用几乎全场景,特别适合金融级mq场景日志分析、大数据采集

RabbitMQ一个queue中存放的message是否有数量限制?

默认情况下一般无限制。
但是可以通过x-max-length对队列中消息的条数进行限制;
x-max-length-bytes对队列中消息的总量进行限制,比如200mb。

RabbitMQ事务机制?

RabbitMQ事务机制和确认机制

public class RabbitMqTransactionExample {
    private static final String QUEUE_NAME = "queue_name";

    public static void main(String[] args) {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            // 开启事务
            channel.txSelect();

            String message = "Hello, RabbitMQ!";

            // 发布消息到队列
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

            // 提交事务
            channel.txCommit();
            System.out.println("Message sent successfully");
        } catch (Exception e) {
            // 回滚事务
            channel.txRollback();
            System.out.println("Failed to send message");
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

RabbitMQ交换机类型?

交换机类型解释举例
Fanout广播模式将消息交给所有绑定这个交换机的队列不同queue绑定一个Exchange
Direct定向模式把消息交给符合指定routing key的队列比如一个消费端key = {“red”, “blue”},另一个消费端key = {“red”, “yellow”},当发送端rabbitTemplate.convertAndSend(exchangeName, “red”, message)时,两个消费端都可以收到消息
Topic通配符模式判断routing key的规则是模糊匹配模式消费端1:key = “china.#” 消费端2:key = “#.news”,发送端:rabbitTemplate.convertAndSend(exchangeName, “china.news”, message); #:代表0个或多个词*:代表1个词

RabbitMQ确保消息可靠性传输?

Rabbitmq 的持久化分为队列持久化、消息持久化和交换机持久化。

1,队列持久化

在定义队列时的通过 durable 参数来决定的
在这里插入图片描述

2,交换机持久化

durable:持久话标志位, durable 设置为 true 表示持久化, 反之为非持久,与队列持久化相同

3,消息持久化

deliveryMode=1 代表不持久化,deliveryMode=2 代表持久化

@Component
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        MessageProperties properties = new MessageProperties();
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化
        Message rabbitMessage = new Message(message.getBytes(), properties);
        rabbitTemplate.send("exchangeName", "routingKey", rabbitMessage);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

4,主从备份

消息到达队列后,MQ宕机也可能导致丢失消息,RabbitMQ提供了持久化功能,集群的主从备份功能

5,生产者消息确认:publisher confirm机制、publisher return机制

5.1,修改publisher服务中的application.yml文件

spring:
   rabbitmq:
	 username: guest
	 password: guest
	 virtual-host: /
	 host: 123.123.123.123
	 port: 5672
	 publisher-confirm-type: simple
	 publisher-returns: true
	 listener:
	   simple:
	     acknowledge-mode: auto # 手动应答
	     prefetch: 1 #每次从队列中取一个,轮询分发,默认是公平分发
	     retry:
	       enabled: true # 开启重试
	       initial-interval: 1000 # 初识的失败等待时长为1秒
	       multiplier: 2 # 失败的等待时长倍数
	       max-attempts: 5 # 重试次数
	   

publish-confirm-type:开启publisher-confirm,这里支持两种类型:
simple:同步等待confirm结果,直到超时
correlated:异步回调,定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback

publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallbackfalse:则直接丢弃消息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

5.2,ReturnCallback,交换机到队列

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        //alt+enter 可以将匿名内部类改成lamda表达式
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投递失败,记录日志
            //当发送时设置错误的routingKey;成功到交换机但未到队列
            // 消息成功投递到交换机!消息ID: 4ccec7ec-a95e-4660-afe3-1370bcde7904
            // 消息发送到队列失败,响应码:312, 失败原因:NO_ROUTE, 交换机: amq.topic, 路由key:ssimple.test
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要,可以重发消息
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

5.3,定义ConfirmCallback,发送端到交换机

@Component
public class RabbitMQMessageSender  implements MQMessageSender, RabbitTemplate.ConfirmCallback{
  private final RabbitTemplate rabbitTemplate;
  Log log = LogFactory.getLog(RabbitMQMessageSender.class);
  @Autowired
  public RabbitMQMessageSender(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
  }

  @PostConstruct
  public void init(){
    rabbitTemplate.setConfirmCallback(this);
  }

  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if(!ack){
      log.error("消息接收失败" + cause);
      // 我们这里要做一些消息补发的措施
      System.out.println("id="+correlationData.getId());
    }
  }

  public void send(String routingKey, MQMessage msg) {
    String jsonString = JsonConverter.bean2Json(msg);
    if (jsonString != null) {
      try {
        rabbitTemplate.convertAndSend(routingKey, jsonString);
      } catch (Exception e) {
        // 连接异常,发送日志
        log.error("Failed to send message RabbitMQ Exception: " + e.getMessage());
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

6,消费者消息确认机制

acknowledge-mode消费端发送ack信号后,消息才会从队列删除,继续消费下一条消息。

spring:
   rabbitmq:
	 username: guest
	 password: guest
	 virtual-host: /
	 host: 123.123.123.123
	 port: 5672
	 publisher-confirm-type: simple
	 publisher-returns: true

	 # 消费者确认机制
	 listener:
	   simple:
	   	 # 将消费者确认机制设置成auto,会利用aop原理,当mq消息发送失败时重试
	     acknowledge-mode: auto
	     prefetch: 1 #每次从队列中取一个,轮询分发,默认是公平分发
	     retry:
	       enabled: true # 开启重试
	       initial-interval: 1000 # 初识的失败等待时长为1秒
	       multiplier: 2 # 失败的等待时长倍数
	       max-attempts: 5 # 重试次数
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
6.1,消费失败重试机制

当消费端连接断开后,mq会给消费端重复发送消息

# 重试达到最大次数后,Spring会返回ack,消息会被丢弃
spring:
  rabbitmq:
    listener:
      simple:
      	prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval,相当于第1秒重试,第3秒重试,第7秒重试
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
6.2,达到最大重试次数后,失败策略

多次重试失败后将消息投递到异常交换机(死信交换机),交由人工处理

失败策略详解
RejectAndDontRequeueRecoverer重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer重试耗尽后,将失败消息投递到指定的交换机

死信(dead letter)?

消费者使用basic.reject或 basic.nack来声明消息消费失败,并且消息的requeue参数设置为false
消息是一个过期消息,超时无人消费
要投递的队列消息满了,无法投递

声明普通的simple.queue队列,并且指定死信交换机dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
        .deadLetterExchange("dl.direct") // 指定死信交换机
        .build();
}
声明死信交换机dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
声明存储死信的队列dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
将死信队列与死信交换机绑定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

延迟队列

利用TTL(消息过期时间)结合死信交换机,实现消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

RabbitMQ是否可以直接将消息推送到队列?

可以,但不推荐,丧失灵活性

RabbitMQ如何避免消息堆积?

消息堆积问题产生的原因往往是因为消息发送的速度超过了消费者消息处理的速度。因此解决方案无外乎以下三点:
1,提高消费者处理速度 2,增加更多消费者 3,增加队列消息存储上限

1,提高消费者处理速度
消费者处理速度是由业务代码决定的,所以我们能做的事情包括:
尽可能优化业务代码,提高业务性能
接收到消息后,开启线程池,并发处理多个消息
优点:成本低,改改代码即可
缺点:开启线程池会带来额外的性能开销,对于高频、低时延的任务不合适。适合任务执行周期较长的业务。

2,增加更多消费者
一个队列绑定多个消费者,共同争抢任务,自然可以提供消息处理的速度。
优点:能用钱解决的问题都不是问题。实现简单粗暴
缺点:问题是没有钱。成本太高

3,增加队列消息存储上限
在RabbitMQ的1.8版本后,加入了新的队列模式:Lazy Queue惰性队列
这种队列不会将消息保存在内存中,而是在收到消息后直接写入磁盘中,理论上没有存储上限。可以解决消息堆积问题。
优点:磁盘存储更安全;存储无上限;避免内存存储带来的Page Out问题,性能更稳定;
缺点:磁盘存储受到IO性能的限制,消息时效性不如内存模式,但影响不大。

RabbitMQ如何保证消息的有序性?

其实RabbitMQ是队列存储,天然具备先进先出的特点,只要消息的发送是有序的,那么理论上接收也是有序的。不过当一个队列绑定了多个消费者时,可能出现消息轮询投递给消费者的情况,而消费者的处理顺序就无法保证了。
因此,要保证消息的有序性,需要做的下面几点:
保证消息发送的有序性
保证一组有序的消息都发送到同一个队列
保证一个队列只包含一个消费者

如何防止MQ消息被重复消费?

消息重复消费的原因多种多样,不可避免。所以只能从消费者端入手,只要能保证消息处理的幂等性就可以确保消息不被重复消费。
而幂等性的保证又有很多方案:
给每一条消息都添加一个唯一id,在本地记录消息表及消息状态,处理消息时基于数据库表的id唯一性做判断
同样是记录消息表,利用消息状态字段实现基于乐观锁的判断,保证幂等
基于业务本身的幂等性。比如根据id的删除、查询业务天生幂等;新增、修改等业务可以考虑基于数据库id唯一性、或者乐观锁机制确保幂等。本质与消息表方案类似。
解决 RabbitMQ 重复消费问题可以使用消息幂等性和消息去重这两种方法。选择哪种方法取决于具体的应用场景和需求。在实际应用中,通常需要考虑消息的唯一标识、消息存储、消息状态管理等方面的问题来有效地解决重复消费问题。

消息幂等性

消息幂等性是一种处理重复消息的有效方法。它要求消息的处理逻辑保持幂等性,即多次处理相同消息的效果与处理一次相同。这意味着如果消息已经成功处理过一次,再次处理相同消息时不会产生副作用。

public class MessageConsumer {
    private static final String QUEUE_NAME = "my_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                
                // 检查消息是否已经处理过,如果处理过则不再处理
                if (!isMessageProcessed(message)) {
                    processMessage(message);
                    markMessageAsProcessed(message);
                }

                System.out.println("Received: " + message);
            };

            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
        }
    }

    private static boolean isMessageProcessed(String message) {
        // 检查消息是否已经被处理
        // 可以使用数据库、缓存或文件等方式记录已处理的消息
        return false;
    }

    private static void markMessageAsProcessed(String message) {
        // 标记消息为已处理
        // 同样可以使用数据库、缓存或文件等方式记录已处理的消息
    }

    private static void processMessage(String message) {
        // 实际消息处理逻辑
        System.out.println("Processing message: " + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

消息去重

另一种解决重复消费问题的方法是使用消息去重机制。这种方法通过记录已经消费过的消息,然后在消息到达时检查它是否已经在记录中存在,从而避免重复处理。

public class MessageConsumer {
    private static final String QUEUE_NAME = "my_queue";
    private static Set<String> processedMessages = new HashSet<>();

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");

                // 检查消息是否已经处理过,如果处理过则不再处理
                if (!processedMessages.contains(message)) {
                    processMessage(message);
                    processedMessages.add(message);
                }

                System.out.println("Received: " + message);
            };

            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
        }
    }

    private static void processMessage(String message) {
        // 实际消息处理逻辑
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

如何保证RabbitMQ的高可用?

要实现RabbitMQ的高可用无外乎下面两点:
做好交换机、队列、消息的持久化
搭建RabbitMQ的镜像集群,做好主从备份。当然也可以使用仲裁队列代替镜像集群。

使用MQ可以解决那些问题?

RabbitMQ能解决的问题很多,例如:
解耦合:将几个业务关联的微服务调用修改为基于MQ的异步通知,可以解除微服务之间的业务耦合。同时还提高了业务性能。
流量削峰:将突发的业务请求放入MQ中,作为缓冲区。后端的业务根据自己的处理能力从MQ中获取消息,逐个处理任务。流量曲线变的平滑很多
延迟队列:基于RabbitMQ的死信队列或者DelayExchange插件,可以实现消息发送后,延迟接收的效果。

RabbitMQ模型?

以下两种模型一条消息,只能被一个consumer消费

队列类型解释举例
BasicQueue 简单队列模型只有1个消费者
WorkQueue任务模型多个消费者 但是只有1个消费者消费消息多个消费端绑定一个queue,@RabbitListener(queues = “simple.queue”) listener1和@RabbitListener(queues = “simple.queue”) listener2

发布/订阅(以下三种模式可以多个消费者同时消费

交换机类型解释举例
Fanout广播模式将消息交给所有绑定这个交换机的队列不同queue绑定一个Exchange
Direct定向模式把消息交给符合指定routing key的队列比如一个消费端key = {“red”, “blue”},另一个消费端key = {“red”, “yellow”},当发送端rabbitTemplate.convertAndSend(exchangeName, “red”, message)时,两个消费端都可以收到消息
Topic通配符模式判断routing key的规则是模糊匹配模式消费端1:key = “china.#” 消费端2:key = “#.news”,发送端:rabbitTemplate.convertAndSend(exchangeName, “china.news”, message); #:代表0个或多个词*:代表1个词
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/513271
推荐阅读
相关标签
  

闽ICP备14008679号