赞
踩
安装 rabbitmq_delayed_message_exchange 插件,请参考:Windows 10安装RabbitMQ及延时消息插件
<?php /** * Created by PhpStorm * User: Jason * Date: 2023-02-06 * Time: 11:23 */ return [ // 配置 'config' => [ 'host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest', ], // 交换机名称 'exchange_name' => 'order_expire_exchange', // 队列名称 'queue_name' => 'order_expire_queue', // 路由key 'route_key_name' => 'order_expire_key', ];
<?php /** * Created by PhpStorm * User: Jason * Date: 2023-02-24 * Time: 15:17 */ $config = require 'config.php'; try { // 建立连接 $conn = new AMQPConnection($config); $conn->connect(); !$conn->isConnected() && die('connect failed'); // 创建管道 $channel = new AMQPChannel($conn); !$channel->isConnected() && die('declare channel failed'); // 创建交换机 $exchange = new AMQPExchange($channel); $exchange->setName($config['exchange_name']); $exchange->setType('x-delayed-message'); $exchange->setArgument('x-delayed-type', 'direct'); $exchange->declareExchange(); // 创建队列 $queue = new AMQPQueue($channel); $queue->setName($config['queue_name']); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); // 交换机绑定队列 $queue->bind($config['exchange_name'], $config['route_key_name']); // 发布消息 $time = rand(1, 10); $message = 'time:' . date('Y-m-d H:i:s') . ',send message:' . 'hello' . $time; echo $message . PHP_EOL; $exchange->publish($message, $config['route_key_name'], AMQP_NOPARAM, ['headers' => ['x-delay' => 1000 * $time]]); // 关闭连接 $conn->disconnect(); } catch (Exception $exception) { echo $exception->getMessage(); }
<?php /** * Created by PhpStorm * User: Jason * Date: 2023-02-24 * Time: 15:27 */ $config = require 'config.php'; try { // 建立连接 $conn = new AMQPConnection($config); $conn->connect(); !$conn->isConnected() && die('connect failed'); // 创建管道 $channel = new AMQPChannel($conn); !$channel->isConnected() && die('declare channel failed'); // 声明交换机 $exchange = new AMQPExchange($channel); $exchange->setName($config['exchange_name']); $exchange->setType('x-delayed-message'); $exchange->setArgument('x-delayed-type', 'direct'); $exchange->declareExchange(); // 声明队列 $queue = new AMQPQueue($channel); $queue->setName($config['queue_name']); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); // 绑定交换机与队列并指向到路由key $queue->bind($config['exchange_name'], $config['route_key_name']); // 阻塞模式接收消息 while (true) { $queue->consume(function ($envelope, $queue) { $msg = $envelope->getBody(); // todo:处理消息 echo date('Y-m-d H:i:s') . ' ' . 'receive message:' . $msg . PHP_EOL; // 手动发送ACK应答 $queue->ack($envelope->getDeliveryTag()); }); } } catch (Exception $exception) { echo $exception->getMessage(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。