当前位置:   article > 正文

RabbitMQ快速上手(包含秒杀案例)_spring.rabbitmq.listener.simple.acknowledge-mode=m

spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.list

1、 MQ概念

1.1 MQ 介绍

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。

1.2 MQ应用场景

MQ的优势

1.2.1异步解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合 调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在 这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流 系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

1.2.2 削峰填谷

举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正 常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限 制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分 散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体 验要好。

1.2.3 消息分发

在实际开发中一个系统的数据有的时候需要分发个不同的系统中, 拿电商举例在双11的时候有很多会场,每一个会场可能都需要用到一个商品的数据,那么我们需要把数据分发到不同的会场中,假设有加了一个会场我们还需要把数据分发给新的会场

常见消息中间件对比图

1.4 RabbitMQ简介

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议 的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中 间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。 Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

1.5 RabbitMQ中的核心概念

Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网 络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线 程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存 到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ架构图

2、RabbitMQ的快速入门

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

1、简单模式 HelloWorld

一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。

2、工作队列模式 Work Queue

一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。

3、发布订阅模式 Publish/subscribe

需要设置类型为 fanout 的交换机 ,并且交换机和队列进行绑定 ,当发送消息到交换机后,交换机会将消

息发送到绑定的队列。

4、路由模式 Routing

需要设置类型为 direct 的交换机 ,交换机和队列进行绑定 , 并且指定 routing key,当发送消息到交换机 后 ,交换机会根据 routing key 将消息发送到对应的队列。

5、通配符模式 Topic

需要设置类型为 topic 的交换机 ,交换机和队列进行绑定 ,并且指定通配符方式的 routing key ,当发送消息到交换机后 ,交换机会根据 routing key 将消息发送到对应的队列。

3、 Springboot环境快速集成RabbitMQ

3.1 入门案例

生产端操作步骤:

  1. 创建生产者SpringBoot工程

  2. 引入start,依赖坐标

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>2.3.2.RELEASE</version>
  5. </parent>
  6. <dependencies>
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-web</artifactId>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-amqp</artifactId>
  14. </dependency>
  15. </dependencies>
  1. 编写yml配置,基本信息配置

  2. 定义交换机,队列以及绑定关系的配置类

  3. 注入RabbitTemplate,调用方法,完成消息发送

生产者

  1. package cn.wolfcode.java.rabbitmq._06boot_helloworld;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Controller;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.ResponseBody;
  7. @Controller
  8. public class QueueController {
  9. @Autowired
  10. private RabbitTemplate rabbitTemplate;
  11. @RequestMapping("/queue")
  12. @ResponseBody
  13. public String sendMsg(String msg){
  14. rabbitTemplate.convertAndSend("","boot_queue",msg);
  15. return "发送成功";
  16. }
  17. }

消费端操作步骤:

消费端

  1. 创建消费者SpringBoot工程

  2. 引入start,依赖坐标

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  1. 编写yml配置,基本信息配置

  2. 定义监听类,使用@RabbitListener注解完成队列监听。

消费者

  1. package cn.wolfcode.java.rabbitmq._06boot_helloworld;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class QueueListener {
  10. @RabbitListener(queuesToDeclare = @Queue("boot_queue"))
  11. public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel){
  12. System.out.println("收到消息:"+msg);
  13. }
  14. }

3.2 Work模式

配置项

#签收模式配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=1

生产者

  1. package cn.wolfcode.java.rabbitmq._07boot_worker;
  2. import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Controller;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.ResponseBody;
  8. @Controller
  9. public class WorkerController {
  10. @Autowired
  11. private RabbitTemplate rabbitTemplate;
  12. @RequestMapping("/worker")
  13. @ResponseBody
  14. public String sendMsg(){
  15. for(int i= 0;i<20;i++){
  16. rabbitTemplate.convertAndSend("","boot_worker","msg:"+i);
  17. }
  18. return "发送成功";
  19. }
  20. }

消费者:

  1. package cn.wolfcode.java.rabbitmq._07boot_worker;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.stereotype.Component;
  8. import java.io.IOException;
  9. @Component
  10. public class Worker {
  11. @RabbitListener(queuesToDeclare = @Queue("boot_worker"))
  12. public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
  13. System.out.println("工作者1:"+msg);
  14. channel.basicAck(deliveryTag,false);
  15. }
  16. }

3.3 Pub/Sub模式

生产者:

  1. package cn.wolfcode.java.rabbitmq._08boot_pubsub;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.amqp.support.AmqpHeaders;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.stereotype.Controller;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.ResponseBody;
  10. @Controller
  11. public class PubSubController {
  12. @Autowired
  13. private RabbitTemplate rabbitTemplate;
  14. @RequestMapping("/pubsub")
  15. @ResponseBody
  16. public String sendMsg(String msg){
  17. rabbitTemplate.convertAndSend("boot_pubsub","","广播消息");
  18. return "发送成功";
  19. }
  20. }

消费者:

  1. package cn.wolfcode.java.rabbitmq._08boot_pubsub;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import org.springframework.amqp.rabbit.annotation.Exchange;
  5. import org.springframework.amqp.rabbit.annotation.Queue;
  6. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.amqp.support.AmqpHeaders;
  9. import org.springframework.messaging.handler.annotation.Header;
  10. import org.springframework.stereotype.Component;
  11. import java.io.IOException;
  12. @Component
  13. public class PubSubReceiver {
  14. @RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "boot_pubsub",type = "fanout")))
  15. public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws Exception {
  16. System.out.println("收到消息1:"+msg);
  17. channel.basicAck(deliveryTag,false);
  18. }
  19. }

3.4 Routing模式

生产者:

  1. package cn.wolfcode.java.rabbitmq._09boot_rounting;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Controller;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.ResponseBody;
  8. @Controller
  9. public class RountingController {
  10. @Autowired
  11. private RabbitTemplate rabbitTemplate;
  12. @RequestMapping("/rounting")
  13. @ResponseBody
  14. public String sendMsg(String key){
  15. rabbitTemplate.convertAndSend("boot_rounting_exchange",key,"rounting消息");
  16. return "发送成功";
  17. }
  18. }

消费者:

  1. package cn.wolfcode.java.rabbitmq._09boot_rounting;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.rabbit.annotation.Exchange;
  4. import org.springframework.amqp.rabbit.annotation.Queue;
  5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.amqp.support.AmqpHeaders;
  8. import org.springframework.messaging.handler.annotation.Header;
  9. import org.springframework.stereotype.Component;
  10. import java.io.IOException;
  11. @Component
  12. public class RountingReceiver {
  13. @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "boot_rounting_queue01"),
  14. exchange = @Exchange(name = "boot_rounting_exchange",type = "direct"),
  15. key = {"error","info"}
  16. ))
  17. public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws Exception {
  18. System.out.println("error&info 收到消息:"+msg);
  19. channel.basicAck(deliveryTag,false);
  20. }
  21. }

3.5 Topic模式

生产者

  1. package cn.wolfcode.java.rabbitmq._10boot_topic;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Controller;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.ResponseBody;
  7. @Controller
  8. public class TopicController {
  9. @Autowired
  10. private RabbitTemplate rabbitTemplate;
  11. @RequestMapping("/topic")
  12. @ResponseBody
  13. public String sendMsg(String key){
  14. rabbitTemplate.convertAndSend("boot_topic_exchange",key,"topic消息");
  15. return "发送成功";
  16. }
  17. }

消费者:

  1. package cn.wolfcode.java.rabbitmq._10boot_topic;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.rabbit.annotation.Exchange;
  4. import org.springframework.amqp.rabbit.annotation.Queue;
  5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.amqp.support.AmqpHeaders;
  8. import org.springframework.messaging.handler.annotation.Header;
  9. import org.springframework.stereotype.Component;
  10. import java.io.IOException;
  11. @Component
  12. public class TopicReceiver01 {
  13. @RabbitListener(bindings = @QueueBinding(
  14. value = @Queue("boot_topic_queue01"),
  15. exchange = @Exchange(name = "boot_topic_exchange",type = "topic"),
  16. key = "order.*"
  17. ))
  18. public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws Exception {
  19. System.out.println("topic收取消息:"+msg);
  20. channel.basicAck(deliveryTag,false);
  21. }
  22. }

6 、 RabbitMQ实战案例

6.1 案例一 - 退款加积分

需求: 用户进行下单操作 , 下完单以后要进行加积分操作, 要求下完单以后加积分采用RabbitMQ 进行加积分

案例分析:

1 : 下完单以后把数据封装成消息利用生产者发送消息到RabbitMQ中。

2:在积分服务编写消费者实时监听RabbitMQ队列中消息,监听到取出消息消费加积分。

没有RabbitMQ之前

有了RabbitMQ以后

代码实现

6.2 案例二 - 秒杀下单操作

需求: 电商平台进行秒杀活动,用户点击下单秒杀商品进行下单,要求用RabbitMQ进行削峰填谷。

有了RabbitMQ以后

 

代码实现

7、 RabbitMQ高频面试题

7.1 RabbitMQ如果出现消息重复消费怎么解决

采用幂等性解决:幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

7.2 RabbitMQ中的死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况:

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

7.3 RabbitMQ 怎么实现消息可靠性

7.3.1 生产者投递可靠性

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

confirm 确认模式

return 退回模式

rabbitmq 整个消息投递的路径为:

producer--->rabbitmq broker--->exchange--->queue--->consumer 消息从 producer 到 exchange 则会返回一个 confirmCallback 。 消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

7.3.2 消费者投递可靠性

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式:

• 自动确认:acknowledge="none"

• 手动确认:acknowledge="manual"

• 根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

7.4 RabbitMQ 如何实现延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。

  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器

  2. 延迟队列

使用延时队列实现

  1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。

  2. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/513188
推荐阅读
相关标签
  

闽ICP备14008679号