赞
踩
direct 类型的交换机路由规则很简单,它会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中
路由模式结构图如下:
P:生产者,向交换机 Exchange 发送消息,发送消息时,会指定一个路由键 routing key;
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列;
Q1:队列(消费者监听此队列),指定了需要与交换机 Exchange 进行绑定的绑定键 BindingKey 为 warning 的消息
Q2:队列(消费者监听此队列),指定了需要与交换机 Exchange 进行绑定的绑定键 BindingKey 为 warning、info 的消息
Q3:队列(消费者监听此队列),指定了需要与交换机 Exchange 进行绑定的绑定键 BindingKey 为 debug、info 的消息
举例说明:
Direct 模式在 Fanout 模式之上做了一个路由键 RoutingKey,对发送给交换机 Exchange 的消息进行筛选
新建3个队列:direct_queue1
、direct_queue2
、direct_queue3
新建一个交换机 direct_exchange
,类型为 direct
绑定交换机与队列的关系(绑定时,明确绑定键 BindingKey)
绑定的关系如上图所示
在交换机 direct_exchange
中发送消息(需要指定一个路由键)
上述的路由键为 “warning”,则会收到的队列有:direct_queue1、direct_queue2
只有路由键和绑定键匹配的队列收到消息
生产者
相对于工作队列模式:
public class Producer { private static final String DIRECT_EXCHANGE_NAME = "code_direct_exchange"; public static void main(String[] args) throws Exception{ // 1. 获取连接 Connection connection = RabbitMqUtil.getConnection("生产者"); // 2. 通过连接获取通道 Channel Channel channel = connection.createChannel(); // 3. 通过通道声明交换机,以及交换机类型为 direct /** * @param1:交换机名称 * @param2:交换机类型 */ channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 4. 消息内容 String message = "Hello RabbitMQ Fanout!!"; String routingKey = "warning"; // 5. 发送消息到交换机,并指定路由键 RoutingKey 为 warning channel.basicPublish(DIRECT_EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("消息发送完成~~~发送的消息为:" + message); // 6. 关闭信道、连接 RabbitMqUtil.close(connection, channel); } }
消费者1
相对于工作队列模式:
public class Consumer { private static final String DIRECT_QUEUE_NAME = "direct_queue1"; private static final String DIRECT_EXCHANGE_NAME = "code_direct_exchange"; public static void main(String[] args) throws Exception{ // 获取连接 Connection connection = RabbitMqUtil.getConnection("消费者"); // 获取通道 Channel channel = connection.createChannel(); String bindingKey = "warning"; // 绑定队列到交换机,并指定一个绑定键 BindingKey channel.queueBind(DIRECT_QUEUE_NAME, DIRECT_EXCHANGE_NAME, bindingKey); // 定义消费者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 消息体 String msg = new String(body,"utf-8"); System.out.println("收到消息:" + msg); } }; // 监听队列 channel.basicConsume(DIRECT_QUEUE_NAME, true, consumer); System.out.println("开始接收消息~~~"); System.in.read(); // 关闭信道、连接 RabbitMqUtil.close(connection, channel); } }
消费者2
public class Consumer2 { private static final String DIRECT_QUEUE_NAME = "direct_queue2"; private static final String DIRECT_EXCHANGE_NAME = "code_direct_exchange"; public static void main(String[] args) throws Exception{ // 获取连接 Connection connection = RabbitMqUtil.getConnection("消费者"); // 获取通道 Channel channel = connection.createChannel(); String bindingKey = "warning"; String bindingKey2 = "info"; // 绑定队列到交换机,并指定一个绑定键 BindingKey channel.queueBind(DIRECT_QUEUE_NAME, DIRECT_EXCHANGE_NAME, bindingKey); channel.queueBind(DIRECT_QUEUE_NAME, DIRECT_EXCHANGE_NAME, bindingKey2); // 定义消费者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 消息体 String msg = new String(body,"utf-8"); System.out.println("收到消息:" + msg); } }; // 监听队列 channel.basicConsume(DIRECT_QUEUE_NAME, true, consumer); System.out.println("开始接收消息~~~"); System.in.read(); // 关闭信道、连接 RabbitMqUtil.close(connection, channel); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。