当前位置:   article > 正文

RabbitMQ — SpringBoot集成RabbitMQ消息队列原理及详细配置

RabbitMQ — SpringBoot集成RabbitMQ消息队列原理及详细配置

1、RabbitMQ消息队列原理剖析

RabbitMQ 是具有代表性的开源消息中间件,当前较多地应用于企业系统内,用于对数据一致性、稳定性和可靠性要求较高的场景中。 RabbitMQ使用Erlang语言来编写的, 并且RabbitMQ是基于AMQP协议的;(AMQP是二进制协议, 提供统一消息服务的应用层标准高级消息队列协议, 是应用层协议的一个开放标准, 为面向消息的中间件设计

mq消息中间件主要在需要进行异步通信的应用情景中使用,例如:

  1. 应用需要确保消息的可靠传递,即使发送消息时接收者由于断电、宕机或 CPU 负载过高等原因不可用,消息也可以在接收者可用时被送达。

  2. 需要在访问量与日俱增、囤积在队列中的消息数日益增长的情况下也能正常运转。

  3. 两个服务在网络不能互通或者应用的路由信息(例如 IP 和端口)不确定的情况下需要通信。例如,两个微服务在不知道对方地址的情况下需要进行通信,则可以通过约定队列名,一个向队列发送消息,一个从队列中收取消息而实现。

  4. 系统组件之间或者应用之间通信较多,需要组件或者应用自身维护彼此的网络连接,而且通信的内容不仅一种。

AMQP协议模型
在这里插入图片描述
AMQP核心概念:
Server : 又称Broker, 接受客户端连接, 实现AMQP实体服务
Connection : 连接, 应用程序与Broker的网络连接
Channel : 网络信道, 几乎所有的操作都在Channel中进行, Channel是进行消息读写的通道。客户端可以建立多个Channel, 每个Channel代表一个会话任务。
Message : 消息, 服务器和应用程序之间传送的数据, 有Properties和Body组成。Properties可以对消息进行修饰, 比如消息的优先级, 延迟等高级特性; Body就是消息体内容。
Virtual Host : 虚拟地址, 用于进行逻辑隔离, 最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue, 同一个Virtual Host里面不能有相同名称的Exchange或Queue
Exchange : 交换机, 用于接收消息, 根据路由键转发消息到绑定的队列
Binding : Exchange和Queue之间的虚拟连接, binding中可以包含routing key
Routing Key : 一个路由规则, 虚拟机可用它来确定如何路由一个特定消息
Queue : 也成Message Queue, 消息队列, 用于保存消息并将它们转发给消费者

总体原理图如下:
在这里插入图片描述

  1. 生产者只需要将消息发送到Exchange即可
  2. 消费者只需要监听对应的消息队列即可
  3. Exchange绑定多个Queue时, 要通过Routing Key进行路由

2、RabbitMQ核心组件以及消息模式

1、Exchange:
如果不指定Exchange的话,RabbitMQ默认使用,(AMQP default)注意一下,需要将routing key等于queue name相同

2、交换机类型:
fanout(效率最好,不需要routing key,routing key如何设置都可以)、direct、topic(#一个或多个,*一个)、headers

3、Auto Delete:
当最后一个Binding到Exchange的Queue删除之后,自动删除该Exchange

4、Binding:
Exchange和Queue之间的连接关系,Exchange之间也可以Binding

5、Queue:
实际物理上存储消息的

6、Durability:
是否持久化,Durable:是,即使服务器重启,这个队列也不会消失,Transient:否

7、Exclusive:
这个queue只能由一个exchange监听restricted to this connection,使用场景:顺序消费

8、Message:
由properties(有消息优先级、延迟等特性)和Body(Payload消息内容)组成,还有content_type、content_encoding、priority、correlation_id、reply_to、expiration、message_id等属性

3、SpringBoot集成RabbitMQ

3.1 引入依赖:spring-boot-starter-amqp
<!-- rabbitmq已经被spring-boot做了整合访问实现。spring cloud也对springboot做了整合逻辑。所以rabbitmq的依赖可以在spring cloud中直接使用。
 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
3.2 application.yml配置
spring:
  rabbitmq:
    username: guest
    password: guest
    addresses: localhost:5672
    listener:
      type: simple
      simple:
        concurrency: 5
        max-concurrency: 20
        acknowledge-mode: manual #设置手动确认
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000ms #尝试时间间隔
        default-requeue-rejected: false #重试失败后是否回队
        prefetch: 2
    connection-timeout: 5000ms
    cache:
      channel:
        size: 5
    publisher-confirms: true #发布者消息确认
    publisher-returns: true  #发布者消息回调
    template:
      retry:
        enabled: true
        max-attempts: 3
        initial-interval: 1000ms #尝试时间间隔
    virtual-host: /

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
3.3 定义RabbitMqConfig

此类定义了公用的mq常量和队列,交换机,死信队列等bean初始化和绑定

@Configuration
public class RabbitMqConfig{

    /**
     * Queue Name
     */
    public static final String TEST_QUEUE = "test.rabbit.queue";

    /**
     * The Topic Exchange Name
     */
    public static final String TEST_TOPIC_EXCHANGE = "test-topic-exchange";

    /**
     * Route Key
     */
    public static final String TEST_ROUTE_KEY = "test.rabbit.route";

    /**
     * Dead Queue Name
     */
    public static final String DEAD_QUEUE_NAME = "default.dead.queue";

    /**
     * The Direct Exchange Name of Dead
     */
    public static final String DEAD_EXCHANGE_NAME = "default.dead.exchange";

    /**
     * Default Dead Route
     */
    public static final String DEAD_ROUTE_KEY = "default.dead.route";
    
    /*--------------------------------------------------*/
    
    /**
     * 创建一个持久的队列,用于死信队列
     * @return
     */
    @Bean
    public Queue deadLetterQueue (){
        return new Queue(DEAD_QUEUE_NAME,true);
    }

    /**
     * 创建死信队列交换机
     * @return
     */
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_EXCHANGE_NAME,true,false);
    }
	/**
	* 当消息在一个队列中变成死信(dead message)之后, 它能被重新publish到另一个Exchange, 这个Exchange就是DLX
	* 设置死信队列的exchange和queue, 然后进行绑定
	*/
    @Bean
    public Binding deadQueueBinding (@Qualifier("deadLetterQueue") Queue deadLetterQueue,
                                     @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_ROUTE_KEY);
    }

  	/*--------------------------------------------------*/


    /**
     * 队列
     * @return
     */
    @Bean(name = "testQueue")
    public Queue flightSearchQueue() {
        Map<String, Object> deadLetterMap = new HashMap<>();
        //设置死信交换机
        deadLetterMap.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        //设置死信routingKey
        deadLetterMap.put("x-dead-letter-routing-key", DEAD_ROUTE_KEY);
        return new Queue(QUEUE_NAME,false,false,false,deadLetterMap);
    }

    /**
     * 交换机
     * @return
     */
    @Bean(name = "testExchange")
    public TopicExchange exchange() {
        return new TopicExchange(TOPIC_EXCHANGE_NAME);
    }

    @Bean
    public Binding bindingExchangeMessage(@Qualifier("testQueue") Queue testQueue,
                                          @Qualifier("testExchange") TopicExchange testExchange) {
        return BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTE_KEY );
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
3.4 消息确认与投递失败处理

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两个选项用来控制消息的投递可靠性模式。

rabbitmq 整个消息投递的路径为:

producer->rabbitmq broker cluster->exchange->queue->consumer
message 从 producer 到 rabbitmq broker cluster 则会返回一个 confirmCallback 。
message 从 exchange->queue 投递失败则会返回一个 returnCallback 。
  • 1
  • 2
  • 3

我们将利用这两个 来callback 控制消息的最终一致性和记录能力;

  • confirmCallback 确认模式
    每个发送的消息都需要配备一个 CorrelationData 相关数据对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。
    消息只要被 rabbitmq broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。
    被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。
  • returnCallback 未投递到queue退回模式
    confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。

在这里我们定义RabbitTemplateConfig类作为统一投递处理,单独处理可直接实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback 进行代码实现:

@Slf4j
@Configuration
public class RabbitTemplateConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean(name = "definedRabbitTemplate")
    public RabbitTemplate rabbitTemplateWithConfirmAndCallBack (){
    
        rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause)
                ->{
                    if(ack){
                        log.debug("RabbitMq send message success");
                    }else {
                        log.error("RabbitMq send message failed, cause:{}", cause);
                    }
                }
        );

        rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)
                -> log.error("Send message:{} to Queue:{} failed",message,routingKey));
        return rabbitTemplate;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
3.5 生产者provider

我们在需要发送消息的服务上定义一个消息投递者,如下demo:

@Component
@Slf4j
public class RabbitMqSender{

    @Autowired
    RabbitTemplate rabbitTemplate;
	//业务类处理,此处模拟流程
    @Autowired
    TestService testService;

    /**
     * 发送消息demo
     * testMessage 模拟业务消息体,作为消息数据载体的类型,必须是Serializable的
     * 如果消息数据载体类型未实现Serializable,在收发消息的时候,都会有异常发生。
     */
    public void sendMsgRequest(TestMessage testMessage) throws IOException {
        try {
            rabbitTemplate.convertAndSend(RabbitConfig.TEST_TOPIC_EXCHANGE,
                    RabbitConfig.TEST_ROUTE_KEY,
                    JsonUtils.serialize(testMessage), //消息进行序列化处理
                    new CorrelationData(UUID.randomUUID().toString()));
            log.debug("RabbitMq send Message success,testMessage content:{}", testMessage);
        } catch (Exception e) {
            log.error("RabbitMq send Message ->{} failed,{}", testMessage, e);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
3.5 消费者consumer

作为消费端,需要考虑到消息的可靠性送达,消息回溯,顺序消费以及日志记录,补偿机制等一系列问题,此处不涉及过多场景,消费端处理还需要基于业务模型进行控制,规划;

  • consumer监听配置
  1. 首先签收模式为手工签收, 用于ACK的手工处理, 这样我们可以保证消息的可靠性送达, 或者在消费端消费失败的时候做一些日志记录, 补偿机制等
  2. 可以设置消费端的监听个数和最大个数, 用于控制消费端的并发情况
  3. @RabbitListener注解 : 消费端监听, 是一个组合注解, 里面可以配置注解 : @Queue, @QueueBinding, @Exchange, 可以通过这个组合注解一次性搞定消费端交换机, 队列, 绑定, 路由, 并且配置监听功能等
@Component
@Slf4j
public class RabbitMqReceive {

    @Autowired
    private TestService testService;
	
    @RabbitListener(queues = {"${test.rabbit.queue}"}, errorHandler = "myRabbitListenerErrorHandler")
    public void receiveMsgRequest(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {

        try {
            TestMessage  testMessage  = JsonUtils.deserialize(message, TestMessage  .class);
            log.debug(MessageFormat.format("Consume the queue {0}", RabbitConfig.TEST_QUEUE));
			//进行相关业务处理
			var response = testService......
			//......
			//......
            if (response.getData() != null) {
                //手动消费成功
                channel.basicAck(tag, false);
            } else {
                //手动消费失败,消息不回发
                channel.basicNack(tag, false, false);
            }
        } catch (Exception e) {
            //手动消费失败,消息不回发
            channel.basicNack(tag, false, false);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • consumer失败处理机制
    定义类MyRabbitListenerErrorHandler 实现 RabbitListenerErrorHandler,进行消费端异常处理handler:
@Component
@Slf4j
public class MyRabbitListenerErrorHandler implements RabbitListenerErrorHandler {

    @Override
    public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) {
        log.error("RabbitMQ execute message->{} failed,cause->{}",amqpMessage,exception.getCause());
        return new Object();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

4、RabbitMQ消息可靠性处理

前面内容,如果consumer未启动,而producer发送了消息。则消息会丢失。如果consumer先启动,创建queue后,producer发送消息可以正常消费。那么当所有的consumer宕机的时候,queue会auto-delete,消息仍旧会丢失。这种情况,消息不可靠。有丢失的可能。

Rabbitmq的消息可靠性处理,分为两部分。

  • 消息不丢失。当consumer全部宕机后,消息不能丢失。 ------持久化解决
  • 消息不会错误消费。当consumer获取消息后,万一consumer在消费消息的过程中发生了异常,如果rabbitmq一旦发送消息给consumer后立刻删除消息,也会有消息丢失的可能。 -------确认机制解决
4.1、消息持久化

@Queue注解中的属性 - autoDelete:当所有消费客户端连接断开后,是否自动删除队列 。true:删除 false:不删除
@Exchange注解中的属性 - autoDelete:当交换器所有的绑定队列都不再使用时,是否自动删除交换器(更粗粒度,不建议)。true:删除 false:不删除

4.2、消息确认机制 ACK - acknowledge

如果在消息处理过程中,消费者的服务器在处理消息时发生异常,那么这条正在处理的消息就很可能没有完成消息的消费,如果RabbitMQ在Consumer消费消息后立刻删除消息,则可能造成数据丢失。为了保证数据的可靠性,RabbitMQ引入了消息确认机制。

  • 消息确认机制是消费者Consumer从RabbitMQ中收到消息并处理完成后,反馈给RabbitMQ的,当RabbitMQ收到确认反馈后才会将此消息从队列中删除。

  • 如果某Consumer在处理消息时出现了网络不稳定,服务器异常等现象时,那么就不会有消息确认反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。

  • 如果在Consumer集群环境下,RabbitMQ未接收到Consumer的确认消息时,会立即将这个消息推送给集群中的其他Consumer,保证不丢失消息。

  • 如果Consumer没有确认反馈,RabbitMQ将永久保存消息。

      消息确认机制默认都是开启状态的,同时不推荐关闭消息确认机制。
      注意:如果Consumer没有处理消息确认,将导致严重后果。
      如:所有的Consumer都没有正常反馈确认信息,并退出监听状态,消息则会永久保存,并处于锁定状态,直到消息被正常消费为止。
      消息的发送者Producer如果持续发送消息到RabbitMQ,那么消息将会堆积,持续占用RabbitMQ所在服务器的内存,导致“内存泄漏”问题。
    
    • 1
    • 2
    • 3
    • 4
4.3、消息确认机制处理方案:
  • 编码异常处理(推荐)

通过编码处理异常的方式,保证消息确认机制正常执行。这种处理方案也可以有效避免消息的重复消费。

异常处理,不是让Consumer编码catch异常后,直接丢弃消息,或反馈ACK确认消息。而是做异常处理的。该抛的异常,还得抛,保证ACK机制的正常执行。或者使用其他的手法,实现消息的再次处理。如:catch代码块中,将未处理成功的消息,重新发送给MQ。如:catch代码中,本地逻辑的重试(使用定时线程池重复执行任务3次。)

  • 配置重试次数处理

通常来说,消息重试3次以上未处理成功,就是Consumer开发出现了严重问题。需要修改Consumer代码,提升版本/打补丁之类的处理方案。

通过全局配置文件,开启消息消费重试机制,配置重试次数。当RabbitMQ未收到Consumer的确认反馈时,会根据配置来决定重试推送消息的次数,当重试次数使用完毕,无论是否收到确认反馈,RabbitMQ都会删除消息,避免内存泄漏的可能。具体配置如下:

#开启重试
spring.rabbitmq.listener.retry.enabled=true
#重试次数,默认为3次
spring.rabbitmq.listener.retry.max-attempts=5
  • 1
  • 2
  • 3
  • 4
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/617115
推荐阅读
相关标签
  

闽ICP备14008679号