赞
踩
1.安装插件
地址:https://www.rabbitmq.com/community-plugins.html
2.将其放到rabbitmq的插件目录plug
mv /www/wwwroot/rabbitmq_delayed_message_exchange-3.8.0.ez /usr/local/rabbitmq/plugins/
3.开启插件
[root@dcdfa0e9eb71 sbin]# ./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@dcdfa0e9eb71:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@dcdfa0e9eb71...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins
查看插件安装列表
在ui界面查看
atal error: Uncaught exception 'PhpAmqpLib\Exception\AMQPProtocolChannelException' with message 'PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange type' in /www/wwwroot/default/example/delay-mq/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php:216
详情可见 Github Issues rabbitmq-delayed-message-exchange/issues/19,正确操作如下图所示
2.代码
安装扩展包
composer require php-amqplib/php-amqplib:2.12.3
publish.php
<?php require_once __DIR__.'/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $exchange = "delayed-order";//交换机名称 $queue = "delayQueue"; //队列名 $routing_key = 'dead-x-key'; //交换机路由key $ttl = 20000; //单位豪秒 //1.建立连接 $connection = new AMQPStreamConnection('127.0.0.1','5672','zk','123456','/'); //2.创建信道 $channel = $connection->channel(); //2.声明交换机 $args = array('x-delayed-type' => 'direct'); $channel->exchange_declare($exchange,"x-delayed-message",false, true, false,false,$args); //3.声明创建队列 $channel->queue_declare($queue, false, true, false, false,false); //4.绑定交换机和队列 $channel->queue_bind($queue,$exchange,$routing_key); //5.创建消息内容 $data = ['orderId'=>rand(10001,99999)]; $options = array( 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, //消息持久化 'application_headers' => new AMQPTable(['x-delay' => $ttl]) ); $msg = new AMQPMessage(json_encode($data),$options); //发送消息 $channel->basic_publish($msg,$exchange,$routing_key); $channel->close(); $connection->close(); var_dump("发送成功",$data);
consume.php
<?php require_once __DIR__.'/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $exchange = "delayed-order";//交换机名称 $queue = "delayQueue"; //队列名 $routing_key = 'dead-x-key'; //交换机路由key $ttl = 20000; //单位豪秒 //1.建立连接 $connection = new AMQPStreamConnection('127.0.0.1','5672','zk','123456','/'); //2.创建信道 $channel = $connection->channel(); //2.声明交换机 $channel->exchange_declare($exchange,"x-delayed-message",false, true, false); //3.绑定交换机和队列 $channel->queue_bind($queue,$exchange,$routing_key); //4.消费队列中的数据 $callback = function ($msg){ $data = json_decode($msg->body,true); var_dump($msg->body); $id = $data['orderId']; //todo 做一些数据库或者业务逻辑的处理 // sleep(3); echo "\n已完成".$id."的处理\n"; $msg->ack(); }; //公平调度,新的消息将发送到一个处于空闲的消费者。 $channel->basic_qos(null, 1, null); $channel->basic_consume($queue,'',false,false,false,false,$callback); //没有消息时候等待 while(count($channel->callbacks)){ $channel->wait(); } //关闭信道 $channel->close(); //关闭连接 $connection->close();
结果:等待20s后进入队列
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。