赞
踩
一。RabbitMQ简要概括
1、AMQP:Advanced Message Queuing Protocol,是一个提供统一消息服务的应用层标准协议。
2、IPC(单一系统进程间通信) -> socket(不同机器间进程通信) -> AMQP(解决大型系统模块与组件间通信)
3、RabbitMQ 基于 Erlang 开发,是 AMQP 的一个开源实现。
4、RabbitMQ 系统架构图:
5、名词术语:
还有几个隐式的概念:
此外,Exchanges 分三种类型:
二、安装PHP-amqp扩展
https://pecl.php.net/package/amqp/1.4.0/windows
先查看自己的php版本
接下来下载dll文件 地址http://pecl.php.net/package/amqp
下载稳定版的,点击DLL
php版本 ,X86 和X64 根据自己情况 , NTS 和 TS 就是那个thread safty 的状态 这个大家都会看吧 就不多说了
下载解压
将php_amqp.dll文件放到php目录的ext文件夹下 见下图:
将rabbitmq.1.dll文件放到php根目录 见下图:
php.ini里面添加
之后重启NGinx
报错:Fatal error: Uncaught exception 'AMQPConnectionException' with message 'Library error: connection closed unexpectedly - Potential login failure.' in
已解决:下载的dll 版本不匹配,重新下载低版本即可
1,建立一个send.php文件用来发送消息
2,建立一个 receive.php 文件用来接收消息
代码如下
send.php
- <?php
- // 发送端
- //echo phpinfo();
-
- /**
- * 发送消息
- */
- $exchangeName = 'AMQP default'; //交换机名
- $routeKey = 'delay-lxw'; //路由key
- $message = 'Hello World!'; //消息内容
-
- // 建立TCP连接,配置信息
- $connection = new AMQPConnection([
- 'host' => 'localhost',
- 'port' => '5672',
- 'vhost' => '/',
- 'login' => 'guest',
- 'password' => 'guest',
- 'heartbeat'=>30,
- ]);
-
- var_dump($connection->connect());
- //创建链接
- $connection->connect() or die("Cannot connect to the broker!\n");
-
- try {
- $channel = new AMQPChannel($connection);
- //创建交换机对象
- $exchange = new AMQPExchange($channel);
- $exchange->setName($exchangeName);
- $exchange->setType(AMQP_EX_TYPE_DIRECT);
- $exchange->declareExchange();
-
- //发送消息
- echo 'Send Message: ' . $exchange->publish($message, $routeKey) . "\n";
- echo "Message Is Sent: " . $message . "\n";
- } catch (\AMQPConnectionException $e) {
- var_dump($e);
- }
-
- $connection->disconnect();// 断开连接
receive.php
- <?php
- //echo phpinfo();
-
- /**
- * 接收消息
- */
- $exchangeName = 'AMQP default';
- $queueName = 'delay-lxw';
- $routeKey = 'delay-lxw';
- // 建立TCP连接
- $connection = new AMQPConnection([
- 'host' => 'localhost',
- 'port' => '5672',
- 'vhost' => '/',
- 'login' => 'guest',
- 'password' => 'guest',
- 'prefetch_count'=>3,
- 'read_timeout'=>0,
- 'write_timeout'=>0,
- 'heartbeat'=>30,
- ]);
- $connection->connect() or die("Cannot connect to the broker!\n");
- $channel = new AMQPChannel($connection);
- $exchange = new AMQPExchange($channel);
- $exchange->setName($exchangeName);
- $exchange->setType(AMQP_EX_TYPE_DIRECT);
- echo 'Exchange Status: ' . $exchange->declareExchange() . "\n";
- $queue = new AMQPQueue($channel);
- $queue->setName($queueName);
- echo 'Message Total: ' . $queue->declareQueue() . "\n";
- echo 'Queue Bind: ' . $queue->bind($exchangeName, $routeKey) . "\n";
- var_dump("Waiting for message...");
- // 消费队列消息
- while (TRUE) {
- $queue->consume('processMessage');
- }
-
- // 断开连接
- $connection->disconnect();
-
- function processMessage($envelope, $queue)
- {
- $msg = $envelope->getBody();
- var_dump("Received: " . $msg);
- $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
- }
打开两个终端,先运行接收者脚本监听消息发送:
php receive.php
在另一个终端中运行消息发送脚本:
php send.php
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。