当前位置:   article > 正文

php实现rabbitmq延迟队列_延迟队列 phpamqplib

延迟队列 phpamqplib

1.安装插件
地址:https://www.rabbitmq.com/community-plugins.html
在这里插入图片描述
2.将其放到rabbitmq的插件目录plug

mv /www/wwwroot/rabbitmq_delayed_message_exchange-3.8.0.ez /usr/local/rabbitmq/plugins/
  • 1

3.开启插件

[root@dcdfa0e9eb71 sbin]# ./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@dcdfa0e9eb71:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@dcdfa0e9eb71...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

查看插件安装列表

在这里插入图片描述
在ui界面查看

在这里插入图片描述

  • 管理控制台声明 x-delayed-message 交换机
    在开始代码之前先打开 RabbitMQ 的管理 UI 界面,声明一个 x-delayed-message 类型的交换机,否则你会遇到下面的错误:
  • 交换机名和代码的交换机名称要一样
atal error:  Uncaught exception 'PhpAmqpLib\Exception\AMQPProtocolChannelException' with message 'PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange type' in /www/wwwroot/default/example/delay-mq/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php:216
  • 1

详情可见 Github Issues rabbitmq-delayed-message-exchange/issues/19,正确操作如下图所示

在这里插入图片描述

2.代码
安装扩展包

composer require php-amqplib/php-amqplib:2.12.3
  • 1

publish.php

<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;



$exchange = "delayed-order";//交换机名称
$queue = "delayQueue"; //队列名
$routing_key = 'dead-x-key'; //交换机路由key
$ttl = 20000; //单位豪秒

//1.建立连接
$connection = new AMQPStreamConnection('127.0.0.1','5672','zk','123456','/');
//2.创建信道
$channel = $connection->channel();
//2.声明交换机
$args = array('x-delayed-type' => 'direct');
$channel->exchange_declare($exchange,"x-delayed-message",false, true, false,false,$args);
//3.声明创建队列
$channel->queue_declare($queue, false, true, false, false,false);
//4.绑定交换机和队列
$channel->queue_bind($queue,$exchange,$routing_key);
//5.创建消息内容
$data = ['orderId'=>rand(10001,99999)];
$options = array(
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,   //消息持久化
    'application_headers' => new AMQPTable(['x-delay' => $ttl])
);
$msg = new AMQPMessage(json_encode($data),$options);
//发送消息
$channel->basic_publish($msg,$exchange,$routing_key);
$channel->close();
$connection->close();
var_dump("发送成功",$data);


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

consume.php

<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$exchange = "delayed-order";//交换机名称
$queue = "delayQueue"; //队列名
$routing_key = 'dead-x-key'; //交换机路由key
$ttl = 20000; //单位豪秒


//1.建立连接
$connection = new AMQPStreamConnection('127.0.0.1','5672','zk','123456','/');
//2.创建信道
$channel = $connection->channel();
//2.声明交换机
$channel->exchange_declare($exchange,"x-delayed-message",false, true, false);
//3.绑定交换机和队列
$channel->queue_bind($queue,$exchange,$routing_key);

//4.消费队列中的数据
$callback = function ($msg){
    $data = json_decode($msg->body,true);
    var_dump($msg->body);
    $id = $data['orderId'];
    //todo 做一些数据库或者业务逻辑的处理
//    sleep(3);
    echo "\n已完成".$id."的处理\n";
    $msg->ack();
};

//公平调度,新的消息将发送到一个处于空闲的消费者。
$channel->basic_qos(null, 1, null);

$channel->basic_consume($queue,'',false,false,false,false,$callback);

//没有消息时候等待
while(count($channel->callbacks)){
    $channel->wait();
}


//关闭信道
$channel->close();
//关闭连接
$connection->close();

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

结果:等待20s后进入队列
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/575008
推荐阅读
相关标签
  

闽ICP备14008679号