当前位置:   article > 正文

RabbitMQ_rabbitmq 消费

rabbitmq 消费

一、基础概念

RabbitMQ是一个消息代理 , 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。

1.1 技术亮点

1 可靠性

RabbitMQ提供了多种技术可以让你在性能和可靠性之间进行权衡。
这些技术包括持久性机制、投递确认、发布者证实和高可用性机制。

思考:高可用如何实现???

回答:通过集群的方式,防止单机问题。

2.灵活的路由

消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用。

比如常用的交换机:topic交换机

3.集群

在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用,保证了RabbitMQ服务器的高可用

RabbitMQ只有两种集群方式:普通集群和镜像队列。

4.高可用的队列

在同一个集群里,队列可以被镜像到多个机器中,以确保当其中某些硬件出现故障后,你的消息仍然安全。

5.多协议

RabbitMQ 支持多种消息协议的消息传递。rabbitmq是通过elang语言来开发的基于amqp协议的。

6.可视化管理工具

RabbitMQ附带了一个易于使用的可视化管理工具,它可以帮助你监控消息代理的每一个环节。

7.追踪
如果你的消息系统有异常行为,RabbitMQ还提供了追踪的支持,让你能够发现问题所在。

如何实现的?????

1.2 AMQP协议

1.AMQP是什么?
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。

2.消息代理(broker)

消息代理及所扮演的角色

消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。

由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以存在于不同的设备上。

3.AMQP 0-9-1 模型简介

AMQP 0-9-1的工作过程如下图:消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

在这里插入图片描述
队列,交换机和绑定统称为AMQP实体(AMQP entities),应用程序(Applications)声明AMQP实体,定义需要的路由方案,或者删除不再需要的AMQP实体。

AMQP的核心概念
1:server :又称为broker,接受客户端连接,实现amqp实体服务
2:Connection: 连接,应用程序与brokder建立网络连接
3:channel:网络通道,几乎所有的操作都是在channel中进行的,是进行消息对象的通道,客户端可以建立 多个通道,每一个channel表示一个会话任务
4:Message: 服务器和应用程序之间传递数据的载体,有properties(消息属性,用来修饰消息,比如消息的优 先级,延时投递)和Body(消息体)
5:virtual host(虚拟主机): 是一个逻辑概念,最上层的消息路由,一个虚拟主机中可以包含多个exhange 和 queue 但是一个虚拟主机中不能有名称相同的exchange 和queue
6:exchange 交换机: 消息直接投递到交换机上,然后交换机根据消息的路由key 来路由到对应绑定的队列上
7:baingding: 绑定 exchange 与queue的虚拟连接,bingding中可以包含route_key
8:route_key 路由key ,他的作用是在交换机上通过route_key来把消息路由到哪个队列上
9:queue:队列,用于来保存消息的载体,有消费者监听,然后消费消息

1.3 RabbitMQ高级特性

1.过期时间(TTL)
Time To Live,也就是生存时间,是一条消息在队列中的最大存活时间,单位是毫秒

2.消息确认

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制。当autoAck为false的时候,RabbitMQ会等待消费者回复的确认信号,收到确认信号之后才从内存或者磁盘中删除消息。

思考:
1.autoAck为true的实现机制是怎样的???
回答:直接默认为已消费了该消息

3.持久化

持久化可以防止在异常情况下丢失数据。RabbitMQ的持久化分为三个部分:交换器持久化、队列持久化和消息的持久化。

持久化的利弊:

利:可以防止在异常情况下丢失数据
弊:因为磁盘的写入速度比内存的写入要慢得多,会影响RabbitMQ的性能降低了吞吐量。

4.死信队列

当消息在一个队列中变成死信之后,他能被重新发送到另一个交换器中,这个交换器成为死信交换器,与该交换器绑定的队列称为死信队列。

7.排他性队列(Exclusive Queue)

如果你想创建一个只有自己可见的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。

该队列的特点是:

只对首次声明它的连接(Connection)可见,且在它连接断开的时候自动删除。

1.4 对比其他mq产品缺陷

1.没有消费组

消费速度较慢,一个队列对多个消费者轮训投递。

2.无法分区顺序

1.4 RabbitMQ实战

1.4.2 消费端:


    /**
     * 监听直播消息
     *
     * @param message mq消息内容
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${mq.livereport.queue}", durable = "true",
                    exclusive = "false", autoDelete = "false"),
            exchange = @Exchange(value = "${mq.livereport.exchange}", type = "topic",
                    durable = "true"),
            key = "${livereport.routingkey}"))
    public void onMessage(Message message) {
        onMessageProxy(message);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

项目中大部分使用@RabbitmqListener注解的方式处理业务代码中MQ的消费,这个注解用于监听指定的队列,如果containerFactory未指定,默认使用SimpleRabbitListenerContainerFactory实例对象创建一个消息监听容器(SimpleMessageListenerContainer)
默认情况下,rabbitmq的消费者为单线程串行消费,这也是队列的特性,我们可以看看SimpleMessageListenerContainer的源码,从图中可以看到设置并发消费属性concurrentConsumers=1,从字面意义也可以分析出该字段是设置并发消费者的数量,默认为一个监听器设置一个消费者.

1.4.3 公共配置

@Configuration
public class RabbitMqConfig {


    /**
     * 
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    /**
     * 
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleRabbitListenerContainerFactory jsonConvertListenerFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

二、RabbitMQ架构

2.1 RabbitMQ应用架构

RabbitMQ应用架构图:

在这里插入图片描述

  • Broker:rabbitmq的服务节点.
    它提供一种传输服务,它的角色就是维护一条从生产者到消费者的中间路线,保证数据能按照指定的方式进行传输到队列中。

  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列,常用的有三种交换机 。
    生产者将消息发送到自己配置的Exchange,由交换器将消息路由到一个或多个队列中。如果路由不到,或返回给生产者,或直接丢弃,或做其它处理。

  • Queue:消息的载体,每个消息都会被投到一个或多个队列。通过路由键路由到对应的队列中。

  • Binding:绑定,图中黄线部分。
    它的作用就是把exchange和queue按照路由规则绑定起来.
    通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键BindingKey绑定队列,这样RabbitMQ就可以指定如何正确的路由到队列了。

  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
    生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。这个路由Key需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。在交换器类型和绑定键固定的情况下,生产者可以在发送消息给交换器时通过指定RoutingKey来决定消息流向哪里。

  • Channel信道:信道是建立在Connection 之上的虚拟连接,其原理使用的io多路复用机制,信道可以对比一个事件处理器。
    当应用程序与Rabbit Broker建立TCP连接的时候,客户端紧接着可以创建一个AMQP 信道(Channel) ,每个信道都会被指派一个唯一的ID。RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。
    信道就像电缆里的光纤束。一条电缆内含有许多光纤束,允许所有的连接通过多条光线束进行传输和接收。

  • vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。可以类别mysql对应的database库。

  • Producer:消息生产者,就是投递消息的程序.

  • Consumer:消息消费者,就是接受消息的程序.

1. 消息发布订阅流程

1.发送端将消息发送到交换机

RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。

发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。
在这里插入图片描述
2.交换机将消息路由到队列

交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。

下图能够很好的描述这个场景:

在这里插入图片描述

在这个场景中,我们可以看到直连交换机 X和两个队列进行了绑定。第一个队列使用orange作为绑定键,第二个队列有两个绑定,一个使用black作为绑定键,另外一个使用green。

这样以来,当路由键为orange的消息发布到交换机,就会被路由到队列Q1。路由键为black或者green的消息就会路由到Q2。其他的所有消息都将会被丢弃。

3.消费者订阅队列里面的消息

队列里面收到了交换机路由到的消息后,根据负载均衡轮训策略分发给订阅了该队列的消费者

2.2 RabbitMQ交换机

交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。

除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

Name
Durability (消息代理重启后,交换机是否还存在)
Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
Arguments(依赖代理本身)
exclusive:exclusive=True表示当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。

交换机持久化机制

交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。

交换机四种类型

RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种。

  1. Direct Exchange

该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中。
在这里插入图片描述

  1. Topic Exchange

该类型的交换器将所有发送到Topic Exchange的消息被转发到所有RoutingKey中指定的Topic的队列上面。即BindingKey模糊匹配。

Exchange将RoutingKey和某Topic进行模糊匹配,其中“”用来匹配一个词,“#”用于匹配一个或者多个词。例如“com.#”能匹配到“com.rabbitmq.oa”和“com.rabbitmq”;而"login."只能匹配到“com.rabbitmq”。
在这里插入图片描述

  1. Fanout Exchange

该类型不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好。

在这里插入图片描述

  1. Headers Exchange

该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用。

2.3 RabbitMQ监听容器

核心参数
1.concurrentConsumers:concurrentConsumers是设置并发消费者的个数,可以进行初始化-最大值动态调整,并发消费者可以提高消息的消费能力,防止消息的堆积
2.prefetchCount:prefetchCount指一个消费者每次一次性从broker里面取出的待消费的消息个数,默认值比较吉利prefetchCount=250

2.4 消息转发到消费者机制

RabbitMQ中消息只能存储在队列中。生产者投递消息到队列,消费者从队列中获取消息并消费。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(轮询)给多个消费者进行消费,而不是每个消费者都收到所有的消息进行消费。(注意:

实现广播消息

RabbitMQ不支持队列层面的广播消费,如果需要广播消费,可以采用一个交换器通过路由Key绑定多个队列即Fanout Exchange交换机类型,由多个消费者来依一来订阅队列的方式。

2.5 rabbitmq的工作模式

在这里插入图片描述

三、RabbitMQ核心机制原理

3.1 消息可靠性机制

信道需要设置为 confirm 模式,则所有在信道上发布的消息都会分配一个唯一 ID。

一旦消息被投递到queue(可持久化的消息需要写入磁盘),信道会发送一个确认给生产者(包含消息唯一 ID)。如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(未确认)消息给生产者。

所有被发送的消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做 任何保证,并且同一条消息不会既被 confirm又被nack。

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。
当确认消息到达生产者, 生产者的回调方法会被触发。
ConfirmCallback接口:只确认是否正确到达 Exchange 中,成功到达则回调
ReturnCallback接口:消息路由失败返回时回调。

3.1.1 生产端confirm机制

confirm机制:消息的确认是指生产端投递消息后,若mq-server接受到消息,就会给生产者一个应答。

生产端根据mq broker返回应答来确认该条消息是否正常发送到了broker,这种方式是消息可靠性投递的核心保障

消息确认机制的流程图
在这里插入图片描述

代码实现
那代码如何来做消息的confirm

第一步:在channel上开启确认模式 channel.confirmSelect();
第二部;在channel上增加confirm监听,来监听成功和异常的confirm结果

生产者代码

public static void main(String[] args) 
throws IOException,TimeoutException { 
ConnectionFactory connectionFactory = new ConnectionFactory(); 
connectionFactory.setVirtualHost("cloudmall"); 
connectionFactory.setHost("47.104.128.12"); connectionFactory.setPort(5672); 
Connection connection = connectionFactory.newConnection(); 
Channel channel = connection.createChannel();
 //开启confirm channel.confirmSelect(); 
 channel.addConfirmListener(new ConfirmListener() 
 { /** * 接口成功 ** @param deliveryTag deliveryTag 消息id * @param multiple 是否批量 * @throws IOException */
 @Override 
 public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  System.out.println("消息id" + deliveryTag + "............ack"); 
  }
 @Override 
 public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  System.out.println("消息id" + deliveryTag + "............no ack"); }
});
 channel.basicPublish("test.confirm.exchange", "test.confirm.key", null, "confirm消息".getBytes()); }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

消费者代码:

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("cloudmall"); connectionFactory.setHost("47.104.128.12"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("test.confirm.exchange","topic",true,false,null); channel.queueDeclare("test.confirm.queue",true,false,false,null); channel.queueBind("test.confirm.queue","test.confirm.exchange","test.confirm.#"); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume("test.confirm.queue",true,queueingConsumer); while (true) { QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); System.out.println(new String(delivery.getBody())); }}
  • 1
  • 2

3.1.2 return机制

return机制,用于处理一些不可路由的消息,在一些特殊的情况下,当前的exchange不存在或者指定的路由key路由不到,这时如果我们需要及时监听这种消息,就需要return机制。

其架构如下:

在这里插入图片描述
针对上述二种情况

我们就需要return listener来处理这种不可达的消息.

处理一;若在消息生产端 的mandatory设置为true 那么就会调用生产端ReturnListener 来处理。
处理二;若消息生产端的mandatory设置为false(默认值也是false) 那么mq-broker就会自动删除消息。

3.1.3 消费端ACK机制

1、什么是消息确认ACK。

答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。

2、ACK的消息确认机制。

答:ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。

如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中,并重复投放。

如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。

消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。

消息消费端的ACK确认机制默认是打开的。
    
ACK机制代码演示参考:

RabbitMQ的消息确认ACK机制 https://www.cnblogs.com/biehongli/p/11789098.html

思考

1.ACK频繁失败,将导致什么问题,如何解决?

答:如果忘记了ACK或者中途系统挂了,那么后果很严重。当Consumer退出时候,Message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RabbitMQ会长时间运行,因此这个"内存泄漏"是致命的,即将导致消息堆积。

解决方案:

限制重试次数

2.消息的最终归宿有哪些?

1.成功消费,被删除
2.消费失败,不断在队列中重试
3.消费失败,到达消息的ttl后被丢进死信队列

3.2 消费端限流机制

场景:
首先,我们迎来了订单的高峰期,在mq的broker上堆积了成千上万条消息没有处理,这个时候,我们随便打开了 消费者,就会出现下面情景:
如此多的消息瞬间推送给消费者,我们的消费者不能处理这么多消息 就会导致消费者出现巨大压力,甚至服务器崩溃。

解决方案:
rabbitmq 提供一个钟qos(服务质量保证),也就是在关闭了消费端的自动ack的前提 下,我们可以设置阈值(出队)的消息数没有被确认(手动确认),那么就不会推送 消息过来.
限流的级别(consumer级别或者是channel级别)

运行结果:
在这里插入图片描述
上图中:队列共有5条消息,4条准备中,1条还未收到ack,当收到ack后,队列又可以向消费端推送一条新的消息。

3.3 高并发

根据队列的特性可知,如果阻塞队列中一个消息阻塞了,那么所有消息将会被阻塞,如果使用
默认设置,concurrentConsumer=1,prefetchCount=250,那么当消费者队列中有一个消息由于某种原因阻塞了,那么该消息的后续消息同样不能被消费.

为了防止这种情况的发生,我们可以增大concurrentConsumer的设置,使多个消费者可以并发消费.

而prefetchCount该如何设置呢?
假设conrrentConsumer=2,prefetchCount采用默认值,rabbitmq容器会初始化两个并发的消费者,每个消费者的阻塞队列大小为250,

rabbitmq的机制是将消息投递给consumer1,先为consumer1投递满250个message,再往consumer2投递250个message,如果consumer1的message一直小于250个,consumer2一直处于空闲状态,那么并发消费退化为单消费者.

3.4 死信队列

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key,死信队列只不过是绑定在死信交换机上的队列,死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】

1.消息变成死信的几种情况

  1. 消息被拒绝:(basic.reject/basic.nack)并且requeue(重回队列)的属性设置为 false表示不需要重回队列,那么该消息就是一个死信消息。
  2. 消息TTL过期: 消息在队列的存活时间超过设置的TTL时间。
  3. 队列达到最大长度:比如队列最大长度是3000 ,那么3001消息就会被送到死信队列上.

2.用途场景

最经典的应用场景:超时未支付订单处理

需求分析:

超过60分钟未支付的订单,我们需要进行超时订单的处理:先调用微信支付api,查询该订单的支付状态。如果未支付调用关闭业务订单的api,并修改订单状态为已关闭,并回滚库存数。如果该订单已经支付,做补偿操作(修改订单状态和记录)。

实现思路 :

如何获取超过60分钟的订单?我们目前有两种实现方案

(1)定时任务轮询方案

编写定时任务,查询所有60分钟前创建的订单列表。 循环此订单列表,查询每个订单的支付状态。如果已支付进行状态补偿,如果未支付则关闭订单。

这种实现方案缺点是时间精度不高,对系统压力比较大。 (为什么时间精度不高?因为时间段交叉了)

(2)使用延迟消息队列

所谓延迟消息队列,就是消息的生产者发送的消息并不会立刻被消费,而是在设定的时间之后才可以消费。

我们可以在订单创建时发送一个延迟消息,消息为订单号,系统会在60分钟后取出这个消息,然后查询订单的支付状态,根据结果做出相应的处理。

系统的设计流程图:
在这里插入图片描述

参考:RabbitMQ死信队列应用
https://blog.csdn.net/huxiang19851114/article/details/113695957

1.异常重试:
当发生未知异常的时候,消息不能够被消费者正常消费,则把他转发到其他队列里面并设置消息的ttl. 如果ttl到期了,则将加入到了死信队列中。

后续的程序可以进行异常分析和记录,并进行失败重试。死信队列的失败重试和正常队列是相互独立的,所以不会影响到正常主流程。故可以有效避免消息堆积。

例子:
1.积分获取如果发生来未知异常,则可以丢进死信队列,进行补偿重试。

3.5.延迟队列

一般的队列,消息一旦进入队列就会被消费者立即消费。

延迟队列就是进入该队列的消息会被消费者延迟消费,延迟队列中存储的对象是的延迟消息,“延迟消息”是指当消息被发送以后,等待特定的时间后,消费者才能拿到这个消息进行消费。

在RabbitMQ中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。

实现思路:

我们创建一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是 毫秒),那所在这个队列的消息在5秒后会消失,进入死信队列。

最后消费者监听死信交换器绑定的队列,而不要监听消息发送的队列,这样就可以实现消息的延迟消费。达到延迟队列的目的。

具体操作:

需要创建四个角色:

order_exchange:订单交换机

order_queue:订单队列

dlx.exchange:死信交换机

dlx.queue:死信队列(存放无法消费的消息)

绑定关系:

order_exchange和order_queue通过order.#路由key绑定;

dlx.exchange和dlx.queue通过#路由key绑定

唯一的区别在于order_queue的创建:

设置消息过期参数 x-message-ttl:10000(10秒);

设置死信交换机 x-dead-letter-exchange:dlx.exchange

在这里插入图片描述
测试:通过order_exchange交换机发送一个消息,等待10S,发现一开始消息进入了order_queue;10S后消息进入死信队列dlx.queue

用途场景:比如消费者从队列里面消费消息失败了,可以延迟一段时间以后进行重试。

参考链接:https://blog.csdn.net/huxiang19851114/article/details/113695957

3.6 事务消息

事务的实现主要是对信道(Channel)的设置。

  1. channel.txSelect();通知服务器开启事务模式;mq服务端会返回Tx.Select-Ok之后,才能发下一条消息。
  2. channel.basicPublish;发送消息,可以是多条,可以是消费消息提交ack
  3. channel.txCommit()提交事务;
  4. channel.txRollback()回滚事务;

消费者使用事务:

  1. autoAck=false,手动提交ack,以事务提交或回滚为准来判断是否删除消息
  2. autoAck=true,不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了。

如果其中任意一个环节出现问题,就会抛出IoException异常,用户可以拦截异常进行事务回滚,或决定要不要重复消息。

事务消息会降低rabbitmq的性能。

3.6 rabbitmq的普通集群

1.普通集群架构

普通集群模式就是将多台Rabbit MQ服务器连接组成一个集群,在连接过程中需要正确的Erlang Cookie和节点名称才能保证机器之间相互进行连接访问,并且集群需要要局域网内进行部署。
在这里插入图片描述

在这里插入图片描述

1.元数据

Rabbit MQ的集群不是每个节点都有所有队列的完全拷贝。

交换机的元数据信息在所有节点上都是一致的,但是存放消息的队列的完整信息都只存在它所创建的节点上,所有其他节点只知道队列的元数据和指向该队列存在的那个节点的指针,元数据信息包括以下内容:

  • 队列元数据:队列名称和它的属性
  • 交换器元数据:交换器名称、类型和属性
  • 绑定元数据:route key,一张简单的表格展示了如何将消息路由到队列
  • vhost元数据:为vhost内的队列、交换器和绑定提供命名空间和安全属性

因此,当用户访问其中任何一个RabbitMQ节点时,通过rabbitmqctl查询到的queue/user/exchange/ vhost等信息都是相同的。

注意:队列只同步元数据信息,不会同步存储的消息,消息只会存在于创建该队列的节点上,其它节点只知道这个队列的元数据信息和一个指向队列的owner node的地址。

2.为何RabbitMQ集群仅采用元数据同步的方式?

RabbitMQ这么设计主要是基于集群本身的性能和存储空间上来考虑。
第一,存储空间,如果每个集群节点都拥有所有Queue的完全数据拷贝,那么每个节点的存储空间会非常大,集群的消息积压能力会非常弱(无法通过集群节点的扩容提高消息积压能力);
第二,性能,消息的发布者需要将消息复制到每一个集群节点,对于持久化消息,网络和磁盘同步复制的开销都会明显增加。

故:普通集群模式无法解决高可用问题,因为一个节点挂了,它的队列消息如果没有持久化到磁盘,该队列的消息就丢失了。

2.集群读写策略

  • 客户端连接的是队列数据所在节点:直接根据路由规则路由到相应队列中
  • 客户端连接的是非队列数据所在节点:则该节点会进行路由转发,包括发送和消费,转发到含有队列消息的节点。

以三个节点(node1、node2、node3)的集群为例来进行说明。

消息实体是存在于队列之中的,而节点之间只有相同的元数据信息,假设消息存在于node1节点的A队列上,当消费者从node2节点上的B队列消费时,这时RabbitMQ会临时在node1和node2节点进行消息传输,把A队列上的消息实体传到B队列上,然后发送给消费者。

这个过程其实会对node1节点产生性能瓶颈,因为无论consumer连node1或node2,都会从node1拉取数据。

针对这种情况,有一个中庸的做法就是将consumer尽量连接每一个节点。

3.集群节点类型

磁盘节点:将配置信息和元信息存储在磁盘上。
内存节点:将配置信息和元信息存储在内存中。性能优于磁盘节点。依赖磁盘节点进行持久化,但如果节点挂了,里面的信息将丢失。

RabbitMQ要求集群中至少有一个磁盘节点,当节点加入和离开集群时,必须通知磁盘节点(如果集群中唯一的磁盘节点崩溃了,则不能进行创建队列、创建交换器、创建绑定、添加用户、更改权限、添加和删除集群节点)。

如果唯一磁盘的磁盘节点崩溃,集群是可以保持运行的,但不能更改任何东西。因此建议在集群中设置两个磁盘节点,只要一个可以,就能正常操作。

1.集群节点异常处理

当集群节点崩溃时,该节点的队列进程和关联的绑定都会消失。附加在那些队列上的消费者 会丢失其所订阅的信息 井且任何匹配该队列绑定信息的新消息也都会消失。那么面临这种情况应该如何处理呢:

1.持久化处理,当该节点重启的时候可以再次获取到该消息。
2.使用镜像模式,就是指创建一个镜像节点,镜像节点保存有崩溃节点的所有信息,那么该节点崩溃时,镜像节点可以接替它继续工作,直至崩溃节点重启。

4.集群优劣分析

优点

这种方式只是通过集群部署的方式提高了消息的吞吐量可以存储更多的消息,但是并没有考虑到高可用.

缺点:

1: 无法保证高可用,元数据节点宕机了,所在队列将无法进行消息的读写。
2.可能造成消息丢失,元数据节点宕机了但还没有同步到磁盘,但如果使用ack确认机制,还是可以保证消息的不丢失。

1.RabbitMQ集群(普通队列)原理详解 https://www.cnblogs.com/zhouganqing/p/14818122.html

思考

1.普通集群模式是否可以保证高可用?

不能,如果真正有数据的那个queue的实例宕机了. 那么其他的实例就无法进行数据的拉取.

3.6 rabbitmq的镜像集群

1.镜像集群架构

rabbitmq镜像集群依赖于普通集群,所以需要先搭建rabbitmq普通集群

镜像集群模式其实就是把需要的队列做成镜像队列,然后将镜像队列放在多个节点当中,这种镜像集群模式解决了普通集群模式没有做到的高可用性的缺点,镜像集群模式属于Rabbit MQ的高可用性的集群部署方案。

普通队列进程及其内容仅仅维持在单个节点之上,所以一个节点的失效表现为其对应的队列不可用。

引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能够自动切换到镜像中的另一个节点上以保证服务的可用性。
针对每个队列的(以下简称镜像队列)都包含一个主节点(master)和若干个从节点(slave)

在这里插入图片描述
由图可知master和slave组成了一个链表结构。

1.GM消费组

GM负责消息的广播,所有的GM组成gm_group,形成链表结构,负责监听相邻节点的状态,以及传递消息到相邻节点,master的GM收到最后一个从节点消息时,代表消息同步完成。
在这里插入图片描述

2.mirror_queue_master/slave

mirror_queue_master/slave负责消息的处理,操作blockingQueue,Queue负责AMQP协议(commit、rollback、ack等)

2.集群读写策略

master提供读写服务,在slave上的操作都会路由到master上,slave只做备份-主备切换。

当消费者与master队列建立连接,消费者可以直接从master队列上获取信息,当消费者与slave队列建立连接呢?消费者是从slave队列直接获取数据的吗?当然不是的,消息的流转顺序如下所示:

1.slave队列先将消费者的请求转发给master队列
2.然后再由master队列准备好数据返回给slave队列
3.最后由slave队列将消息返回给消费者

那这样就会有一个疑问?消费者的请求都是由master队列进行处理的,那么消息的负载是不是不能够做到有效的均衡呢?

在这里插入图片描述
集群中的每个 Broker 节点都包含 1 个队列的 master 和 2 个队列的 slave, Q1 的负载大多都在 broker1 上,Q2 的负载大多都集中在 broker2 上,Q3 的负载大多都集中在 broker3 上,只要确保队列的 master 节点均匀散落在集群中的各个 Broker 节点即可确保很大程度的负载均衡。

3.主从同步策略

slave会准确地按照maste执行命令地顺序进行动作,故slave和master上维护的状态应该是相同的。

4.集群恢复机制

如果master由于某种原因失效,那么“资历最老”(基于slave加入cluster的时间排序,即最靠近master链表的slave)的slave会被提升为新的master。

发送到镜像队列的所有消息会被同时发往 master和所有的slave上,如果此时master挂掉了,消息还会在slave上,这样slave提升为 master的时候消息也不会丢失。

当slave挂掉之后,除了与slave相连的客户端连接全部断开,没有其他影响
当master挂掉之后,会有以下影响:

1.与master连接的客户端连接全部断开;

2.选举最老的slave作为新的master,因为最老的slave与旧的master之间的同步状态应该是最好的。如果此时所有slave处于未同步状态,则未同步的消息会丢失;

3.新的master重新入队所有unack的消息,因为新的slave无法区分这些unack的消息是否己经到达客户端,或者是ack信息丢失在老的master链路上,再或者是丢失在老的master 组播ack消息到所有slave的链路上,所以出于消息可靠性的考虑,重新入队所有unack的消息,不过此时客户端可能会有重复消息;

4.如果客户端连接着slave,并且Basic.Consume 消费时指定了x-cancel-on-ha-failover 参数,那么断开之时客户端会收到一个Consumer Cancellation Notification的通知,消费者客户端中会回调Consumer接口的handleCancel方法。如果未指定x-cancel-on-ha-failover参数,那么消费者将无法感知 master 宕机

当所有slave都出现未同步状态,并且ha-prornote-on-shutdown设置为when-synced(默认)时,如果master因为主动原因停掉,比如通过rabbitrnqctl stop命令或者优雅关闭操作系统,那么slave不会接管master,也就是此时镜像队列不可用。但是如果master因为被动原因停掉,比如Erlang虚拟机或者操作系统崩溃,那么slave会接管master。这个配置项隐含的价值取向是保证消息可靠不丢失,同时放弃了可用性。如果ha-prornote-on-shutdown设置为always ,那么不论master因为何种原因停止,slave都会接管master,优先保证可用性,不过消息可能会丢失。

5.负载均衡

Rabbit MQ的负载均衡是体现在物理机器层面上的,而不是体现在内存中的队列层面的。这样解释吧,现在有3台物理机,需要创建3个master队列和6个slave队列, 消息的请求负载都在3个master队列上,那么只需要将3个master队列和6个slave队列均匀的分布在3台物理机上,这样在很大程度上实现了每台机器的负载均衡。当然每个master队列消息请求的数量可能会有不同,无法保持绝对的负载均衡。

6.消息的可靠性

RabbitMQ的镜像队列使用 publisher confirm 和事务两种机制来保证其消息的可靠性。

在事务机制中,只有当前事务在全部镜像中执行之后,客户端才会收到 Tx Commit-Ok 的消息。

同样的,在 publisher confirm 机制中,生产者进行当前消息确认的前提是该消息被全部进行所接收了。

7.镜像集群优劣分析

优点

1.镜像队列的引入可以极大地提升 RabbitMQ 的可用性及可靠性,提供了数据冗余备份、避免单点故障的功能,因此推荐在实际应用中为每个重要的队列都配置镜像。

缺点:

1: 性能开销大: 因为需要进行整个集群内部所有实例的数据同步,因此写数据的性能较差,从而导致吞吐量相比其他mq产品较低。

2:提供了数据的冗余备份,会导致存储压力变大,可能会出现IO瓶颈。

3:无法线性扩容: 因为每一个服务器中都包含整个集群服务节点中的所有数据, 这样如果一旦单个服务器节点的容量无法容纳了怎么办?.所以相比于Kafka和rockermq,消息堆积能力更差,不适合大数据的场景。

1.RabbitMQ集群(镜像队列)原理详解 https://www.cnblogs.com/zhouganqing/p/14818939.html

四、应用

4.1 解决分布式事务

实现思路:

在这里插入图片描述

在这里插入图片描述

参考文献

1.rabbitmq官网文档中文 http://rabbitmq.mr-ping.com/

2.RabbitMQ并发消费者关键参数prefetch,concurrency
https://blog.csdn.net/u014439693/article/details/106898188/

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/399214
推荐阅读
相关标签
  

闽ICP备14008679号