赞
踩
在这篇文章中,荔枝围绕交换机梳理RabbitMQ中交换机相关知识,主要包括:扇出交换机、直接交换机、主题交换机,根据不同的交换机引出相应的RabbitMQ模式。同样的本篇文章也是主要围绕如何通过交换机定向群体发送消息这一个问题来梳理的,希望能够帮助到有需要的小伙伴~~~
在前面学习的内容中,没有涉及到有关交换机的内容,这时因为默认使用到了RabbitMQ提供的交换机。RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。生产者只需要将消息发送到交换机,由交换机来决定将消息转发到哪些消息队列中。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息,这由交换机的类型来决定。
交换机的类型总共有一下五种:默认(AMQP default)、direct(直接)、topic(主题)、headers(标题)、fanout(扇出 | 发布订阅)。
默认交换机通常是用一个空字符串来标识的,比如在前面文章中的用法
channel.basicPublish("",queueName,null,message.getBytes());
消息能路由发送到队列中其实是由routingKey(bingingKey)绑定的Key指定的,这个routingKey起了一个类似路由分发的作用。
临时队列在我们断开消费者的连接的时候队列会被自动删除。创建一个临时队列的方式比较简单:
String queueName = channel.queueDeclare().getQueue();
扇出交换机是指交换机所连接的队列的路由关键词routingKey是一样的,因此该交换机所连接的队列可以同时获得交换机转接的来自生产者的消息。关于该交换机模式的具体实现可以看看下面(二、发布订阅模式)中的内容(建议先跳过去看)。
直接交换机和扇出交换机相比最大的区别就是直接交换机可以根据路由关键词来实现消息的定向广播,在发送方发送的时候声明RoutingKey,交换机可以将发送过来的消息通过管道定向地输送到指定的消费者队列。下面看看这个demo:
发布者
- package com.crj.rabbitmq.direct;
-
- import com.crj.rabbitmq.utils.RabbitMqUtil;
- import com.rabbitmq.client.Channel;
-
- import java.util.Scanner;
-
- public class Publish {
- //交换机的名字
- public static final String EXCHANGE_NAME = "direct";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtil.getChannel();
- channel.exchangeDeclare(EXCHANGE_NAME,"direct");
- //控制台输出消息
- Scanner scanner = new Scanner(System.in);
-
- while (scanner.hasNext()) {
- String message = scanner.next();
- channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
- System.out.println("生产者发布消息:"+message);
- }
- }
- }
消息接收者
- package com.crj.rabbitmq.direct;
-
- import com.crj.rabbitmq.utils.RabbitMqUtil;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DeliverCallback;
-
- public class Consumer2 {
- public static final String EXCHANGE_NAME = "direct";
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtil.getChannel();
- //声明一个交换机
- channel.exchangeDeclare(EXCHANGE_NAME,"direct");
- //声明一个临时队列并随机生成名字
- channel.queueDeclare("disk",false,false,false,null);
-
- //绑定交换机与队列
- channel.queueBind("disk",EXCHANGE_NAME,"error");
- System.out.println("Consumer正在等待接收消息。。。。。");
-
- //接收消息
- DeliverCallback deliverCallback = (consumerTag, message)->{
- System.out.println("Consumer2控制台打印的信息:"+new String(message.getBody(),"UTF-8"));
- };
- //消费者开始消费消息
- channel.basicConsume("disk",true,deliverCallback,(consumerTag)->{});
- }
- }
- package com.crj.rabbitmq.direct;
-
- import com.crj.rabbitmq.utils.RabbitMqUtil;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DeliverCallback;
-
- public class Consumer {
- public static final String EXCHANGE_NAME = "direct";
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtil.getChannel();
- //声明一个交换机
- channel.exchangeDeclare(EXCHANGE_NAME,"direct");
- //声明一个临时队列并随机生成名字
- channel.queueDeclare("console",false,false,false,null);
- //绑定交换机与队列
- channel.queueBind("console",EXCHANGE_NAME,"info");
- //多重绑定
- channel.queueBind("console",EXCHANGE_NAME,"warning");
- System.out.println("Consumer正在等待接收消息。。。。。");
-
- //接收消息
- DeliverCallback deliverCallback = (consumerTag, message)->{
- System.out.println("Consumer控制台打印的信息:"+new String(message.getBody(),"UTF-8"));
- };
- //消费者开始消费消息
- channel.basicConsume("console",true,deliverCallback,(consumerTag)->{});
- }
- }
多重绑定:也就是消费者可以一次性绑定多个信道队列来接收来自不同交换机发送过来的消息。
发布订阅模式比较容易理解,其实就是交换机会根据RoutingKey将目标消息广播到相应的队列中。与之前文章不同在于,在这里我们定义了交换机的名字EXCHANGE_NAME,并通过Channel对象的exchangeDeclare方法来实现交换机模式的设置。
消息发布者:
区别于之前,这里我们需要在basicPublish为当前信道队列设置好交换机的名字以及相应的路由关键词,同时传入message内容。
- package com.crj.rabbitmq.PublishAndOrder;
-
- import com.crj.rabbitmq.utils.RabbitMqUtil;
- import com.rabbitmq.client.Channel;
-
- import java.util.Scanner;
-
- public class Publish {
- public static final String EXCHANGE_NAME = "LOGS";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtil.getChannel();
- channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
- //控制台输出消息
- Scanner scanner = new Scanner(System.in);
-
- while (scanner.hasNext()) {
- String message = scanner.next();
- channel.basicPublish(EXCHANGE_NAME,"routingKey路由关键词",null,message.getBytes("UTF-8"));
- System.out.println("生产者发布消息:"+message);
- }
- }
- }
消息接收者
由于消息接收者的队列在接收完消息之后可以丢弃,因此我们可以随机生成队列名并开启自动清除,通过Channel对象的queueBind方法将交换机和队列进行绑定,之后就可以开始消费消息了。
- package com.crj.rabbitmq.PublishAndOrder;
-
- import com.crj.rabbitmq.utils.RabbitMqUtil;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DeliverCallback;
-
- public class Receiver {
- //交换机的名称
- public static final String EXCHANGE_NAME = "LOGS";
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtil.getChannel();
- channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
- //声明一个临时队列并随机生成名字
- String queueName = channel.queueDeclare().getQueue();
-
- //绑定交换机与队列
- channel.queueBind(queueName,EXCHANGE_NAME,"routingKey路由关键词");
- System.out.println("等待接收消息。。。。。");
-
- //接收消息
- DeliverCallback deliverCallback = (consumerTag,message)->{
- System.out.println("控制台打印的信息:"+new String(message.getBody(),"UTF-8"));
- };
- //消费者开始消费消息
- channel.basicConsume(queueName,true,deliverCallback,(consumerTag)->{});
-
- }
- }
运行结果:
可以看到两个信道队列的消费者是同时获得生产者发送的消息的,这就是扇出交换机,也是发布确认模式。
在前面的扇出交换机和直接交换机分别实现了两种比较极端的模式:扇出是广播,不管对象是谁;直接交换机是指定定向的对象进行消息的发送,而不能同时发送消息给多个对象。为了实现将消息广播给指定的对象这一功能,我们引入了Topic交换机。在Topic交换机中,发送的类型是topic交换机的消息的routing_key不能随意写,必须满足一定的规范:它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,说:"stock.usd.nyse","nyse.vmw","quck.orange.rabbit".这种类型的。当然这个单词列表最多不能超过255个字节。
在这个规则列表中,其中有两个替换符是需要注意的:
在发送方和消息的接收方们之间可以执行一个约定,定义各自发布的消息的格式,这样就可以实现在指定消息接收群体对象中进行广播了。
下面我们可以看看demo示例:
生产者
生产者中我们定义了一个map结构来构造消息集合,并将map中的消息单独发出,map对象中记录了相应的routingKey和消息体。
- package com.crj.rabbitmq.topic;
-
- import com.crj.rabbitmq.utils.RabbitMqUtil;
- import com.rabbitmq.client.Channel;
-
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Scanner;
-
- public class Publish {
- //交换机的名字
- public static final String EXCHANGE_NAME = "topic";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtil.getChannel();
- //创建消息集合
- Map<String,String> bindingKeyMap = new HashMap<>();
- bindingKeyMap.put("quick.orange.rabbit","被队列QlQ2接收到");
- bindingKeyMap.put("1azy.orange.elephant","被队列Q1Q2接收到");
- bindingKeyMap.put("quick.orange.fox","被队列Ql接收到");
- bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到");
- bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次");
- bindingKeyMap.put("quick.brown..fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
- bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
- bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");
-
- for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
- String routingKey = bindingKeyEntry.getKey();
- String message = bindingKeyEntry.getValue();
- channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
- System.out.println("发出消息:"+message);
- }
- }
- }
消费者
两个消费者分别定义不同的routingKey规则,
通过上面的学习中,我们对消息发送者和消息接收者二者的功能代码模块都比较熟悉了,同样也已经学习了五种核心模式了——简单、工作、发布确认、发布订阅、主题。虽然文章内容简单但还是值得作为笔记回顾的哈哈哈哈~~~
今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~
如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!
如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。