赞
踩
本文将介绍RabbitMQ的七种工作模式的第一种Simple模式的代码实现,编程工具使用的是IDEA,在RabbitMQ中的工作模式都是生产消费模型
生产者消费模型的介绍已经手写可以参考笔者的这篇文章:多线程实操&&阻塞队列
所谓Simple模式就是简单的一个生产者p与一个消费者c,一对一的关系,如下图所示:
在这个过程中,生产者会将消息通过channel通道放入到我们的消息队列queue中,消费者在察觉消息队列中有消息时,会从queue中获取消息。
要在IDEA中使用RabbitMQ,创建需要导入下面这些依赖
<!--Java原生RabbitMQ依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
创建一个Simple包,在包里面创建两个类,Producer(生产者)和Consumer(消费者)
Producer代码如下
/** * 生产者 */ public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost(主机地址);//服务器地址 connectionFactory.setPort(5672);//服务端口号 connectionFactory.setVirtualHost("/");//消息隔离点,可以类比为数据库中的表 connectionFactory.setUsername("guest");//登录的身份账号 connectionFactory.setPassword("guest");//密码 Connection connection = null; Channel channel = null; try { String queue1 = "queue1";// 消息存放的消息位置 // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ channel.queueDeclare(queue1, false, false, false, null); // 6: 准备发送消息的内容 String message = "我宇宙超级无敌终极爱学习!!!"; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange,为空的话会使用默认的dirct模式交换机 // @params2: 队列名称/routing // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish("",queue1,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("信息发布成功"); } catch (Exception e) { e.printStackTrace(); System.out.println("信息发布失败....."); }finally { // 先释放通道再释放连接 // 7: 释放连接关闭通道 if(channel != null && channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if(connection != null ){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
执行结果如下图:
我们到RabbitMQ的管理界面看看消息是否被放进了queue1中(Queue->queue1–>getMessges),结果如下:
Consumer代码如下:
/** * 消费者 */ public class Consumer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost(主机地址); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = null; Channel channel = null; try { String queue1 = "queue1"; // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 从队列中接收消息 channel.basicConsume(queue1, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("接收到了消息是:" + new String(message.getBody(),"UTF-8")); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("消息接收失败~~"); } }); // 这里需要这样设置是因为防止信息还没有读取到程序就结束了,因为上面的代码是异步的,下面开始执行了,上面都不一定执行完毕 System.out.println("开始接收消息..."); System.in.read(); } catch (Exception e) { e.printStackTrace(); System.out.println("信息发布失败....."); } finally { // 先释放通道再释放连接 // 6: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
执行结果:
RabbitMQ服务界面查看一下是否还有消息
total已经为0了,所以刚刚的消息被读取到了
既然我们刚刚使用到了管理界面,那当然要做一下简单介绍,如下图:
Queue功能释义
queue1的详情页
消息获取模式说明:
- NACK应答:不告诉RabbitMQ-Server消息被消费了,重新放入消息队列中,简单说就是盯了一眼就放回去
- ACK应答:告诉RabbitMQ-Server消息被消费了,消息会重消息队列中去
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。