赞
踩
源码地址:https://github.com/Tellsea/springboot-learn/tree/master/springboot-rabbitmq
RabbitMQ 提供了 6 种消息模型,但是第 6 种其实是 RPC,并不是 MQ,因此不予学习。那么也就剩下 5 种。
rotingKey
的队列routing pattern
(路由模式) 的队列但是其实 3、4、5 这三种都属于订阅模型,只不过进行路由的方式不同。
我已经安装好了 RabbitMQ,Erlang,RabbitMQ 图形界面插件。创建了用户:tellsea,和虚拟主机:/tellsea-host,并设置了使用权,下面给出下载地址。
相关软件的安装
或者群文件夹下载,QQ 群:957406675
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
spring:
rabbitmq:
host: 127.0.0.1
username: tellsea
password: 123456
virtual-host: /tellsea-host
Spring AMQP 提供的‘template’扮演者关键的角色。定义主要操作的接口是 AmqpTemplate。
@Autowired
private AmqpTemplate amqpTemplate;
发送消息
@Test
public void simple() throws InterruptedException {
String msg = "RabbitMQ simple ...";
for (int i = 0; i < 10; i++) {
amqpTemplate.convertAndSend("spring.simple.queue", msg + i);
}
Thread.sleep(5000);
}
接收消息
/**
* simple:生产者-->队列-->消费者
*/
@Component
public class SimpleListener {
// 通过注解自动创建 spring.simple.queue 队列
@RabbitListener(queuesToDeclare = @Queue("spring.simple.queue"))
public void listen(String msg) {
System.out.println("SimpleListener listen 接收到消息:" + msg);
}
}
在刚才的基本模型中,一个生产者,一个消费者,生产的消息直接被消费者消费。比较简单。
Work queues,也被称为(Task queues),任务模型。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
发送消息
@Test
public void work() throws InterruptedException {
String msg = "RabbitMQ simple ...";
for (int i = 0; i < 10; i++) {
amqpTemplate.convertAndSend("spring.work.queue", msg + i);
}
Thread.sleep(5000);
}
接收消息
@Component
public class WorkListener {
// 通过注解自动创建 spring.work.queue 队列
@RabbitListener(queuesToDeclare = @Queue("spring.work.queue"))
public void listen(String msg) {
System.out.println("WorkListener listen 接收到消息:" + msg);
}
// 创建两个队列共同消费
@RabbitListener(queuesToDeclare = @Queue("spring.work.queue"))
public void listen2(String msg) {
System.out.println("WorkListener listen2 接收到消息:" + msg);
}
}
Fanout,也称为广播。在广播模式下,消息发送流程是这样的:
发送消息
@Test
public void fanout() throws InterruptedException {
String msg = "RabbitMQ fanout ...";
for (int i = 0; i < 10; i++) {
// 这里注意细节,第二个参数需要写,不然第一个参数就变成routingKey了
amqpTemplate.convertAndSend("spring.fanout.exchange", "", msg + i);
}
Thread.sleep(5000);
}
接收消息
/**
* Fanout:广播,将消息交给所有绑定到交换机的队列,每个消费者都会收到同一条消息
*/
@Component
public class FanoutListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.fanout.queue", durable = "true"),
exchange = @Exchange(
value = "spring.fanout.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.FANOUT
)
))
public void listen(String msg) {
System.out.println("FanoutListener listen 接收到消息:" + msg);
}
// 队列2(第二个人),同样能接收到消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.fanout2.queue", durable = "true"),
exchange = @Exchange(
value = "spring.fanout.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.FANOUT
)
))
public void listen2(String msg) {
System.out.println("FanoutListener listen2 接收到消息:" + msg);
}
}
在 Fanout 模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到 Direct 类型的 Exchange。
在 Direct 模型下:
RoutingKey
(路由 key)RoutingKey
。Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息发送消息
@Test
public void direct() throws InterruptedException {
String msg = "RabbitMQ direct ...";
for (int i = 0; i < 10; i++) {
amqpTemplate.convertAndSend("spring.direct.exchange", "direct", msg + i);
}
Thread.sleep(5000);
}
接收消息
/**
* Direct:定向,把消息交给符合指定routing key 的队列
*/
@Component
public class DirectListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.direct.queue", durable = "true"),
exchange = @Exchange(
value = "spring.direct.exchange",
ignoreDeclarationExceptions = "true"
),
key = {"direct"}
))
public void listen(String msg) {
System.out.println("DirectListener listen 接收到消息:" + msg);
}
// 队列2(第二个人),key值不同,接收不到消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.direct2.queue", durable = "true"),
exchange = @Exchange(
value = "spring.direct.exchange",
ignoreDeclarationExceptions = "true"
),
key = {"direct-test"}
))
public void listen2(String msg) {
System.out.println("DirectListener listen2 接收到消息:" + msg);
}
}
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: user.insert
通配符规则 | 举例 |
---|---|
# :匹配一个或多个词 | user.# :能够匹配user.insert.save 或者 user.insert |
* :匹配不多不少恰好 1 个词 | user.* :只能匹配user.insert |
发送消息
@Test
public void topic() throws InterruptedException {
amqpTemplate.convertAndSend("spring.topic.exchange", "user.insert", "新增用户");
amqpTemplate.convertAndSend("spring.topic.exchange", "user.delete", "删除用户");
amqpTemplate.convertAndSend("spring.topic.exchange", "student.insert", "新增学生");
amqpTemplate.convertAndSend("spring.topic.exchange", "student.delete", "删除学生");
Thread.sleep(5000);
}
接收消息
/**
* Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
*/
@Component
public class TopicListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.topic.queue", durable = "true"),
exchange = @Exchange(
value = "spring.topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"user.*"}
))
public void listen(String msg) {
System.out.println("TopicListener User 接收到消息:" + msg);
}
// 通配规则不同,接收不到消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.topic.queue", durable = "true"),
exchange = @Exchange(
value = "spring.topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"student.*"}
))
public void listen2(String msg) {
System.out.println("TopicListener Student 接收到消息:" + msg);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。