当前位置:   article > 正文

RabbitMQ的详解和使用

rabbitmq

前言:消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

目前在生产环境,使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 等,本文主要介绍 RabbitMQ 的原理和使用

目录

一、AMQP协议

1、AMQP协议介绍

2、功能范围

二、RabbitMQ的原理

1、RabbitMQ概念介绍

2、Exchange消息调度策略

3.消息持久化:Message durability 

4.事务

5. Confirm机制

6.priority queue(优先级队列)

7.延迟队列 

三、RabbitMQ的使用 

1、RabbitMQ的安装

 2、PHP对RabbitMQ的使用【php-amqplib扩展】


一、AMQP协议

1、AMQP协议介绍

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

2、功能范围

存储转发(多个消息发送者,单个消息接收者)
分布式事务(多个消息发送者,多个消息接收者)
发布订阅(多个消息发送者,多个消息接收者)
基于内容的路由(多个消息发送者,多个消息接收者)
文件传输队列(多个消息发送者,多个消息接收者)
点对点连接(单个消息发送者,单个消息接收者)

二、RabbitMQ的原理

1、RabbitMQ概念介绍

通常我们谈到消息队列服务, 会有三个概念: 发消息者、消息队列、收消息者。RabbitMQ 在这个基本概念之上, 多做了⼀层抽象, 在发消息者和队列之间, 加⼊了交换器 (Exchange)。这样发消息者和消息队列就没有直接联系,转⽽变成发消息者把消息发给交换器,交换器根据调度策略再把消息转发给消息队列。


消息⽣产者并没有直接将消息发送给消息队列,⽽是通过建⽴与Exchange的Channel,将消息发送给Exchange。Exchange根据路由规则,将消息转发给指定的消息队列。消息队列储存消息,等待消费者取出消息。消费者通过建⽴与消息队列相连的Channel,从消息队列中获取消息。

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAbTBfNjg5NDkwNjQ=,size_20,color_FFFFFF,t_70,g_se,x_16

1) Channel(信道):多路复⽤连接中的⼀条独⽴的双向数据流通道。信道是建⽴在真实的TCP连接内的虚拟连接,复⽤TCP连接的通道。
2) Producer(消息的⽣产者):向消息队列发布消息的客户端应⽤程序。
3) Consumer(消息的消费者):从消息队列取得消息的客户端应⽤程序。
4) Message(消息):消息由消息头和消息体组成。消息体是不透明的,⽽消息头则由⼀系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)等。
5) Routing Key(路由键):消息头的⼀个属性,⽤于标记消息的路由规则,决定了交换机的转发路径。最⼤长度255 字节。
6) Queue(消息队列):存储消息的⼀种数据结构,⽤来保存消息,直到消息发送给消费者。它是消息的容器,也是消息的终点。⼀个消息可投⼊⼀个或多个队列。消息⼀直在队列⾥⾯,等待消费者连接到这个队列将消息取⾛。需要注意,当多个消费者订阅同⼀个Queue,这时Queue中的消息会被平均分摊给多个消费者进⾏处理,⽽不是每个消费者都收到所有的消息并处理,每⼀条消息只能被⼀个订阅者接收。
7) 交换(交换路由器):提供生产者和队列之间的匹配,接收生产者发送的消息,并根据路由规则将这些消息转发到消息队列柱。交换用于转发消息。它不会存储消息。如果没有绑定到exchange的队列,它将直接丢弃生产者发送的消息。交出交换机有四种消息调度策略(将在下一节中介绍),即扇出、直接、主题和头。
8) Binding(绑定):⽤于建⽴Exchange和Queue之间的关联。⼀个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规
则,所以可以将交换器理解成⼀个由Binding构成的路由表。
9) Binding Key(绑定键):Exchange与Queue的绑定关系,⽤于匹配Routing Key。最⼤长度255 字节。
10) Broker:RabbitMQ Server,服务器实体。

2、Exchange消息调度策略

1) Direct(路由模式) ,即直接交换机

精确匹配:当消息的Routing Key 与 Exchange和Queue之间的Binding Key完全匹配,如果匹配成功,将消息分发到该Queue。只有当Routing Key和Binding Key完全匹配的时候,消息队列才可以获取消息。Direct是Exchange的默认模式

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAbTBfNjg5NDkwNjQ=,size_20,color_FFFFFF,t_70,g_se,x_16

 2) Fanout(订阅模式|广播模式),即扇形交换机

交换器会把所有发送到该交换器的消息路由到所有与该交换器绑定的消息队列中。订阅模式与Binding Key和Routing Key⽆关,交换器将接受到的消息分发给有绑定关系的所有消息队列队列(不论Binding Key和Routing Key是什么)。类似于子网⼴播,子网内的每台主机都获得了⼀份复制的消息。Fanout交换机转发消息是最快的。 

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAbTBfNjg5NDkwNjQ=,size_20,color_FFFFFF,t_70,g_se,x_16

3) Topic (通配符模式) ,即主题交换机

按照正则表达式模糊匹配:⽤消息的Routing Key 与 Exchange和Queue之间的Binding Key进⾏模糊匹配,如果匹配成功,将消息分发到该Queue。
Routing Key是⼀个句点号“. ”分隔的字符串。Binding Key与Routing Key⼀样也是句点号“. ”分隔的字符串。Binding Key中可以存在两种特殊字符“ * ”与“#”,⽤于做模糊匹配,其中“*”⽤于匹配⼀个单词,“#”⽤于匹配多个单词(可以是零个)。 

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAbTBfNjg5NDkwNjQ=,size_20,color_FFFFFF,t_70,g_se,x_16

4) Headers(键值对模式) ,即头交换机

Headers不依赖于Routing Key与Binding Key的匹配规则来转发消息,交换器的路由规则是通过消息头的Headers属性来进⾏匹配转发的,类似HTTP请求的Headers。

3.消息持久化:Message durability 

如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝⼤部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了⼩概率丢失事件的发⽣(⽐如RabbitMQ服务器已经接收到⽣产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种⼩概率事件也要管理起来,那么我们要⽤到事务。 

4.事务

对事务的⽀持是AMQP协议的⼀个重要特性。假设当⽣产者将⼀个持久化消息发送给服务器时,因为consume命令本⾝没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,⽣产者也⽆法获知该消息已经丢失。如果此时使⽤事务,即通过txSelect()开启⼀个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息⼀定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器接收。当然Rabbit MQ也提供了txRollback()命令⽤于回滚某⼀个事务 

5. Confirm机制

使⽤事务固然可以保证只有提交的事务,才会被服务器执⾏。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了⼀个更加轻量级的机制来保证⽣产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配⼀个唯⼀的ID,然后⼀旦该消息被正确的路由到匹配的队列中后,服务器会返回给⽣产者⼀个Confirm,该Confirm包含该消息的ID,这样⽣产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最⼤优点在于异步,⽣产者在发送消息以后,即可继续执⾏其他任务。⽽服务器返回Confirm后,会触发⽣产者的回调函数,⽣产者在回调函数中处理Confirm信息。如果消息服务器发⽣异常,导致该消息丢失,会返回给⽣产者⼀个nack,表⽰消息已经丢失,这样⽣产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要⽐事务优越很多。但是Confirm机制,⽆法进⾏回滚,就是⼀旦服务器崩溃,⽣产者⽆法得到Confirm信息,⽣产者其实本⾝也不知道该消息是否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要⽀持去重。

6.priority queue(优先级队列)

声明队列时需要指定x-max-priority属性,并设置⼀个优先级数值

如果设置的优先级⼩于等于队列设置的x-max-priority属性,优先级有效。
如果设置的优先级⼤于队列设置的x-max-priority属性,则优先级失效。


创建优先级队列,需要增加x-max-priority参数,指定⼀个数字。表⽰最⼤的优先级,建议优先级设置为1~10之间。
发送消息的时候,需要设置priority属性,最好不要超过上⾯指定的最⼤的优先级。
如果⽣产端发送很慢,消费者消息很快,则有可能不会严格的按照优先级来进⾏消费。
第⼀,如果发送的消息的优先级属性⼩于设置的队列属性x-max-priority值,则按优先级的⾼低进⾏消费,数字越⾼则优先级越⾼。
第⼆,如果发送的消息的优先级属性都⼤于设置的队列属性x-max-priority值,则设置的优先级失效,按照⼊队列的顺序进⾏消费。
第三,如果消费端⼀直进⾏监听,⽽发送端⼀条条的发送消息,优先级属性也会失效。
RabbitMQ不能保证消息的严格的顺序消费。 

7.延迟队列 

顾名思义,延迟队列就是进⼊该队列的消息会被延迟消费的队列。⽽⼀般的队列,消息⼀旦⼊队了之后就会被消费者马上消费。
延迟队列多⽤于需要延迟⼯作的场景。最常见的是以下两种场景:
①延迟消费。⽐如:
⽤户⽣成订单之后,需要过⼀段时间校验订单的⽀付状态,如果订单仍未⽀付则需要及时地关闭订单。
⽤户注册成功之后,需要过⼀段时间⽐如⼀周后校验⽤户的使⽤情况,如果发现⽤户活跃度较低,则发送邮件或者短信来提醒⽤户使⽤。
②延迟重试。⽐如消费者从队列⾥消费消息时失败了,但是想要延迟⼀段时间后⾃动重试。


我们可以利⽤RabbitMQ的两个特性,⼀个是Time-To-Live Extensions,另⼀个是Dead Letter Exchanges。实现延迟队列。


Time-To-Live Extensions
RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了⼀条消息可在队列中存活的最⼤时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进⼊了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,⼜配置了队列的TTL,那么较⼩的那个值会被取⽤。


Dead Letter Exchange
刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,⼀共有三种消息的“死亡”形式:
1.消息被拒绝。通过调⽤basic.reject或者basic.nack并且设置的requeue参数为false。
2.消息因为设置了TTL⽽过期。
3.消息进⼊了⼀条已经达到最⼤长度的队列。
如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。 

三、RabbitMQ的使用 

1、RabbitMQ的安装

1) 第一步:下载并安装erlang,并配置环境变量【说明:这个软件安装后才能安装RabbitMq Server软件】

下载地址:http://www.erlang.org/downloads

2) 第二步:下载并安装RabbitMQ

下载地址:Downloading and Installing RabbitMQ — RabbitMQ

3) 开启web管理插件

rabbitmq-plugins enable rabbitmq_management

4) 启动server服务

  1. # 开启服务
  2. rabbitmq-service start
  3. # 关闭服务
  4. rabbitmq-service stop

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAbTBfNjg5NDkwNjQ=,size_20,color_FFFFFF,t_70,g_se,x_16

开启后浏览器访问http://127.0.0.1:15672出现下面图片展示页面即可:

默认账号密码:guest guest

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAbTBfNjg5NDkwNjQ=,size_17,color_FFFFFF,t_70,g_se,x_16
 

 2、PHP对RabbitMQ的使用【php-amqplib扩展】

1) 先安装php-amqplib扩展

composer update php-amqplib/php-amqplib

2) php-amqplib的使用说明

1.1 建立连接

$conn = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$host: RabbitMQ服务器主机IP地址
$port: RabbitMQ服务器端口
$user: 连接RabbitMQ服务器的用户名
$password: 连接RabbitMQ服务器的用户密码
$vhost: 连接RabbitMQ服务器的vhost(虚拟主机)

1.2 建立通道

$channel = $conn->channel($channel_id); //在已连接基础上建立生产者与mq之间的通道
$channel_id:信道id,不传则获取$channel[“”]信道,再无则循环$this->channle数组,下标从1到最大信道数找第一个不是AMQPChannel对象的下标,实例化并返回AMQPChannel对象,无则抛出异常No free channel ids

1.3 初始化交换机

$channel->exchange_declare($exchangeName, $type, $passive, $durable, $auto_delete, $internal, $nowait, $arguments, $ticket); 
$exhcangeName:交换器名字
$type:交换器类型,具体介绍看上文【direct(默认),fanout, topic, 和headers】
$passive:是否检测同名交换机
$durable:交换机是否开启持久化
$auto_delete:通道关闭后是否删除交换机
$internal:设置是否是内置的,设置true表示是内置的交换器
$nowait:设置true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度
$arguments:其他一些结构化参数
$ticket:null

1.4 初始化队列

$channel->queue_declare($queueName, $passive, $durable, $exclusive, $auto_delete, $nowait);
$queueName: 队列名称
$passive: 是否检测同名队列
$durable: 是否开启队列持久化
$exclusive: 队列是否可以被其他队列访问
$auto_delete: 通道关闭后是否删除队列
$nowait:设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度


$arguments:其他一些结构化参数(可用于设置死信队列

$arguments = new AMQPTable([
    'x-message-ttl' => 10000, // 延迟时间 (毫秒)创建queue时设置该参数可指定消息在该queue中待多久,可根据x-dead-letter-routing-key和x-dead-letter-exchange生成可延迟的死信队列。
    'x-expires' => 26000, // 队列存活时间 如果一个队列开始没有设置存活时间,后面又设置是无效的
    'x-dead-letter-exchange' => 'exchangeName', // 延迟结束后指向交换机(死信收容交换机)
    'x-dead-letter-queue' => 'queueName', // 延迟结束后指向队列(死信收容队列)
    'x-dead-letter-routing-key' => 'routingKey', // 设置routing-key
    'x-max-priority'=>'10' //声明优先级队列.表示队列应该支持的最大优先级。建议使用1到10之间.该参数会造成额外的CPU消耗。
]);

$ticket:null

1.5 路由绑定

$channel->queue_bind($queueName, $exchangeName, $routingKey, $nowait, $arguments, $ticket);
$queueName:队列名称
$exchangeName:交换机名称
$routingKey:路由键
$nowait:设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度


$arguments:定义绑定的一些参数,若设置headers交换机可使用下面方法设置键值

        $bindArguments = [
            'type'    => 'log',
            'level'   => 'log_level',
            'x-match' => 'all' //默认any
        ]; 
        $arguments = new AMQPTable($bindArguments);

$ticket:null

1.6 创建信息对象

$channel->basic_publish($msg, $exchangeName, $routingKey, $mandatory, $immediate, $ticket);


$msg:object AMQPMessage对象,若使用headers交换机可使用通过下面设置键值

        $bindArguments = [
            'level' => 'log_level',
            'type'  => 'log'
         ]; 
        $headers = new AMQPTable($bindArguments);
        $msg->set('application_headers', $bindArguments);

$exchangeName:string 交换机名字
$routingKey:string 路由键 如果交换机类型
$mandatory:设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当设置为false的时,出现上述问题,则消息直接被丢弃
$immediate:RabbitMQ3.0版本开始去掉对immediate参数的支持
$ticket:null

1.7 关闭信道和链接

$channel->close();
$connection->close();

1.8.1 设置消费者(Consumer)客户端同时只处理一条队列

$channel->basic_qos(0, 1, false);

1.8.2 消费消息

$channel->basic_consume($queueName, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback, $ticket, $arguments);
$queueName:队列名称
$consumer_tag :消费者客户端身份标识,用于区分多个客户端
$no_local:这个功能属于AMQP的标准,但是RabbitMQ并没有做实现
$no_ack:收到消息后,是否不需要回复确认即被认为被消费
$exclusive:是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
$nowait:不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
$callback:回调函数 

$ticket:null


$arguments:定义绑定的一些参数,若设置headers交换机可使用下面方法接收对应消息

        $bindArguments = [
            'type'    => 'log',
            'level'   => 'log_level',
            'x-match' => 'all' //默认any
        ]; 
        $arguments = new AMQPTable($bindArguments);

1.8.3 监听消息

while(count($channel->callbacks)) {
     $channel->wait();
 }

3)php-amqplib的使用案例

  1. <?php
  2. /**
  3. * 生产者-发布消息
  4. */
  5. require_once './vendor/autoload.php';
  6. use PhpAmqpLib\Connection\AMQPStreamConnection;
  7. use PhpAmqpLib\Exchange\AMQPExchangeType;
  8. use PhpAmqpLib\Message\AMQPMessage;
  9. $exchangeName = "ex_";
  10. $queueName = 'que_';
  11. $routingKey = 'key_';
  12. $data = 'Hello World!222';
  13. $connection = new AMQPStreamConnection("127.0.0.1", 5672, "guest", "guest", "/"); //连接server
  14. $channel = $connection->channel(); //创建通道
  15. $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, true); //初始化交换机
  16. $channel->queue_declare($queueName, false, false, false, false); //声明队列
  17. $channel->queue_bind($queueName, $exchangeName, $routingKey); //路由绑定
  18. $msg = new AMQPMessage($data, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //发送消息,delivery_mode=2 用于做消息持久化
  19. $channel->basic_publish($msg, $exchangeName, $routingKey); //发送消息
  20. $channel->close(); //关闭管道
  21. $connection->close(); //关闭链接
  1. <?php
  2. /**
  3. * 消费者1-接受消息
  4. */
  5. require_once './vendor/autoload.php';
  6. use PhpAmqpLib\Connection\AMQPStreamConnection;
  7. use PhpAmqpLib\Exchange\AMQPExchangeType;
  8. use PhpAmqpLib\Message\AMQPMessage;
  9. $exchangeName = "ex_";
  10. $queueName = 'que_';
  11. $routingKey = 'key_';
  12. $connection = new AMQPStreamConnection("127.0.0.1", 5672, "guest", "guest", "/"); //连接server
  13. $channel = $connection->channel(); //创建通道
  14. $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, true); //初始化交换机
  15. echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
  16. $callback = function($msg) {
  17. echo " [x] Received ", $msg->body, "\n";
  18. };
  19. $channel->basic_qos(0, 1, false);
  20. $channel->basic_consume($queueName, 1, false, true, false, false, $callback);
  21. while(count($channel->callbacks)) {
  22. $channel->wait();
  23. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/736837
推荐阅读
相关标签
  

闽ICP备14008679号