当前位置:   article > 正文

SpringAMQP消息队列(SpringBoot集成RabbitMQ)_spring-boot-starter-amqp

spring-boot-starter-amqp

一、初始配置

1、导入maven坐标

  1. <!--rabbitmq-->
  2.     <dependency>
  3.         <groupId>org.springframework.boot</groupId>
  4.         <artifactId>spring-boot-starter-amqp</artifactId>
  5.     </dependency>

2、yml配置

  1. spring:
  2.     rabbitmq:
  3.     host: 你的rabbitmq的ip
  4.     port: 5672
  5.     username: guest
  6.     password: guest

二、基本消息队列

1、创建队列

访问接口:http://localhost:15672,账号密码都为guest

进入后左下角有Add queue添加队列,我已添加队列为MqTest1

2、发布消息

  1. @SpringBootTest
  2. class RabbitMQDemoPublishApplicationTests {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @Test
  6. void contextLoads() {
  7. String queue="MqTest1";
  8. String message="message1";
  9. rabbitTemplate.convertAndSend(queue,message);
  10. }
  11. }

此时可以看到队列有一个消息

3、接受消息

  1. package com.rabbitmqdemoconsumer.rabbitmq;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class SpringRabbitLeistener {
  6. @RabbitListener(queues = "MqTest1")
  7. public void listenSimpleQueueMessage(String msg){
  8. System.out.println("接收到的消息:"+msg);
  9. }
  10. }

此时控制台输出接收到的消息

三、工作消息队列(Work Queue)

可以提高消息处理速度,避免队列消息堆积

1、发布消息

  1. @SpringBootTest
  2. class RabbitMQDemoPublishApplicationTests {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @Test
  6. void contextLoads() {
  7. String queue="MqTest1";
  8. String message="message1";
  9. for (int i=0;i<10;i++){
  10. rabbitTemplate.convertAndSend(queue,message);
  11. }
  12. }
  13. }

此时队列有10条消息

2、接受消息

  1. package com.rabbitmqdemoconsumer.rabbitmq;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class SpringRabbitLeistener {
  6. @RabbitListener(queues = "MqTest1")
  7. public void listenSimpleQueueMessage1(String msg){
  8. System.out.println("consume1接收到的消息:"+msg);
  9. }
  10. @RabbitListener(queues = "MqTest1")
  11. public void listenSimpleQueueMessage2(String msg){
  12. System.out.println("consume2接收到的消息:"+msg);
  13. }
  14. }

3、控制台输出结果

  1. consume1接收到的消息:message1
  2. consume2接收到的消息:message1
  3. consume1接收到的消息:message1
  4. consume2接收到的消息:message1
  5. consume1接收到的消息:message1
  6. consume2接收到的消息:message1
  7. consume1接收到的消息:message1
  8. consume2接收到的消息:message1
  9. consume1接收到的消息:message1
  10. consume2接收到的消息:message1

4、消息预取问题

但是此时有一个问题就是消息预取,比如队列有10条消息,两个消费者各自直接先预取5个消息,如果一个消费者接受消息的速度慢,一个快,就会导致一个消费者已经完成工作,另一个还在慢慢处理,会造成消息堆积消费者身上,要解决这个问题需要在yml文件配置相关配置

  1. rabbitmq:
  2. host: 43.140.244.236
  3. port: 5672
  4. username: guest
  5. password: guest
  6. virtual-host: /
  7. listener:
  8. simple:
  9. prefetch: 1 #每次只能取一个,处理完才能取下一个消息

这样可以避免消息预取导致堆积

四、发布订阅模式

exchange是交换机,负责消息路由,但不存储消息,路由失败则消息丢失

五、发布订阅模式之广播模式(Fanout)

1、Fanout配置类(@Bean声明)

  1. package com.rabbitmqdemoconsumer.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class FanountConfig {
  10. //交换机声明
  11. @Bean
  12. public FanoutExchange fanoutExchange(){
  13. return new FanoutExchange("FanountExchange");
  14. }
  15. //声明队列1
  16. @Bean
  17. public Queue Fanount_Qeueue1(){
  18. return new Queue("Fanount_Qeueue1");
  19. }
  20. //声明队列2
  21. @Bean
  22. public Queue Fanount_Qeueue2(){
  23. return new Queue("Fanount_Qeueue2");
  24. }
  25. //绑定交换机和队列
  26. @Bean
  27. public Binding bindingFanount_Qeueue1(Queue Fanount_Qeueue1,FanoutExchange fanoutExchange){
  28. return BindingBuilder.bind(Fanount_Qeueue1).to(fanoutExchange);
  29. }
  30. @Bean
  31. public Binding bindingFanount_Qeueue2(Queue Fanount_Qeueue2,FanoutExchange fanoutExchange){
  32. return BindingBuilder.bind(Fanount_Qeueue2).to(fanoutExchange);
  33. }
  34. }

可以看到声明的队列

已经声明的交换机(第一个)

绑定关系

2、发送消息

首先发送10条消息,经过交换机转发到队列

  1. @SpringBootTest
  2. class RabbitMQDemoPublishApplicationTests {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @Test
  6. void contextLoads2() {
  7. String exchange="FanountExchange";
  8. String message="message";
  9. for (int i=0;i<10;i++){
  10. rabbitTemplate.convertAndSend(exchange,"",message);
  11. }
  12. }
  13. }

此时可以看到两个队列各自有十条消息

3、接受消息

  1. //监听交换机Fanount_Qeueue1
  2. @RabbitListener(queues = "Fanount_Qeueue1")
  3. public void listenFanountQeueue1(String msg){
  4. System.out.println("Fanount_Qeueue1接收到的消息:"+msg);
  5. }
  6. //监听交换机Fanount_Qeueue2
  7. @RabbitListener(queues = "Fanount_Qeueue2")
  8. public void listenFanountQeueue2(String msg){
  9. System.out.println("Fanount_Qeueue2接收到的消息:"+msg);
  10. }

控制台结果如下(共发送20条,每个队列10条)

  1. Fanount_Qeueue1接收到的消息:message
  2. Fanount_Qeueue1接收到的消息:message
  3. Fanount_Qeueue1接收到的消息:message
  4. Fanount_Qeueue1接收到的消息:message
  5. Fanount_Qeueue2接收到的消息:message
  6. Fanount_Qeueue1接收到的消息:message
  7. Fanount_Qeueue2接收到的消息:message
  8. Fanount_Qeueue1接收到的消息:message
  9. Fanount_Qeueue2接收到的消息:message
  10. Fanount_Qeueue1接收到的消息:message
  11. Fanount_Qeueue2接收到的消息:message
  12. Fanount_Qeueue1接收到的消息:message
  13. Fanount_Qeueue2接收到的消息:message
  14. Fanount_Qeueue1接收到的消息:message
  15. Fanount_Qeueue2接收到的消息:message
  16. Fanount_Qeueue2接收到的消息:message
  17. Fanount_Qeueue2接收到的消息:message
  18. Fanount_Qeueue2接收到的消息:message

六、发布订阅模式之路由模式(Direct)

会将消息根据规则路由到指定的队列

1、声明(基于@RabbitListener声明)

  1. package com.rabbitmqdemoconsumer.rabbitmq;
  2. import org.springframework.amqp.core.ExchangeTypes;
  3. import org.springframework.amqp.rabbit.annotation.Exchange;
  4. import org.springframework.amqp.rabbit.annotation.Queue;
  5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class SpringRabbitLeistener {
  10. /**
  11. * 绑定交换机和队列,并为key赋值
  12. * @param msg
  13. */
  14. @RabbitListener(bindings = @QueueBinding(
  15. value = @Queue(name = "DirectQueue1"),
  16. exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
  17. key = {"red","blue"}
  18. ))
  19. public void listenDirectQueue1(String msg){
  20. System.out.println("listenDirectQueue1接收到的消息:"+msg);
  21. }
  22. @RabbitListener(bindings = @QueueBinding(
  23. value = @Queue(name = "DirectQueue2"),
  24. exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
  25. key = {"red","yellow"}
  26. ))
  27. public void listenDirectQueue2(String msg){
  28. System.out.println("listenDirectQueue2接收到的消息:"+msg);
  29. }
  30. }

此时可以看到声明的队列

声明的交换机(第一个)

绑定关系

2、发送给blue

发送消息

  1. @SpringBootTest
  2. class RabbitMQDemoPublishApplicationTests {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @Test
  6. void contextLoads2() {
  7. String exchange="DirectExchange";
  8. String message="HelloWorld";
  9. for (int i=0;i<10;i++){
  10. rabbitTemplate.convertAndSend(exchange,"blue",message);
  11. }
  12. }
  13. }

接收消息

  1. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  2. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  3. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  4. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  5. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  6. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  7. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  8. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  9. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  10. listenDirectQueue1(red,blue)接收到的消息:HelloWorld

3、发送给red

发送消息

  1. @SpringBootTest
  2. class RabbitMQDemoPublishApplicationTests {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @Test
  6. void contextLoads2() {
  7. String exchange="DirectExchange";
  8. String message="HelloWorld";
  9. for (int i=0;i<10;i++){
  10. rabbitTemplate.convertAndSend(exchange,"blue",message);
  11. }
  12. }
  13. }

接收消息

  1. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  2. listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
  3. listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
  4. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  5. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  6. listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
  7. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  8. listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
  9. listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
  10. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  11. listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
  12. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  13. listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
  14. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  15. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  16. listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
  17. listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
  18. listenDirectQueue1(red,blue)接收到的消息:HelloWorld
  19. listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
  20. listenDirectQueue1(red,blue)接收到的消息:HelloWorld

七、发布订阅模式之广播模式(Topic)

Queue与Exchange指定BindingKey可以使用通配符:

#:代指0个或多个单词

*:代指一个单词

比如:

bindingkey: china.# ->中国的所有消息

bindingkey: #.weather ->所以国家的天气

1、声明

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "TopicQueue1"),
  3. exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
  4. key = {"china.#"}
  5. ))
  6. public void listenTopicQueue1(String msg){
  7. System.out.println("listenTopicQueue1接收到的消息:"+msg);
  8. }
  9. @RabbitListener(bindings = @QueueBinding(
  10. value = @Queue(name = "TopicQueue2"),
  11. exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
  12. key = {"#.news"}
  13. ))
  14. public void listenTopicQueue2(String msg){
  15. System.out.println("listenTopicQueue2接收到的消息:"+msg);
  16. }

队列

交换机(第四个)

绑定关系

2、发送消息(测试1)

  1. package com.rabbitmqdemo;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. @SpringBootTest
  7. class RabbitMQDemoPublishApplicationTests {
  8. @Autowired
  9. private RabbitTemplate rabbitTemplate;
  10. @Test
  11. void contextLoads2() {
  12. String exchange="TopicExchange";
  13. String message="HelloWorld";
  14. for (int i=0;i<10;i++){
  15. rabbitTemplate.convertAndSend(exchange,"china.news",message);
  16. }
  17. }
  18. }

接收消息

  1. TopicQueue2接收到的消息:HelloWorld
  2. TopicQueue1接收到的消息:HelloWorld
  3. TopicQueue2接收到的消息:HelloWorld
  4. TopicQueue1接收到的消息:HelloWorld
  5. TopicQueue2接收到的消息:HelloWorld
  6. TopicQueue1接收到的消息:HelloWorld
  7. TopicQueue2接收到的消息:HelloWorld
  8. TopicQueue1接收到的消息:HelloWorld
  9. TopicQueue2接收到的消息:HelloWorld
  10. TopicQueue1接收到的消息:HelloWorld
  11. TopicQueue2接收到的消息:HelloWorld
  12. TopicQueue1接收到的消息:HelloWorld
  13. TopicQueue2接收到的消息:HelloWorld
  14. TopicQueue1接收到的消息:HelloWorld
  15. TopicQueue2接收到的消息:HelloWorld
  16. TopicQueue1接收到的消息:HelloWorld
  17. TopicQueue2接收到的消息:HelloWorld
  18. TopicQueue1接收到的消息:HelloWorld
  19. TopicQueue1接收到的消息:HelloWorld
  20. TopicQueue2接收到的消息:HelloWorld

3、发送消息(测试2)

发送消息

  1. package com.rabbitmqdemo;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. @SpringBootTest
  7. class RabbitMQDemoPublishApplicationTests {
  8. @Autowired
  9. private RabbitTemplate rabbitTemplate;
  10. @Test
  11. void contextLoads2() {
  12. String exchange="TopicExchange";
  13. String message="HelloWorld";
  14. for (int i=0;i<10;i++){
  15. rabbitTemplate.convertAndSend(exchange,"china.weather",message);
  16. }
  17. }
  18. }

接收消息

  1. TopicQueue1接收到的消息:HelloWorld
  2. TopicQueue1接收到的消息:HelloWorld
  3. TopicQueue1接收到的消息:HelloWorld
  4. TopicQueue1接收到的消息:HelloWorld
  5. TopicQueue1接收到的消息:HelloWorld
  6. TopicQueue1接收到的消息:HelloWorld
  7. TopicQueue1接收到的消息:HelloWorld
  8. TopicQueue1接收到的消息:HelloWorld
  9. TopicQueue1接收到的消息:HelloWorld
  10. TopicQueue1接收到的消息:HelloWorld

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/139294
推荐阅读
相关标签
  

闽ICP备14008679号

        
cppcmd=keepalive&