赞
踩
MQ消息中间件,这几年逐渐火爆起来,用处越来越多,消息、削峰限流等。MQ一般遵循AMQP协议。AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
当前市面上常见的MQ有,ActiveMQ、RabbitMQ、Kafka、RocketMQ等。她们中有标准的MQ(遵循AMQP协议,如RabbitMQ),也有非标准的MQ(如Kafka)。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ有如下特点,
我们先看下 RabbitMQ 的结构图,如下,图重分两大块:RabbitMQ Server 和 RabbitMQ Client,其中 Client,可以包含生产者(Publisher或Provider)和消费者(Consumer)。
RabbitMQ 结构图中有几个重要的概念:VHost(虚机主机)、Exchange(交换机)、Queue(队列)、Binding(绑定)。整个消息传递的流程都是围绕着几个组件来进行的。生产者把消息发布到Exchange上,然后Exchange把消息路由到与Exchange绑定的Queue队列中,消费者建立与Queue的连接后,消费消息。
RabbitMQ Server服务器,服务节点称为Broker。
虚拟主机,标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
交换机,用来接收生产者(producer)发送的消息,并将这些消息路由给服务器中的队列(Queue)。
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
绑定,用于交换机(Exchange)和消息队列(Queue)之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
信道,多路复用连接中的一条独立的双向数据流通道。新到是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过新到发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接(一个TCP链接,可以包含多个信道)。
Producer消息生产者,也是一个向交换器发布消息的客户端应用程序。
消息消费者,表示一个从一个消息队列中取得消息的客户端应用程序。
网络连接,比如一个TCP连接。一个consumer需要和broker建立连接,以获取队列中的消息。
消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息的路由模式)等。
Exchange分发消息时,根据交换机的类型不同,分发策略也不同。常见的交换机类型有四种:direct、fanout、topic、headers(headers匹配AMQP消息的header而不是路由键(Routing-key),此外headers交换器和direct交换器完全一致,但是性能差了很多,很少使用)。
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
上图所示,生产者(P)生产消息1将消息1推送到Exchange,由于Exchange Type=fanout这时候会遵循fanout的规则将消息推送到所有与它绑定Queue,也就是图上的两个Queue最后两个消费者消费。
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
当生产者(P)发送消息时Rotuing key=booking时,这时候将消息传送给Exchange,Exchange获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的Queue,这时发现Queue1和Queue2都符合,就会将消息传送给这两个队列,如果我们以Rotuing key=create和Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。
前面提到的direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。它的约定是:
当生产者发送消息Routing Key=F.C.E的时候,这时候只满足Queue1,所以会被路由到Queue1中,如果Routing Key=A.C.E这时候会被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中。
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
此外headers交换器和direct交换器完全一致,但是性能差了很多,很少使用。
RabbitMQ 的消息策略是怎样的呢?消息是怎样传递的呢?首先明确一点就是生产者产生的消息并不是直接发送给消息队列Queue的,而是要经过Exchange(交换器),由Exchange再将消息路由到一个或多个Queue,这里还会对不符合路由规则(由Exchange Type决定规则)的消息进行丢弃掉。RabbitMQ 通过Binding将Exchange和Queue链接在一起,然后将消息准确的推送到对应的Queue中。
1)消息的传递
2)结构图
要解释清楚 RabbitMQ 整个消息的传递过程,我们需要上面两张图来看,分两步:发送消息和消费消息。
Producer(生产者)向 RabbitMQ Server(Broker)发送消息,
生产者需要关心:Exchange、Queue、Binding。
Consumer(消费者)向 RabbitMQ Server(Broker)订阅消息,
消费者只需要关心:Queue。
重启RabbitMQ后,队列和交换器都会丢失(随同里面的消息),原因在于每个队列和交换器的durable属性,该属性默认为false。RabbitMQ提供了durable属性来实现持久化,保证断电后消息不丢失。RabbitMQ 的持久化分:交换机、队列持久化和消息持久化。
所以,RabbitMQ 的消息持久化,需要做到以下三点:
注意,如果原先有非持久的交换器或者队列,需要删除后才可重新创建,否则就创建其他名称的交换器或者队列,代码如下:
- //声明持久交换器
- channel.ExchangeDeclare(
- "HelloExchange", //交换器名称
- ExchangeType.Direct,//交换器类型
- true, //是否持久话
- false, //是否自动删除
- null //关于交换器的详细设置,键值对形式
- );
- //声明持久队列
- channel.QueueDeclare(
- "HelloQueue",//队列名称
- true, //是否持久化
- false, //是否只对首次声明的队列可见
- false, //是否自动删除
- null 关于队列和队列内消息的详细设置,键值对形式
- );
- //发布持久消息
- string msg_str = "这是生产者第一次发布的消息";
- IBasicProperties msg_pro = channel.CreateBasicProperties();
- msg_pro.ContentType = "text/plain";//发布的数据类型
- msg_pro.DeliveryMode = 2;//标记持久化

RabbitMQ 的持久化机制是:把持久化的数据写入磁盘上的一个持久化日志文件,在做数据恢复时,从磁盘读取持久化的数据重建。当发布一条持久化的消息到持久化的交换机上时,RabbitMQ 会在消息提交到日志文件后才发送响应。如果RabbitMQ重启,服务器会自动重建交换机和队列,重播持久性日志文件中的消息到合适的队列或者交换机上。
消息持久化对RabbitMQ的性能有较大影响,写入磁盘要比写入内存慢很多,而且会极大的减少RabbitMQ服务器每秒可处理的消息总数,导致消息吞度量降低至少10倍的情况并不少见。持久化消息在RabbitMQ内建集群环境中工作的并不好,实际上集群上的队列均匀分布在各个节点上而且没有冗余,如果运行a队列的节点崩溃了,那么直到节点恢复前,这个队列就从整个集群消失了,而且这个节点上的所有队列不可用,而且持久化队列也无法重建。
RabbitMQ 支持 AMQP 事务,来处理消息丢失的情况(确认机制比事务更轻量)。AMQP事务与数据库事务不同。
AMQP事务:提供的一种保证消息成功投递的方式,通过将信道开启事务模式后,利用信道 Channel 的三个命令来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递。
RabbitMQ的事务非常消耗性能,不但会降低大约2-10倍的消息吞度量,而且会使生产者应用程序之间产生同步,与使用MQ解耦异步系统的初衷相背离。
相比较事务模式,RabbitMQ 提供了更好的方案来保证消息投递:发送方确认模式。
和事务类似,我们需要将信道 channel 设置为 confirm 模式,而且只能通过重新创建信道来关闭该设置。一旦信道进入 confirm 模式,所有的信道上发布的消息都会被指派一个唯一的ID。当消息被投递到队列后,信道就会发送一个发送方确认模式给生产者程序,使得生产者知道消息安全到达队列了。如果发送的消息丢失,RabbitMQ会发送一条nack消息,告诉生产者消息丢失,生产者会再次发送消息(re-publish)。发送发确认模式最大的好处是它们是异步的,没有回滚的概念,更加轻量级,对性能的影响也几乎忽略不计。
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者 (包含消息的唯一ID) ,这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag 域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
开启confirm模式,
- // 创建连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUsername(config.UserName);
- factory.setPassword(config.Password);
- factory.setVirtualHost(config.VHost);
- factory.setHost(config.Host);
- factory.setPort(config.Port);
- Connection conn = factory.newConnection();
- // 创建信道
- Channel channel = conn.createChannel();
- // 声明队列
- channel.queueDeclare(config.QueueName, false, false, false, null);
- // 开启发送方确认模式
- channel.confirmSelect();
1)Confirm 普通模式(单条)
每发送一条消息,调用 channel.waitForConfirms() 方法等待服务端confirm,这实际上是一种串行的confirm,每publish一条消息之后就等待服务端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。
- // 开启发送方确认模式
- channel.confirmSelect();
- String message = String.format("时间 => %s", new Date().getTime());
- channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
- if (channel.waitForConfirms()) { // confirm 普通单条
- System.out.println("消息发送成功" );
- }
2)Confirm 批量模式(批量)
每发送一批消息之后,调用 channel.waitForConfirmsOrDie() 方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm效率,但是如果一旦出现confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降。
- // 开启发送方确认模式
- channel.confirmSelect();
- for (int i = 0; i < 10; i++) {
- String message = String.format("时间 => %s", new Date().getTime());
- channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
- }
- channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
- System.out.println("全部执行完成");
3)Confirm 异步模式(异步)
RabbitMQ 使用 channel.addConfirmListener()异步监听,异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可。消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认。
- // 开启发送方确认模式
- channel.confirmSelect();
- for (int i = 0; i < 10; i++) {
- String message = String.format("时间 => %s", new Date().getTime());
- channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
- }
- //异步监听确认和未确认的消息
- channel.addConfirmListener(new ConfirmListener() {
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("未确认消息,标识:" + deliveryTag);
- }
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
- }
- });

有测试总结:Confirm批量确定和Confirm异步模式性能相差不大,Confirm模式要比事务快10倍左右。
“死信”是RabbitMQ中的一种消息机制。死信队列(死信交换机),又称 dead-letter-exchange(DLX)。当一条消息在一个队列中变成死信后,它会被重新发布到一个交换机中,这个交换机就是 DLX。
队列中的消息在以下三种情况下会变成死信,
消息进入死信队列的过程:消息 -> 队列 (触发以上条件)-> DLX交换机 -> DLK队列。
RabbitMQ 中存在两种方可设置消息的过期时间,
如果同时使用这两种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个死信消息。
RabbitMQ 中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。
1)场景
订单下单10秒后,若用户没有付款,则系统自动取消订单。
2)分析
以上适合使用延时队列解决,RabbitMQ 的延时队列可以由 过期消息+死信队列 来实现。
不使用传统的轮询方式,优势:若数据库数据量大,则定时轮询就会特别消耗资源,拖垮服务器,且响应慢。
3)实现
大致流程,
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。