当前位置:   article > 正文

基于PHP使用集成RabbitMQ_php 对接rebitmq

php 对接rebitmq

一。RabbitMQ简要概括

1、AMQP:Advanced Message Queuing Protocol,是一个提供统一消息服务的应用层标准协议。

2、IPC(单一系统进程间通信) -> socket(不同机器间进程通信) -> AMQP(解决大型系统模块与组件间通信)

3、RabbitMQ 基于 Erlang 开发,是 AMQP 的一个开源实现。

4、RabbitMQ 系统架构图:

5、名词术语:

  • RabbitMQ Server(broker server):维护一条从 Producer 到 Consumer 的路线,保证数据能够按照指定的方式进行传输;
  • Client A & B:数据发送方,Producers create messages and publish (send) them to a broker server (RabbitMQ),一个有效的 Message 包含 payload 和 label 两部分
  • Client 1、2、3:数据消费方,Consumers attach to a broker server (RabbitMQ) and subscribe to a queue
  • Exchange:Exchanges are where producers publish their messages
  • Queue: Queues are where the messages end up and are received by consumers
  • Binding:Bindings are how the messages get routed from the exchange to particular queues

还有几个隐式的概念:

  • Connection:Producer 和 Consumer 通过 TCP 连接到 RabbitMQ
  • Channel:它建立在上述的 TCP 连接中,数据流动都是在 Channel 中进行的

此外,Exchanges 分三种类型:

  • direct:如果 routing key 匹配,那么 Message 就会被传递到相应的 queue
  • fanout:会向响应的 queue 广播
  • topic:对 key 进行模式匹配,比如 ab* 可以传递到所有 ab* 的 queue

二、安装PHP-amqp扩展

https://pecl.php.net/package/amqp/1.4.0/windows

先查看自己的php版本

接下来下载dll文件 地址http://pecl.php.net/package/amqp

下载稳定版的,点击DLL

php版本 ,X86 和X64 根据自己情况  , NTS  和 TS 就是那个thread safty 的状态 这个大家都会看吧 就不多说了

下载解压

将php_amqp.dll文件放到php目录的ext文件夹下  见下图:

 将rabbitmq.1.dll文件放到php根目录 见下图:

 php.ini里面添加    

之后重启NGinx

报错:Fatal error: Uncaught exception 'AMQPConnectionException' with message 'Library error: connection closed unexpectedly - Potential login failure.' in 

已解决:下载的dll 版本不匹配,重新下载低版本即可

 

 

 

php 中使用Rabbitmq实现实现消息发送和接收

1,建立一个send.php文件用来发送消息

2,建立一个 receive.php 文件用来接收消息

代码如下

send.php

  1. <?php
  2. // 发送端
  3. //echo phpinfo();
  4. /**
  5. * 发送消息
  6. */
  7. $exchangeName = 'AMQP default'; //交换机名
  8. $routeKey = 'delay-lxw'; //路由key
  9. $message = 'Hello World!'; //消息内容
  10. // 建立TCP连接,配置信息
  11. $connection = new AMQPConnection([
  12. 'host' => 'localhost',
  13. 'port' => '5672',
  14. 'vhost' => '/',
  15. 'login' => 'guest',
  16. 'password' => 'guest',
  17. 'heartbeat'=>30,
  18. ]);
  19. var_dump($connection->connect());
  20. //创建链接
  21. $connection->connect() or die("Cannot connect to the broker!\n");
  22. try {
  23. $channel = new AMQPChannel($connection);
  24. //创建交换机对象
  25. $exchange = new AMQPExchange($channel);
  26. $exchange->setName($exchangeName);
  27. $exchange->setType(AMQP_EX_TYPE_DIRECT);
  28. $exchange->declareExchange();
  29. //发送消息
  30. echo 'Send Message: ' . $exchange->publish($message, $routeKey) . "\n";
  31. echo "Message Is Sent: " . $message . "\n";
  32. } catch (\AMQPConnectionException $e) {
  33. var_dump($e);
  34. }
  35. $connection->disconnect();// 断开连接

receive.php

  1. <?php
  2. //echo phpinfo();
  3. /**
  4. * 接收消息
  5. */
  6. $exchangeName = 'AMQP default';
  7. $queueName = 'delay-lxw';
  8. $routeKey = 'delay-lxw';
  9. // 建立TCP连接
  10. $connection = new AMQPConnection([
  11. 'host' => 'localhost',
  12. 'port' => '5672',
  13. 'vhost' => '/',
  14. 'login' => 'guest',
  15. 'password' => 'guest',
  16. 'prefetch_count'=>3,
  17. 'read_timeout'=>0,
  18. 'write_timeout'=>0,
  19. 'heartbeat'=>30,
  20. ]);
  21. $connection->connect() or die("Cannot connect to the broker!\n");
  22. $channel = new AMQPChannel($connection);
  23. $exchange = new AMQPExchange($channel);
  24. $exchange->setName($exchangeName);
  25. $exchange->setType(AMQP_EX_TYPE_DIRECT);
  26. echo 'Exchange Status: ' . $exchange->declareExchange() . "\n";
  27. $queue = new AMQPQueue($channel);
  28. $queue->setName($queueName);
  29. echo 'Message Total: ' . $queue->declareQueue() . "\n";
  30. echo 'Queue Bind: ' . $queue->bind($exchangeName, $routeKey) . "\n";
  31. var_dump("Waiting for message...");
  32. // 消费队列消息
  33. while (TRUE) {
  34. $queue->consume('processMessage');
  35. }
  36. // 断开连接
  37. $connection->disconnect();
  38. function processMessage($envelope, $queue)
  39. {
  40. $msg = $envelope->getBody();
  41. var_dump("Received: " . $msg);
  42. $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
  43. }

打开两个终端,先运行接收者脚本监听消息发送:

php receive.php

在另一个终端中运行消息发送脚本:

php send.php

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/575061
推荐阅读
相关标签
  

闽ICP备14008679号