赞
踩
2021最新Java面经整理 | 中间件篇(三)RabbitMQ
目录
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ有如下特点,
我们先看下 RabbitMQ 的结构图,如下,图重分两大块:RabbitMQ Server 和 RabbitMQ Client,其中 Client,可以包含生产者(Publisher或Provider)和消费者(Consumer)。
RabbitMQ 结构图中有几个重要的概念:VHost(虚机主机)、Exchange(交换机)、Queue(队列)、Binding(绑定)。整个消息传递的流程都是围绕着几个组件来进行的。生产者把消息发布到Exchange上,然后Exchange把消息路由到与Exchange绑定的Queue队列中,消费者建立与Queue的连接后,消费消息。
1、Broker(服务节点)
RabbitMQ Server服务器,服务节点称为Broker。
虚拟主机,标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
交换机,用来接收生产者(producer)发送的消息,并将这些消息路由给服务器中的队列(Queue)。
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
绑定,用于交换机(Exchange)和消息队列(Queue)之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
信道,多路复用连接中的一条独立的双向数据流通道。新到是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过新到发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接(一个TCP链接,可以包含多个信道)。
Producer消息生产者,也是一个向交换器发布消息的客户端应用程序。
消息消费者,表示一个从一个消息队列中取得消息的客户端应用程序。
网络连接,比如一个TCP连接。一个consumer需要和broker建立连接,以获取队列中的消息。
消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息的路由模式)等。
Exchange分发消息时,根据交换机的类型不同,分发策略也不同。常见的交换机类型有四种:direct、fanout、topic、headers(headers匹配AMQP消息的header而不是路由键(Routing-key),此外headers交换器和direct交换器完全一致,但是性能差了很多,很少使用)。
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
上图所示,生产者(P)生产消息1将消息1推送到Exchange,由于Exchange Type=fanout这时候会遵循fanout的规则将消息推送到所有与它绑定Queue,也就是图上的两个Queue最后两个消费者消费。
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
当生产者(P)发送消息时Rotuing key=booking时,这时候将消息传送给Exchange,Exchange获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的Queue,这时发现Queue1和Queue2都符合,就会将消息传送给这两个队列,如果我们以Rotuing key=create和Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。
前面提到的direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。它的约定是:
当生产者发送消息Routing Key=F.C.E的时候,这时候只满足Queue1,所以会被路由到Queue1中,如果Routing Key=A.C.E这时候会被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中。
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
此外headers交换器和direct交换器完全一致,但是性能差了很多,很少使用。
RabbitMQ 的消息策略是怎样的呢?消息是怎样传递的呢?首先明确一点就是生产者产生的消息并不是直接发送给消息队列Queue的,而是要经过Exchange(交换器),由Exchange再将消息路由到一个或多个Queue,这里还会对不符合路由规则(由Exchange Type决定规则)的消息进行丢弃掉。RabbitMQ 通过Binding将Exchange和Queue链接在一起,然后将消息准确的推送到对应的Queue中。
1)消息的传递
2)结构图
要解释清楚 RabbitMQ 整个消息的传递过程,我们需要上面两张图来看,分两步:发送消息和消费消息。
Producer(生产者)向 RabbitMQ Server(Broker)发送消息,
生产者需要关心:Exchange、Queue、Binding。
Consumer(消费者)向 RabbitMQ Server(Broker)订阅消息,
消费者只需要关心:Queue。
重启RabbitMQ后,队列和交换器都会丢失(随同里面的消息),原因在于每个队列和交换器的durable属性,该属性默认为false。RabbitMQ提供了durable属性来实现持久化,保证断电后消息不丢失。RabbitMQ 的持久化分:交换机、队列持久化和消息持久化。
所以,RabbitMQ 的消息持久化,需要做到以下三点:
注意,如果原先有非持久的交换器或者队列,需要删除后才可重新创建,否则就创建其他名称的交换器或者队列,代码如下:
- //声明持久交换器
- channel.ExchangeDeclare(
- "HelloExchange", //交换器名称
- ExchangeType.Direct,//交换器类型
- true, //是否持久话
- false, //是否自动删除
- null //关于交换器的详细设置,键值对形式
- );
- //声明持久队列
- channel.QueueDeclare(
- "HelloQueue",//队列名称
- true, //是否持久化
- false, //是否只对首次声明的队列可见
- false, //是否自动删除
- null 关于队列和队列内消息的详细设置,键值对形式
- );
- //发布持久消息
- string msg_str = "这是生产者第一次发布的消息";
- IBasicProperties msg_pro = channel.CreateBasicProperties();
- msg_pro.ContentType = "text/plain";//发布的数据类型
- msg_pro.DeliveryMode = 2;//标记持久化
RabbitMQ 的持久化机制是:把持久化的数据写入磁盘上的一个持久化日志文件,在做数据恢复时,从磁盘读取持久化的数据重建。当发布一条持久化的消息到持久化的交换机上时,RabbitMQ 会在消息提交到日志文件后才发送响应。如果RabbitMQ重启,服务器会自动重建交换机和队列,重播持久性日志文件中的消息到合适的队列或者交换机上。
消息持久化对RabbitMQ的性能有较大影响,写入磁盘要比写入内存慢很多,而且会极大的减少RabbitMQ服务器每秒可处理的消息总数,导致消息吞度量降低至少10倍的情况并不少见。持久化消息在RabbitMQ内建集群环境中工作的并不好,实际上集群上的队列均匀分布在各个节点上而且没有冗余,如果运行a队列的节点崩溃了,那么直到节点恢复前,这个队列就从整个集群消失了,而且这个节点上的所有队列不可用,而且持久化队列也无法重建。
RabbitMQ 支持 AMQP 事务,来处理消息丢失的情况(确认机制比事务更轻量)。AMQP事务与数据库事务不同。
AMQP事务:提供的一种保证消息成功投递的方式,通过将信道开启事务模式后,利用信道 Channel 的三个命令来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递。
RabbitMQ的事务非常消耗性能,不但会降低大约2-10倍的消息吞度量,而且会使生产者应用程序之间产生同步,与使用MQ解耦异步系统的初衷相背离。
相比较事务模式,RabbitMQ 提供了更好的方案来保证消息投递:发送方确认模式。
和事务类似,我们需要将信道 channel 设置为 confirm 模式,而且只能通过重新创建信道来关闭该设置。一旦信道进入 confirm 模式,所有的信道上发布的消息都会被指派一个唯一的ID。当消息被投递到队列后,信道就会发送一个发送方确认模式给生产者程序,使得生产者知道消息安全到达队列了。如果发送的消息丢失,RabbitMQ会发送一条nack消息,告诉生产者消息丢失,生产者会再次发送消息(re-publish)。发送发确认模式最大的好处是它们是异步的,没有回滚的概念,更加轻量级,对性能的影响也几乎忽略不计。
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者 (包含消息的唯一ID) ,这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag 域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
开启confirm模式,
- // 创建连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUsername(config.UserName);
- factory.setPassword(config.Password);
- factory.setVirtualHost(config.VHost);
- factory.setHost(config.Host);
- factory.setPort(config.Port);
- Connection conn = factory.newConnection();
- // 创建信道
- Channel channel = conn.createChannel();
- // 声明队列
- channel.queueDeclare(config.QueueName, false, false, false, null);
- // 开启发送方确认模式
- channel.confirmSelect();
1)Confirm 普通模式(单条)
每发送一条消息,调用 channel.waitForConfirms() 方法等待服务端confirm,这实际上是一种串行的confirm,每publish一条消息之后就等待服务端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。
- // 开启发送方确认模式
- channel.confirmSelect();
- String message = String.format("时间 => %s", new Date().getTime());
- channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
- if (channel.waitForConfirms()) { // confirm 普通单条
- System.out.println("消息发送成功" );
- }
2)Confirm 批量模式(批量)
每发送一批消息之后,调用 channel.waitForConfirmsOrDie() 方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm效率,但是如果一旦出现confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降。
- // 开启发送方确认模式
- channel.confirmSelect();
- for (int i = 0; i < 10; i++) {
- String message = String.format("时间 => %s", new Date().getTime());
- channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
- }
- channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
- System.out.println("全部执行完成");
3)Confirm 异步模式(异步)
RabbitMQ 使用 channel.addConfirmListener()异步监听,异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可。消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认。
- // 开启发送方确认模式
- channel.confirmSelect();
- for (int i = 0; i < 10; i++) {
- String message = String.format("时间 => %s", new Date().getTime());
- channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
- }
- //异步监听确认和未确认的消息
- channel.addConfirmListener(new ConfirmListener() {
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("未确认消息,标识:" + deliveryTag);
- }
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
- }
- });
有测试总结:Confirm批量确定和Confirm异步模式性能相差不大,Confirm模式要比事务快10倍左右。
“死信”是RabbitMQ中的一种消息机制。死信队列(死信交换机),又称 dead-letter-exchange(DLX)。当一条消息在一个队列中变成死信后,它会被重新发布到一个交换机中,这个交换机就是 DLX。
队列中的消息在以下三种情况下会变成死信,
消息进入死信队列的过程:消息 -> 队列 (触发以上条件)-> DLX交换机 -> DLK队列。
RabbitMQ 中存在两种方可设置消息的过期时间,
如果同时使用这两种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个死信消息。
RabbitMQ 中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。
1)场景
订单下单10秒后,若用户没有付款,则系统自动取消订单。
2)分析
以上适合使用延时队列解决,RabbitMQ 的延时队列可以由 过期消息+死信队列 来实现。
不使用传统的轮询方式,优势:若数据库数据量大,则定时轮询就会特别消耗资源,拖垮服务器,且响应慢。
3)实现
大致流程,
有的业务场景需要我们,保证RabbitMQ消息的顺序性。当 RabbitMQ 中一个queue对应多个consumer的时候,无法保证消息消费的顺序性。解决方案:
1)原queue拆分成多个queue,一个queue对应一个consumer,就是多一些queue而已,确实是麻烦点;
2)或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理;
其他:针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中。然后只用一个消费者去消费该队列。同一个queue里的消息一定是顺序消息的。我的观点是保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。例如B消息的业务应该保证在A消息后业务后执行,那么我们保证A消息先进queueA,B消息后进queueB就可以了。
如何保证消息不被重复消费?保证消息不被重复消费的关键是保证消息队列的幂等性,这个问题针对业务场景来答分以下几点:
1)如果你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
2)如果你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
3)如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。
综上,消息队列出现消息重复的原因有多种,消息队列并不能保证消息的唯一,所以我们只能在业务层面上做这些控制。
总之解决的方案很多,要看具体业务场景。
如何解决丢数据的问题?
1)生产者丢数据
生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。
然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
2)消息队列丢数据
处理消息队列丢数据的情况,一般是开启消息持久化。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步:
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
3)消费者丢数据
启用手动确认模式可以解决这个问题,
- 自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。
- 手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。
- 不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。
1、什么是 rabbitmq
采用 AMQP 高级消息队列协议的一种消息队列技术,最大的特点就是消费并不需要确保提供方存在,实现了服务之间的高度解耦
2、为什么要使用 rabbitmq
3、使用 rabbitmq 的场景
4、如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
1)发送方确认模式
将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。
一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。
如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
2)接收方确认机制
消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。
这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;
下面罗列几种特殊情况:
(1)如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
(1)2如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。
5、如何避免消息重复投递或重复消费?
在消息生产时,MQ 内部针对每条生产者发送的消息生成一个 inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重的依据,避免同一条消息被重复消费。
6、消息基于什么传输?
由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。
7、消息如何分发?
若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能。
8、消息怎么路由?
消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。
常用的交换器主要分为一下三种:
9、如何确保消息不丢失?
消息持久化,当然前提是队列必须持久化。
RabbitMQ 确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件,当发布一条持久性消息到持久交换器上时,Rabbit 会在消息提交到日志文件后才发送响应。一旦消费者从持久队列中消费了一条持久化消息,RabbitMQ 会在持久化日志中把这条消息标记为等待垃圾收集。如果持久化消息在被消费之前 RabbitMQ 重启,那么 Rabbit 会自动重建交换器和队列(以及绑定),并重新发布持久化日志文件中的消息到合适的队列。
10、RabbitMQ 有几种广播类型?
11、RabbitMQ 持久化有什么缺点?
持久化的缺地就是降低了服务器的吞吐量,因为使用的是磁盘而非内存存储,从而降低了吞吐量。可尽量使用 ssd 硬盘来缓解吞吐量的问题。
12、RabbitMQ 节点的类型有哪些?
磁盘节点:消息会存储到磁盘。
内存节点:消息都存储在内存中,重启服务器消息丢失,性能高于磁盘类型。
13、RabbitMQ 集群中唯一一个磁盘节点崩溃了会发生什么情况?
如果唯一磁盘的磁盘节点崩溃了,不能进行以下操作:
唯一磁盘节点崩溃了,集群是可以保持运行的,但你不能更改任何东西。
14、RabbitMQ 怎么保证消息的稳定性?
提供了事务的功能和confirm(确认)模式,推荐confirm模式。
15、要保证消息持久化成功的条件有哪些?
以上四个条件都满足才能保证消息持久化成功。
16、RabbitMQ 每个节点是其他节点的完整拷贝吗?为什么?
存储空间的考虑:如果每个节点都拥有所有队列的完全拷贝,这样新增节点不但没有新增存储空间,反而增加了更多的冗余数据;
性能的考虑:如果每条消息都需要完整拷贝到每一个集群节点,那新增节点并没有提升处理消息的能力,最多是保持和单节点相同的性能甚至是更糟。
17、RabbitMQ 中 vhost 的作用是什么?
vhost:每个 RabbitMQ 都能创建很多 vhost,我们称之为虚拟主机,每个虚拟主机其实都是 mini 版的RabbitMQ,它拥有自己的队列,交换器和绑定,拥有自己的权限机制。
18、RabbitMQ 集群搭建需要注意哪些问题?
各节点之间使用“–link”连接,此属性不能忽略。
各节点使用的 erlang cookie 值必须相同,此值相当于“秘钥”的功能,用于各节点的认证。
整个集群中必须包含一个磁盘节点。
19、RabbitMQ 对集群节点停止顺序有要求吗?
RabbitMQ 对集群的停止的顺序是有要求的,应该先关闭内存节点,最后再关闭磁盘节点。如果顺序恰好相反的话,可能会造成消息的丢失。
20、RabbitMQ 的消息是怎么发送的?
首先客户端必须连接到 RabbitMQ 服务器才能发布和消费消息,客户端和 rabbit server 之间会创建一个 tcp 连接,一旦 tcp 打开并通过了认证(认证就是你发送给 rabbit 服务器的用户名和密码),你的客户端和 RabbitMQ 就创建了一条 amqp 信道(channel),信道是创建在“真实” tcp 上的虚拟连接,amqp 命令都是通过信道发送出去的,每个信道都会有一个唯一的 id,不论是发布消息,订阅队列都是通过这个信道完成的。
21、RabbitMQ 怎么避免消息丢失?
把消息持久化磁盘,保证服务器重启消息不丢失。 每个集群中至少有一个物理磁盘,保证消息落入磁盘。
22、RabbitMQ 有哪些重要的组件?
23、如何保证高可用的?RabbitMQ 的集群
1)主备模式
也称为 Warren (兔子窝) 模式。实现 rabbitMQ 的高可用集群,一般在并发和数据量不高的情况下,这种模式非常的好用且简单。也就是一个主/备方案,主节点提供读写,备用节点不提供读写。如果主节点挂了,就切换到备用节点,原来的备用节点升级为主节点提供读写服务,当原来的主节点恢复运行后,原来的主节点就变成备用节点,和 activeMQ 利用 zookeeper 做主/备一样,也可以一主多备。
2)远程模式
远程模式可以实现双活的一种模式,简称 shovel 模式,所谓的 shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联,远距离通信和复制。 Shovel 就是我们可以把消息进行数据中心的复制工作,我们可以跨地域的让两个 MQ 集群互联。
3)镜像模式
非常经典的 mirror 镜像模式,保证 100% 数据不丢失。在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集群模式。mirror 镜像队列,目的是为了保证 rabbitMQ 数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲是 2 - 3 个节点实现数据同步。对于 100% 数据可靠性解决方案,一般是采用 3 个节点。
4)多活模式
也是实现异地数据复制的主流模式,因为 shovel 模式配置比较复杂,所以一般来说,实现异地集群的都是采用这种双活 或者 多活模型来实现的。这种模式需要依赖 rabbitMQ 的 federation 插件,可以实现持续的,可靠的 AMQP 数据通信,多活模式在实际配置与应用非常的简单。rabbitMQ 部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心各部署一套 rabbitMQ 集群,各中心的rabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。
24、如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
消息积压处理办法:临时紧急扩容:
先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。