赞
踩
RabbitMQ 是一个开源的消息代理和队列服务器,用于通过异步消息传递方式在分布式系统或服务之间进行通信。RabbitMQ 实现了高级消息队列协议(AMQP),同时也支持其他消息协议,如 MQTT 和 STOMP。它由 Erlang 语言编写,因此继承了 Erlang 所具有的高并发、高可用和容错特性。
RabbitMQ 的基本架构包含了以下几个主要组件:
生产者(Producer):
生产者是发送消息的应用程序。它创建消息,并可以将消息发送到交换器。
消费者(Consumer):
消费者是接收消息的应用程序。它从队列中提取消息并进行处理。
交换器(Exchange):
交换器负责接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列。RabbitMQ 支持几种不同类型的交换器:
队列(Queue):
队列是消息最终被消费者接收的地方,它是存放消息的缓冲区。生产者和消费者通常都不直接交互,而是通过队列进行通信。
绑定(Binding):
绑定是交换器和队列之间的链接。它告诉交换器消息应该传递到哪些队列。
虚拟主机(Virtual Host):
虚拟主机提供了一组独立的环境,每个环境都有自己的交换器、队列和绑定。它们用于逻辑上区分和隔离不同应用程序的消息环境。
连接(Connection):
连接是一个 TCP 连接,生产者和消费者通过此连接到 RabbitMQ 服务器。在一个连接内部,可以创建多个通道。
通道(Channel):
通道是在连接内部建立的多路传输会话。每个通道代表一个会话任务。
RabbitMQ 服务器(RabbitMQ broker)通常指的是运行RabbitMQ软件的物理服务器或集群。在分布式模式下,RabbitMQ 可以配置为集群模式,以实现负载均衡和高可用性。
RabbitMQ 是一个强大的中间件,它使得应用程序和服务能够通过消息队列以松耦合的形式交换数据,支持多种消息模式,如工作队列、发布订阅、路由和主题消息等。这为跨多个服务和应用程序的可靠消息传递提供了基础设施。
RabbitMQ支持AMQP协议,也支持MQTT、STOMP等其他协议。
在分布式系统下具备异步,削峰,负载均衡等一系列高级功能;
拥有持久化的机制,进程消息,队列中的信息也可以保存下来。
实现消费者和生产者之间的解耦。
对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定量的限流,利于数据库的操作。
可以使用消息队列达到异步下单的效果,排队中,后台进行逻辑下单。
1. 异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种1.串行的方式;2.并行方式。
(1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。
假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)。
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?
引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。
2. 应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。
传统模式的缺点:
1) 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败;
2) 订单系统与库存系统耦合;
如何解决以上问题呢?引入应用消息队列后的方案,如下图:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
3. 请求削峰
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
可以控制活动的人数;
可以缓解短时间内高流量压垮应用;
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;
秒杀业务根据消息队列中的请求信息,再做后续处理。
4. 日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:
日志采集客户端,负责日志数据采集,定时写受写入Kafka队列;
Kafka消息队列,负责日志数据的接收,存储和转发;
日志处理应用:订阅并消费kafka队列中的日志数据;
(1)Kafka:接收用户日志的消息队列。
(2)Logstash:做日志解析,统一成JSON输出给Elasticsearch。
(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。
(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。
5. 消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。
点对点通讯:
客户端A和客户端B使用同一队列,进行消息通讯。
聊天室通讯:
客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合, A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。
异步:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。
削峰:减少高峰时期对服务器压力。
系统可用性降低
本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;
系统复杂度提高
加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。
一致性问题
A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。
所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用还是得用的。
综上,各种对比之后,有如下建议:
1. 简单模式(Simple Queue)
作用: 生产者将消息发送到队列,消费者从队列中获取消息。每条消息只能被一个消费者消费。
适用场景: 适用于消息量较小且不需要复杂路由逻辑的场景。
2. 工作队列模式(Work Queue)
作用: 通过多个消费者分担任务,达到负载均衡的目的。消息在多个消费者之间分配,每条消息只能被一个消费者处理。
适用场景: 适用于需要并行处理大量独立任务的场景,如图像处理、视频转码等。
3. 发布/订阅模式(publish/subscribe)
作用: 消息发布到交换机,所有绑定到该交换机的队列都会收到消息。常用的交换机类型是fanout。
适用场景: 适用于广播消息的场景,如日志处理、事件通知等。
4. 路由模式(routing)
作用: 消息发布到交换机,并根据路由键将消息发送到相应的队列。常用的交换机类型是direct。
适用场景: 适用于需要对消息分类处理的场景,如日志系统中按严重级别分类的日志处理等。
5. 主题模式(topics)
作用: 消息发布到交换机,并根据路由键(通配符匹配)将消息发送到相应的队列。常用的交换机类型是topic。
适用场景: 适用于需要根据复杂的路由规则进行分类处理的场景,如日志系统中按模块和严重级别分类的日志处理等。
6. RPC模式
作用: 实现远程过程调用,客户端发送请求消息到队列,服务器端处理后返回响应消息。
适用场景: 适用于需要实现远程服务调用的场景,如微服务之间的通信。
7. 发布确认模式
作用: 用于确保消息已经成功地发布并被broker接收。它提供了一种轻量级的方法来确认消息的持久性和可靠性,适用于需要高可靠性消息传递的场景。
适用场景: 金融交易、订单处理等确保消息被可靠地传递和处理,防止丢失的场景。
在RabbitMQ中,消息的路由是通过交换机(Exchange)来实现的。交换机根据不同的路由规则将消息分发到一个或者多个队列。
消息路由的基本流程:
路由键(Routing Key) 可以是字符串,也可以根据交换机的类型以及绑定的规则进行匹配。
RabbitMQ中,有四种交换机类型,分别是直连交换机(Direct Exchange)、扇出交换机(Fanout Exchange)、主题交换机(Topic Exchange)和头交换机(Header Exchange)。
如果没有消息匹配的队列的话,消息会被丢弃或者返回给生产者,根据生产者的配置决定消息的去留。
生产者发送消息时需要指定exchange和route_key ,根据AMQP协议格式构建其他内容,接着序列化为二进制格式发送给Broker,由Broker的exchange根据路由规则将数据分发到不同的Queue中。
RabbitMQ在网络层有Connection和Channel两个概念。
Connection其实对应的就是一条TCP连接,而Channel是Connection中的虚拟链接,一个Connection可以创建多个虚拟连接。
在RabbitMQ中,客户端和Broker之间的通信都是基于Channel维度的,这样可以减少实际的TCP连接数,节省系统资源的使用。
RabbitMQ如果要保证消息的顺序性,首先生产者需要按序发送,确认上一条消息发送成功后再发送下一条,然后需要使得这类消息仅发送到一个队列中,并且只有一一个消费者消费这个队列,且消费者必须要消费完一条消息后,再消费另一条消息,不能是多线程并发消费,这样才能保证消息的顺序性。
如果要全局消息顺序,那么就是全局只使用一个队列,但是一般业务场景不会要求全局顺序,仅要求部分顺序,例如按照订单或者用户维度顺序即可,那么就可以利用用户ID或者订单ID对消息进行分区,即相同的用户(订单)的消息发往一个队列,保证局部顺序可提高并发度。
如何实现幂等操作呢?
消费者在消费消息的时候,添加分布式锁(保证互斥的都行),接着从数据库流水表中查看是否已经有这个唯一ID的消费记录。
如果有的话说明已经被消费过了,即跳过后续的业务执行即可。
如果没有的话说明是第一次消费这条消息,就执行业务逻辑,并将其写入数据库流水表中(和业务处理在同一个事务下,这样保证业务执行成功则流水表一定被插入),然后再释放锁。
这样就能保证消息不会被重复消费!
当然加锁不是必须的,通过数据库唯一索引也可以避免重复数的产生。只不过加锁可以解决高并发下的资源消耗问题(如果没有锁,每个线程都能执行完业务,然后等最后事务提交那一刻由数据库拦截,最后只有一个事务成功提交,其他都需要回滚,这挺费时间和浪费资源)
发送方确认模式:
接收方确认机制:
几种特殊情况:
消息不可靠的情况可能是消息丢失,劫持等原因;
丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;
1. 生产者丢失消息
从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
2. 消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,RabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢?这里顺便说一下吧,其实也很容易,就下面两步
将queue的持久化标识durable设置为true,则代表是一个持久的队列,发送消息的时候将deliveryMode=2,这样设置以后,即使RabbitMQ挂了,重启后也能恢复数据。
3. 消费者丢失消息
消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!
消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;如果这时处理消息失败,就会丢失该消息;
解决方案:处理消息成功后,手动回复确认消息。
RabbitMQ的持久化分为队列/交换机的持久化与消息的持久化两种:
RabbitMQ支持消息的优先级处理。可以通过设置消息的优先级字段来实现,优先级高的消息会优先被消费者消费。此外,还可以通过使用优先级队列来实现更细粒度的优先级控制。
RabbitMQ通过插件支持优先级队列的实现。优先级队列是指根据消息的优先级进行排序和处理的队列。要实现优先级队列,可以使用RabbitMQ的插件“rabbitmq_priority_queue”,它提供了一个优先级队列交换机(Priority Queue Exchange),可以将具有不同优先级的消息路由到不同的队列中。
RabbitMQ通过提供集群功能来处理高并发场景下的性能问题。通过将多个节点组成一个集群,可以实现负载均衡和分发,提高系统的整体处理能力。同时,还可以通过调整各种参数来优化性能,如调整消息的持久化设置、调整交换机和队列的匹配规则等。
RabbitMQ通过插件支持延迟队列的实现。延迟队列是指将消息放入队列中等待一段时间后再进行处理。要实现延迟队列,可以使用RabbitMQ的插件“rabbitmq_delayed_message_exchange”,它提供了一个延迟交换器(Delayed Message Exchange),可以将消息路由到指定的队列中等待指定的延迟时间。
主要有以下三种情况会导致消息进入死信交换机:
死信,顾名思义就是无法被消费的消息,一般来说 Producer 将消息投递到 broker 或者直接丢到 queue 中,Consumer 从 Queue 中取出消息进行消费,但是某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列。
死信队列有其特殊的应用场景,例如用户在商城下单成功并点击去支付的时候,如果在指定的时间内未支付,那么就可以将该下单消息投递到死信队列中,至于后续怎么处理死信队列需要结合具体的应用场景。
不可以, 持久化的操作是将数据写入磁盘中,效率上肯定比写入内存中要慢很多倍。而我们一般用mq会处理很多业务消息,若是所有消息都持久化,压力无疑是巨大的。所以持久化策略需要综合考虑,以及可能遇到的问题和解决方案,或者我们可以让一些必要数据持久化。
1、概述
在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?
RabbitMQ为我们提供了两种方式:
想要保证发送者一定能把消息发送给RabbitMQ,一种是通过Confirm机制,另一种就是通过事务机制。
RabbitMQ的事务机制,允许生产者将一组操作打包成一个原子事务单元,要么全部执行成功,要么全部失败。事务提供了一种确保消息完整性的方法,但需要谨慎使用,因为他们对性能有一定的影响。
RabbitMQ是基于AMQP协议实现的,RabbitMQ中,事务是通过在通道(Channel)上启用的,与事务机制有关的方法有三个:
我们需要先通过txSelect开启事务,然后就可以发布消息给MQ了,如果txCommit提交成功了,则消息一定到达了RabbitMQ,如果在txCommit执行之前RabbitMQ实例异常崩溃或者抛出异常,那我们就可以捕获这个异常然后执行txRollback进行回滚事务。
所以,通过事务机制,我们也能保证消息一定可以发送给RabbitMQ。
以下,是一个通过事务发送消息的方法实例:
package com.example.demo.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMQTransactionExample { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //启用事务 channel.txSelect(); String exchangeName = "my_exchange"; String routingKey = "my_routing_key"; try { //发送第一条消息 String message1 = "Transaction Message 1"; channel.basicPublish(exchangeName, routingKey, null, message1.getBytes()); //发送第二条消息 String message2 = "Transaction Message 2"; channel.basicPublish(exchangeName, routingKey, null, message2.getBytes()); //模拟一个错误 int x = 1 / 0; //提交事务(如果没有发生错误) channel.txCommit(); System.out.println("Transaction committed."); } catch (Exception e) { //发生错误,回滚事务 channel.txRollback(); System.out.println("Transaction rolled back."); } } } }
1. 消息应答机制
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉,由于RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息也无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制。
消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除掉了。
2. 自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
3. 手动消息应答
//用于肯定确认,RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
channel.basicAck();
//用于否定确认
channel.basicNack();
//用于否定确认,不处理该消息了直接拒绝,可以将其丢弃了
channel.basicReject();
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
为了保证 RabbitMQ 中消息的可靠性投递,以及消息在发生特定异常时的补偿策略,RabbitMQ诞生了消息确认和返回机制。
消息确认机制(也叫应答机制)
消息确认机制,是保障消息与 RabbitMQ消息之间可靠传输消息一种保障机制
其主要内容就是用来监听RabbitMQ消息队列收到消息之后返回给生产端的ack确认消息
消息确认机制描述了一种消息是否已经被发送到 RabbitMQ消息队列中以及 RabbitMQ消息队列是否以及接收到生产端发送的消息。
消息确认机制的作用
监听生产者的消息是否已经发送到了 RabbitMQ消息队列中;
如果消息没有被发送到 RabbitMQ消息队列中,则消息确认机制不会给生产端返回任何确认应答,也就是没有发送成功。相反,如果消息被成功发送到了 RabbitMQ消息队列中,则消息确认机制会给生产端返回一个确认应答,以通知生产者,消息已经发送到了 RabbitMQ消息队列
消息返回机制
描述不可达的消息与生产者之间的一种保障策略
其主要是用来监听,RabbitMQ消息队列中是否存在不可达的消息,并根据监听结果返回给生产端的一种监听机制,消息返回机制描述了一种 RabbitMQ消息队列中的不可达消息与生产端的关系
什么是不可达的消息
消息在被成功发送到RabbitMQ消息队列中之后,如果消息在经过当前配置的 exchangeName 或 routingKey 没有找到指定的交换机,或没有匹配到对应的消息队列,那么这个消息就被称为不可达的消息,如果此时配置了消息返回机制,那么此时RabbitMQ消息队列会返回给生产者一个信号,信号中包括消息不可达的原因,以及消息本身的内容。
消息返回机制的作用
监听生产端发动到RabbitMQ消息队列中的消息是否可达。如果消息不可达,则返回一个信号通知生产端,相反,如果消息可达,则不会返回任何信号。
高可用的通俗解释是,能够7*24小时不间断的对外提供服务。由于单机存在单点故障,随时可能发生宕机,所以一般都是通过搭建集群来保证高可用的。
RabbitMQ有三种模式,单机模式就不做过多解释了,demo级别的,生产一般不可能使用单机模式,除非MQ只是用来打印日志。
单机模式一般用于demo搭建,不适合在生产环境中使用。剩下的集群模式和镜像模式都可以帮助我们实现不同程度的高可用。
1. 普通集群模式
2. 镜像模式
高可靠也分成三个方面,生产者高可靠、MQ高可靠和消费者高可靠
生产者的高可靠主要是依靠补偿机制来实现的,确保生产者能够将消息发送至MQ,如果发送失败,则需要进行重发。
1. 发送高可靠
日志记录 + 定时任务健康检查 + 消息补偿
消息延迟投递 + 二次确认 + 回调检查
关于延迟队列,可以采用死信队列或者插件机制实现,这里不展开,这里只需要关注生产者发送就好了。
2. 存储高可靠
RabbitMQ提供了持久化机制,交换器、队列、消息都可以进行持久化,具体的配置也不麻烦,大家自行查阅即可。
3. 消费高可靠
消费者高可靠,一是保证消息能够被消费者消费(消费者ack机制),二是保证消息重复消费的幂等性。
关于消息消费的幂等性
多版本(乐观锁)控制
乐观锁的方案可以参考这篇 RabbitMQ应用问题消息的可靠性保障(消息补偿机制)和幂等性问题(乐观锁解决方案)思路
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。