赞
踩
不指定交换机直接发送到队列时,多个消费者之间存在的是竞争关系,一个消息只能被一个消费者接收,其他的消费者不能够再次接收;交换机可以绑定多个不同的队列,但是其Routingkkey是相同的,这样就可以从多个不同的队列发送相同的消息给多个消费者。
不指定交换机的情况,称为简单(工作)模式:
使用交换机的情况,称为发布、订阅模式:
在RabbitMQ中生产者不会讲消息直接发送到队列(不指定有默认交换机);交换机接收来自生产者的消息并推送给队列。
交换机类型:
无名类型:使用空字符串来指定。
临时队列是没有持久化的队列。也可以直接随机给队列起一个名字,当消费者断开连接时,队列也会自动删除。
创建随机临时队列:
String queueName = channel.queueDeclare().getQueue();
绑定就是交换机和队列之间的绑定关系。
它将接收到的所有消息广播到它知道的所有队列中。
模拟测试场景:
/** * @Description fanout 发布订阅模式 * @date 2022/3/8 9:01 */ public class Producer { private static final String EXCHANGE_NAME = "fanout_mq"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String msg = scanner.next(); channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息:" + msg); } } }
/** * @Description 接收发布订阅模式2 * @date 2022/3/8 9:01 */ public class Worker1 { private static final String EXCHANGE_NAME = "fanout_mq"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); // 声明临时队列 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("线程A等待接收消息..."); DeliverCallback callback = (consumerTag, message) -> { System.out.println("线程A接收到:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(queueName,true,callback,consumerTag -> {}); } }
两个消费者除了线程名字不同外没有其他区别。
生产者发送:
两个消费者分别接收到:
指定RoutingKey使得不同的key接收队列的消息。
测试:
生产者:
/** * @Description direct 路由模式 * @date 2022/3/8 9:01 */ public class Producer { private static final String EXCHANGE_NAME = "direct_mq"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Scanner scanner = new Scanner(System.in); int i = 1; while (scanner.hasNext()){ i++; String msg = scanner.next(); if ( i % 2 == 0){ channel.basicPublish(EXCHANGE_NAME,"info",null,msg.getBytes(StandardCharsets.UTF_8)); }else { channel.basicPublish(EXCHANGE_NAME,"error",null,msg.getBytes(StandardCharsets.UTF_8)); } System.out.println("发送消息:" + msg); } } }
消费者A:
/** * @Description 接收 direct路由模式1 * @date 2022/3/8 9:01 */ public class Worker1 { private static final String EXCHANGE_NAME = "direct_mq"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 声明一个队列 channel.queueDeclare("console",false,false,false,null); // 绑定交换机和队列 channel.queueBind("console",EXCHANGE_NAME,"info"); System.out.println("线程A等待接收消息..."); DeliverCallback callback = (consumerTag, message) -> { System.out.println("线程A接收到:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume("console",true,callback,consumerTag -> {}); } }
消费者B:
修改名称和指定RoutingKey为error。
测试结果:
上方路由模式中无法同时发送多个队列,在当前topic模式中可以指定多个RoutingKey中间使用英文句号隔开。通配符*代替一个单词;#代表零个或多个单词。
生产者:
/** * @Description topic 主题模式 * @date 2022/3/8 9:01 */ public class Producer { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); /** * Q1-->绑定的是 * 中间带 orange 带 3 个单词的字符串(*.orange.*) * Q2-->绑定的是 * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit) * 第一个单词是 lazy 的多个单词(lazy.#) * */ Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到"); bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到"); bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到"); bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到"); bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次"); bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2"); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息:" + message); } } }
消费者1:
/** * @Description topic 主题消费者1 * @date 2022/3/8 9:01 */ public class Worker1 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明 Q1 队列与绑定关系 String queueName = "Q1"; //声明 channel.queueDeclare(queueName, false, false, false, null); //绑定 channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" 接收队列:" + queueName + " 绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
消费者2
/** * @Description topic 主题模式消费者2 * @date 2022/3/8 9:01 */ public class Worker2 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明 Q2 队列与绑定关系 String queueName = "Q2"; //声明 channel.queueDeclare(queueName, false, false, false, null); //绑定 channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" 接收队列:" + queueName + " 绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。