赞
踩
composer依赖包是官方的php-amqplib/php-amqplib(需打开php的sockets扩展,否则会报错)
第一步 【安装延迟队列插件】:
rabbitMQ默认是没有延迟队列插件的,我们需要手动进行安装。
- wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
- cp rabbitmq_delayed_message_exchange-3.9.0.ez /opt/rabbitmq/plugins/
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- <?php
-
- require_once "../vendor/autoload.php";
-
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- use PhpAmqpLib\Message\AMQPMessage;
-
- class SendMessage {
-
- protected $rabbitmq_channel;
- protected $rabbitmq_connection;
-
- public function __construct() {
- $this->rabbitmq_connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
- $this->rabbitmq_channel = $this->rabbitmq_connection->channel();
- }
-
- /**
- * 生产延迟队列
- * @param $msg
- */
- public function sendMessage($msg) {
- //声明死信交换机
- $this->rabbitmq_channel->exchange_declare(
- 'delayed_exchange',
- 'x-delayed-message',
- false,
- true,
- false,
- false,
- false,
- new \PhpAmqpLib\Wire\AMQPTable([
- 'x-delayed-type' => \PhpAmqpLib\Exchange\AMQPExchangeType::FANOUT
- ])
- );
-
- $headers = new \PhpAmqpLib\Wire\AMQPTable(['x-delay' => 7000]);
- $message = new AMQPMessage($msg, ['delivery_mode' => 2]);
- $message->set('application_headers', $headers);
- $this->rabbitmq_channel->basic_publish($message, 'delayed_exchange');
-
- $datetime = date('Y/m/d H:i:s');
- echo "成功发送延迟消息 : {$msg} , {$datetime} \n";
-
- }
-
-
- /**
- * @throws Exception
- */
- public function __destruct()
- {
- $this->rabbitmq_channel->close();
- $this->rabbitmq_connection->close();
- }
- }
-
- try {
-
- $sendMessage = new SendMessage();
- $sendMessage->sendMessage("这是一条延迟消息");
-
- } catch (\Throwable $e) {
-
- echo $e->getMessage();
-
- }
- <?php
-
- require_once "../vendor/autoload.php";
-
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- use PhpAmqpLib\Message\AMQPMessage;
-
- class ReceiveMessage {
-
- protected $rabbitmq_channel;
- protected $rabbitmq_connection;
-
- public function __construct() {
- $this->rabbitmq_connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
- $this->rabbitmq_channel = $this->rabbitmq_connection->channel();
- }
-
- /**
- * 消费延迟队列
- * @throws ErrorException
- */
- public function receiveMessage() {
-
- //声明死信交换机
- $this->rabbitmq_channel->exchange_declare(
- 'delayed_exchange',
- 'x-delayed-message',
- false,
- true,
- false,
- false,
- false,
- new \PhpAmqpLib\Wire\AMQPTable([
- 'x-delayed-type' => \PhpAmqpLib\Exchange\AMQPExchangeType::FANOUT
- ])
- );
- //声明死信队列
- $this->rabbitmq_channel->queue_declare(
- 'delayed_queue',
- false,
- true,
- false,
- false,
- false,
- new \PhpAmqpLib\Wire\AMQPTable([
- 'x-dead-letter-exchange' => 'delayed'
- ])
- );
-
- //绑定队列到交换机
- $this->rabbitmq_channel->queue_bind('delayed_queue', 'delayed_exchange');
-
- echo "正在等待延迟队列消息, waiting... \n";
-
- $callback = function (AMQPMessage $message) {
- //$headers = $message->get('application_headers');
- //$nativeData = $headers->getNativeData();
- echo $message->body . '-------' . date('Y/m/d H:i:s') . "\n";
- $message->ack();
- };
-
- $this->rabbitmq_channel->basic_consume(
- 'delayed_queue',
- '',
- false,
- false,
- false,
- false,
- $callback
- );
-
- while ($this->rabbitmq_channel->is_consuming()) {
- $this->rabbitmq_channel->wait();
- }
-
- }
- }
-
- try {
-
- $receiveMessage = new ReceiveMessage();
- $receiveMessage->receiveMessage();
-
- } catch (\Throwable $e) {
-
- echo $e->getMessage();
-
- }
首先执行消费者进行监听队列
现在执行生产者来发送一条延迟消息
再次回到消费者控制台查看消费情况
由此可见,我们上面生产的延迟消息['x-delay' => 7000](7秒),消费时间则正是7秒后.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。