当前位置:   article > 正文

微服务(SpringCloud)第四篇之RabbitMQ(消息队列基础篇)_springcloud rabbitmq

springcloud rabbitmq

作者简介:☕️大家好,我是intelligent_M,一个Java后端开发者!
当前专栏:intelligent_M—— 微服务(SpringCloud) ,CSDN博客。

后续会更新Java相关技术栈以及链表哈希表二叉树…,回溯算法贪心算法…等等算法题。
创作不易 欢迎点赞评论!!!


RabbitMQ(基础篇)

同步和异步

  • RabbitMQ是高性能的异步通讯组件
  • 微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢?

我们先来看看什么是同步通讯和异步通讯。如图:
在这里插入图片描述

  • 同步通讯:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。
    在这里插入图片描述

  • 异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。
    在这里插入图片描述

两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。

所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。

同步调用

在这里插入图片描述

  • 同步调用的优势是什么?
    • 时效性强,等待到结果后才返回
  • 同步调用的问题是什么?
  • 1.拓展性差
    在这里插入图片描述

目前的业务相对简单,但是随着业务规模扩大,产品的功能也在不断完善。也就是说每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则,拓展性不好。

  • 2.性能下降

由于我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和:
在这里插入图片描述
假如每个微服务的执行时长都是50ms,则最终整个业务的耗时可能高达300ms,性能太差了。

  • 3.级联失败问题
    由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。
    这其实就是同步调用的级联失败问题。

但是大家思考一下,我们假设用户余额充足,扣款已经成功,此时我们应该确保支付流水单更新为已支付,确保交易成功。毕竟收到手里的钱没道理再退回去吧。因此,这里不能因为短信通知、更新订单状态失败而回滚整个事务。

异步调用

  • 异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
    • 消息发送者:投递消息的人,就是原来的调用方
    • 消息代理:管理,暂存,转发消息,你可以把它理解成微信服务器
    • 消息接收者:接收和处理消息的人,就是原来的服务提供方
      在这里插入图片描述
  • 支付服务不在同步调用业务关联度低的服务,而是发送消息通知到Broker。
  • 具备下列优势:
  • 解除耦合,拓展性强
  • 无需等待,性能好
  • 故障隔离
  • 缓存消息,流量削峰填谷
    在这里插入图片描述
    QPS为每秒钟的访问量
    在这里插入图片描述
  • 异步调用的优势是什么?
    • 耦合度低,拓展性强
    • 异步调用,无需等待,性能好
    • 故障隔离,下游服务故障不影响上游业务
    • 缓存消息,流量削峰填谷
  • 异步调用的问题是什么?
    • 不能立即得到调用结果,时效性差
    • 不确定下游业务执行是否成功
    • 业务安全依赖于Broker(消息代理)的可靠性

MQ技术选型

  • MQ(MessageQueue),中文是消息队列,字面意思就是存放消息的队列。也就是异步调用中的Broker
    在这里插入图片描述
  • RabbitMQ的消息可靠性比较高很多公司也在用RabbitMQ,所以这里我们以RabbitMQ为例讲解,其他的MQ在OpenFeign的调用下都大同小异
    在这里插入图片描述
    在这里插入图片描述
  • RabbitMQ的整体架构及核心概念:
  • virtual-host:虚拟主机,起到数据隔离的作用
  • publisher:消息发送者
  • consumer:消息的消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由转发消息(没有存储消息的能力)
    在这里插入图片描述
  • 快速入门
  • 1.创建一个消息队列
    在这里插入图片描述
    在这里插入图片描述
  • 2.选择交换机并绑定消息队列
    在这里插入图片描述
    在这里插入图片描述
    可以看到队列和交换机绑定成功了
    在这里插入图片描述
    在这里插入图片描述
  • 3.可以给利用交换机发送消息
    在这里插入图片描述
  • 4.队列可以查看消息
    在这里插入图片描述

数据隔离

  • 需求:在RabbitMQ的控制台完成下列操作:
  • 新建一个用户hmall
  • 为hmall用户创建一个virtual host
  • 测试不同virtual host之间的数据隔离现象
    在这里插入图片描述
    用hmall用户登录创建虚拟主机
    在这里插入图片描述
    选择自己的虚拟主机就看不到别的用户的队列(保证了数据隔离)
    在这里插入图片描述
    交换机也是隔离的在这里插入图片描述
  • 所以当你有多个项目的时候,可以创建不同的用户以及不同的虚拟主机实现数据隔离

SpringAMQP(Java客户端)

快速入门

  • SpringAmqp的官方地址
    在这里插入图片描述
    在这里插入图片描述
  • 1.引入spring-amqp依赖
  • 在工程中引入spring-amqp依赖,这样publisher和consumer服务都可以使用:
    在这里插入图片描述
  • 2.配置RabbitMQ服务信息
  • 在每个微服务中引入MQ服务端信息,这样微服务才能连接到RabbitMQ
    在这里插入图片描述
  • 3.发送消息
  • SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:
  @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testSendMessage2Queue() {
        //队列名称
        String queueName = "simple.queue";
        //消息
        String msg = "hello, amqp!";
        //发送消息
        rabbitTemplate.convertAndSend(queueName, msg);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 4.接收消息
  • SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:
@Slf4j//日志
@Component//注册为一个Bean
public class MqListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        System.out.println("消费者收到了simple.queue的消息:【" + msg +"】");
    }
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在这里插入图片描述

  • SpringAMQP如何发消息?
    • 1.引入spring-boot-starter-amqp依赖
    • 2.配置rabbitmq服务端信息
    • 3.利用RabbitTemplate发送消息
    • 4.利用@RabbitListener注解声明要监听的队列,监听消息

work模式(Work Queues)

  • Work queues,任务模型简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息
    在这里插入图片描述
  • 消费者消息推送限制
    • 默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者,但者并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
    • 因此我们需要修改application.yaml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:
      在这里插入图片描述
  • Work模型的使用:
  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 通一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

交换机

  • 真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:
  • Fanout:广播
  • Direct:定向
  • Topic:话题
    在这里插入图片描述

Fanout交换机

  • FanoutExchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
    在这里插入图片描述
  • 利用SpringAMQP演示FanoutExchange的使用
  • 实现思路如下:
  • 1.在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
  • 2.在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
  • 3.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1 收到了 fanout.queue1的消息:【" + msg +"】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) throws InterruptedException {
        System.out.println("消费者2 收到了 fanout.queue2的消息:【" + msg +"】");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 4.在publisher中编写测试方法,向hmall.fanout发送消息
 @Test
    void testSendFanout() {
        String exchangeName = "hmall.fanout";
        String msg = "hello, everyone!";
        rabbitTemplate.convertAndSend(exchangeName, null, msg);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述

  • 交换机的作用是什么?
    • 接收publisher发送的消息
    • 将消息按照规则路由到与之绑定的队列
    • FanoutExchange会将消息路由到每个绑定的队列

Direct交换机

  • DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由
    • 每一个Queue都与Exchange设置一个Bindingkey
    • 发布者发送消息时,指定消息的RoutingKey
    • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
      在这里插入图片描述
      在这里插入图片描述
  • 描述下Direct交换机与Fanout交换机的差异
    • Fanout交换机将消息路由给每一个与之绑定的队列
    • Direct交换机根据RoutingKey判断路由给哪个队列
    • 如果多个队列具有相同RoutingKey,则与Fanout功能类似

Topic交换机(推荐使用,功能最强大)

  • TopicExchange与DirectExchange类似,区别在于routingKey可以时多个单词的列表,并且以 . 分割,Queue与Exchange指定BindingKey时可以使用通配符:
  • #:代指0个或多个单词
  • *:代指一个单词
    在这里插入图片描述
  • 利用SpringAMQP演示DirectExchange的使用
  • 1.在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2
    在这里插入图片描述
  • 2.在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定
    在这里插入图片描述
  • 3.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
@RabbitListener(queues = "topic.queue1")
    public void listenTopicQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1 收到了 topic.queue1的消息:【" + msg +"】");
    }

    @RabbitListener(queues = "topic.queue2")
    public void listenTopicQueue2(String msg) throws InterruptedException {
        System.out.println("消费者2 收到了 topic.queue2的消息:【" + msg +"】");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 4.在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息
 @Test
    void testSendTopic() {
        String exchangeName = "hmall.topic";
        String msg = "今天天气挺不错,我的心情的挺好的";
        rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 描述下Direct交换机和Topic交换机的差异?
    • Topic交换机接收的消息RoutingKey可以时多个单词,以点分割
    • Topic交换机与队列绑定时的bindingKey可以指定通配符
    • #:代表0个或多个词
    • *:代表1个单词

声明队列交换机

  • SpringAMQP提供了几个类,用来声明队列,交换机及其绑定关系:
    • Queue:用于声明队列,可以用工厂类QueueBuilder构建
    • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
    • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
      在这里插入图片描述
  • 例如,声明(基于Bean的声明方式)一个Fanout类型的交换机,并创建队列与其绑定:
  • 该方式需要写多个Bean比较麻烦不推荐
@Configuration
public class FanoutConfiguration {

    @Bean//声明fanout交换机
    public FanoutExchange fanoutExchange(){
        // ExchangeBuilder.fanoutExchange("").build();
        return new FanoutExchange("hmall.fanout2");
    }

    @Bean//声明队列
    public Queue fanoutQueue3(){
        // QueueBuilder.durable("ff").build();//durable 耐用的持久的(持久化把队列写入磁盘)
        return new Queue("fanout.queue3");
    }

    @Bean//声明绑定关系
    public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
    }

    @Bean//声明队列
    public Queue fanoutQueue4(){
        return new Queue("fanout.queue4");
    }

    @Bean//声明绑定关系
    public Binding fanoutBinding4(){
        return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:
  • 该方式大大简化了创建Bean的过程简化了代码推荐使用这种方式
@RabbitListener(bindings = @QueueBinding(
           value = @Queue(name = "direct.queue1", durable = "true"),
           exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
           key = {"red", "blue"}
   ))
   public void listenDirectQueue1(String msg) throws InterruptedException {
       System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
   }

   @RabbitListener(bindings = @QueueBinding(
           value = @Queue(name = "direct.queue2", durable = "true"),
           exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
           key = {"red", "yellow"}
   ))
   public void listenDirectQueue2(String msg) throws InterruptedException {
       System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg +"】");
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 声明队列,交换机,绑定关系的Bean是什么?
    • Queue
    • FanoutExchange,DirectExchange,TopicExchange
    • Binding
  • 基于@RabbitListener注解声明队列和交换机有哪些常见注解?
    • @Queue
    • @Exchanger

消息转换器

  • 需求:测试利用SpringAMQP发送对象类型的消息

  • 1.声明一个队列,名为object.queue

  • 2.编写单元测试,向队列中直接发送一条消息,消息类型为Map

  • 3.在控制台查看消息,总结问题
    在这里插入图片描述
    在这里插入图片描述

  • 这里可以看到它对我们发送的消息做了序列化,jdk自带的对象字节流给我们的消息转成字节了

  • Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

  • 存在以下问题

    • 1.JDK的序列化有安全风险
    • 2.JDK序列化的消息太大占用空间
    • 3.JDK序列化的消息可读性差
  • 建议采用JSON序列化代替默认的JDK序列化,需要做两件事情

  • 1.在publisher和consumer中都要引入依赖:

 <!--Jackson-->
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 2.在publisher和consumer中都要配置MessageConverter:
@Bean
    public MessageConverter jacksonMessageConvertor(){
        return new Jackson2JsonMessageConverter();
    }
  • 1
  • 2
  • 3
  • 4
  • 重新发送刚才的消息,效果如下
    在这里插入图片描述

看完本篇请前往SpringCloud微服务第五篇

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/390893
推荐阅读
相关标签
  

闽ICP备14008679号