当前位置:   article > 正文

八、SpringCloud-RabbitMQ + Spring AMQP 消息队列_rabbitmq 版本控制

rabbitmq 版本控制

RabbitMQ + Spring AMQP 消息队列

一、异步通讯

我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响应的库存并准备发货。

在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。

订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。

为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。

在这里插入图片描述
Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。

1、好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速

  • 故障隔离:服务没有直接调用,不存在级联失败问题

  • 调用间没有阻塞,不会造成无效的资源占用

  • 耦合度极低,每个服务都可以灵活插拔,可替换

  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

2、缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理
  • 需要依赖于Broker的可靠、安全、性能

在这里插入图片描述
RabbitMQ中的一些角色:

  • publisher:生产者
  • consumer:消费者
  • exchange个:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

二、RabbitMQ消息模型

在这里插入图片描述

三、SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

四、使用SpringAMQP实现几种不同RabbitMQ消息模型的用法

1、Basic Queue 简单队列模型

1)创建子工程 publisher 和 consumer
1.1)在pom.xml 中引入版本控制依赖
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!--amqp依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
1.2)编写启动类
@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class, args);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
1.3)添加配置文件 application.yaml
server:
  port: 8015
spring:
  application:
    name: publisher
  cloud:
    nacos:
      discovery:
        server-addr: http://127.0.0.1:8868   # 注册中心  http://ip:端口号
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
server:
  port: 8016
spring:
  application:
    name: consumer
  cloud:
    nacos:
      discovery:
        server-addr: http://127.0.0.1:8868   # 注册中心  http://ip:端口号
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
2)在 publisher 子工程中 编写controller
@RestController
@RequestMapping("/publisher/basic")
public class BasicQueueController {

   @Resource
   private BasicQueueMessage basicQueueMessage;

   @GetMapping("/queue")
   public void basicQueue(@RequestParam("message")String message) {

       basicQueueMessage.sendBasicQueueMessage(message);

   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
3)编写message
3.1)编写 message 接口
public interface BasicQueueMessage {
   void sendBasicQueueMessage(String message);
}
  • 1
  • 2
  • 3
3.2)编写 message 实现类
@Service
public class BasicQueueMessageImpl implements BasicQueueMessage {

    String queueName = "basic_queue_message";

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendBasicQueueMessage(String message) {

        rabbitTemplate.convertAndSend(queueName,message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
4) 在 consumer 子工程中 编写 listener
@Component
public class BasicQueueListener {
    
    @RabbitListener(queuesToDeclare = @Queue(name = "basic_queue_message"))
    public void basicQueueConsumer(String message){
        System.out.println("message = " + message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
5) gateway 网关添加 publisher 路由规则
spring:
  cloud:
    nacos:
      server-addr: localhost:8868 # nacos地址
    gateway:
      routes: # 网关路由配置
        - id: publisher
          uri: lb://publisher
          predicates:
            - Path=/publisher/**
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
6)测试 Basic Queue 简单队列模型

在这里插入图片描述
在这里插入图片描述

2、WorkQueue 工作队列模型

在这里插入图片描述

1)在 publisher 子工程中 编写controller
@RestController
@RequestMapping("/publisher/work")
public class WorkQueueController {


    @Resource
    private WorkQueueMessage workQueueMessage;

    @GetMapping("/queue")
    public void basicQueue(@RequestParam("message")String message) throws Exception {

        workQueueMessage.sendWorkQueueMessage(message);

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
2)编写message
2.1) 编写 message 接口
public interface WorkQueueMessage {
   void sendWorkQueueMessage(String message) throws Exception;
}
  • 1
  • 2
  • 3
2.2)编写 message 实现类
@Service
public class WorkQueueMessageImpl implements WorkQueueMessage {

   @Resource
   private RabbitTemplate rabbitTemplate;

   String queueName = "work_queue_message";

   @Override
   public void sendWorkQueueMessage(String message) throws Exception {
       for (int i = 0; i < 50; i++) {
           rabbitTemplate.convertAndSend(queueName,message +" -- "+i);
           Thread.sleep(20);
       }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
3)在 consumer 子工程中 编写 listener
@Component
public class WorkQueueListener {


    @RabbitListener(queuesToDeclare = @Queue(name = "work_queue_message"))
    public void workQueueConsumer1(String message) throws Exception {
        System.out.println("message1 = " +" 【 "+ message +" 】 "+ LocalTime.now());
        Thread.sleep(20);
    }


    @RabbitListener(queuesToDeclare = @Queue(name = "work_queue_message"))
    public void workQueueConsumer2(String message) throws Exception {
        System.err.println("message2 = " +" 【 "+ message +" 】 "+ LocalTime.now());
        Thread.sleep(50);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
4)测试 WorkQueue 工作队列模型

在这里插入图片描述

3、发布/订阅介绍

在这里插入图片描述
在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

4、Fanout 广播模型

在这里插入图片描述

1)在 publisher 子工程中 编写controller
@RestController
@RequestMapping("/publisher/fanout")
public class FanoutQueueController {

    @Resource
    private FanoutQueueMessage fanoutQueueMessage;

    @GetMapping("/queue")
    public void fanoutQueue(@RequestParam("message")String message) throws Exception {

            fanoutQueueMessage.sendFanoutQueueMessage(message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
2)编写 message
2.1)编写 message接口
public interface FanoutQueueMessage {
   void sendFanoutQueueMessage(String message) throws Exception;
}
  • 1
  • 2
  • 3
2.2)编写 message 实现类
@Service
public class FanoutQueueMessageImpl implements FanoutQueueMessage {

    final String fanoutExchangeName = "fanout_exchange";

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendFanoutQueueMessage(String message) throws Exception {
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend(fanoutExchangeName,"",message+" -- "+i);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
3)在 consumer 子工程中 编写 listener
@Component
public class FanoutQueueListener {


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout_queue_message1"),
            exchange = @Exchange(name = "fanout_exchange",type = ExchangeTypes.FANOUT)
    ))
    public void fanoutQueueConsumer1(String message) throws Exception {
        System.out.println("message1 = " +" 【 "+ message +" 】 "+ LocalTime.now());

    }
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout_queue_message1"),
            exchange = @Exchange(name = "fanout_exchange",type = ExchangeTypes.FANOUT)
    ))
    public void fanoutQueueConsumer3(String message) throws Exception {
        System.out.println("message3 = " +" 【 "+ message +" 】 "+ LocalTime.now());

    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout_queue_message2"),
            exchange = @Exchange(name = "fanout_exchange",type = ExchangeTypes.FANOUT)
    ))
    public void fanoutQueueConsumer2(String message) throws Exception {
        System.err.println("message2 = " +" 【 "+ message +" 】 "+ LocalTime.now());

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
4)测试 Fanout 广播模型

在这里插入图片描述

5、Direct 路由模型

在这里插入图片描述
在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
1)在 publisher 子工程中 编写controller
@RestController
@RequestMapping("/publisher/direct")
public class DirectQueueController {
    @Resource
    private DirectQueueMessage directQueueMessage;

    @GetMapping("/queue")
    public void directQueue(@RequestParam("message")String message) throws Exception {

        directQueueMessage.sendDirectQueueMessage(message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
2)编写 message
2.1)编写 message 接口
public interface DirectQueueMessage {

   void sendDirectQueueMessage(String message);
}
  • 1
  • 2
  • 3
  • 4
2.2)编写 message 实现类
@Service
public class DirectQueueMessageImpl implements DirectQueueMessage {

   @Resource
   private RabbitTemplate rabbitTemplate;

   final String directExchangeName = "direct_exchange";

   @Override
   public void sendDirectQueueMessage(String message) {
       for (int i = 1; i <= 50; i++) {
           if (i<15) {
               rabbitTemplate.convertAndSend(directExchangeName, "red",message +"--routingKey-->" +"red  "+i);
           } else if (i > 17&& i<34) {
               rabbitTemplate.convertAndSend(directExchangeName, "yellow",message+"--routingKey-->" +"yellow  "+i);
           }else{
               rabbitTemplate.convertAndSend(directExchangeName, "blue",message+"--routingKey-->" +"blue  "+i);
           }


       }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
3)在 consumer 子工程中 编写 listener
@Component
public class DirectQueueLitener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("direct_queue_message1"),
            exchange = @Exchange(name = "direct_exchange",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}))
    public void directQueueConsumer1(String message) {
        System.out.println("message1 -> consumer - red 、blue" +" 【 "+ message +" 】 "+ LocalTime.now());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("direct_queue_message2"),
            exchange = @Exchange(name = "direct_exchange",type = ExchangeTypes.DIRECT),
            key = {"yellow","blue"}))
    public void directQueueConsumer2(String message) {
        System.err.println("message2 -> consumer - yellow 、blue" +" 【 "+ message +" 】 "+ LocalTime.now());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
4) 测试 Direct 路由模型

在这里插入图片描述

6、Topic 主题模型

1)Topic 主题模型通配符

Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

1.1)通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词
在这里插入图片描述

2)在 publisher 子工程中 编写controller
@RestController
@RequestMapping("/publisher/topic")
public class TopicQueueController {

    @Resource
    private TopicQueueMessage topicQueueMessage;

    @GetMapping("/queue")
    public void topicQueue(@RequestParam("message")String message) throws Exception {

        topicQueueMessage.sendTopicQueueMessage(message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3)编写 message
3.1)编写 message 接口
public interface TopicQueueMessage {
   void sendTopicQueueMessage(String message);
}
  • 1
  • 2
  • 3
3.2)编写 message 实现类
@Service
public class TopicQueueMessageImpl implements TopicQueueMessage {

   @Resource
   private RabbitTemplate rabbitTemplate;

   final String topicExchangeName = "topic_exchange";

   @Override
   public void sendTopicQueueMessage(String message) {
       for (int i = 1; i <= 50; i++) {
           if (i < 17) {
               rabbitTemplate.convertAndSend(topicExchangeName, "new.message", message + "--routingKey-->" + "new.message  " + i);
           } else if (i > 37) {
               rabbitTemplate.convertAndSend(topicExchangeName, "old.message", message + "--routingKey-->" + "old.message  " + i);
           } else {
               rabbitTemplate.convertAndSend(topicExchangeName, "china.news", message + "--routingKey-->" + "china.news  " + i);

           }
       }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
4)在 consumer 子工程中 编写 listener
@Component
public class TopicQueueListener {
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic_queue_message1"),
            exchange = @Exchange(name = "topic_exchange",type = ExchangeTypes.TOPIC),
            key = {"old.#"}))
    public void directQueueConsumer1(String message) {
        System.out.println("message1 -> consumer - old.# " +" 【 "+ message +" 】 "+ LocalTime.now());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic_queue_message2"),
            exchange = @Exchange(name = "topic_exchange",type = ExchangeTypes.TOPIC),
            key = {"new.#","china.#"}))
    public void directQueueConsumer2(String message) {
        System.err.println("message2 -> consumer - new.# 、china.#" +" 【 "+ message +" 】 "+ LocalTime.now());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
5)测试 Topic 主题模型

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/582176
推荐阅读
相关标签
  

闽ICP备14008679号