赞
踩
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
总共有一下类型:
无名交换机也就是默认交换机,当交换机名填空字符串时默认使用的是此交换机。
当我们第一次接触RabbitMQ时(如下模型),直接使用队列连接生产者和消费者,但其实是使用了默认交换机。
// 生产者 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, "Hello World!".getBytes()); // 消费者 channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(new String(message.getBody(), StandardCharsets.UTF_8)); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println(consumerTag); } });
在编写代码时并没有声明交换机,basicPublish方法第一个参数传入"",第二个参数指定队列名。basicConsume方法第一个参数指定从哪个队列中取消息。这就是使用无名交换机的例子。
补充:如何创建临时队列
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
String queueName = channel.queueDeclare().getQueue();
Fanout 这种类型非常简单。正如从名称中猜到的那样“扇出”,它是将接收到的所有消息广播到它知道的所有队列中。
补充:绑定(bindings)
什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定。
Fanout 实战
一个生产者,两个消费者,每个消费者对应一个队列,将两个队列和同一个类型为fanout的交换机进行绑定,生产者只用向这个交换机发送消息就可以,不需要关心交换机会把消息发给谁。
消费者1
public class ReceiveLogs01 { public static final String DECLARE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 声明一个队列,队列的名称是随机的,当消费者断开与队列的连接,就自动删除 String queueName = channel.queueDeclare().getQueue(); // 声明一个交换机 channel.exchangeDeclare(DECLARE_NAME, BuiltinExchangeType.FANOUT); // 绑定交换机与队列 channel.queueBind(queueName, DECLARE_NAME, ""); System.out.println("ReceiveLogs01等待接收消息:"); channel.basicConsume(queueName, true, (consumerTag, message) -> { System.out.println("123: " + new String(message.getBody(), StandardCharsets.UTF_8)); }, (CancelCallback) null); } }
消费者2
public class ReceiveLogs02 { public static final String DECLARE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 声明一个队列,队列的名称是随机的,当消费者断开与队列的连接,就自动删除 String queueName = channel.queueDeclare().getQueue(); // 声明一个交换机 channel.exchangeDeclare(DECLARE_NAME, BuiltinExchangeType.FANOUT); //绑定交换机与队列 channel.queueBind(queueName, DECLARE_NAME, ""); System.out.println("ReceiveLogs02等待接收消息:"); channel.basicConsume(queueName, true, (consumerTag, message) -> { System.out.println("123: " + new String(message.getBody(), StandardCharsets.UTF_8)); }, (CancelCallback) null); } }
生产者
public class EmitLog {
public static final String DECLARE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
// 把消息发送给指定交换机,fanout类型不需要指定routingKey
channel.basicPublish(DECLARE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
}
}
}
交换机可以通过路由(routingKey)与队列进行绑定,在接收到生产者发来消息后,通过路由发送给指定队列,从而达到指定消费者消费。
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
Direct实战
消费者1
public class ReceiveLogsDirect01 { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { // 通过自定义的工具类获取信道 Channel channel = RabbitMQUtils.getChannel(); // 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 声明一个队列 channel.queueDeclare("console", false, false, false, null); // 绑定交换机 channel.queueBind("console", EXCHANGE_NAME, "info"); channel.queueBind("console", EXCHANGE_NAME, "warning"); // 是否自动应答 boolean autoAck = true; channel.basicConsume("console", autoAck, (x, y) -> { System.out.println("消息:" + new String(y.getBody(), StandardCharsets.UTF_8)); }, System.out::println); } }
消费者2
public class ReceiveLogsDirect02 { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 声明一个队列 channel.queueDeclare("disk", false, false, false, null); // 绑定交换机 channel.queueBind("disk", EXCHANGE_NAME, "error"); channel.queueBind("disk", EXCHANGE_NAME, "warning"); channel.basicConsume("disk", true, (x, y) -> { System.out.println("消息:" + new String(y.getBody(), StandardCharsets.UTF_8)); }, System.out::println); } }
生产者
public class DirectLogs { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); Scanner scanner = new Scanner(System.in); System.out.println("请输入消息[格式:routingKey message]:"); while (scanner.hasNext()) { String routingKey = scanner.next(); String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8)); } } }
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的
Topics 实战
消费者1
public class ReceiveLogsTopic01 { public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("Q1", false, false, false, null); channel.queueBind("Q1", EXCHANGE_NAME, "*.orange.*"); System.out.println("C1等到接收消息..."); channel.basicConsume("Q1", true, (x, y) -> { System.out.println("消息:" + new String(y.getBody(), StandardCharsets.UTF_8)); }, System.out::println); } }
消费者2
public class ReceiveLogsTopic02 { public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("Q2", false, false, false, null); channel.queueBind("Q2", EXCHANGE_NAME, "lazy.#"); channel.queueBind("Q2", EXCHANGE_NAME, "*.*.rabbit"); System.out.println("C1等到接收消息..."); channel.basicConsume("Q2", true, (x, y) -> { System.out.println("消息:" + new String(y.getBody(), StandardCharsets.UTF_8)); }, System.out::println); } }
生产者
public class TopicLogs { public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String routingKey = scanner.next(); String message = scanner.next(); System.out.println(routingKey); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8)); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。