当前位置:   article > 正文

RabbitMQ 延迟队列-基于PHP实现_php rabbitmq 死信队列

php rabbitmq 死信队列

安装 RabbitMQ 延迟队列插件

RabbitMQ 延迟队列插件未安装直接使用的话,会报错:

unknown exchange type 'x-delayed-message'
  • 1

插件下载地址:https://www.rabbitmq.com/community-plugins.html 。下载 Erlang 可执行文件之后,复制到rabbit服务的插件目录(自己的安装目录,我的是 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\plugins )中,然后开启插件服务:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 1

测试步骤

  1. 创建测试项目的目录 mq
  2. 从 https://github.com/php-amqplib/php-amqplib 下载AMQP库(当然也可以通过 composer 安装,这里为了简单直接自己处理了),放入 mq 目录
  3. 编写 index.php,实现自动加载
  4. 创建 test 目录,里面分别创建 mqc.php 消费者和 mqp.php 生产者两个文件
  5. 跑脚本,测试消息的生产和消费:
    • php -f index.php delayP p 3 生产消息,延时3秒
    • php -f index.php delayC c 消费消息

目录结构

├─PhpAmqpLib
│  ├─Channel
│  ├─Connection
│  ├─Exception
│  ├─Exchange
│  ├─Helper
│  │  └─Protocol
│  ├─Message
│  └─Wire
│      └─IO
├─test
│  ├─delayP.php
│  └─delayC.php
└─index.php
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

源码

index.php

<?php

function my_autoloader($cName) {
	include(__DIR__."/".$cName.".php");
}

spl_autoload_register("my_autoloader");

print_r($argv);

if (isset($argv[2])) {
	$cname = '\test\\'.$argv[1];
	if (!class_exists($cname)) {
		exit("class (".$cname.") not exists".PHP_EOL);
	}
	$c = new $cname();
	if (!method_exists($c, $argv[2])) {
		exit("method (".$argv[2].") not exists".PHP_EOL);
	}
	if (isset($argv[3])) {
		call_user_func(array($c, $argv[2]), $argv[3]);
	} else {
		call_user_func(array($c, $argv[2]));
	}
} else if (isset($argv[1])) {
	if (!function_exists($argv[1])) {
		exit("function (".$argv[1].") not exists".PHP_EOL);
	}
	$argv[1]();
} else {
	exit("please input at least one argument".PHP_EOL);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

delayP.php

<?php

namespace test;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class delayP {
	private $host = 'localhost';
	private $port = 5672;
	private $user = 'guest';
	private $password = 'guest';
	
	// 可能丢失
	public function p($delayTime = 5) {
		$connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, '/', false, 'AMQPLAIN', null, 'en_US', 3.0, 120.0, null, true, 60);
		$channel = $connection->channel();

		$channel->exchange_declare('delay.myExchange', 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct']));
		$channel->queue_declare('delay.myQueue', false, true, false, false);
		$channel->queue_bind('delay.myQueue', 'delay.myExchange', 'my');

		// 准备消息
		$msg = new AMQPMessage(
			json_encode([
				'data' => "延迟队列数据".time(),
				'sendTime' => time(),
				'expectRunTime' => time() + $delayTime
			]), [
				'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // 持久化
			]
		);		
		$msg->set('application_headers', new AMQPTable([
			'x-delay' => $delayTime * 1000 // 延迟时间,单位毫秒
		]));
		$channel->basic_publish($msg, 'delay.myExchange', 'my');
		echo "basic_publish success";

		$channel->close();
		$connection->close();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

delayC.php

<?php

namespace test;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class delayC {
	private $host = 'localhost';
	private $port = 5672;
	private $user = 'guest';
	private $password = 'guest';
	
	// 自动 ACK
	public function c() {
		$connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, '/', false, 'AMQPLAIN', null, 'en_US', 3.0, 120.0, null, true, 60);
		$channel = $connection->channel();

		$channel->exchange_declare('delay.myExchange', 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct']));
		$channel->queue_declare('delay.myQueue', false, true, false, false);
		
		//闭包回调函数
		$callback = function ($msg) {
			echo $msg->body.' '.time();
			echo PHP_EOL;
		};
		$channel->basic_qos(null, 1, null);
		$channel->basic_consume('delay.myQueue', '', false, true, false, false, $callback);

		while (count($channel->callbacks)) {
			$channel->wait();
		}
		$channel->close();
		$connection->close();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/575040
推荐阅读
相关标签
  

闽ICP备14008679号