当前位置:   article > 正文

RabbitMQ入门案例之Simple简单模式_mq中的simple模式

mq中的simple模式

前言

本文将介绍RabbitMQ的七种工作模式的第一种Simple模式的代码实现,编程工具使用的是IDEA,在RabbitMQ中的工作模式都是生产消费模型

生产者消费模型的介绍已经手写可以参考笔者的这篇文章:多线程实操&&阻塞队列

官网文档地址:https://rabbitmq.com/getstarted.html

什么是Simple模式

所谓Simple模式就是简单的一个生产者p与一个消费者c,一对一的关系,如下图所示:
在这里插入图片描述
在这个过程中,生产者会将消息通过channel通道放入到我们的消息队列queue中,消费者在察觉消息队列中有消息时,会从queue中获取消息。

Simple模式操作

要在IDEA中使用RabbitMQ,创建需要导入下面这些依赖

<!--Java原生RabbitMQ依赖-->
<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.10.0</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

创建一个Simple包,在包里面创建两个类,Producer(生产者)和Consumer(消费者)

Producer代码如下

/**
 * 生产者
 */
public class Producer {

    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost(主机地址);//服务器地址
        connectionFactory.setPort(5672);//服务端口号
        connectionFactory.setVirtualHost("/");//消息隔离点,可以类比为数据库中的表
        connectionFactory.setUsername("guest");//登录的身份账号
        connectionFactory.setPassword("guest");//密码

        Connection connection = null;
        Channel channel = null;
        try {
            String queue1 = "queue1";// 消息存放的消息位置
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            channel.queueDeclare(queue1, false, false, false, null);
            // 6: 准备发送消息的内容
            String message = "我宇宙超级无敌终极爱学习!!!";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange,为空的话会使用默认的dirct模式交换机
            // @params2: 队列名称/routing
            // @params3: 属性配置
            // @params4: 发送消息的内容
            channel.basicPublish("",queue1,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("信息发布成功");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("信息发布失败.....");
        }finally {
            // 先释放通道再释放连接
            // 7: 释放连接关闭通道
            if(channel != null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection != null   ){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

执行结果如下图:
在这里插入图片描述
我们到RabbitMQ的管理界面看看消息是否被放进了queue1中(Queue->queue1–>getMessges),结果如下:
在这里插入图片描述

Consumer代码如下:

/**
 * 消费者
 */
public class Consumer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost(主机地址);
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = null;
        Channel channel = null;
        try {
            String queue1 = "queue1";
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 从队列中接收消息
            channel.basicConsume(queue1, true, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("接收到了消息是:" + new String(message.getBody(),"UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("消息接收失败~~");
                }
            });
            // 这里需要这样设置是因为防止信息还没有读取到程序就结束了,因为上面的代码是异步的,下面开始执行了,上面都不一定执行完毕
            System.out.println("开始接收消息...");
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("信息发布失败.....");
        } finally {
            // 先释放通道再释放连接
            // 6: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

执行结果:
在这里插入图片描述
RabbitMQ服务界面查看一下是否还有消息
在这里插入图片描述
total已经为0了,所以刚刚的消息被读取到了

RabbitMQ管理界面的部分介绍

既然我们刚刚使用到了管理界面,那当然要做一下简单介绍,如下图:

Queue功能释义

在这里插入图片描述

queue1的详情页

在这里插入图片描述
消息获取模式说明:
在这里插入图片描述

  • NACK应答:不告诉RabbitMQ-Server消息被消费了,重新放入消息队列中,简单说就是盯了一眼就放回去
  • ACK应答:告诉RabbitMQ-Server消息被消费了,消息会重消息队列中去
以上是RabbitMQ入门案例的全部内容,感谢阅读~~
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/515835
推荐阅读
相关标签
  

闽ICP备14008679号