赞
踩
RabbitMQ 延迟队列插件未安装直接使用的话,会报错:
unknown exchange type 'x-delayed-message'
插件下载地址:https://www.rabbitmq.com/community-plugins.html 。下载 Erlang 可执行文件之后,复制到rabbit服务的插件目录(自己的安装目录,我的是 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\plugins
)中,然后开启插件服务:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
├─PhpAmqpLib
│ ├─Channel
│ ├─Connection
│ ├─Exception
│ ├─Exchange
│ ├─Helper
│ │ └─Protocol
│ ├─Message
│ └─Wire
│ └─IO
├─test
│ ├─delayP.php
│ └─delayC.php
└─index.php
<?php function my_autoloader($cName) { include(__DIR__."/".$cName.".php"); } spl_autoload_register("my_autoloader"); print_r($argv); if (isset($argv[2])) { $cname = '\test\\'.$argv[1]; if (!class_exists($cname)) { exit("class (".$cname.") not exists".PHP_EOL); } $c = new $cname(); if (!method_exists($c, $argv[2])) { exit("method (".$argv[2].") not exists".PHP_EOL); } if (isset($argv[3])) { call_user_func(array($c, $argv[2]), $argv[3]); } else { call_user_func(array($c, $argv[2])); } } else if (isset($argv[1])) { if (!function_exists($argv[1])) { exit("function (".$argv[1].") not exists".PHP_EOL); } $argv[1](); } else { exit("please input at least one argument".PHP_EOL); }
<?php namespace test; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; class delayP { private $host = 'localhost'; private $port = 5672; private $user = 'guest'; private $password = 'guest'; // 可能丢失 public function p($delayTime = 5) { $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, '/', false, 'AMQPLAIN', null, 'en_US', 3.0, 120.0, null, true, 60); $channel = $connection->channel(); $channel->exchange_declare('delay.myExchange', 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct'])); $channel->queue_declare('delay.myQueue', false, true, false, false); $channel->queue_bind('delay.myQueue', 'delay.myExchange', 'my'); // 准备消息 $msg = new AMQPMessage( json_encode([ 'data' => "延迟队列数据".time(), 'sendTime' => time(), 'expectRunTime' => time() + $delayTime ]), [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // 持久化 ] ); $msg->set('application_headers', new AMQPTable([ 'x-delay' => $delayTime * 1000 // 延迟时间,单位毫秒 ])); $channel->basic_publish($msg, 'delay.myExchange', 'my'); echo "basic_publish success"; $channel->close(); $connection->close(); } }
<?php namespace test; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; class delayC { private $host = 'localhost'; private $port = 5672; private $user = 'guest'; private $password = 'guest'; // 自动 ACK public function c() { $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, '/', false, 'AMQPLAIN', null, 'en_US', 3.0, 120.0, null, true, 60); $channel = $connection->channel(); $channel->exchange_declare('delay.myExchange', 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct'])); $channel->queue_declare('delay.myQueue', false, true, false, false); //闭包回调函数 $callback = function ($msg) { echo $msg->body.' '.time(); echo PHP_EOL; }; $channel->basic_qos(null, 1, null); $channel->basic_consume('delay.myQueue', '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。