赞
踩
RabbitMQ中的一些角色:
RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型:
**Basic Queue:**简单队列模型
在publisher方编写一个测试方法,向rabbitmq发送消息。
@Test
void testSendMessage2SimpleQueue() {
String queueName = "simple.queue";
String message = "hello,spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
在consumer方的类下编写一个接收信息的方法。
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
}
注意::consumer方想要接收信息只能运行整个项目,而publisher方只需运行测试方法。
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
使用 Basic Queue
时,如果消息的生产速度大于了消费速度,长此以往,就会导致队列堆积太多的消息而无法处理。那么就可以使用 Work Queue
来解决这一个问题,原理是 人多力量大
,多个消费者共同处理消息处理,速度就能大大提高了。
我们循环发送消息来模拟消息堆积的情况。
在publisher方编写一个测试方法,循环发送50次信息,每次间隔20ms,这样就能在1s内发送50条信息。
@Test
void testSendMessage2WorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello,message__";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
模拟2个性能不同的消费者共同接收消息,第一个消费者消费能力更强,假设他20ms就能接收一条消息,而第二个消费者消费能力较弱,200ms才能接收一条消息。
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到simple.queue的消息:【" + msg + "】"+ LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到simple.queue的消息:【" + msg + "】"+LocalTime.now());
Thread.sleep(200);
}
我们的理想状态是消费者1处理的消息大约是消费者2的10倍。但是运行结果(展示部分)却出乎了意料:
我们发现消费者1接收的都是 奇数 位的消息,而消费者2接收的都是 偶数 位的消息。并且两者都接收了25条消息,刚好均分了。这说明消费者处理消息的默认原则是取到信息,先将信息均分给消费者,然后让他们各自去消费。
我们明显不是这个意思,我们想的是“能者多劳”,让处理速度更快的消费者处理更多的消息。
为此,我们只需要在消费者方的配置文件中添加这么一段配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
再次进行相同的测试,发现结果果然如我们预料,消费者1处理的信息数量大约是消费者2的十倍:
①多个消费者绑定一个队列,队列中的一条消息只能被一个消费者处理。
②可以通过设置prefetch来控制消费者预取的消息数量
交换机的三种类型:
routing key
的队列routing pattern
(路由模式)的队列注意: 交换机只负责转发消息,不具备存储消息的能力。所以如果没有队列和交换机绑定或者没有符合路由规则的队列,那么消息就会被丢失。
Fanout直译为”扇出“,在mq中叫”广播“更合适。
广播模式下,消息的发送流程:
需求:
1)创建一个交换机 itcast.fanout,类型是Fanout
2)创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout
3)消费者订阅队列
4)生产者发送消息
Spring提供了一个接口Exchange,来表示所有不同类型的交换机:
实践:
1)在消费者的中新建一个配置类来声明队列和交换机:
@Configuration public class FanoutConfig { //声明一个广播交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("itcast.fanout"); } //队列一 @Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1"); } //队列二 @Bean public Queue fanoutQueue2() { return new Queue("fanout.queue2"); } //绑定队列一和广播交换机 @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } //绑定队列二和广播交换机 @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
2)在消费者的消息接收类中添加两个方法:
//广播
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】"+LocalTime.now());
}
//广播
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到fanout.queue2的消息:【" + msg + "】"+LocalTime.now());
}
3)消费者编写消息发送测试方法:
@Test
void testSendFanoutExchange() {
//交换机名称
String exchangeName = "itcast.fanout";
//消息
String message = "hello,every one";
//发送
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
声明队列、交换机、绑定关系的Bean类型分别是什么?
Direct与Fanout的区别在于Direct可以通过特殊规则指定给哪个队列发送消息或者不给哪个队列发消息。
注意:
RoutingKey
(路由key)RoutingKey
。RoutingKey
和 队列的 RoutingKey
相同时,交换机才会将消息发送给该队列。需求:
1)利用注解 @RabbitListener
声明交换机,队列和 RoutingKey
。
2)在消费者方编写两个方法,监听direct.queue1和direct.queue2 。
3)在生产者方编写测试方法发送消息。
实践:
1)利用注解 @RabbitListener
声明交换机,队列和 RoutingKey
。
2)在消费者方编写两个方法,监听direct.queue1和direct.queue2 。
//路由 @RabbitListener(bindings =@QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"red","blue"} )) public void listenDirectQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } //路由 @RabbitListener(bindings =@QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"red","yellow"} )) public void listenDirectQueue2(String msg) throws InterruptedException { System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); }
3)在生产者方编写测试方法发送消息,指定 RoutingKey
为red,那么两个队列都能接收到消息,若 RoutingKey
为yellow,那么只有队列2才能接收到消息。
@Test
void testSendDirectExchange() {
//交换机名称
String exchangeName = "itcast.direct";
//消息
String message = "hello,red";
//发送
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
Direct类型和Fanout类型的差异:
RoutingKey
将消息路由到指定队列RoutingKey
,那么Direct的功能和Fanout相同。基于@RabbitListener注解声明队列和交换机的常用注解:
Topic与Direct类似,都可以使用 RoutingKey
把消息路由到不同队列,但是 Topic
的 RoutingKey
还可以使用通配符来简化书写。
Topic的 RoutingKey
通常由多个单词组成,每个单词之间用 .
分隔。
通配符的规则;
#
表示匹配1个或多个单词*
表示只匹配1个单词如:china.#
可以表示 china.news.weather
或者 china.news
,而 china.*
只能表示 china.news
或 china.weather
解释:
china.#
,因此凡是以 china.
开头的routing key
都会被匹配到。包括china.news和china.weather#.news
,因此凡是以 .news
结尾的 routing key
都会被匹配。包括china.news和japan.news需求:
1)利用注解 @RabbitListener
声明交换机,队列和 RoutingKey
。
2)在消费者方编写两个方法,监听topic.queue1和topic.queue2 。
3)在生产者方编写测试方法发送消息。
实践:
1)利用注解 @RabbitListener
声明交换机,队列和 RoutingKey
。
2)在消费者方编写两个方法,监听topic.queue1和topic.queue2 。
//话题 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); } //话题 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg) throws InterruptedException { System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); }
3)在生产者方编写测试方法发送消息。
@Test
void testSendTopicExchange() {
//交换机名称
String exchangeName = "itcast.topic";
//消息
String message = "hello,topic";
//发送
rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}
Direct交换机与Topic交换机的差异:
**.**
分割#
:代表0个或多个词*
:代表1个词关闭消费者方服务,我们在生成者方编写一个测试方法发送一个Map对象:
@Test
void testSendObjectQueue() {
HashMap<String, Object> map = new HashMap<>();
map.put("name","刘亦菲");
map.put("age",16);
rabbitTemplate.convertAndSend("object.queue",map);
}
我们在mq的管理后台发现消息长这样:
这明显不是我们想要的,导致消息变成这样是因为序列化器的原因,mq消息仅支持字节传输,但amqp提供的接口支持对象,由此导致长串乱码。
解决这一问题,我们需要配置新的序列化器。
我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在生成者和消费者方都引入依赖,或者直接在两者的父工程中引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
在消费者和生产者的配置类中都加入如下bean:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
再次运行生产者的测试方法,结果如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。