当前位置:   article > 正文

php+rabbitMQ操作之延迟队列(延迟插件)_php rabbitmq 延时队列

php rabbitmq 延时队列

composer依赖包是官方的php-amqplib/php-amqplib(需打开php的sockets扩展,否则会报错)

第一步 【安装延迟队列插件】:

rabbitMQ默认是没有延迟队列插件的,我们需要手动进行安装。

  1. wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
  2. cp rabbitmq_delayed_message_exchange-3.9.0.ez /opt/rabbitmq/plugins/
  3. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

第二步 【创建延迟队列生产者】:

  1. <?php
  2. require_once "../vendor/autoload.php";
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. class SendMessage {
  6. protected $rabbitmq_channel;
  7. protected $rabbitmq_connection;
  8. public function __construct() {
  9. $this->rabbitmq_connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  10. $this->rabbitmq_channel = $this->rabbitmq_connection->channel();
  11. }
  12. /**
  13. * 生产延迟队列
  14. * @param $msg
  15. */
  16. public function sendMessage($msg) {
  17. //声明死信交换机
  18. $this->rabbitmq_channel->exchange_declare(
  19. 'delayed_exchange',
  20. 'x-delayed-message',
  21. false,
  22. true,
  23. false,
  24. false,
  25. false,
  26. new \PhpAmqpLib\Wire\AMQPTable([
  27. 'x-delayed-type' => \PhpAmqpLib\Exchange\AMQPExchangeType::FANOUT
  28. ])
  29. );
  30. $headers = new \PhpAmqpLib\Wire\AMQPTable(['x-delay' => 7000]);
  31. $message = new AMQPMessage($msg, ['delivery_mode' => 2]);
  32. $message->set('application_headers', $headers);
  33. $this->rabbitmq_channel->basic_publish($message, 'delayed_exchange');
  34. $datetime = date('Y/m/d H:i:s');
  35. echo "成功发送延迟消息 : {$msg} , {$datetime} \n";
  36. }
  37. /**
  38. * @throws Exception
  39. */
  40. public function __destruct()
  41. {
  42. $this->rabbitmq_channel->close();
  43. $this->rabbitmq_connection->close();
  44. }
  45. }
  46. try {
  47. $sendMessage = new SendMessage();
  48. $sendMessage->sendMessage("这是一条延迟消息");
  49. } catch (\Throwable $e) {
  50. echo $e->getMessage();
  51. }

第三步 【创建延迟队列消费者】:

  1. <?php
  2. require_once "../vendor/autoload.php";
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. class ReceiveMessage {
  6. protected $rabbitmq_channel;
  7. protected $rabbitmq_connection;
  8. public function __construct() {
  9. $this->rabbitmq_connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  10. $this->rabbitmq_channel = $this->rabbitmq_connection->channel();
  11. }
  12. /**
  13. * 消费延迟队列
  14. * @throws ErrorException
  15. */
  16. public function receiveMessage() {
  17. //声明死信交换机
  18. $this->rabbitmq_channel->exchange_declare(
  19. 'delayed_exchange',
  20. 'x-delayed-message',
  21. false,
  22. true,
  23. false,
  24. false,
  25. false,
  26. new \PhpAmqpLib\Wire\AMQPTable([
  27. 'x-delayed-type' => \PhpAmqpLib\Exchange\AMQPExchangeType::FANOUT
  28. ])
  29. );
  30. //声明死信队列
  31. $this->rabbitmq_channel->queue_declare(
  32. 'delayed_queue',
  33. false,
  34. true,
  35. false,
  36. false,
  37. false,
  38. new \PhpAmqpLib\Wire\AMQPTable([
  39. 'x-dead-letter-exchange' => 'delayed'
  40. ])
  41. );
  42. //绑定队列到交换机
  43. $this->rabbitmq_channel->queue_bind('delayed_queue', 'delayed_exchange');
  44. echo "正在等待延迟队列消息, waiting... \n";
  45. $callback = function (AMQPMessage $message) {
  46. //$headers = $message->get('application_headers');
  47. //$nativeData = $headers->getNativeData();
  48. echo $message->body . '-------' . date('Y/m/d H:i:s') . "\n";
  49. $message->ack();
  50. };
  51. $this->rabbitmq_channel->basic_consume(
  52. 'delayed_queue',
  53. '',
  54. false,
  55. false,
  56. false,
  57. false,
  58. $callback
  59. );
  60. while ($this->rabbitmq_channel->is_consuming()) {
  61. $this->rabbitmq_channel->wait();
  62. }
  63. }
  64. }
  65. try {
  66. $receiveMessage = new ReceiveMessage();
  67. $receiveMessage->receiveMessage();
  68. } catch (\Throwable $e) {
  69. echo $e->getMessage();
  70. }

第四步【测试】:

首先执行消费者进行监听队列

现在执行生产者来发送一条延迟消息

 再次回到消费者控制台查看消费情况

 由此可见,我们上面生产的延迟消息['x-delay' => 7000](7秒),消费时间则正是7秒后.

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号