赞
踩
这篇文章,主要介绍消息队列RabbitMQ七种模式之发布订阅模式(Publish/Subscribe)。
目录
(2)Exchange如何知道消息应该分发到哪个Queue里面?
前面介绍的两个模式下,生产者都是直接和Queue消息队列交互的,这种方式存在一个问题,那就是如果要向不同的Queue消息队列里面发送数据,还得重新写一个生产者出来,为了解决这个问题,RabbitMQ引入了Exchange交换机。
Exchange交换机,它主要作用就是接收生产者发送的消息,并且根据消息的RouteKey将消息映射到不同的Queue消息队列里面,通过这种方式,就可以实现不同的消息分发到不同的Queue消息队列里面,并且这种模式下,生产者是不会直接和Queue消息队列交互的,生产者也不需要知道消息应该放入哪个队列,它只要告诉Exchange即可,剩下的事情都是Exchange来处理。
为了能够让Exchange知道某一条消息应该分发到哪个消息队列里面,RabbitMQ需要给Exchange和Queue之间添加映射关系,这个关系叫做:Binding(绑定)。虽然绑定在一起了,但是还是区分不了哪个消息应该分发到哪个Queue队列里面,所以还需要一个唯一标识,这个标识就是RouteKey路由键,也可以认为是Queue的唯一标识。采用上面这种模式,大致的逻辑图就如下所示啦:
RabbitMQ中有四种常见的Exchange交换机类型,分别是:
RabbitMQ中,发布订阅模式就是指定Exchange交换机的类型是【fanout】,然后RabbitMQ就会将消息分发到所有和这个Exchange交换机绑定的Queue消息队列里面,此时所有的消费者都可以接收到这一条消息。
注意:对于发布订阅模式来说,消息的路由键RouteKey是没有作用的,可以不写。
- <!-- 引入 RabbitMQ 依赖 -->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.16.0</version>
- </dependency>
- package com.rabbitmq.demo.pub;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- /**
- * @version 1.0.0
- * @Date: 2023/2/25 16:23
- * @Copyright (C) ZhuYouBin
- * @Description: 消息生产者
- */
- public class Producer {
- public static void main(String[] args) {
- // 1、创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 2、设置连接的 RabbitMQ 服务地址
- factory.setHost("127.0.0.1"); // 默认就是本机
- factory.setPort(5672); // 默认就是 5672 端口
- // 3、获取连接
- Connection connection = null; // 连接
- Channel channel = null; // 通道
- try {
- connection = factory.newConnection();
- // 4、获取通道
- channel = connection.createChannel();
- // 5、声明 Exchange,如果不存在,则会创建
- String exchangeName = "exchange_demo_2023";
- channel.exchangeDeclare(exchangeName, "fanout");
- // 6、发送消息
- String message = "这是发布订阅模式,发送的消息数据";
- channel.basicPublish(exchangeName, "", null, message.getBytes());
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (null != channel) {
- try {
- channel.close();
- } catch (Exception e) {}
- }
- if (null != connection) {
- try {
- connection.close();
- } catch (Exception e) {}
- }
- }
- }
- }
- package com.rabbitmq.demo.pub;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
-
- /**
- * @version 1.0.0
- * @Date: 2023/2/25 16:30
- * @Copyright (C) ZhuYouBin
- * @Description: 消息消费者
- */
- public class Consumer {
- public static void main(String[] args) {
- // 1、创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 2、设置连接的 RabbitMQ 服务地址
- factory.setHost("127.0.0.1"); // 默认就是本机
- factory.setPort(5672); // 默认就是 5672 端口
- // 3、获取连接
- Connection connection = null; // 连接
- Channel channel = null; // 通道
- try {
- connection = factory.newConnection();
- // 4、获取通道
- channel = connection.createChannel();
- // 5、声明 Exchange,如果不存在,则会创建
- String exchangeName = "exchange_demo_2023";
- channel.exchangeDeclare(exchangeName, "fanout");
- // 6、指定需要操作的消息队列,如果队列不存在,则会创建
- String queueName = "queue_demo_2023";
- channel.queueDeclare(queueName, false, false, false, null);
- // 7、绑定 Exchange 和 Queue
- channel.queueBind(queueName, exchangeName, "");
- // 8、消费消息
- DeliverCallback callback = new DeliverCallback() {
- public void handle(String s, Delivery delivery) throws IOException {
- // 接收消息
- System.out.println("这是接收的消息:" + new String(delivery.getBody()));
- }
- };
- channel.basicConsume(queueName, true, callback, i->{});
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
运行消费者、生产者,查看控制台,消费者日志输出内容,可以发现两个绑定不同Queue队列的消费者,接收到了相同的消息,这就是发布订阅模式。
到此,RabbitMQ消息队列中的发布订阅模式就介绍完啦。
综上,这篇文章结束了,主要介绍消息队列RabbitMQ七种模式之发布订阅模式(Publish/Subscribe)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。