赞
踩
第一步:使用@RabbitListener注解声明交换机、队列、BindingKey
因此解决这种我们发送的对象消息被序列化的问题,我们就需要使用消息转换器了:
什么是SpringAAMQP:
一句话,就是简化刚才消息提供者/消费者向MQ队列发送/订阅获取消息的代码的(我们前面的笔记知道,那些代码步骤太杂了,还要创建连接,创建通道等......)
就是简化刚才我们写的那个消息提供者向RabbitMQ消息队列发送消息,和消息消费者向RabbitMQ消息队列中订阅获取暂存在RabbitMQ消息队列中的消息代码 (简化建立联系、创建通道等那些代码)
第一步:
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
第二步:
操作如下所示:
消息发送之后,就会发现我们simple.queue队列中有了这个hello,spring amqp! 消息了:
值得注意的是:如果没有这个simple.queue队列的话,那么我们消息发送者向simple.queue队列中发送消息是发送不成功的,不会给我们自动新建simple.queue队列。
同理第一步也是先看看父工程中是否导入了SpringAMQP的依赖。
第二步:
具体代码演示如下所示:
然后运行我们的消息消费者启动类,会发现确实拿到了RabbitMQ消息队列中的消息了:
演示如下所示:
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- /**
- * 回忆基础: 首先我们要知道 Thread.sleep(20); 中 20对应的是毫秒, 换算单位:20毫秒 = 0.02秒 ( 毫秒 × 1000 = 秒 )
- *
- * 我们下面知道循环50次向队列中发送消息,并且每发送一条消息睡眠0.02秒,那么【理论】上我们消息提供者
- * 向队列中发送完这50次消息需要耗时1s (也就是说一秒中把这五十条消息发送到队列当中去)
- *
- */
- @Test
- public void testSendMessage2WorkQueue() throws InterruptedException {
- String queueName = "simple.queue"; // 也就是说消息发送者向这个simple.queue队列中发送下面的消息
- String message = "hello, message__";
- for (int i = 1; i <= 50; i++) {
- rabbitTemplate.convertAndSend(queueName, message + i); // 把 message + i 消息发送到simple.queue队列当中
- Thread.sleep(20);
- }
- }
最终我们会发现simple.queue队列中确实暂存了消息提供者发送的50条消息了:
- @Component
- public class SpringRabbitListener {
-
- /**
- * 假设这个是消费者1:
- * 消费者1每0.02s消费simple.queue队列中一条消息
- *
- * (队列中50条消息的话理论上消费者1用1s的时间就能消费完了,理论上说
- * 消息提供者每1s发送50条消息,而消费者1每1s消费50条消息,理论上队列中不会堆积消息了)
- */
- @RabbitListener(queues = "simple.queue") // 也就是说这个消息消费者就消费这个simple.queue队列中的消息数据
- public void listenWorkQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
- Thread.sleep(20);
- }
-
-
- /**
- * 假设这个是消费者2:
- * 消费者2每0.2s才消费simple.queue队列中的一条消息,就算是1s钟也就才消费了队列中5条消息
- *
- * 也就是说我们现在simple.queue队列中有50条消息,按理说这50条消息,消费者1消费的快,那么消费者1就应该
- * 打印获取更多的队列中的消息吧,毕竟比消费者2消费的快,能者多劳嘛。
- */
- @RabbitListener(queues = "simple.queue") // 也就是说这个消息消费者就消费这个simple.queue队列中的消息数据
- public void listenWorkQueue2(String msg) throws InterruptedException {
- System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
- Thread.sleep(200);
- }
- }
我们从上面的代码中可以知道,消费者1每0.02秒就能消费一条队列中的消息,而消费者2每0.2秒才消费队列中一条消息,那么消费者1明显比消费者2消费的快,现在我们消息队列中
知道了已经暂存了50条消息了,按理说我们消费者1拿到的消息数据肯定会比消费者2拿到的多。
我们直接开启两个消息消费者进行消费测试(注意:先开启消息消费者,消息提供者再向队列中发送消息,要不然这五十条消息全被消费者1消费掉了):
会发现好家伙,消费者1的消费能力明明比消费者2的能力高的多,但这两个消息消费者还是平均分配这队列中的50条消息了,这是怎么回事呢,这就是消息预取机制。
也就是说上面本来消息消费者1的能力比消息消费者2的消费能力高,但是这两个消息消费者同时消费队列中暂存的消息的时候却还是平均分配消费了,这就是消息预取机制的作用了。
什么是消息预取机制呢:
现在我们假如有这样一个问题:就是能者多劳,明明消息消费者1的消费队列中的消息能力高于消息消费者2的消费能力,消费者1说了凭什么我消费能力比他强还让我们俩个平均消费消息呢,因此我们怎么解决这种平均分配消息呢,也就是说谁消费能力高谁就消费消息的多,
做法如下所示:
这个配置成1:也就是说你俩都别预取了,你俩谁消费完1个后(如果设置的是2那么就消费完2个后)再来我队列中拿消息把,这样消费者1的消费能力高那么消费者1肯定就会去队列中拿的消息数据多了。
首先我们前面讲过的hello world模型和 Work Queue 工作模型实现消息发送和消费的过程都是如下所示的:(都是消息提供者先把消息发送到queue队列当中,然后消息消费者获取消费queue队列中暂存的消息 【工作模型的话消费者是两个】)
总结:
也就是说订阅模型相比着前面两种模型来说:
前面两种模型都是消息提供者先把消息发送到queue队列中,而订阅模型的话则是消息提供者先把消息发送给exchange交换机,然后exchange交换机通过路由的方式把消息路由发送给队列中。其他的过程都是一样的(也就是说只有消息提供者是把消息发送给exchange交换机然后路由给绑定的队列当中了,而消息消费者消费数据的时候还是直接从队列当中拿消息进行消费的)。
注意1:exchange交换机只负责路由消息提供者发送的消息到队列中,而不储存消息数据,如果路由失败的话,那么消息数据就直接丢失了。
注意2:exchange交换机有三个类型:Fanout(广播)、Direct(路由)、Topic(话题)
总结一句话:
也就是说我们知道了订阅模型的话是通过先把消息发送者发送的消息发送给交换机中,然后通过路由到队列当中的进行暂存数据的,如何路由到队列中:只有队列和交换机绑定了关系那么队列才能收到交换机中路由的消息数据(比如上面的queue1、queue2两个队列,只要这两个队列都和交换机绑定了关系了,那么这两个队列中就能收到暂存到消息发送者publisher发送的消息了,最终消息消费者就可以在queue1或者queue2队列中获取到消息发送者发送的消息了,也就是说在这两个队列中都可以获取到消息发送者发送的消息咯)
首先我们知道消息发送者是先把消息发送到交换机当中的,并且把消息数据路由到和这个交换机绑定了关系的队列当中的,因此我们第一步就是先在消息消费者中把交换机和队列创建声明出来,并且让这个交换机和队列绑定好关系。
- package cn.itcast.mq.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration // 这个配置类注解一定要加上
- public class FanoutConfig {
- /**
- * : 也就是说声明创建一个itcast.fanout为名的fanout类型的交换机
- */
- @Bean
- public FanoutExchange fanoutExchange(){
- return new FanoutExchange("itcast.fanout"); // 这里面填写的就是创建的交换机的名字
- }
-
- /**
- * : 也就是说声明创建一个fanout.queue1为名的队列
- */
- @Bean
- public Queue fanoutQueue1(){
- return new Queue("fanout.queue1"); // 这里面填写的就是创建的队列的名字
- }
-
- /**
- * : 也就是说把fanout.queue1为名的队列和itcast.fanout为名的fanout类型的交换机绑定上关系。
- * (绑定上关系的话,那么这个队列跟着交换机大哥混,等交换机大哥收到消息发送者发送的消息后,交换机大哥就会把这个消息路由给这个队列中进行暂存消息数据)
- */
- @Bean
- public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ // 注意:这里的参数一个是队列的方法名,一个是交换机的方法名
- return BindingBuilder // 然后通过这个BindingBuilder对象就可以把上面的队列和交换机进行绑定了。
- .bind(fanoutQueue1)
- .to(fanoutExchange);
- }
-
- /**
- * : 也就是说再声明创建一个fanout.queue2为名的队列
- */
- @Bean
- public Queue fanoutQueue2(){
- return new Queue("fanout.queue2");
- }
-
- /**
- * : 也就是说把fanout.queue2为名的队列和itcast.fanout为名的fanout类型的交换机绑定上关系。
- * (同理绑定上关系的话,那么这个队列跟着交换机大哥混,等交换机大哥收到消息发送者发送的消息后,交换机大哥就会把这个消息路由给这个队列中进行暂存消息数据)
- */
- @Bean
- public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
- return BindingBuilder
- .bind(fanoutQueue2)
- .to(fanoutExchange);
- }
-
- }
通过上面写的这个配置类,那么我们就知道了现在交换机和两个队列都创建成功了,并且这两个队列和交换机也绑定上关系了。
绑定上关系后:也就是说比如消息发送者发送了一个hello every one的消息,那么此时这个消息就会发送到交换机上了,然后因为这两个队列和交换机绑定了关系,因此这两个队列都能路由到交换机中的消息发送者发送的消息(也就是说此时这两个队列当中都暂存到了消息发送者发送的hello every one的消息数据了)。然后最终消息消费者就可以对任意一个队列中订阅获取队列中暂存的hello every one的消息数据咯。
然后我们启动消息消费者的启动类:
就会发现,我们声明创建的itcast.fanout为名的交换机和fanout.queue1、fanout.queue2为名的两个队列在RabbitMQ上确实给我们创建成功了并且这个交换机确实也和这两个队列绑定好关系了。
我们知道两个队列创建好之后,那么我们消息消费者就可以向队列中订阅消费队列中的暂存的消息数据了,我们这里假设有两个消费者A、B,A消息消费者消费queue1队列中的消息数据,B消息消费者消费queue2队列中的消息数据
(会发现这两个消息消费者都能获取到消息发送者发送的消息数据,因为这两个队列都绑定了交换机,那么交换机路由时会把消息发送者的数据分别给这两个队列各发送一份)
因此就在消息消费者中写代码逻辑:
注意:一定要先保证消费者是开启状态,然后再写消息发送者进行向队列中发送消息数据(等队列中有消息的时候,这里消费者就可以直接消费队列中的消息数据了)
我们知道交换机和队列还有消息消费者都准备好之后,那么我们就可以新建一个消息发送者向交换机当中发送消息数据了(然后路由给我们绑定关系的队列中),最后供消息消费者消费消息数据。
因此我们就要在消息发送者模块中写逻辑了:
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
-
- @Test
- public void testSendFanoutExchange() {
- // 交换机名称
- String exchangeName = "itcast.fanout"; // 向刚才我们创建的itcast.fanout为名的交换机中发送"hello, every one!"
- // 的消息数据
- // 消息
- String message = "hello, every one!";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "", message); // 向交换机中发送消息
- }
因此通过上面的三个步骤的操作,我们知道消费者是开启状态的,等着队列中有消息数据的时候就会消费队列中的消息数据,那么我们第三步通过向队列中发送消息后,那么交换机就会把这条消息发送者发送的消息路由给queue1和queue2绑定的两个队列当中了,那么我们消费者A和消费者B此时就获取消费到了队列中的消息发送者发送的消息数据了:
修改:上面利用 @RabbitListener 声明的是Bindingkey
- @Component
- public class SpringRabbitListener {
-
- /**
- * 第一步:使用@RabbitListener注解声明一个名为itcast.direct1的direct路由模式的交换机、
- * 交换机有了之后,我们就可以使用@Queue注解声明创建队列了,我们这里就创建了一个direct.queue1为名的队列
- * 并且这个队列绑定key(也就是说BindingKey)绑定了red和blue两种。
- *
- * 也就是说等会只要交换机接收的消息对应的RoutingKey是blue或者red,那么这个交换机就会把这个消息路由给这个direct.queue1为名的队列
- * (因为这个队列BindingKey也为blue和red,因此只要和交换机接收的消息对应的RoutingKey一致的话,那么该队列就能路由到消息)
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1"),
- exchange = @Exchange(name = "itcast.direct1", type = ExchangeTypes.DIRECT), // type的作用就是声明这个交换机是direct路由的模式
- key = {"red", "blue"} // 该direct.queue1队列的BindingKey为red和blue // 还可以用 type = ”direct“ 和上面用枚举的效果是一样的
- ))
-
- // 这个消息消费者直接就对这个direct.queue1为名的队列中暂存的消息进行消费 (可以理解为上面的direct.queue1队列注解在这个消费者方法上
- // 所以就对这个direct.queue1队列进行消费)
- public void listenDirectQueue1(String msg){ // 注意:这个消费接收到的消息类型要保证和队列中的消息数据类型一致(String类型)
- System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
- }
-
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2"), // 同理这里又声明创建了一个direct.queue2为名的队列
- exchange = @Exchange(name = "itcast.direct1", type = ExchangeTypes.DIRECT), // 绑定声明的交换机还是itcast.direct1为名的交换机
- key = {"red", "yellow"} // 并且为这个direct.queue2为名的队列设置的BindingKey为red和yellow
- ))
- public void listenDirectQueue2(String msg){
- System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
- }
- }
通过使用这个@RabbitListener注解声明,我们知道了我们声明创建成功了一个"itcast.direct1"为名的交换机了,并且两个队列direct.queue1、direct.queue2也都声明创建成功了,而且这个direct.queue1队列我们绑定的BindingKey为blue和red,direct.queue2队列
我们绑定的BindingKey为red和yellow:
我们刚才上面分析知道了,只有当交换机接收到的消息对应的RoutingKey和队列绑定的BindingKey一致的时候,那么该队列才能接收到交换机路由的消息数据
我们现在假定消息发送者向 itcast.direct1"为名的交换机中发送的消息对应的RoutingKey为blue,我们知道两个队列中只有direct.queue1队列我们绑定的BindingKey有这个blue,因此这个队列和消息的RoutingKey是一致的,所以这个队列能接收到交换机路由的这个消息发送者发送的消息数据。(而direct.queue2这个队列中绑定的BindingKey没有和这个消息的RoutingKey一致的key数据,因此这个direct.queue2队列就接收不到交换机路由的消息数据了。
也就是说:交换机说了你们队列谁和我的规则一样我就把消息路由给谁,不一样你还想要消息数据?)
代码演示如下所示:
会发现还真的只有direct.queue1队列接收到了交换机路由的消息发送者发送的消息数据了:
我们现在假定消息发送者向 itcast.direct1"为名的交换机中发送的消息对应的RoutingKey为red,我们知道两个队列中绑定的BindingKey都有这个red,因此这两个队列和消息的RoutingKey都是一致的,所以这两个队列都能接收到交换机路由的这个消息发送者发送的消息数据。
总结:TopicExchange交换机和上面的DirectExchange交换机的区别两点:
1、TopicExchange交换机消息对应的Routingkey必须是多个单词列表,并且以.分割
2、TopicExchange交换机中Bindingkey可以使用通配符的方式
除了这两点外其他的代码逻辑都是一样的。
- @Component
- public class SpringRabbitListener {
-
- /**
- * 第一步:使用@RabbitListener注解声明一个名为itcast.topic1的topic模式的交换机、
- * 交换机有了之后,我们就可以使用@Queue注解声明创建队列了,我们这里就创建了一个topic.queue1为名的队列
- * 并且这个队列绑定key(也就是说BindingKey)绑定了red和blue两种。
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue1"),
- exchange = @Exchange(name = "itcast.topic1", type = ExchangeTypes.TOPIC), // type的作用就是声明这个交换机是topic1模式
- key = "china.#" // 该topic.queue1队列的BindingKey为"china.#" // 还可以用 type = ”TOPIC“ 和上面用枚举的效果是一样的
- ))
-
- // 这个消息消费者直接就对这个direct.queue1为名的队列中暂存的消息进行消费 (可以理解为上面的direct.queue1队列注解在这个消费者方法上
- // 所以就对这个direct.queue1队列进行消费)
- public void listenDirectQueue1(String msg){ // 注意:这个消费接收到的消息类型要保证和队列中的消息数据类型一致(String类型)
- System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
- }
-
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue2"), // 同理这里又声明创建了一个direct.queue2为名的队列
- exchange = @Exchange(name = "itcast.topic1", type = ExchangeTypes.TOPIC), // 绑定声明的交换机还是itcast.topic1为名的交换机
- key = "#.news" // 并且为这个topic.queue2为名的队列设置的BindingKey为 "#.news"
- ))
- public void listenDirectQueue2(String msg){
- System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
- }
- }
注:queue2队列对应的bindingkey为 #.news的格式,因此消息发送者发送的消息RoutingKey格式为 china.weather格式的时候,这个queue2对应的bindingkey和RoutingKey是不会相同的,只有queue1队列对应的bindingkey为 china.#的格式才和这个消息RoutingKey相同一致,因此这条消息交换机就只会路由到queue1队列当中,不会路由到queue2队列当中
我们就拿简单的hello world 模型进行演示问题所在:
我们知道hello world模型,我们消息发送者只需要直接把消息发送给队列即可,然后消息消费者直接在队列中订阅获取数据就可以了,因此相比着需要和那几个绑定交换机的简单多了,因此我们就拿这个hello world模型进行代码演示问题把:
问题就是我们上面五种模型中,我们都是往队列中发送的是String字符串类型的消息数据(如下面的hello spring amqp!),
这里仅以hello world最简单的模型为例演示:
现在呢就是说我们能不能向队列中发送消息的时候以对象的形式发送呢,比如说以map集合对象的形式往队列中发送消息 :
原因就是我们RabbitMQ只支持字节的形式消息数据,而我们刚才代码中向队列中发送消息时spring是支持向队列中发送对象形式的消息的,而RabbitMQ只支持字节的形式的数据因此我们对象形式的数据就被序列化成字节形式的数据了,因此在队列中查看到的数据就是字节的形式了。(也就是说对象序列化成字节了)
注意:虽然现在在队列中展示的是对象形式的消息数据了,但是其实还是字节的形式数据
因此,我们消息消费者如果想要接收获取这个队列中上面这个对象形式的消息的话(实际上还是字节的形式数据),还需要用到消息转换器:
消息转换器: 把队列中字节形式消息 ---- 序列化-----成对象的形式供消息消费者使用
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。