赞
踩
参考官网:https://www.rabbitmq.com/getstarted.html
简单模式 Simple, 参考RabbitMQ详解(二):消息模式 Simple(简单)模式
简单模式是最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。
发布订阅模式 fanout
同时向多个消费者发送消息的模式(类似广播的形式)
路由模式 direct
根据路由键选择性给多个消费者发送消息的模式
主题模式 topic
是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式
工作模式 work
分发机制
…
注意 type 类型为fanout
图像化管理页面新建queue02、queue03队列
点击交换器后,绑定创建的三个队列
绑定成功后会如图所示
package com.cn.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * fanout(发布订阅) 生产者 */ public class Producer { public static void main(String[] args) { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置工厂属性 factory.setHost("请填写自己的ip地址"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //3.从连接工厂中获取连接 connection = factory.newConnection("生产者1"); //4.从连接中获取通道 channel = connection.createChannel(); //5.申请队列存储信息,此步骤不需要了,我们手动在图形管理页面创建好交换机及绑定好队列queue01、queue02、queue03 //6.准备发送消息的内容 String message = "hello,rabbitmq!"; //7.1.准备交换机 String exchangeName = "fanout-exchange"; //7.2.定义路由key,fanout模式没有routingKey参数 String routingKey = ""; // 7.3: 发送消息给中间件rabbitmq-server /* * @params1: 交换机exchange * @params2: 队列名称/routingkey * @params3: 属性配置 * @params4: 发送消息的内容 */ channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception e) { e.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 8: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
启动生产者, 会看到每个队列都投递了一条消息
package com.cn.fanout; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.Charset; /** * fanout(发布订阅) 消费者 */ public class Consumer { public static Runnable runnable = new Runnable(){ @Override public void run() { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置工厂属性 factory.setHost("请填写自己的ip地址"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { //3.从连接工厂中获取连接 connection = factory.newConnection("生产者1"); //4.从连接中获取通道 channel = connection.createChannel(); //5.接收消息 channel.basicConsume(queueName, true, new DeliverCallback() { public void handle(String s, Delivery delivery) throws IOException { System.out.println(queueName + "收到消息是:" + new String(delivery.getBody(), Charset.defaultCharset())); } }, new CancelCallback() { public void handle(String s) throws IOException { System.out.println("接收消息失败了..."); } }); System.out.println(queueName + "开始接收消息 "); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { // 6: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }; public static void main(String[] args) { // 启动三个线程去执行 new Thread(runnable, "queue01").start(); new Thread(runnable, "queue02").start(); new Thread(runnable, "queue03").start(); } }
启动消费者,会看到队列中消息已经被消费
查看控制台打印日志
package com.cn.direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * direct(路由) 生产者 */ public class Producer { public static void main(String[] args) { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置工厂属性 factory.setHost("请填写自己的ip地址"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //3.从连接工厂中获取连接 connection = factory.newConnection("生产者1"); //4.从连接中获取通道 channel = connection.createChannel(); //5.申请队列存储信息,此步骤不需要了,我们手动在图形管理页面创建好交换机及绑定好队列queue01、queue02、queue03 //6.准备发送消息的内容 String message = "hello,rabbitmq,direct!"; //7.1.准备交换机 String exchangeName = "direct-exchange"; //7.2.定义路由key, direct需要增加routingKey1参数 String routingKey1 = "email"; // String routingKey2 = "sms"; // 7.3: 发送消息给中间件rabbitmq-server /* * @params1: 交换机exchange * @params2: 队列名称/routingkey * @params3: 属性配置 * @params4: 发送消息的内容 */ channel.basicPublish(exchangeName, routingKey1, null, message.getBytes()); // channel.basicPublish(exchangeName, routingKey2, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception e) { e.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 8: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
启动生产者, 会看到只有quque01队列投递了一条消息
因为我们的routingKey指定为email,绑定的队列信息如下,所有只有queue01接收到了消息
//同fanout模式消费者代码相同
启动消费者,会看到队列中消息已经被消费
查看控制台打印日志
package com.cn.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * topic(主题) 生产者 */ public class Producer { public static void main(String[] args) { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置工厂属性 factory.setHost("请填写自己的ip地址"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //3.从连接工厂中获取连接 connection = factory.newConnection("生产者1"); //4.从连接中获取通道 channel = connection.createChannel(); //5.申请队列存储信息,此步骤不需要了,我们手动在图形管理页面创建好交换机及绑定好队列queue01、queue02、queue03 //6.准备发送消息的内容 String message = "hello,rabbitmq,topic!"; //7.1.准备交换机 String exchangeName = "topic-exchange"; //7.2.定义路由key, 模糊匹配 String routingKey1 = "com.order.xxx"; // 7.3: 发送消息给中间件rabbitmq-server /* * @params1: 交换机exchange * @params2: 队列名称/routingkey * @params3: 属性配置 * @params4: 发送消息的内容 */ channel.basicPublish(exchangeName, routingKey1, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception e) { e.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 8: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
启动生产者, 会看到quque01、queue02队列分别投递了一条消息
因为我们的routingKey指定为com.order.xxx,绑定的队列信息如下,所有queue01、queue02接收到了消息
//同fanout模式消费者代码相同
启动消费者,会看到队列中消息已经被消费
查看控制台打印日志
上面操作的案例 我们都是在管理页面端进行交换机的创建以及绑定,现在我们使用纯代码的方式进行操作
package com.cn.all; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 完整 生产者 */ public class Producer { public static void main(String[] args) { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置工厂属性 factory.setHost("请填写自己的ip地址"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //3.从连接工厂中获取连接 connection = factory.newConnection("生产者1"); //4.从连接中获取通道 channel = connection.createChannel(); //5.准备发送消息的内容 String message = "hello,rabbitmq,all!"; //6.1.准备交换机 String exchangeName = "direct-message-exchange"; //6.2.交换机类型 String exchangeType = "direct"; //6.3.声明交换机(是否持久化,true代表交换机不会随着服务器重启丢失) channel.exchangeDeclare(exchangeName,exchangeType,true); //7.声明队列 channel.queueDeclare("queue04", true, false ,false, null); channel.queueDeclare("queue05", true, false ,false, null); channel.queueDeclare("queue06", true, false ,false, null); //8.定义路由key String routingKey1 = "order"; String routingKey2 = "course"; //9.队列和交换机进行绑定 channel.queueBind("queue04", exchangeName, routingKey1); channel.queueBind("queue05", exchangeName, routingKey1); channel.queueBind("queue06", exchangeName, routingKey2); //10: 发送消息给中间件rabbitmq-server /* * @params1: 交换机exchange * @params2: 队列名称/routingkey * @params3: 属性配置 * @params4: 发送消息的内容 */ channel.basicPublish(exchangeName, routingKey1, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception e) { e.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 8: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
启动生产者, 会看到交换机和队列都已创建好,并且已经互相绑定好
同fanout模式消费者代码相同
启动消费者,会看到队列中消息已经被消费
查看控制台打印日志
当有多个消费者时,我们的消费会被哪个消费者消费呢?我们该如何均衡消费者消费信息的多少呢?
package com.cn.work.roundrobin; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置工厂属性 factory.setHost("请填写自己的ip地址"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //3.从连接工厂中获取连接 connection = factory.newConnection("生产者7"); //4.从连接中获取通道 channel = connection.createChannel(); //5.申请队列存储信息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 */ channel.queueDeclare("queue07", true ,false,false, null); //6.准备发送消息的内容 for (int i = 0; i < 20; i++) { String message = "hello,rabbitmq,work!" + i; // 7: 发送消息给中间件rabbitmq-server /* * @params1: 交换机exchange * @params2: 队列名称/routing * @params3: 属性配置 * @params4: 发送消息的内容 */ channel.basicPublish("", "queue07", null, message.getBytes()); } System.out.println("消息发送成功!"); } catch (Exception e) { e.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 8: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
package com.cn.work.roundrobin; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.Charset; public class Consumer1 { public static void main(String[] args) { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置工厂属性 factory.setHost("请填写自己的ip地址"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //3.从连接工厂中获取连接 connection = factory.newConnection("消费者1"); //4.从连接中获取通道 channel = connection.createChannel(); //5.接收消息(应答机制参数为true 自动应答) channel.basicConsume("queue07", true, new DeliverCallback() { public void handle(String s, Delivery delivery) throws IOException { System.out.println("Consumer1收到消息是:" + new String(delivery.getBody(), Charset.defaultCharset())); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { public void handle(String s) throws IOException { System.out.println("Consumer1接收消息失败了..."); } }); System.out.println("Consumer1开始接收消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { // 6: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
同上,名称稍修改即可
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
先在管理页面创建好队列queue,然后启动消费者1和2,最后启动生产者看页面日志
消费者1和消费者2
work1和work2的消息处理能力不同,但是最后处理的消息条数相同,是“按均分配”。
//同上轮询模式的生产者代码相同
注意:
//设置消费消息指标
finalChannel.basicQos(1);
finalChannel.basicConsume(“queue1”, false, new DeliverCallback() { … })
//修改为手动应答
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
package com.cn.work.fairdispatch; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.Charset; public class Consumer1 { public static void main(String[] args) { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置工厂属性 factory.setHost("请填写自己的ip地址"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //3.从连接工厂中获取连接 connection = factory.newConnection("消费者1"); //4.从连接中获取通道 channel = connection.createChannel(); //5.接收消息(应答机制参数为false 手动应答) final Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue07", false, new DeliverCallback() { public void handle(String s, Delivery delivery) throws IOException { System.out.println("Consumer1收到消息是:" + new String(delivery.getBody(), Charset.defaultCharset())); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } //修改为手动应答 finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } }, new CancelCallback() { public void handle(String s) throws IOException { System.out.println("Consumer1接收消息失败了..."); } }); System.out.println("Consumer1开始接收消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { // 6: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
同上,名称稍修改即可
先在管理页面创建好队列queue,然后启动消费者1和2,最后启动生产者看页面日志
消费者1和消费者2
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。