赞
踩
rabbitmq 的路由模式图是这样子的, 其实就是 交换机的模式改为 direct 定向模式
上面的 error info warning 就是不同的routing_key
上面的使用场景是这样的, 在消费者 C1中, 我们只关注 error 的错误日志, 其它的我们不关心, 而消费者C2中, 我们要把所有的日志都处理掉
代码说明, 我们在前一章中, 交换机是在生产者中创建的, 队列是在消费者中绑定的, 可以运行, 在做这个 routing 模式的时候, 我发现,我们可以把所有的声明, 创建,绑定代码都写在生产者中, 不影响程序运行, 而且更易管理
routing_sender.php
<?php require __DIR__."/../vendor/autoload.php"; $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection("127.0.0.1",5672,"admin","123456"); $channel = $connection->channel(); //这里创建了一个交换机,并且指定交换机的模式是 direct 定向的 $channel->exchange_declare("my_exchange","direct",false,false,false,false); //创建了两个队列 $channel->queue_declare("queue1",false,false,false,false,false); $channel->queue_declare("queue2",false,false,false,false,false); //绑定队列到交换机 只绑定了一个 routing_key $channel->queue_bind("queue1","my_exchange","error"); //这里是队列2, 绑定了三种不同的 route_key $channel->queue_bind("queue2","my_exchange","error"); $channel->queue_bind("queue2","my_exchange","info"); $channel->queue_bind("queue2","my_exchange","warning"); for($i=0;$i<50;$i++){ if($i%3 == 0){ //这个信息的 routing_key 是error $channel->basic_publish(new \PhpAmqpLib\Message\AMQPMessage("这是一个Error的信息,两个队列都应该有,这是每".$i."条"),"my_exchange","error"); } if($i%3 ==1){ //这个信息的 routing_key 是info $channel->basic_publish(new \PhpAmqpLib\Message\AMQPMessage("这是一个Info的信息,只有queue2队列才会有,这是第".$i."条"),"my_exchange","info"); } if($i%3 ==2){ //这个信息的 routing_key 是warning $channel->basic_publish(new \PhpAmqpLib\Message\AMQPMessage("这是一个wraning的信息,只有queue2队列才会有,这是第".$i."条"),"my_exchange","warning"); } } $channel->close(); $connection->close();
routing_receive1.php
<?php
require __DIR__."/../vendor/autoload.php";
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection("127.0.0.1",5672,"admin","123456");
$channel = $connection->channel();
//我感觉下面这一句不要也可以, 没有试过 因这这个队列在生产者中已经有了
$channel->queue_declare("queue1",false,false,false,false);
$callback = function($msg){
echo "这是queue1收到的队列消息".$msg->body.PHP_EOL;
};
$channel->basic_consume("queue1","",false,false,false,false,$callback);
while(count($channel->callbacks)){
$channel->wait();
}
routing_receive2.php
require __DIR__."/../vendor/autoload.php";
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection("127.0.0.1",5672,"admin","123456");
$channel = $connection->channel();
$channel->queue_declare("queue2",false,false,false,false);
$callback = function($msg){
echo "这是queue2收到的队列消息".$msg->body.PHP_EOL;
};
$channel->basic_consume("queue2","",false,false,false,false,$callback);
while(count($channel->callbacks)){
$channel->wait();
}
我们来看一下运得结果
完美
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。