当前位置:   article > 正文

rabbitMQ + yii2 (php)路由_yii2 rabbitmq

yii2 rabbitmq

在前面的学习中,我们实现了一个简单的日志系统。 可以把日志消息广播给多个接收者。

本篇中我们打算新增一个功能–使得它能够只订阅消息的字集。 列如我们只需要把严重的错误日志写入文件中,但同时也把所有的日志信息输入到控制台中。

绑定 bindings

前面的例子,我们已经创建绑定:

$channel->queue_bind($queue_name, 'logs');

  • 1
  • 2

绑定是指交换机和队列的关系,可以简单理解:这个队列对这个交换机的消息感兴趣。

绑定的时候可以带上一个额外的routing_key参数,为了避免与$channel::basic_publish的参数混淆,我们把它叫做绑定键(binding key)。以下是如何创建一个带绑定键的绑定。

$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);```

  • 1
  • 2
  • 3

绑定键的意义取决于交换机的类型,我们之前使用过得扇形交换机会忽略这个值。

直连交换机

我们的日志系统广播所有的消息给所有的消费者,我们打算扩展它,使其基于日志的严重程度进行消息过滤。
列如我们也许只是希望将比较严重的错误日志写入磁盘,以免在警告或者信息info日志上浪费磁盘空间。

我们使用的扇形交换机fanout没有足够的灵活性–它能做的仅仅是广播。

我们将使用直连交换机来代替,路由的算法很简单–交换机将会绑定键和路由键进行精确匹配,从而确定消息改分发到哪个队列。

多个绑定
多个队列使用相同的绑定键是合法的,这个例子中,也就是说两个队列可以使用相同的绑定键,这样一来,直连交换机和扇形交换机的行为就一样了。会将消息广播到所有匹配的队列,带有相同路由键的消息会发送到其中的队列。
发送日志

我们将会发送消息到一个直连交换机,把日志的级别作为路由键,这样接收日志的脚本就可以根据严重级别来选择它想要处理的日志。

我们需要创建一个交换机:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
  • 1

我们发送一条信息:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);
  • 1
  • 2

我们先假设“severity”的值是info、warning、error中的一个。

订阅
处理接收消息的方式和之前的差不多。只有一个例外,我们将会为我们感兴趣的每个严重级别分别创建一个新的绑定。

foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}
  • 1
  • 2
  • 3

代码整合

在这里插入图片描述

rabbitmq-producer/emit-log-direct 的代码:

<?php
/**
 * @link http://www.yiiframework.com/
 * @copyright Copyright (c) 2008 Yii Software LLC
 * @license http://www.yiiframework.com/license/
 * 生产者 发送信息
 */

namespace yii\console\controllers;

use Yii;
use yii\base\InvalidConfigException;
use yii\base\InvalidParamException;
use yii\console\Controller;
use yii\console\Exception;
use yii\console\ExitCode;
use yii\helpers\Console;
use yii\helpers\FileHelper;
use yii\test\FixtureTrait;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Manages fixture data loading and unloading.
 *
 * ```
 * #load fixtures from UsersFixture class with default namespace "tests\unit\fixtures"
 * yii fixture/load User
 *
 * #also a short version of this command (generate action is default)
 * yii fixture User
 *
 * #load all fixtures
 * yii fixture "*"
 *
 * #load all fixtures except User
 * yii fixture "*, -User"
 *
 * #load fixtures with different namespace.
 * yii fixture/load User --namespace=alias\my\custom\namespace\goes\here
 * ```
 *
 * The `unload` sub-command can be used similarly to unload fixtures.
 *
 * @author Mark Jebri <mark.github@yandex.ru>
 * @since 2.0
 */
class ProducerController extends Controller
{

	private $channel;
	private  $connection;
	public function init ()
	{

		$amqp = yii::$app->params['amqp'];

		//建立一个到RabbitMQ服务器的连接
		$this->connection = new AMQPStreamConnection($amqp["host"], $amqp["port"], $amqp["user"], $amqp["password"]);
		$this->channel = $this->connection->channel();


	}


	/**
	*发送指定日志信息
	*
	**/
	public function actionEmitLogDirect(){

		try {

		//设置和发送者是一样的,我们打开一个连接和一个通道,然后声明我们将要消耗的队列。请注意,这与发送的队列中的队列相匹配。
		//建立一个到RabbitMQ服务器的连接
		$connection = $this->connection;
		$channel = $this->channel;
		//创建direct类型的交换机
		$channel->exchange_declare('direct_logs','direct',false,false,false);
	    //发送数据
		$parms = func_get_args();

		//设置路由键
        $severity = isset($parms[0]) && !empty($parms[0]) ? $parms[0] : 'info';
        $data = implode(' ', array_slice($parms, 1));       
        if(empty($data)) $data = "Hello world";

        //处理信息
        $msg = new AMQPMessage($data);

        //发送信息
        $channel->basic_publish($msg,'direct_logs',$severity);

        echo " [x] Sent ",$severity,':',$data," \n";

		$channel->close();
		$connection->close();




		} catch(\Exception $e) {

			echo $e->getMessage();
		}

	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110

rabbitmq-consumer/receive-logs-direct的代码:

<?php
/**
 * @link http://www.yiiframework.com/
 * @copyright Copyright (c) 2008 Yii Software LLC
 * @license http://www.yiiframework.com/license/
 * 消费者 接收信息
 */

namespace yii\console\controllers;

use Yii;
use yii\base\InvalidConfigException;
use yii\base\InvalidParamException;
use yii\console\Controller;
use yii\console\Exception;
use yii\console\ExitCode;
use yii\helpers\Console;
use yii\helpers\FileHelper;
use yii\test\FixtureTrait;
use common\tools\Pusher;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Manages fixture data loading and unloading.
 *
 * ```
 * #load fixtures from UsersFixture class with default namespace "tests\unit\fixtures"
 * yii fixture/load User
 *
 * #also a short version of this command (generate action is default)
 * yii fixture User
 *
 * #load all fixtures
 * yii fixture "*"
 *
 * #load all fixtures except User
 * yii fixture "*, -User"
 *
 * #load fixtures with different namespace.
 * yii fixture/load User --namespace=alias\my\custom\namespace\goes\here
 * ```
 *
 * The `unload` sub-command can be used similarly to unload fixtures.
 *
 * @author Mark Jebri <mark.github@yandex.ru>
 * @since 2.0
 */
class ConsumerController extends Controller
{
	private $channel;
	private  $connection;
	public function init ()
	{


		$amqp = yii::$app->params['amqp'];

		//建立一个到RabbitMQ服务器的连接
		$this->connection = new AMQPStreamConnection($amqp["host"], $amqp["port"], $amqp["user"], $amqp["password"]);
		$this->channel = $this->connection->channel();


	}




	public function actionReceiveLogsDirect(){

		try{
			//设置和发送者是一样的,我们打开一个连接和一个通道,然后声明我们将要消耗的队列。请注意,这与发送的队列中的队列相匹配。
			//建立一个到RabbitMQ服务器的连接
			$connection = $this->connection;
			$channel = $this->channel;
			//创建direct类型的交换机
		    $channel->exchange_declare('direct_logs','direct',false,false,false);
			//随机创建信道
			list($queue_name, ,) = $channel->queue_declare("",false,false,true,false);
			 //发送数据
		    $parms = func_get_args();
			//获取绑定键
			$severities = array_slice($parms, 0);
           
            //绑定
            foreach($severities as $severity) {

    			$channel->queue_bind($queue_name, 'direct_logs', $severity);

			}

			//回调函数
			$callback = function($msg){

				echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";

			};

			//接收信息
			$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

			while(count($channel->callbacks)) {
				$channel->wait();
			}

			$channel->close();
			$connection->close();


 
		} catch(\Exception $e){

			echo $e->getMessage();
		}
	}



}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120

如果你希望只是保存warning和error级别的日志到磁盘,只需要打开控制台并输入:

./yii  rabbitmq-consumer/receive-logs-direct info warning error > logs_from_rabbit.log
  • 1

如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:

./yii  rabbitmq-consumer/receive-logs-direct info warning error
  • 1

如果要触发一个error级别的日志,只需要输入:

gongzgiyangdeMacBook-Air:yii2advanced gongzhiyang$ ./yii rabbitmq-producer/emit-log-direct error "Run. Run. Or it will explode2."
 [x] Sent error:Run. Run. Or it will explode2. 
gongzgiyangdeMacBook-Air:yii2advanced gongzhiyang$ ./yii rabbitmq-producer/emit-log-direct error "Run. Run. Or it will explode2."
 [x] Sent error:Run. Run. Or it will explode2. 
gongzgiyangdeMacBook-Air:yii2advanced gongzhiyang$ ./yii rabbitmq-producer/emit-log-direct error "Run. Run. Or it will explode2."
 [x] Sent error:Run. Run. Or it will explode2. 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号