赞
踩
实质是一个没有设置名称的直连交换机,它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
所以,当你声明一个 fanout 的交换机,绑定 routing key = "testqueue" 的时候,default exchange 也会多一个队列名为 "testqueue" 的队列。
选择 type 为 Direct (type: ExchangeType.Direct)的交换机
把 直连交换机(Direct Exchange) 和 Queue 绑定(routing key)
消息代理(message brokere )会根据生产者指定的路由(routing key)将消息发送给相应的消费者(消息处理器),这个消息处理器可以是单个也可以多个。
直连交换机一般用来处理单播路由
为什么直连交换机一般用于处理单播路由 ? 因为用扇形或主题交换机做 【多播路由更加方便】,因为扇形交换机不用再绑定路由值(routing key),就能做到把消息广播给所有绑定在这个交换机的队列(queue)中
创建信道,声明路由,插入消息
using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //声明一个路由 type: ExchangeType.Direct Direct类型的路由 channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //声明一个队列用来接受Error类型的日志 channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //路由绑定 key channel.QueueBind(queue: "DirectExchangeErrorQueue", exchange: "DirectExChange", routingKey: "error"); List<LogMsgModel> logList = new List<LogMsgModel>(); var i = 1; logList.Add(new { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条信息") }); //插入信息 channel.BasicPublish(exchange: "DirectExChange", routingKey: log.LogType, basicProperties: null, body: log.Msg); } }
相对于直连交换机(deriect exchange),扇形交换机(fanout exhcange)只和队列(queue)建立绑定没有 routing key
当生产者给该扇形交换机(fanout exchange)发送消息时,所有的订阅这个 (交换机+队列)绑定的消费者都会接收到信息
综合两面两点扇形交换机更适合处理广播路由(broadcast routing)
using (var connection = RabbitMQHelper.GetConnection())
{
using(var channel = connection.CreateModel())
{
// 声明交换机对象
channel.ExchangeDeclare("fanout_exchange", "fanout");
// 创建队列
string queueName1 = "fanout_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "fanout_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "fanout_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 绑定到交互机
// fanout_exchange 绑定了 3个队列
channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");for (int i = 0; i < 10; i++)
{
string message = $"RabbitMQ Fanout {i + 1} Message";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("fanout_exchange", "", null, body);
Console.WriteLine("Send Fanout {0} message",i + 1);
}
}
}
var connection = RabbitMQHelper.GetConnection();
{
var channel = connection.CreateModel();
{
//申明exchange
channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
// 创建队列
string queueName1 = "fanout_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "fanout_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "fanout_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 绑定到交互机
channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");
Console.WriteLine("[*] Waitting for fanout logs.");
//申明consumer
var consumer = new EventingBasicConsumer(channel);
//绑定消息接收后的事件委托
consumer.Received += (model, ea) => {
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("[x] {0}", message);};
channel.BasicConsume(queue: queueName1, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
路由绑定队列需要指定 Key
Key 有自己的规则,可以有占位符, * 匹配单个单词、#匹配多个单词,可以模糊匹配
//声明一个ExchangeType.Topic类型的路由 channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); //声明 queue channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //路由和消息队列绑定====》模糊匹配 Chain. 开头的消息 channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null);
headers头路由模型中,消息是根据prop即请求头中key-value来匹配的。
消费方指定的headers中必须包含一个"x-match"的键。
键"x-match"的值有2个:all和any。
all:表示消费方指定的所有key-value都必须在消息header中出现并匹配。
any:表示消费方指定的key-value至少有一个在消息header中出现并匹配即可。
//生成端代码 Map<String, Object> header = new HashMap<String, Object>(); header.put("name", "张三"); header.put("idcard","123321"); header.put("phone","13567655555"); AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().headers(header); channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8")); //消费端代码 Map<String, Object> header = new HashMap<String, Object>(); header.put("x-match", "all"); //x-match: all表所有key-value全部匹配才匹配成功 ,any表只需要匹配任意一个key-value 即匹配成功。 header.put("name", "张三"); header.put("idcard","123321"); channel.queueBind(queueName, EXCHANGE_NAME, "", header); //处理消息 channel.BasicConsume(queue: "DirectExchangeErrorQueue", autoAck: true, consumer: consumer);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。