当前位置:   article > 正文

RabbitMQ入门案例之发布订阅模式_订阅 mq

订阅 mq

前言

本文章主要介绍RabbitMQ的发布订阅模式,该模式下,消息为广播形式,一经发布则会进入交换机绑定的队列中,详细介绍可以阅读官方文档。

官网文档地址:https://rabbitmq.com/getstarted.html

什么是发布与订阅模式

RabbitMQ中的发布与订阅模式是一种消息传递的方式,用于在分布式系统中传递消息。

在该模式中,发送者(发布者)通过将消息发送到一个称为Exchange(交换机)的组件,消息将被路由到一个或多个称为Queue(队列)的组件。每个队列都有一个名称和一组绑定(bindings),指定接收哪些消息。消费者(订阅者)可以在指定的队列上进行侦听,以获取消息。

该模式的一个重要特点是支持多个消费者接收相同的消息,通过使用多个队列和绑定实现。这种方式进一步提高了系统的可伸缩性和健壮性,从而能够满足大规模分布式系统对高效消息传递的需求。
在这里插入图片描述

代码实操

在实操之前,我们可以先到RabbitMQ的管理界面先定义好该模式的交换机与绑定交换机的消息队列。
定义交换机如下图:
在这里插入图片描述
然后点击All Exchange查看是否创建成功
在这里插入图片描述
点击定义好的交换机fanout_exchange,会出现下图中的界面:
在这里插入图片描述
这时我们就可以进行绑定队列了,下面是字段解释
在这里插入图片描述
在配置好queue消息后,点击绑定即可。
在这里插入图片描述
需要注意的是,我们绑定的队列是必须在Queue模块已经创建好的对列,不然是无法绑定的,也就是说,交换机和队列是两个分离开的模块,Exchange只负责将信息送到Queue中,而且Queue负责消息的存储和消费。

到这里交换机的工作就准备好了,接下来就是创建项目和导入依赖了。要使用RabbitMQ我们可以导入下面的这个依赖

<!--RabbitMQ依赖-->
<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.10.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

生产者代码

public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("IP地址");
        connectionFactory.setPort(端口号);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            String message = "宇宙无敌爱学习!!!";
            String  exchangeName = "fanout_exchange";
            String routingKey = "";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

执行代码效果如下:在这里插入图片描述
我们到管理界面查看是否有消息被写入了,结果如下:
在这里插入图片描述
发现在这个模式下,三个队列都被放入了同一个消息,这就是所谓的发布与订阅,类似广播,一经发布就全部人都收到

接下来我执行生产者代码,生产者代码使用了线程来进行处理,代码如下:

public class Consumer {
    private static Runnable runnable = () -> {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("IP地址");
        connectionFactory.setPort(端口号);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //获取队列的名称
        final String queueName = Thread.currentThread().getName();
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            // 这里如果queue已经被创建过一次了,可以不需要定义
            //channel.queueDeclare("queue1", false, false, false, null);
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
            finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println(queueName + ":开始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    };
    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "queue1").start();
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

结果如下:
在这里插入图片描述在这里插入图片描述
可以看出,消息被消费掉了。

以上便是发布与订阅的全部内容,仅作为个学习笔记使用
感谢观看
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/694405
推荐阅读
相关标签
  

闽ICP备14008679号