当前位置:   article > 正文

RabbitMQ - 4种交换机详解_rabbitmq 默认交换机

rabbitmq 默认交换机

1、  默认的交换机

实质是一个没有设置名称的直连交换机,它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同

所以,当你声明一个 fanout 的交换机,绑定 routing key = "testqueue" 的时候,default exchange 也会多一个队列名为 "testqueue" 的队列。

2、直连交换机( Direct Exchange)

  • 选择 type 为 Direct (type: ExchangeType.Direct)的交换机 

  • 把 直连交换机(Direct Exchange)  和 Queue 绑定(routing key)

  • 消息代理(message brokere )会根据生产者指定的路由(routing key)将消息发送给相应的消费者(消息处理器),这个消息处理器可以是单个也可以多个。

  • 直连交换机一般用来处理单播路由

  • 为什么直连交换机一般用于处理单播路由 ? 因为用扇形或主题交换机做 【多播路由更加方便】,因为扇形交换机不用再绑定路由值(routing key),就能做到把消息广播给所有绑定在这个交换机的队列(queue)中

  • 创建信道,声明路由,插入消息

        using (var connection = factory.CreateConnection())
        {
            using (IModel channel = connection.CreateModel())
            { 
                //声明一个路由 type: ExchangeType.Direct Direct类型的路由                           channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                //声明一个队列用来接受Error类型的日志
                channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); 
                
                //路由绑定 key
                channel.QueueBind(queue: "DirectExchangeErrorQueue",
                              exchange: "DirectExChange",
                              routingKey: "error");
                    
                 List<LogMsgModel> logList = new List<LogMsgModel>();
                 var i = 1;
                 logList.Add(new { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条信息") });
                //插入信息
                 channel.BasicPublish(exchange: "DirectExChange",
                                            routingKey: log.LogType,
                                            basicProperties: null,
                                            body: log.Msg);
                }
             }

3、  扇形交换机(FanoutExchange)

  • 相对于直连交换机(deriect exchange),扇形交换机(fanout exhcange)只和队列(queue)建立绑定没有 routing key

  • 当生产者给该扇形交换机(fanout exchange)发送消息时,所有的订阅这个 (交换机+队列)绑定的消费者都会接收到信息

  • 综合两面两点扇形交换机更适合处理广播路由(broadcast routing)

//生产者:

 using (var connection = RabbitMQHelper.GetConnection())
            {
                using(var channel = connection.CreateModel())
                {
                    // 声明交换机对象
                    channel.ExchangeDeclare("fanout_exchange", "fanout");
                    // 创建队列
                    string queueName1 = "fanout_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "fanout_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "fanout_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);
                    // 绑定到交互机
                    // fanout_exchange 绑定了 3个队列 
                    channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");

                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Fanout {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("fanout_exchange", "", null, body);
                        Console.WriteLine("Send Fanout {0} message",i + 1);
                    }
                }
            }

//消费者 

 var connection = RabbitMQHelper.GetConnection();
            {
                var channel = connection.CreateModel();
                {
                    //申明exchange
                    channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
                    // 创建队列
                    string queueName1 = "fanout_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "fanout_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "fanout_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);
                    // 绑定到交互机
                    channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");


                    Console.WriteLine("[*] Waitting for fanout logs.");
                    //申明consumer
                    var consumer = new EventingBasicConsumer(channel);
                    //绑定消息接收后的事件委托
                    consumer.Received += (model, ea) => {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine("[x] {0}", message);

                    };

                    channel.BasicConsume(queue: queueName1, autoAck: true, consumer: consumer);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }

4、主题路由(topic exchange)

  • 路由绑定队列需要指定 Key

  • Key 有自己的规则,可以有占位符, * 匹配单个单词、#匹配多个单词,可以模糊匹配

  //声明一个ExchangeType.Topic类型的路由
  channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); 
  
//声明 queue
channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); 
​
//路由和消息队列绑定====》模糊匹配 Chain. 开头的消息
 channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null); 
 

5、头交换机(Header Exchange)

  • headers头路由模型中,消息是根据prop即请求头中key-value来匹配的。

  • 消费方指定的headers中必须包含一个"x-match"的键。

  • 键"x-match"的值有2个:all和any。

  • all:表示消费方指定的所有key-value都必须在消息header中出现并匹配。

  • any:表示消费方指定的key-value至少有一个在消息header中出现并匹配即可。

    //生成端代码    
    Map<String, Object> header = new HashMap<String, Object>();
    header.put("name", "张三");
    header.put("idcard","123321"); 
    header.put("phone","13567655555");
    AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().headers(header);
    channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));
​
​
  //消费端代码
  Map<String, Object> header = new HashMap<String, Object>();
  header.put("x-match", "all");  //x-match: all表所有key-value全部匹配才匹配成功 ,any表只需要匹配任意一个key-value 即匹配成功。
  header.put("name", "张三");
  header.put("idcard","123321"); 
  
  channel.queueBind(queueName, EXCHANGE_NAME, "", header);
  //处理消息
 channel.BasicConsume(queue: "DirectExchangeErrorQueue",
                      autoAck: true,
                      consumer: consumer);

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/830028
推荐阅读
相关标签
  

闽ICP备14008679号