当前位置:   article > 正文

RabbitMQ 学习教程(七)路由模式 Direct

RabbitMQ 学习教程(七)路由模式 Direct

路由模式

direct 类型的交换机路由规则很简单,它会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中

路由模式结构图如下:
在这里插入图片描述
P:生产者,向交换机 Exchange 发送消息,发送消息时,会指定一个路由键 routing key;

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列;

Q1:队列(消费者监听此队列),指定了需要与交换机 Exchange 进行绑定的绑定键 BindingKey 为 warning 的消息

Q2:队列(消费者监听此队列),指定了需要与交换机 Exchange 进行绑定的绑定键 BindingKey 为 warning、info 的消息

Q3:队列(消费者监听此队列),指定了需要与交换机 Exchange 进行绑定的绑定键 BindingKey 为 debug、info 的消息

举例说明:

  1. 交换机的类型为 direct,如果生产者发送一条消息,并在发送消息的时候设置路由键为 warning,则消息会路由到 Q1、Q2;
  2. 如果在发送消息时,设置路由键为 info,则消息会路由到 Q2、Q3;
  3. 如果设置路由键为 debug,则消息只会路由到 Q3

后台页面操作

Direct 模式在 Fanout 模式之上做了一个路由键 RoutingKey,对发送给交换机 Exchange 的消息进行筛选

新建3个队列:direct_queue1direct_queue2direct_queue3
在这里插入图片描述
新建一个交换机 direct_exchange,类型为 direct
在这里插入图片描述
绑定交换机与队列的关系(绑定时,明确绑定键 BindingKey)
在这里插入图片描述
绑定的关系如上图所示

在交换机 direct_exchange 中发送消息(需要指定一个路由键

在这里插入图片描述
上述的路由键为 “warning”,则会收到的队列有:direct_queue1、direct_queue2

只有路由键和绑定键匹配的队列收到消息
在这里插入图片描述

代码实现

生产者

相对于工作队列模式:

  1. 交换机类型不同,工作队列模式的交换机类型为 默认的交换机,而路由模式的交换机类型为 direct
  2. 给交换机发送消息时,指定了一个路由键 RountingKey
public class Producer {

    private static final String DIRECT_EXCHANGE_NAME = "code_direct_exchange";

    public static void main(String[] args) throws Exception{
        // 1. 获取连接
        Connection connection = RabbitMqUtil.getConnection("生产者");

        // 2. 通过连接获取通道 Channel
        Channel channel = connection.createChannel();
        // 3. 通过通道声明交换机,以及交换机类型为 direct
        /**
         * @param1:交换机名称
         * @param2:交换机类型
         */
        channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 4. 消息内容
        String message = "Hello RabbitMQ Fanout!!";
        String routingKey = "warning";
        // 5. 发送消息到交换机,并指定路由键 RoutingKey 为 warning
        channel.basicPublish(DIRECT_EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println("消息发送完成~~~发送的消息为:" + message);
        // 6. 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

消费者1

相对于工作队列模式:

  1. 绑定队列和交换机时,指定了一个绑定键 BindingKey
public class Consumer {

    private static final String DIRECT_QUEUE_NAME = "direct_queue1";
    private static final String DIRECT_EXCHANGE_NAME = "code_direct_exchange";

    public static void main(String[] args) throws Exception{
        // 获取连接
        Connection connection = RabbitMqUtil.getConnection("消费者");
        // 获取通道
        Channel channel = connection.createChannel();
        String bindingKey = "warning";
        // 绑定队列到交换机,并指定一个绑定键 BindingKey
        channel.queueBind(DIRECT_QUEUE_NAME, DIRECT_EXCHANGE_NAME, bindingKey);

        // 定义消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 消息体
                String msg = new String(body,"utf-8");
                System.out.println("收到消息:" + msg);
            }
        };

        // 监听队列
        channel.basicConsume(DIRECT_QUEUE_NAME, true, consumer);

        System.out.println("开始接收消息~~~");
        System.in.read();
        // 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

消费者2

  1. 添加了两个绑定键 BindingKey
public class Consumer2 {

    private static final String DIRECT_QUEUE_NAME = "direct_queue2";
    private static final String DIRECT_EXCHANGE_NAME = "code_direct_exchange";

    public static void main(String[] args) throws Exception{
        // 获取连接
        Connection connection = RabbitMqUtil.getConnection("消费者");
        // 获取通道
        Channel channel = connection.createChannel();
        String bindingKey = "warning";
        String bindingKey2 = "info";
        // 绑定队列到交换机,并指定一个绑定键 BindingKey
        channel.queueBind(DIRECT_QUEUE_NAME, DIRECT_EXCHANGE_NAME, bindingKey);
        channel.queueBind(DIRECT_QUEUE_NAME, DIRECT_EXCHANGE_NAME, bindingKey2);

        // 定义消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 消息体
                String msg = new String(body,"utf-8");
                System.out.println("收到消息:" + msg);
            }
        };

        // 监听队列
        channel.basicConsume(DIRECT_QUEUE_NAME, true, consumer);

        System.out.println("开始接收消息~~~");
        System.in.read();
        // 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/598164
推荐阅读
相关标签
  

闽ICP备14008679号