赞
踩
direct 类型的交换机路由规则是完全匹配 BindingKey 和 RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。
topic 类型的交换机在匹配规则上进行了扩展,它与 direct 类型的交换机类似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
*
用于匹配一个单词,#
用于匹配多个单词(可以是零个)。模糊匹配举例:
Topic 模型如下图:
对上图进行举例说明:
Topic 在 direct 模式上面进一步筛选
新建3个队列:topic_queue1、topic_queue2、topic_queue3
新建一个交换机 topic_exchange
绑定交换机和队列的关系(需要添加绑定键。# 代表0个或多个;* 代表一个)
在交换机中发送消息(需要指定路由键)
只有路由键和绑定键相匹配的队列才收到消息
生产者
和路由模式不同:
public class Producer { private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange"; public static void main(String[] args) throws Exception{ // 1. 获取连接 Connection connection = RabbitMqUtil.getConnection("生产者"); // 2. 通过连接获取通道 Channel Channel channel = connection.createChannel(); // 3. 通过通道声明交换机,以及交换机类型为 direct /** * @param1:交换机名称 * @param2:交换机类型 */ channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 4. 消息内容 String message = "Hello RabbitMQ Topic!!"; String routingKey = "com.rabbitmq.client"; // 5. 发送消息到交换机,并指定路由键 RoutingKey 为 com.rabbitmq.client channel.basicPublish(TOPIC_EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("消息发送完成~~~发送的消息为:" + message); // 6. 关闭信道、连接 RabbitMqUtil.close(connection, channel); } }
消费者
和路由模式不同:
public class Consumer { private static final String TOPIC_QUEUE_NAME = "topic_queue1"; private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange"; public static void main(String[] args) throws Exception{ // 获取连接 Connection connection = RabbitMqUtil.getConnection("消费者"); // 获取通道 Channel channel = connection.createChannel(); String bindingKey = "*.rabbitmq.*"; // 绑定队列到交换机,并指定一个绑定键 BindingKey channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, bindingKey); // 定义消费者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 消息体 String msg = new String(body,"utf-8"); System.out.println("收到消息:" + msg); } }; // 监听队列 channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer); System.out.println("开始接收消息~~~"); System.in.read(); // 关闭信道、连接 RabbitMqUtil.close(connection, channel); } }
public class Consumer2 { private static final String TOPIC_QUEUE_NAME = "topic_queue2"; private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange"; public static void main(String[] args) throws Exception{ // 获取连接 Connection connection = RabbitMqUtil.getConnection("消费者"); // 获取通道 Channel channel = connection.createChannel(); String bindingKey = "*.*.client"; // 绑定队列到交换机,并指定一个绑定键 BindingKey channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, bindingKey); // 定义消费者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 消息体 String msg = new String(body,"utf-8"); System.out.println("收到消息:" + msg); } }; // 监听队列 channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer); System.out.println("开始接收消息~~~"); System.in.read(); // 关闭信道、连接 RabbitMqUtil.close(connection, channel); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。