赞
踩
提示:以下是本篇文章正文内容,不足之处欢迎指出
MQ (Message Quene):翻译为 消息队列 ,通过典型的 生产者 和 消费者 模型生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入轻松的实现系统间解耦。别名为消息中间件—通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
当今市面上有很多主流的消息中间件,如老牌的 ActiveMQ、 RabbitMQ,炙手可热的 Kafka,阿里巴巴自主开发 RocketMQ 等。
总结:RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性〈少量延迟),可靠性〔少量丢数据)要求稍低的场景使用,比如ELK日志收集。
RabbitMQ,基于 AMQP 协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。
官网下载地址:https://www.rabbitmq.com/download.html
我们直接选择Linux环境下进行安装,官网可能出现下载网速慢的情况,这里已经准备好了安装包和相关依赖
rpm -ivh erlang-22.0.7-1.e17.x86_64.rpm
(若安装过程中报错,显示libcrypto.so.10(OPENSSL_1.0.2)(64bit) is needed by erlang-22.0.7-1.el7.x86_64错误,则先安装openssl-libs-1.0.2k-19.el7.x86_64.rpm依赖,在进行erlang安装)
rpm -ivh openssl-libs-1.0.2k-19.el7.x86_64.rpm --force
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh --nodeps rabbitmq-server-3.7.18-1.el7.noarch.rpm
find / -name rabbitmq.config.example
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
vim /etc/rabbitmq/rabbitmq.config
修改为如下图所示
rabbitmq-plugins enable rabbitmq_management 出现如下说明: Enabling plugins on node rabbit@localhost: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@localhost... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch set 3 plugins. Offline change; changes will take effect at broker restart.
启动:systemctl start rabbitmq-server
重启:systemctl restart rabbitmq-server
停止:systemctl stop rabbitmq-server
systemctl status rabbitmq-server
systemctl stop firewalld
1. 服务启动相关
systemctl start|restart|stop status rabbitmq-server
2. 管理命令行 用来在不使用web管理界面情况下命令操作RabbitMQ
rabbitmqctl help 可以查看更多命令
3. 插件管理命令行
rabbitmqplugins enable|list|disable
connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才
可以完成消息的生产和消费,在这里可以查看连接情况`
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
Exchanges:交换机,用来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中,等待消费,
消费后被移除队列。
上面的Tags选项,其实是指定用户的角色,可选的有以下几个: 超级管理员(administrator) 可登陆管理控制台,可查看所有的信息,并且可以对用户, 策略(policy)进行操作。 监控者(monitoring) 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息 (进程数,内存使用情况,磁盘使用情况等) 策略制定者(policymaker) 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的 相关信息(上图红框标识的部分)。 普通管理者(management) 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 其他 无法登陆管理控制台,通常就是普通的生产者和消费者。
虚拟主机
为了让各个用户可以互不干扰的工作,RabbitMQ添加了
虚拟主机(Virtual Hosts)的概念。其实就是一个独立的
访问路径,不同用户使用不同路径,各自有自己的
队列、交换机,互相不会影响。相当于关系型中的数据库
可上官网查看具体详情
<!--引入rabbitmq的相关依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱,
可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
public class Provider { // 生产消息 @Test public void testSendMessage() throws IOException, TimeoutException { // 创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置连接rabbitmq主机 connectionFactory.setHost("192.168.88.100"); // 设置端口号 connectionFactory.setPort(5672); // 设置连接哪个虚拟机 connectionFactory.setVirtualHost("/emsVirtual"); // 设置访问虚拟主机的用户名和密码 connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); // 获取连接对象 Connection connection = connectionFactory.newConnection(); // 获取连接中的通道 Channel channel = connection.createChannel(); // 通道绑定对应消息队列 // 参数1:队列名称 如果队列不存在则自动创建 // 参数2:用来定义队列特性是否需要持久化 true 持久化队列 false 不持久化 // 参数3:exclusive:是否独占队列 true 独占队列 false 不独占 // 参数4:autoDelete:是否在消费完成后自动删除队列 true 自动删除 false 不自动删除 // 参数5:额外附加参数 channel.queueDeclare("hello", false, false, false, null); // 发布消息 // 参数1:交换机名称 参数2:队列名称 参数3:传递消息额外设置 参数4:消息的具体内容 channel.basicPublish("","hello",null,"hello rabbitmq".getBytes()); // 关闭通道 channel.close(); // 关闭连接 connection.close(); } }
// 消费者 public class Customer { public static void main(String[] args) throws IOException, TimeoutException { // 创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置连接rabbitmq主机 connectionFactory.setHost("192.168.88.100"); // 设置端口号 connectionFactory.setPort(5672); // 设置连接哪个虚拟机 connectionFactory.setVirtualHost("/emsVirtual"); // 设置访问虚拟主机的用户名和密码 connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); // 获取连接对象 Connection connection = connectionFactory.newConnection(); // 获取连接中的通道 Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); // 消费消息 // 参数1:消费哪个队列的消息 队列名称 // 参数2:开始消息的自动确认机制 // 参数3:消费时的回调接口 channel.basicConsume("hello", true, new DefaultConsumer(channel) { // 最后一个参数:消息队列中取出的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
public class RabbitMQUtils { private static ConnectionFactory connectionFactory; // 静态代码块,在类加载时执行,只执行一次 static { connectionFactory = new ConnectionFactory(); // 设置连接rabbitmq主机 connectionFactory.setHost("192.168.88.100"); // 设置端口号 connectionFactory.setPort(5672); // 设置连接哪个虚拟机 connectionFactory.setVirtualHost("/emsVirtual"); // 设置访问虚拟主机的用户名和密码 connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); } // 定义提供连接对象的方法 public static Connection getConnection() { try { // 获取连接对象 return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; } // 关闭通道和连接 public static void closeConnectionAndChanel(Channel channel, Connection connection) { try { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } }
Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消耗速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
P:生产者:任务的发布者
C1:消费者1,领取任务并且完成任务,假设完成速度较慢
C2:消费者2,领取任务并且完成任务,假设完成速度快
public class Provider { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 通过通道声明队列 channel.queueDeclare("work", true, false, false, null); for (int i = 0; i < 10; i++) { // 生产消息 channel.basicPublish("", "work", null, (i + "hello work quene").getBytes()); } // 关闭资源 RabbitMQUtils.closeConnectionAndChanel(channel, connection); } }
public class Customer1 { public static void main(String[] args) throws IOException { // 获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1-" + new String(body)); } }); } }
public class Customer2 { public static void main(String[] args) throws IOException { // 获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2-" + new String(body)); } }); } }
测试结果
总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。
消息确认机制:
完成一项任务可能需要几秒钟。您可能想知道,如果其中一个消费者开始了一项长期任务,但只完成了一部分就死了,会发生什么情况。在我们当前的代码中,一旦RabbitMQ将消息传递给使用者,它就会立即将其标记为删除。在这种情况下,如果您杀死一个worker,我们将丢失它刚刚处理的消息。我们还将丢失发送给该特定工作进程但尚未处理的所有消息。但我们不想失去任何任务。如果一个worker死了,我们希望把任务交给另一个工人。
// 开发消费者1 public class Customer1 { public static void main(String[] args) throws IOException { // 获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 保证消息队列中每次只能处理一个 channel.basicQos(1); channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } System.out.println("消费者1-" + new String(body)); // 参数1:确认队列中那个具体消息 参数2:是否开启多个消息同时确实 channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
// 开发消费者2 public class Customer2 { public static void main(String[] args) throws IOException { // 获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 保证消息队列中每次只能处理一个 channel.basicQos(1); channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2-" + new String(body)); //手动确认 参数1:手动确认消息标识 参数2:false 每次确认一个 channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
设置通道一次只能消费一个消息
关闭消息的自动确认,开启手动确认消息
fanout 扇出 也称为广播
在广播模式下,消息发送流程是这样的:
1.可以有多个消费者
2.每个消费者有自己的queue(队列)
3.每个队列都要绑定到Exchange(交换机)
4.生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
5.交换机把消息发送给绑定过的所有队列
6.队列的消费者都能拿到消息。实现一条消息被多个消费者消费
public class Provider { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 将通道声明指定交换机 // 参数1:交换机名称 参数2:交换机类型 fanout 广播类型 channel.exchangeDeclare("logs", "fanout"); // 发送消息 channel.basicPublish("logs", "", null, "fanout type message".getBytes()); // 关闭资源 RabbitMQUtils.closeConnectionAndChanel(channel, connection); } }
public class Customer1 { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("logs", "fanout"); // 临时队列 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queueName, "logs", ""); // 消费消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } }
public class Customer2 { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("logs", "fanout"); // 临时队列 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queueName, "logs", ""); // 消费消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:" + new String(body)); } }); } }
在Fanout模式中。一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。
这时就要用到 Direct 类型的 Exchange
在Direct模型下:
队列与交换机的绑定,不能在是任意绑定了,而是要指定一个 RoutingKey (路由key);
消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey;
Exchange不再把消息交给每一个绑定的队列,而是根据消息的 RoutingKey 进行判断,只有队列的 RoutingKey 与消息的RoutingKey也完全一致,才会接收到消息。
图解:
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
public class Provider { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 通过通道声明交换机 参数1:交换机名称 参数2:direct 路由模式 channel.exchangeDeclare("logs_direct", "direct"); // 发送消息 String routingkey = "info"; channel.basicPublish("logs_direct", routingkey, null, ("这是direct模型发布的基于route key:[" + routingkey + "]发送的消息").getBytes()); // 关闭资源 RabbitMQUtils.closeConnectionAndChanel(channel, connection); } }
public class Customer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明交换机名称 String exchangeName = "logs_direct"; // 通道声明交换机以及交换的类型 channel.exchangeDeclare("logs_direct", "direct"); // 创建一个临时队列 String queue = channel.queueDeclare().getQueue(); // 基于route key绑定队列和交换机 channel.queueBind(queue, exchangeName, "error"); // 获取消费的消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } }
public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明交换机名称 String exchangeName = "logs_direct"; // 通道声明交换机以及交换的类型 channel.exchangeDeclare("logs_direct", "direct"); // 创建一个临时队列 String queue = channel.queueDeclare().getQueue(); // 基于route key绑定队列和交换机 channel.queueBind(queue, exchangeName, "info"); channel.queueBind(queue, exchangeName, "error"); channel.queueBind(queue, exchangeName, "warning"); // 获取消费的消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } }
测试生产者发送Route key为error的消息时
测试生产者发送Route key为info的消息时
Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 RoutingKey 的时候使用通配符!这种模型 RoutingKey 一般都是由一个或多个单词组成,多个单词之间以“.”分割,例如:‘item.insert’
# 统配符
* (star) can substitute for exactly one word. 匹配不多不少恰好1个词
# (hash) can substitute for zero or more words. 匹配零个、一个或多个词
# 如:
audit.# 匹配audit、audit.irs 、或者audit.irs.corporate等
audit.* 只能匹配 audit.irs
public class Provider { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 通过通道声明交换机以及交换机类型 参数1:交换机名称 参数2:topic channel.exchangeDeclare("topics", "topic"); // 发送消息 String routingkey = "user.save"; channel.basicPublish("topics", routingkey, null, ("这是topic模型发布的基于route key:[" + routingkey + "]发送的消息").getBytes()); // 关闭资源 RabbitMQUtils.closeConnectionAndChanel(channel, connection); } }
// Routing Key中使用*通配符方式 public class Customer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 通道声明交换机以及交换的类型 channel.exchangeDeclare("topics", "topic"); // 创建一个临时队列 String queue = channel.queueDeclare().getQueue(); // 基于route key绑定队列和交换机 动态通配符形式 channel.queueBind(queue, "topics", "user.*"); // 获取消费的消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } }
// Routing Key中使用#通配符方式 public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 通道声明交换机以及交换的类型 channel.exchangeDeclare("topics", "topic"); // 创建一个临时队列 String queue = channel.queueDeclare().getQueue(); // 基于route key绑定队列和交换机 动态通配符形式 channel.queueBind(queue, "topics", "user.#"); // 获取消费的消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:" + new String(body)); } }); } }
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
application:
name: springboot_rabbitmq
rabbitmq:
host: 192.168.88.100
port: 5672
username: ems
password: 123456
virtual-host: /emsVirtual
@SpringBootTest(classes = RabbitMqDemo01Application.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { // 注入rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; // hello world @Test public void test01() { rabbitTemplate.convertAndSend("hello","hello world"); } // 生产端没有指定交换机只有routingKey和Object。 //消费方产生hello队列,放在默认的交换机(AMQP default)上。 //而默认的交换机有一个特点,只要你的routerKey的名字与这个 //交换机的队列有相同的名字,他就会自动路由上。 //生产端routingKey 叫hello ,消费端生产hello队列。 //他们就路由上了 }
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {
@RabbitHandler
public void receive(String message) {
System.out.println("message = " + message);
}
}
@SpringBootTest(classes = RabbitMqDemo01Application.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { // 注入rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; // work @Test public void testWork() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work", i + " work模型"); } } }
@Component public class WorkCustomer { // 消费者1 @RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String message) { System.out.println("messge1 = " + message); } // 消费者1 @RabbitListener(queuesToDeclare = @Queue("work")) public void receive2(String message) { System.out.println("messge2 = " + message); } }
@SpringBootTest(classes = RabbitMqDemo01Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
// 注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// fanout 广播
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("logs","","Fanout的模型发送的消息");
}
}
@Component public class FanoutCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, // 创建临时队列 exchange = @Exchange(value = "logs" , type = "fanout") // 绑定的交换机 ) }) public void receive1(String message) { System.out.println("message1 = " +message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, // 创建临时队列 exchange = @Exchange(value = "logs" , type = "fanout") // 绑定的交换机 ) }) public void receive2(String message) { System.out.println("message2 = " +message); } }
@SpringBootTest(classes = RabbitMqDemo01Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
// 注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
//route 路由模式
@Test
public void testRoute() {
rabbitTemplate.convertAndSend("directs", "info", "发送info的key的路由信息");
}
}
@Component public class RouteCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, // 创建临时队列 exchange = @Exchange(value = "directs", type = "direct"), //自定义交换机名称和类型 key = {"info", "error", "warning"} ) }) public void receive1(String message) { System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, // 创建临时队列 exchange = @Exchange(value = "directs", type = "direct"), //自定义交换机名称和类型 key = {"info"} ) }) public void receive2(String message) { System.out.println("message2 = " + message); } }
测试结果
发送info的key的路由信息
发送error的key的路由信息
@SpringBootTest(classes = RabbitMqDemo01Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
// 注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
//topic 动态路由 订阅模式
public void testTopic() {
rabbitTemplate.convertAndSend("topics", "suer.save", "user.save 路由消息");
}
}
@Component public class TopicCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(name = "topics", type = "topic"), key = {"user.save", "user.*"} ) }) public void receive1(String message) { System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(name = "topics", type = "topic"), key = {"order.#", "product.#", "user.*"} ) }) public void receive2(String message) { System.out.println("message2 = " + message); } }
测试结果
user.save 路由消息
order.save 路由消息
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种。1、串行的方式 2、并行的方式
串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,邮件、短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。
并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式可能提高处理的时间。
消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高了处理时间,但是,邮件和短信对我们正常使用没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回,消息队列:引入消息队列后,把发送邮件、短信不是必须的业务逻辑异步处理。
场景:双11是购物狂阶,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。
这种做法有一个缺点:当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合,引入消息队列
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
至此,RabbitMQ的学习以及整合SpringBoot已经完成!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。