赞
踩
在前面的学习中,我们实现了一个简单的日志系统。 可以把日志消息广播给多个接收者。
本篇中我们打算新增一个功能–使得它能够只订阅消息的字集。 列如我们只需要把严重的错误日志写入文件中,但同时也把所有的日志信息输入到控制台中。
绑定 bindings
前面的例子,我们已经创建绑定:
$channel->queue_bind($queue_name, 'logs');
绑定是指交换机和队列的关系,可以简单理解:这个队列对这个交换机的消息感兴趣。
绑定的时候可以带上一个额外的routing_key参数,为了避免与$channel::basic_publish的参数混淆,我们把它叫做绑定键(binding key)。以下是如何创建一个带绑定键的绑定。
$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);```
绑定键的意义取决于交换机的类型,我们之前使用过得扇形交换机会忽略这个值。
直连交换机
我们的日志系统广播所有的消息给所有的消费者,我们打算扩展它,使其基于日志的严重程度进行消息过滤。
列如我们也许只是希望将比较严重的错误日志写入磁盘,以免在警告或者信息info日志上浪费磁盘空间。
我们使用的扇形交换机fanout没有足够的灵活性–它能做的仅仅是广播。
我们将使用直连交换机来代替,路由的算法很简单–交换机将会绑定键和路由键进行精确匹配,从而确定消息改分发到哪个队列。
多个绑定
多个队列使用相同的绑定键是合法的,这个例子中,也就是说两个队列可以使用相同的绑定键,这样一来,直连交换机和扇形交换机的行为就一样了。会将消息广播到所有匹配的队列,带有相同路由键的消息会发送到其中的队列。
发送日志
我们将会发送消息到一个直连交换机,把日志的级别作为路由键,这样接收日志的脚本就可以根据严重级别来选择它想要处理的日志。
我们需要创建一个交换机:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
我们发送一条信息:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);
我们先假设“severity”的值是info、warning、error中的一个。
订阅
处理接收消息的方式和之前的差不多。只有一个例外,我们将会为我们感兴趣的每个严重级别分别创建一个新的绑定。
foreach($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
代码整合
rabbitmq-producer/emit-log-direct 的代码:
<?php /** * @link http://www.yiiframework.com/ * @copyright Copyright (c) 2008 Yii Software LLC * @license http://www.yiiframework.com/license/ * 生产者 发送信息 */ namespace yii\console\controllers; use Yii; use yii\base\InvalidConfigException; use yii\base\InvalidParamException; use yii\console\Controller; use yii\console\Exception; use yii\console\ExitCode; use yii\helpers\Console; use yii\helpers\FileHelper; use yii\test\FixtureTrait; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * Manages fixture data loading and unloading. * * ``` * #load fixtures from UsersFixture class with default namespace "tests\unit\fixtures" * yii fixture/load User * * #also a short version of this command (generate action is default) * yii fixture User * * #load all fixtures * yii fixture "*" * * #load all fixtures except User * yii fixture "*, -User" * * #load fixtures with different namespace. * yii fixture/load User --namespace=alias\my\custom\namespace\goes\here * ``` * * The `unload` sub-command can be used similarly to unload fixtures. * * @author Mark Jebri <mark.github@yandex.ru> * @since 2.0 */ class ProducerController extends Controller { private $channel; private $connection; public function init () { $amqp = yii::$app->params['amqp']; //建立一个到RabbitMQ服务器的连接 $this->connection = new AMQPStreamConnection($amqp["host"], $amqp["port"], $amqp["user"], $amqp["password"]); $this->channel = $this->connection->channel(); } /** *发送指定日志信息 * **/ public function actionEmitLogDirect(){ try { //设置和发送者是一样的,我们打开一个连接和一个通道,然后声明我们将要消耗的队列。请注意,这与发送的队列中的队列相匹配。 //建立一个到RabbitMQ服务器的连接 $connection = $this->connection; $channel = $this->channel; //创建direct类型的交换机 $channel->exchange_declare('direct_logs','direct',false,false,false); //发送数据 $parms = func_get_args(); //设置路由键 $severity = isset($parms[0]) && !empty($parms[0]) ? $parms[0] : 'info'; $data = implode(' ', array_slice($parms, 1)); if(empty($data)) $data = "Hello world"; //处理信息 $msg = new AMQPMessage($data); //发送信息 $channel->basic_publish($msg,'direct_logs',$severity); echo " [x] Sent ",$severity,':',$data," \n"; $channel->close(); $connection->close(); } catch(\Exception $e) { echo $e->getMessage(); } } }
rabbitmq-consumer/receive-logs-direct的代码:
<?php /** * @link http://www.yiiframework.com/ * @copyright Copyright (c) 2008 Yii Software LLC * @license http://www.yiiframework.com/license/ * 消费者 接收信息 */ namespace yii\console\controllers; use Yii; use yii\base\InvalidConfigException; use yii\base\InvalidParamException; use yii\console\Controller; use yii\console\Exception; use yii\console\ExitCode; use yii\helpers\Console; use yii\helpers\FileHelper; use yii\test\FixtureTrait; use common\tools\Pusher; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * Manages fixture data loading and unloading. * * ``` * #load fixtures from UsersFixture class with default namespace "tests\unit\fixtures" * yii fixture/load User * * #also a short version of this command (generate action is default) * yii fixture User * * #load all fixtures * yii fixture "*" * * #load all fixtures except User * yii fixture "*, -User" * * #load fixtures with different namespace. * yii fixture/load User --namespace=alias\my\custom\namespace\goes\here * ``` * * The `unload` sub-command can be used similarly to unload fixtures. * * @author Mark Jebri <mark.github@yandex.ru> * @since 2.0 */ class ConsumerController extends Controller { private $channel; private $connection; public function init () { $amqp = yii::$app->params['amqp']; //建立一个到RabbitMQ服务器的连接 $this->connection = new AMQPStreamConnection($amqp["host"], $amqp["port"], $amqp["user"], $amqp["password"]); $this->channel = $this->connection->channel(); } public function actionReceiveLogsDirect(){ try{ //设置和发送者是一样的,我们打开一个连接和一个通道,然后声明我们将要消耗的队列。请注意,这与发送的队列中的队列相匹配。 //建立一个到RabbitMQ服务器的连接 $connection = $this->connection; $channel = $this->channel; //创建direct类型的交换机 $channel->exchange_declare('direct_logs','direct',false,false,false); //随机创建信道 list($queue_name, ,) = $channel->queue_declare("",false,false,true,false); //发送数据 $parms = func_get_args(); //获取绑定键 $severities = array_slice($parms, 0); //绑定 foreach($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); } //回调函数 $callback = function($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; //接收信息 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } catch(\Exception $e){ echo $e->getMessage(); } } }
如果你希望只是保存warning和error级别的日志到磁盘,只需要打开控制台并输入:
./yii rabbitmq-consumer/receive-logs-direct info warning error > logs_from_rabbit.log
如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:
./yii rabbitmq-consumer/receive-logs-direct info warning error
如果要触发一个error级别的日志,只需要输入:
gongzgiyangdeMacBook-Air:yii2advanced gongzhiyang$ ./yii rabbitmq-producer/emit-log-direct error "Run. Run. Or it will explode2."
[x] Sent error:Run. Run. Or it will explode2.
gongzgiyangdeMacBook-Air:yii2advanced gongzhiyang$ ./yii rabbitmq-producer/emit-log-direct error "Run. Run. Or it will explode2."
[x] Sent error:Run. Run. Or it will explode2.
gongzgiyangdeMacBook-Air:yii2advanced gongzhiyang$ ./yii rabbitmq-producer/emit-log-direct error "Run. Run. Or it will explode2."
[x] Sent error:Run. Run. Or it will explode2.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。