赞
踩
做研发的,可能距离成功一步之遥,别因为一叶障目而放弃。
AMQP(Advanced Message Queuing Protocol),是一种用于消息传递协议,类比成HTTP、TCP、UDP这种不同的协议就行。它定义了消息中间件客户端与服务端(买家买家对驿站的沟通)的通信规则(怎么运快递),包括消息格式(什么类型的快递)、消息发布(怎么发快递)、消息订阅(怎么收快递)、队列管理(怎么处理快递)等。
AMQP高效就高效在把通信(物流阶段)能遇到的问题都解决了。
无法被消费的消息。
都有,生产者发数据到MQ是推,消费者消费消息是拉。
推或拉是两种通信的方向选择,跟MQ无关,但是类似MQ,顺便提一下。
个人认为:
RabbitMQ内部不可以直连队列,但是操作上可以直连队列。
就算是常规(Hello World)模式,没有声明交换机,也会经过一个默认交换机。
不过这样丧失了交换机灵活的路由分发功能,适用于简单的场景。
Docker安装
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
浏览器访问:http://192.168.xxx.xxx:15672
普通安装
CentOS7的安装RabbitMQ的教程,已经被官网删除了,支持CentOS8,CentOS需要借助外力。 安装Erlang curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash yum -y install erlang 安装RabbitMQ curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash yum install rabbitmq-server 开启服务,并设置开机自启 systemctl start rabbitmq-server systemctl enable rabbitmq-server 检查状态 systemctl status rabbitmq-server 启动网页端控制台 rabbitmq-plugins enable rabbitmq_management 开启防火墙 firewall-cmd --zone=public --add-port=80/tcp --permanent systemctl restart firewalld 新建网页端登录用户,并配置角色与权限 rabbitmqctl add_user admin 12345678 rabbitmqctl set_user_tags admin administrator rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" rabbitmqctl set_permissions -p <virtual_host> <username> <configure_permission> <write_permission> <read_permission> -p <virtual_host>:指定虚拟主机的名称,例如/(默认虚拟主机)。 <username>:要为其设置权限的用户名。 <configure_permission>:配置权限的正则表达式。允许用户对队列、交换机等进行配置。 <write_permission>:写权限的正则表达式。允许用户发布消息。 <read_permission>:读权限的正则表达式。允许用户获取消息。 浏览器访问 http://192.168.xxx.xxx:15672 用户名admin,密码12345678
systemctl start/stop/restart/status/enable rabbitmq-server # RabbitMQ Server开启、关停、重启、状态查看、开机自启
rabbitmq-plugins enable 插件名 # RabbitMQ Server安装插件
rabbitmq-plugins list # 插件列表
rabbitmqctl version # 查看RabbitMQ Server版本
rabbitmqctl list_exchanges # 查看交换机列表
rabbitmqctl list_queues # 查看队列列表
rabbitmqctl list_bindings # 查看绑定列表
while (count($channel->callbacks)) {
$channel->wait();
}
//或者
$channel->consume();
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //初始化连接 $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); //初始化信道 $channel = $connection->channel(); /* 参数1:队列名 参数2:在声明队列时指定是否启用passively模式,passively模式用于检查队列是否存在,而不是实际创建一个新队列。如果队列不存在,则会返回一个通知,而不会创建新队列。 参数3:指定队列的持久性。在这里,它是false,表示队列不是持久的。如果设置为true,则队列将在服务器重启时或宕机后保留下来。 参数4:指定队列的排他性。如果设置为 true,则该队列只能被声明它的连接使用,一般用于临时队列。false表示队列不是排它的。 参数5:指定队列的自动删除,如果设置为 true,则在队列不再被使用时将自动删除。在这里,它是 false,表示队列不会自动删除。 */ $channel->queue_declare('hello', false, false, false, false); //编辑消息 $msg = new AMQPMessage('Hello World!'); //发送消息,交换机用不上,所以留空。这方法没有返回值 $channel->basic_publish($msg, '', 'hello'); //用完了就关闭,释放资源 $channel->close(); $connection->close();
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //初始化连接 $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); //初始化信道 $channel = $connection->channel(); /* 参数1:队列名 参数2:在声明队列时指定是否启用passively模式,passively模式用于检查队列是否存在,而不是实际创建一个新队列。如果队列不存在,则会返回一个通知,而不会创建新队列。 参数3:指定队列的持久性。在这里,它是false,表示队列不是持久的。如果设置为true,则队列将在服务器重启后保留下来。 参数4:指定队列的排他性。如果设置为 true,则该队列只能被声明它的连接使用,一般用于临时队列。false表示队列不是排它的。 参数5:指定队列的自动删除,如果设置为 true,则在队列不再被使用时将自动删除。在这里,它是 false,表示队列不会自动删除。 */ $channel->queue_declare('hello', false, false, false, false); /* 参数1:队列名称 参数2:这是消费者标签(consumer tag),用于唯一标识消费者。在这里,它是空字符串,表示不为消费者指定任何特定的标签。 参数3:如果设置了无本地字段,则服务器将不会向发布消息的连接发送消息。 参数4:是指定是否自动确认消息(auto-ack)。设置为true则表示消费者在接收到消息后会立即确认消息,RabbitMQ将会将消息标记为已处理并从队列中删除。false表示消费者会手动确认消息,即在处理消息后,通过调用 $channel->basic_ack($deliveryTag) 手动确认消息。 参数5:指定是否独占消费者。如果设置为true,则表示只有当前连接能够使用该消费者。在这里,它是true,表示只有当前连接可以使用这个消费者。 参数6:如果设置了,服务器将不会对该方法作出响应。客户端不应等待答复方法。如果服务器无法完成该方法,它将引发通道或连接异常。 参数7:回调参数,拿到数据怎样处理。 */ $channel->basic_consume('hello', '', false, true, false, false, function ($msg) { echo $msg->body; }); //通过死循环持久化当前进程,实时消费 $channel->consume();
生产者代码,在cli模式下,依次输入1~10,执行10次
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
//获取命令行参数
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
消费者1代码,cli模式下运行,依次返回1、3、5、7、9,可见RabbitMQ不管消费节点处理的时间,只会根据消费者数量轮询处理,哪怕其中任意几个队列任务重,其它队列任务轻松。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->basic_consume('hello', '', false, true, false, false, function ($msg) { echo "收到消息,内容为{$msg->getBody()}\n"; sleep(5); echo "成功处理消息\n"; } ); $channel->consume();
消费者2代码,(与消费者1代码唯一不同的,就是sleep函数的时间),cli模式下运行,依次返回2、4、6、8、10,可见RabbitMQ不管消费节点处理的时间,只会根据消费者数量轮询处理,哪怕其中任意几个队列任务重,其它队列任务轻松。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->basic_consume('hello', '', false, true, false, false, function ($msg) { echo "收到消息,内容为{$msg->getBody()}\n"; sleep(10); echo "成功处理消息\n"; } ); $channel->consume();
生产者代码:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); /** 参数1:交换机名称。 参数2:交换机类型,这里是扇出。 参数3:当passive参数设置为true时,表示不会实际创建新的交换机或队列,而是用来检查已经存在的交换机或队列是否已经存在。如果存在,则返回成功,如果不存在,则返回失败。passive参数主要用于检查交换机或队列是否存在,而不是实际创建新的实体 参数4:交换机是否持久化,即当RabbitMQ服务器重启时,交换机会不会被重新创建。 参数5:当所有绑定的队列都与交换机解绑后,是否自动删除交换机。 */ $channel->exchange_declare('fanout_test', 'fanout', false, false, false); $msg = new AMQPMessage('扇出测试'); //发送给指定的交换机 $channel->basic_publish($msg, 'fanout_test'); $channel->close(); $connection->close();
消费者1代码:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); //交换机初始化 $channel->exchange_declare('fanout_test', 'fanout', false, false, false); //创建临时队列,用于接受队列的消息 $queue_info = $channel->queue_declare("", false, false, true, false); $queue_name = $queue_info[0]; //队列绑定指定的交换机 $channel->queue_bind($queue_name, 'fanout_test'); $callback = function ($msg) { echo $msg->getBody(); }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
消费者2代码(同1):
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); //交换机初始化 $channel->exchange_declare('fanout_test', 'fanout', false, false, false); //创建临时队列,用于接受队列的消息 $queue_info = $channel->queue_declare("", false, false, true, false); $queue_name = $queue_info[0]; //队列绑定指定的交换机 $channel->queue_bind($queue_name, 'fanout_test'); $callback = function ($msg) { echo $msg->getBody(); }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
生产端代码,只让消费者1消费。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->exchange_declare('direct_test', 'direct', false, false, false); $msg = new AMQPMessage('扇出测试'); //发送给指定的交换机,并指定路由键 $channel->basic_publish($msg, 'direct_test', 'consumer1'); $channel->close(); $connection->close();
消费者1代码
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); //交换机初始化 $channel->exchange_declare('direct_test', 'direct', false, false, false); //创建临时队列,用于接受队列的消息 $queue_info = $channel->queue_declare("", false, false, true, false); $queue_name = $queue_info[0]; //队列绑定指定的交换机,并声明路由键 $channel->queue_bind($queue_name, 'direct_test', 'consumer1'); $callback = function ($msg) { echo $msg->getBody(); }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
消费者2代码
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); //交换机初始化 $channel->exchange_declare('direct_test', 'direct', false, false, false); //创建临时队列,用于接受队列的消息 $queue_info = $channel->queue_declare("", false, false, true, false); $queue_name = $queue_info[0]; //队列绑定指定的交换机,并声明路由键 $channel->queue_bind($queue_name, 'direct_test', 'consumer2'); $callback = function ($msg) { echo $msg->getBody(); }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
生产者代码
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); //声明队列类型为主题 $channel->exchange_declare('topic_test', 'topic', false, false, false); $msg = new AMQPMessage('topic测试数据'); /* 以下路由键可以接受到消息 a.b a.*.* a.*.*.* #.z z a.x.y.z abc.z */ $arr = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z', 'x', 'y', 'z', 'a', 'ab', 'ac', 'ad','a.x.y.z', 'abc.z']; foreach($arr as $v) { $channel->basic_publish($msg, 'topic_logs', $v); } $channel->close(); $connection->close();
消费者代码
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); //声明topic模式的交换机 $channel->exchange_declare('topic_test', 'topic', false, false, false); //创建临时队列 $queue_name = $channel->queue_declare("", false, false, true, false)[0]; $binding_keys = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z']; //绑定多个路由键 foreach ($binding_keys as $binding_key) { $channel->queue_bind($queue_name, 'topic_logs', $binding_key); } $callback = function ($msg) { echo 'RoutingKey:', $msg->getRoutingKey(), ' --- Msg:', $msg->getBody(), "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); try { $channel->consume(); } catch (\Throwable $exception) { echo $exception->getMessage(); } $channel->close(); $connection->close();
调用端代码
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RpcClient { private $connection; private $channel; private $queue_name; private $response; private $corr_id; public function __construct() { $this->connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $this->channel = $this->connection->channel(); $this->queue_name = $this->channel->queue_declare("", false, false, true, false)[0]; $this->channel->basic_consume($this->queue_name, '', false, true, false, false, array($this, 'onResponse')); } public function onResponse($rep) { if ($rep->get('correlation_id') == $this->corr_id) { $this->response = $rep->body; } } public function call($str) { $this->response = null; $this->corr_id = uniqid(); $this->channel->basic_publish(new AMQPMessage($str, [ 'correlation_id' => $this->corr_id, 'reply_to' => $this->queue_name ]), '', 'rpc_queue'); while (! $this->response) { $this->channel->wait(); } return $this->response; } } $fibonacci_rpc = new RpcClient(); $response = $fibonacci_rpc->call('客户端向服务端发送数据'); echo $response, "\n";
服务端代码
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->queue_declare('rpc_queue', false, false, false, false); $callback = function ($req) { echo $req->getBody(), "\n"; $msg = new AMQPMessage('服务端成功接收:'. $req->getBody(), ['correlation_id' => $req->get('correlation_id')]); $req->getChannel()->basic_publish($msg, '', $req->get('reply_to')); $req->ack(); }; $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); //声明普通交换机名称 $normal_exchange = 'normal_exchange'; //声明死信交换机名称 $dead_exchange = 'dead_exchange'; //声明普通队列名称 $normal_queue = 'normal_queue'; //声明死信队列名称 $dead_queue = 'dead_queue'; //声明普通路由键 $normal_routing_key = 'normal_routing_key'; //声明死信路由键 $dead_routing_key = 'dead_routing_key'; //声明普通交换 $channel->exchange_declare($normal_exchange, 'direct', false, false); //声明死信交换机 $channel->exchange_declare($dead_exchange, 'direct', false, false); //配置普通队列异常时,转发给死信队列的荷载参数,就指望着这个普通队列有问题了,才会把消息数据转发到死信队列,转发到那里,肯定是要配置的。 $payload = new AMQPTable(); //设置消息生存时间为15秒 $payload->set('x-message-ttl', 10000); //定位普通队列出异常了,要转发的交换机 $payload->set('x-dead-letter-exchange', $dead_exchange); //定位了要转发的交换机还不够,还得知道那个队列,不然交换机不知道路由那个消息到达那个队列 $payload->set('x-dead-letter-routing-key', $dead_routing_key); //声明普通队,就是等普通队列出问题了,才把数据丢给死信队列,所以普通(注意是普通队列)队列,要额外的配置。 $channel->queue_declare($normal_queue, false, false, false, false, false, $payload); //声明死信队列,其实死信队列本身是一个普通队列。 $channel->queue_declare($dead_queue, false, false, false, false); echo '正在等待接受消息...'; //绑定普通交换机与普通队列 $channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key); //绑定死信交换机与死信队列 $channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key); $channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) { echo $msg->getBody(), "\n"; }); //常驻进程 $channel->consume(); $channel->close(); $connection->close();
然后再执行生产者代码,模拟发消息。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->exchange_declare('normal_exchange', 'direct', false, false, true); for($i = 0; $i< 10; $i++) { $msg = new AMQPMessage($i, [ //配置过期时间为10秒,让生产者控制过期时间 'expiration' => '10000' ]); $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key'); } $channel->close(); $connection->close();
极简死信队列消费示例
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $callback = function ($msg) { echo $msg->getBody(); }; $channel->basic_consume('dead_queue', '', false, true, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
消费者端代码,这一步是为了初始化普通和死信交换机、队列、路由键,并且需要执行后Ctrl + C强制停止,保证生产者生产的消息,不被能正常的消费(不然怎么演示不正常现象时的死信队列?)。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); //声明普通交换机名称 $normal_exchange = 'normal_exchange'; //声明死信交换机名称 $dead_exchange = 'dead_exchange'; //声明普通队列名称 $normal_queue = 'normal_queue'; //声明死信队列名称 $dead_queue = 'dead_queue'; //声明普通路由键 $normal_routing_key = 'normal_routing_key'; //声明死信路由键 $dead_routing_key = 'dead_routing_key'; //声明普通交换 $channel->exchange_declare($normal_exchange, 'direct', false, false); //声明死信交换机 $channel->exchange_declare($dead_exchange, 'direct', false, false); //配置普通队列异常时,转发给死信队列的荷载参数,就指望着这个普通队列有问题了,才会把消息数据转发到死信队列,转发到那里,肯定是要配置的。 $payload = new AMQPTable(); //设置最多存储8条消息 $payload->set('x-max-length', 8); //定位普通队列出异常了,要转发的交换机 $payload->set('x-dead-letter-exchange', $dead_exchange); //定位了要转发的交换机还不够,还得知道那个队列,不然交换机不知道路由那个消息到达那个队列 $payload->set('x-dead-letter-routing-key', $dead_routing_key); //声明普通队,就是等普通队列出问题了,才把数据丢给死信队列,所以普通(注意是普通队列)队列,要额外的配置。 $channel->queue_declare($normal_queue, false, false, false, false, false, $payload); //声明死信队列,其实死信队列本身是一个普通队列。 $channel->queue_declare($dead_queue, false, false, false, false); echo '正在等待接受消息...'; //绑定普通交换机与普通队列 $channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key); //绑定死信交换机与死信队列 $channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key); $channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) { echo $msg->getBody(), "\n"; }); //常驻进程 $channel->consume(); $channel->close(); $connection->close();
生产者代码
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->exchange_declare('normal_exchange', 'direct', false, false, true); for($i = 0; $i< 10; $i++) { $msg = new AMQPMessage($i, [ //配置过期时间为10秒,让生产者控制过期时间 'expiration' => '10000' ]); $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key'); } $channel->close(); $connection->close();
极简死信队列消费示例
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $callback = function ($msg) { echo $msg->getBody(); }; $channel->basic_consume('dead_queue', '', false, true, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
这次让消费者正常消费代码即可,不用Ctrl + C强制中断,正常接受生产者者的数据。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); //声明普通交换机名称 $normal_exchange = 'normal_exchange'; //声明死信交换机名称 $dead_exchange = 'dead_exchange'; //声明普通队列名称 $normal_queue = 'normal_queue'; //声明死信队列名称 $dead_queue = 'dead_queue'; //声明普通路由键 $normal_routing_key = 'normal_routing_key'; //声明死信路由键 $dead_routing_key = 'dead_routing_key'; //声明普通交换 $channel->exchange_declare($normal_exchange, 'direct', false, false); //声明死信交换机 $channel->exchange_declare($dead_exchange, 'direct', false, false); //配置普通队列异常时,转发给死信队列的荷载参数,就指望着这个普通队列有问题了,才会把消息数据转发到死信队列,转发到那里,肯定是要配置的。 $payload = new AMQPTable(); //定位普通队列出异常了,要转发的交换机 $payload->set('x-dead-letter-exchange', $dead_exchange); //定位了要转发的交换机还不够,还得知道那个队列,不然交换机不知道路由那个消息到达那个队列 $payload->set('x-dead-letter-routing-key', $dead_routing_key); //声明普通队,就是等普通队列出问题了,才把数据丢给死信队列,所以普通(注意是普通队列)队列,要额外的配置。 $channel->queue_declare($normal_queue, false, false, false, false, false, $payload); //声明死信队列,其实死信队列本身是一个普通队列。 $channel->queue_declare($dead_queue, false, false, false, false); echo '正在等待接受消息...'; //绑定普通交换机与普通队列 $channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key); //绑定死信交换机与死信队列 $channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key); $channel->basic_consume($normal_queue, '', false, false, false, false, function($msg) { if($msg->getBody() > 6) { //手动拒绝消息,不批量,且不让重入队列 $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, false); } else { //手动确认消息 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } }); //常驻进程 $channel->consume(); $channel->close(); $connection->close();
生产者代码
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->exchange_declare('normal_exchange', 'direct', false, false, true); for($i = 0; $i< 10; $i++) { $msg = new AMQPMessage($i, [ //配置过期时间为10秒,让生产者控制过期时间 'expiration' => '10000' ]); $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key'); } $channel->close(); $connection->close();
极简死信队列消费示例
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $callback = function ($msg) { echo $msg->getBody(); }; $channel->basic_consume('dead_queue', '', false, true, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
初始化普通、死信交换机、队列、路由键。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); //声明普通交换机名称 $normal_exchange = 'normal_exchange'; //声明死信交换机名称 $dead_exchange = 'dead_exchange'; //声明普通队列名称 $normal_queue = 'normal_queue'; //声明死信队列名称 $dead_queue = 'dead_queue'; //声明普通路由键 $normal_routing_key = 'normal_routing_key'; //声明死信路由键 $dead_routing_key = 'dead_routing_key'; //声明普通交换 $channel->exchange_declare($normal_exchange, 'direct', false, false); //声明死信交换机 $channel->exchange_declare($dead_exchange, 'direct', false, false); //配置普通队列异常时,转发给死信队列的荷载参数,就指望着这个普通队列有问题了,才会把消息数据转发到死信队列,转发到那里,肯定是要配置的。 $payload = new AMQPTable(); //定位普通队列出异常了,要转发的交换机 $payload->set('x-dead-letter-exchange', $dead_exchange); //定位了要转发的交换机还不够,还得知道那个队列,不然交换机不知道路由那个消息到达那个队列 $payload->set('x-dead-letter-routing-key', $dead_routing_key); //声明普通队,就是等普通队列出问题了,才把数据丢给死信队列,所以普通(注意是普通队列)队列,要额外的配置。 $channel->queue_declare($normal_queue, false, false, false, false, false, $payload); //声明死信队列,其实死信队列本身是一个普通队列。 $channel->queue_declare($dead_queue, false, false, false, false); echo '正在等待接受消息...'; //绑定普通交换机与普通队列 $channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key); //绑定死信交换机与死信队列 $channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key); $channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) { });
生产者代码,产生延时任务
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->exchange_declare('normal_exchange', 'direct', false, false, true); for($i = 0;$i < 10; $i++) { $msg = new AMQPMessage(microtime(true), [ 'expiration' => '3000' ]); $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key'); } $channel->close(); $connection->close();
延时任务处理代码
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $callback = function ($msg) { $consumer_time = microtime(true); $product_time = $msg->getBody(); echo '时间处理误差:', bcsub($consumer_time, $product_time, 4), "\n"; }; $channel->basic_consume('dead_queue', '', false, true, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
rabbitmqctl version
发现是3.10.0,rabbitmq-delayed-message-exchange release页也有说明:This release has no functional changes but lists RabbitMQ 3.10.x as supported in plugin metadata.cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.0/rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq-plugins enable rabbitmq_delayed_message_exchange 发现如下字样,就说明安装成功。 Enabling plugins on node rabbit@lnmp: rabbitmq_delayed_message_exchange The following plugins have been configured: rabbitmq_delayed_message_exchange rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@lnmp... The following plugins have been enabled: rabbitmq_delayed_message_exchange started 1 plugins. systemctl restart rabbitmq-server
生产者代码
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $delay_exchange = 'delay_exchange'; $delay_queue = 'delay_queue'; $delay_routing_key = 'delay_routing_key'; //声明交换机类型为direct $payload = new AMQPTable(); $payload->set('x-delayed-type', 'direct'); //声明延时队列交换机,参数2的类型,是安装插件后才有的,固定值 $channel->exchange_declare($delay_exchange, 'x-delayed-message', false, false, true, false, false, $payload); //声明一个自定义延迟队列 $channel->queue_declare($delay_queue, false, false, false, false); //队列绑定交换机 $channel->queue_bind($delay_queue, $delay_exchange, $delay_routing_key); //发送延迟消息 $msg = new AMQPMessage(microtime(true), [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, //这里配置超时时间,固定格式。 'application_headers' => new AMQPTable(['x-delay' => 5000]) ]); $channel->basic_publish($msg, $delay_exchange, $delay_routing_key); $channel->close(); $connection->close();
消费者代码
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $callback = function ($msg) { $consumer_time = microtime(true); $product_time = $msg->getBody(); echo '时间处理误差:', bcsub($consumer_time, $product_time, 4), "\n"; }; $channel->basic_consume('delay_queue', '', false, true, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
某些业务场景,可能需要提前触发,或者延期处理,这就需要一些外的操作,才能完成。
首先启动生产者代码:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $test_exchange = 'test_exchange'; $test_queue = 'test_queue'; $test_routing_key = 'test_routing_key'; $channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false); $payload = new AMQPTable(); $payload->set('x-max-priority', 20); //声明一个自定义延迟队列 $channel->queue_declare($test_queue, false, false, false, false, false,$payload); //队列绑定交换机 $channel->queue_bind($test_queue, $test_exchange, $test_routing_key); for($i =0; $i < 10; $i++) { $msg = new AMQPMessage($i, [ //为每个消息设置不同的优先级 'priority' => $i, ]); $channel->basic_publish($msg, $test_exchange, $test_routing_key, true); } $channel->close(); $connection->close();
消费者代码:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $callback = function ($msg) { echo $msg . "\n"; }; $channel->basic_consume('test_queue', '', false, true, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
惰性队列用的极少,因为是存储在磁盘中的。在消费者挂掉的情况下,避免RabbitMQ积累了太多的消息,消耗内存去采取的策略。
加了RabbitMQ组件可以看做是分布式系统,分布式系统有有一个CAP定理,CAP只能同时满足两个。
对于RabbitMQ消息丢失,或者重复消费的问题,若当前需求不能容忍,就需要额外的环节来弥补。如果当前需求(小概率)能容忍,去掉高可用环节,那么在性能上就会提升。
例如支付宝,支付时总是会延迟几秒钟,一是走支付宝安全风控系统,二是要极致要求稳定高可用的环节必然性能下降,三是等支付回调,然而其它接口内容却是瞬间加载。
所以没有最好的架构,只有最合适的架构,所以要结合业务场景,判断业务容忍度下限,与开发复杂度,在高可用与性能之间权衡,这是架构师必备思维。
/*
参数1:预取大小,通常情况下为null,表示不限制消息大小。
参数2:预取数量,表示消费者每次最多接收的消息数量,值为权重比例,可以为任意正整数。
参数3:是否将预取限制应用到channel级别(true)或者消费者级别(false)。
*/
$channel->basic_qos(null, 1, false);
极简概括:是用来保证消息不丢失的的重要功能,生产者将消息发送给MQ,MQ把数据保存在磁盘上之后,会返回生产者成功保存的反馈。需要生产者端的队列以及消息持久化的前提,就是为了防止队列或者小写将要持久化的时候RabbitMQ出故障的间隙情况发生。
队列持久化、消息持久化、发布确认3个因素加起来,才能保证消息不丢失。
发布确认模式有3种:
单个确认,发送100条数据,耗时0.337秒,大部分场景够用,高并发场景除外。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->queue_declare('hello', false, true, false, false); //开启发布确认模式 $channel->confirm_select(); $start = microtime(true); for($i = 0; $i < 100; $i++) { $msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $channel->basic_publish($msg, '', 'hello'); //生产一条数据,发送一条确认消息,参数值为超时时间,单位:秒 $channel->wait_for_pending_acks(10.000); } echo microtime(true) - $start; // 0.337秒 $channel->close(); $connection->close();
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->queue_declare('hello', false, true, false, false); //开启发布确认模式 $channel->confirm_select(); $start = microtime(true); for($i = 0; $i < 100; $i++) { $msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $channel->basic_publish($msg, '', 'hello'); } //生产一条数据,发送一条确认消息,参数值为超时时间,单位:秒 $channel->wait_for_pending_acks(10.000); echo microtime(true) - $start; // 0.048秒 $channel->close(); $connection->close();
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->set_ack_handler( function (\PhpAmqpLib\Message\AMQPMessage $message){ // echo $message->getBody(); } ); $channel->set_nack_handler( function (\PhpAmqpLib\Message\AMQPMessage $message){ // echo $message->getBody(); } ); $channel->queue_declare('hello', false, true, false, false); //开启发布确认模式 $channel->confirm_select(); $start = microtime(true); for($i = 0; $i < 100; $i++) { $msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $channel->basic_publish($msg, '', 'hello'); } echo microtime(true) - $start; // 0.012秒 $channel->close(); $connection->close();
消费者代码,首先启动
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $callback = function ($msg) { echo $msg->getBody() . "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; //手动确认 $channel->basic_consume('test_queue', '', false, false, false, false, $callback); $channel->consume(); $channel->close(); $connection->close();
先测试生产者,不用事务的情况,看看生产者异常情况下,消息是否会回滚。
命令行提示出异常了,消费者返回012三条消息,说明没有回滚。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $test_exchange = 'test_exchange'; $test_queue = 'test_queue'; $test_routing_key = 'test_routing_key'; $channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false); $payload = new AMQPTable(); $payload->set('x-max-priority', 20); //声明一个自定义延迟队列 $channel->queue_declare($test_queue, false, false, false, false, false,$payload); //队列绑定交换机 $channel->queue_bind($test_queue, $test_exchange, $test_routing_key); try { for($i =0; $i < 10; $i++) { if($i == 3) { throw new Exception('测试异常'); } $msg = new AMQPMessage($i); $channel->basic_publish($msg, $test_exchange, $test_routing_key, true); } echo '成功执行'; } catch(\Exception $e) { echo '出异常了'; } $channel->close(); $connection->close();
再测试生产者,用事务的情况,看看生产者异常情况下,消息是否会回滚。
命令行提示出异常了,消费者返回0条消息,说明生产端异常,也会回滚。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $test_exchange = 'test_exchange'; $test_queue = 'test_queue'; $test_routing_key = 'test_routing_key'; $channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false); $payload = new AMQPTable(); $payload->set('x-max-priority', 20); //声明一个自定义延迟队列 $channel->queue_declare($test_queue, false, false, false, false, false,$payload); //队列绑定交换机 $channel->queue_bind($test_queue, $test_exchange, $test_routing_key); try { $channel->tx_select(); for($i =0; $i < 10; $i++) { if($i == 3) { throw new Exception('测试异常'); } $msg = new AMQPMessage($i); $channel->basic_publish($msg, $test_exchange, $test_routing_key, true); } echo '成功执行'; //提交事务 $channel->tx_commit(); } catch(\Exception $e) { echo '出异常了'; //回滚事务 $channel->tx_rollback(); } $channel->close(); $connection->close();
测试手动确认或拒绝应答(无论是确认,或者拒绝,RabbitMQ都任务此消息已经消费过了)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
//获取命令行参数
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
这里只有消费者1,手动拒绝消息
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->basic_consume('hello', '', false, false, false, false, function ($msg) { echo "收到消息,内容为{$msg->getBody()}\n"; sleep(5); echo "拒绝处理\n"; /* basic_nack和basic_ack参数一致 参数1:string 消息唯一标识符 参数2:bool 是否确认接受或拒绝多个消息 参数3:bool 表示是否重新放入消息队列 */ $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']); } ); $channel->consume();
这里只有消费者1,手动确认消息,basic_nack(拒绝)方法改为basic_ack(同意)方法即可。
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->basic_consume('hello', '', false, false, false, false, function ($msg) { echo "收到消息,内容为{$msg->getBody()}\n"; sleep(5); echo "成功处理消息\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } ); $channel->consume();
测试故障转移,生产者生产出一条消息,消费者1会立即捕获这条消息,但是处理时间为5秒钟,如果在这5秒钟内Ctrl + C强行终止,则RabbitMQ立即会让消费者2去消费这条数据,进而保证消息不丢失。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
//获取命令行参数
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
消费者1代码如下:
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->basic_consume('hello', '', false, false, false, false, function ($msg) { echo "收到消息,内容为{$msg->getBody()}\n"; sleep(5); echo "成功处理消息\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } ); $channel->consume();
消费者2代码如下
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678'); $channel = $connection->channel(); $channel->basic_consume('hello', '', false, false, false, false, function ($msg) { echo "收到消息,内容为{$msg->getBody()}\n"; sleep(10); echo "成功处理消息\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } ); $channel->consume();
以php-amqplib/php-amqplib为例,
交换机持久化:exchange_declare()方法参数4设置为true即可。
队列持久化:exchange_declare()方法参数3设置为true即可。
消息持久化:AMQPMessage()对象参数2添加[‘delivery_mode’ => AMQPMessage::DELIVERY_MODE_PERSISTENT]即可。
对于设置为持久化存储的情况:RabbitMQ会开启一个进程,采用追加写的方式,对数据进行实时持久化存储。
对于设置为非持久化存储的:RabbitMQ会开启一个进程,当内存不够时,采用追加写的方式,对数据进行实时持久化存储。
存储方式:持久化存储时,RabbitMQ会内部维护一张ETS的表,用于记录消息的(id、偏移量、有效数据、左边文件、右边文件的元数据)。
注意:如果一个队列已经存在且没有配置持久化,若再次配置持久化,会报错。如果一个队列持久化了,在网页端控制台->queues->队列堆在表格->所在队列的Features项,会显示D。
将消息标记为持久化并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ已经设置为持久化,不能完全保证不会丢失消息,接受了一个消息但还没有保存它时,仍然有一个很短的时间空隙,这个间隙出问题了,还是有丢失的可能。
composer require vladimir-yuldashev/laravel-queue-rabbitmq vim laravel/config/queue.php,添加以下配置,并修改'driver' => 'rabbitmq'。 'rabbitmq' => [ 'driver' => 'rabbitmq', 'hosts' => [ [ 'host' => '192.168.0.180', 'port' => '5672', 'user' => 'admin', 'password' => '12345678', 'vhost' => '/', ] ], 'lazy' => false, 'options' => [ 'queue' => [ 'prioritize_delayed' => false, 'queue_max_priority' => 10, ], ], ],
大部分公司仍旧用的是框架自带的基于Redis实现的队列或延时队列,RabbitMQ都很少用,更别说用到集群。
集群方面的,小公司用不上,大公司有专门的运维。
后期更新……
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。