当前位置:   article > 正文

MQ的相关的使用_mq使用

mq使用

1.1. MQ 的相关概念

1.1.1. 什么是MQ

MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。

1.1.2. 为什么要用MQ【业务场景】

1、异步

如: 用户注册发送,注册邮件、注册短信

传统做法 

1、串行 (先发送邮件、再发短信)。问题:持续时间长

2、并行(将注册信息写入数据库后,同时发送邮件、短信),速度快、但不能满足高吞吐需求

消息队列做法

将数据写入数据库、同时发送消息给发送邮件和注册,异步处理 

2、应用解耦

如:双十一购物节,用户下单后、订单系统通知库存系统。

传统做法:

订单系统调用库存系统接口。问题:库存接口故障,订单就会失败,而损失大量订单 

 

消息队列做法

订单系统:下单,订单系统完成持久化,将消息写入队列,返回下单成功给用户

库存系统:订阅下单的消息,获取下单消息,进行库操作,就算库存系统故障,消息队列也能保证消息可靠投递,不会导致消息丢失。

3、流量削峰

 

如:秒杀活动、一般会因为流量过大,导致应用挂掉,一般在应用前端加入消息队列。

作用

1、可以控制活动人数,超过一定阈值,订单直接丢弃

2、可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

 

1.1.3. MQ 的分类

1. ActiveMQ

 优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较高,概率丢失数据低

缺点: 官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用

2. Kafka

  大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件, 以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。

  优点: 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次; 有优秀的第三方Kafka Web 管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持: 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

  缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序, 但是一台代理宕机后,就会产生消息乱序,社区更新较慢

3. RocketMQ

  RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。

       优点:  单机吞吐量十万级, 可用性非常高,分布式架构,消息可以做到 0 丢失MQ 功能较为完善,还是分布式的,扩展性好, 支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ

  缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在MQ核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码

4. RabbitMQ

  2007 年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一

  优点: 由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高

https://www.rabbitmq.com/news.html

  缺点:商业版需要收费,学习成本较高

1.1.4. MQ 的选择

1. Kafka

  Kafka 主要特点是基于Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能, 肯定是首选 kafka 了。

2. RocketMQ

天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。

3. RabbitMQ

  结合 erlang 语言本身的并发优势,性能好时效性微秒级社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备RabbitMQ。

1.2. RabbitMQ

1.1.1. RabbitMQ 的概念

   RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。

1.1.2. 四大核心概念

  1. 生产者

产生数据发送消息的程序是生产者

  1. 交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

  1. 队列

队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

  1. 消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

1.1.3. RabbitMQ 核心部分

 

  1. Hello Wold               简单模式
  1. Work queues           工作队列模式
  1. Publish/Subscribe    发布订阅模式
  1. Routing                    路由模式
  1. Topics                      主题模式
  1. Publisher Confirms  发布确认模式

1.1.4 RabbitMQ工作原理

 

 

  1. Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
  1. Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  1. Connection:publisher/consumer 和 broker 之间的 TCP 连接

 

  1. Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销 
  1. Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  1. Queue:消息最终被送到这里等待 consumer 取走
  1. Binding:exchange 和queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据 

1.3 下载安装

  docker 安装 RabbitMQ

1.4 界面认识

1、概要

 

2、连接

3、通道

 

4、交换机

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

 

 

 

5、队列

 

点击名称进去,可以看到队列的详细信息

 

1.5 五种模型示例

0、springboot依赖配置

  1. 依赖
  2. <!-- amqp依赖,包含Rabbitmq-->
  3. <dependency>
  4.    <groupId>org.springframework.boot</groupId>
  5.    <artifactId>spring-boot-starter-amqp</artifactId>
  6. </dependency>
  7. yml配置
  8. spring:
  9.   application:
  10.     name: rabbitmq
  11.   rabbitmq:
  12.     host: 127.0.0.1
  13.     port: 5672
  14.     username: guest
  15.     password: guest
  16.     virtual-host: /

1、Hello World简单模型

 

一对一消费,只有一个消费者能接收到

  1. 消费者 
  2. @Component
  3. public class HolloWordListener {
  4. // @RabbitListener(queues = ("simple.queue")) // queues需手动先创建队列
  5. @RabbitListener(queuesToDeclare = @Queue("simple.queue"))  // queuesToDeclare 自动声明队列
  6. public void holloWordListener(String message){
  7. System.out.println("message = " + message);
  8. }
  9. }
  10. 生产者 
  11. @Autowired
  12. private RabbitTemplate rabbitTemplate;
  13. @Test
  14. public void testSimpleQueue() {
  15. String queueName = "simple.queue"; // 队列名称
  16. String message = "heel,simple.queue"; // 要发送的消息
  17. rabbitTemplate.convertAndSend(queueName, message);
  18. }

2、Work queues 工作队列

多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置能者多劳模式,谁完成的快,谁多做一点

  1. 消费者 
  2. @Component
  3. public class WoekWordListener {
  4. @RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自动声明队列
  5. public void holloWordListener(String message) throws InterruptedException {
  6. Thread.sleep(200);
  7. System.out.println("message1 = " + message);
  8. }
  9. @RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自动声明队列
  10. public void holloWordListener1(String message) throws InterruptedException {
  11. Thread.sleep(400);
  12. System.out.println("message2 = " + message);
  13. }
  14. }
  15. 生产者 
  16. @Autowired
  17. private RabbitTemplate rabbitTemplate;
  18. @Test
  19. public void testWorkQueue(){
  20. String queueName = "workQueue";
  21. String message = "hello,work.queue__";
  22. for (int i = 0; i < 10; i++) {
  23. rabbitTemplate.convertAndSend(queueName,message+i);
  24. System.out.println("i = " + i);
  25. }
  26. }
  27. 取消预取机制,能者多劳配置
  28. spring:
  29.   rabbitmq:
  30.     host: 127.0.0.1
  31.     port: 5672
  32.     username: guest
  33.     password: guest
  34.     virtual-host: /
  35.     listener: 
  36.       simple:
  37.         prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条

3、Publish/Subscribe发布订阅模型

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。

实现方式是加入了exchange(交换机),注意:交换机是不缓存消息的

 

使用fanout交换机,会将接收到的消息路由到每一个跟其绑定的queue(队列) 

 

简单队列模式和工作模式只能消费同种消息

门户网站,用户在注册完后一般都会发送消息通知用户注册成功(失败)。

如果在一个系统中,用户注册信息有邮箱、手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息(假设都发送)。

利用 MQ 实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列。注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息。但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处理。

  1. 消费者 
  2. // 消费者直接绑定交换机,指定类型为fanout
  3. @Component
  4. public class FanoutExchangeListener {
  5. // 不指定队列,消息过了就没了
  6. //  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})
  7. // 指定队列,可以接收缓存到队列里的消息
  8. @RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test",durable = "true" ),
  9.         exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})
  10. public void reveivel(String message){
  11. System.out.println("message = " + message);
  12. }
  13. @RabbitListener(bindings = {@QueueBinding(value = @Queue,
  14.        exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})
  15. public void reveivel2(String message){
  16. System.out.println("message1 = " + message);
  17. }
  18. }
  19. 生产者 
  20. @Autowired
  21. private RabbitTemplate rabbitTemplate;
  22. @Test
  23. public void tesyPubSubQueue(){
  24. // 参数1:交换机名称 ,
  25.         // 参数2 routingKey,(fanout类型可不写) ,
  26.         // 参数3,消息内容
  27. rabbitTemplate.convertAndSend("fanoutTest","","消息内容");
  28. }

这个时候就可以利用发布/订阅模式将消息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱、手机),并绑定到交换机。这样生产者只需要发布一次消息,两个队列都会接收到消息发给对应的消费者,大致如下图所示。

 

4、Routing路由模型

routing模型也是将消息发送到交换机

使用的是Direct类型的交换机,会将接收到的消息根据规则路由到指定的Queue(队列),因此称为路由模式

 

  1. 消费者
  2. // 消费者直接绑定交换机,指定类型为direct,并指定key表示能消费的key
  3. @Component
  4. public class RoutingExchangeListener {
  5. // 不指定队列,消息过了就没了
  6. //  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = {"info","error"})})
  7. // 指定队列,可以接收缓存到队列里的消息
  8. // key = {"info","error"} 表示我能接收到routingKey为 info和error的消息
  9. @RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test1",durable = "true" ),
  10.        exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),
  11.        key = {"info", "error"})})
  12. public void receivel(String message){
  13. System.out.println("message = " + message);
  14. }
  15. // key = {"error"} 表示我只能接收到routingKey为 error的消息
  16. @RabbitListener(bindings = {@QueueBinding(value = @Queue,
  17.         exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),
  18.         key = {"error"})})
  19. public void receivel1(String message){
  20. System.out.println("message1 = " + message);
  21. }
  22. }
  23. 生产者
  24. @Autowired
  25. private RabbitTemplate rabbitTemplate;
  26. // 路由模型
  27. @Test
  28. public void direstExchangeTest(){
  29. rabbitTemplate.convertAndSend("direstTest","info","发送info的key的路由消息");
  30. }
  31. // 路由模型
  32. @Test
  33. public void direstExchangeTest1(){
  34. rabbitTemplate.convertAndSend("direstTest","error","发送error的key的路由消息");
  35. }

5、Topics主题模型

topicExchange与directExchange类型,区别在于routingKey必须是多个单词的列表,并且以 . 分隔

*(代表通配符,任意一个字段)  user.name  user.*  [user.age,  user.xxx]

#(号代表一个或多个字段   user.name  user.name.age) 

  1. 消费者
  2. @Component
  3. public class TopicsExchangeListener {
  4. // 不指定队列,消息过了就没了
  5. //  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"user.save","user.*"})})
  6. // 指定队列,可以接收缓存到队列里的消息
  7. // key = {"user.save","user.*"} 表示能消费 routingkey为  user.save 和 user.任意一个字符  的消息
  8. @RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test2",durable = "true" ),exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"user.save","user.*"})})
  9. public void recevicel(String message){
  10. System.out.println("message = " + message);
  11. }
  12. // key = {"order.#","user.*"} 表示能消费 routingkey为  order.一个或多个字符   和  user.任意一个字符  的消息
  13. @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"order.#","user.*"})})
  14. public void recevicel1(String message){
  15. System.out.println("message1 = " + message);
  16. }
  17. }
  18. 生产者
  19. @Autowired
  20. private RabbitTemplate rabbitTemplate;
  21. @Test
  22. public void topicTest(){
  23. rabbitTemplate.convertAndSend("topicTest","user.save","topic路由消息,use.save");
  24. }
  25. @Test
  26. public void topicTest1(){
  27. rabbitTemplate.convertAndSend("topicTest","order.select.getone","topic路由消息,order.select.getone");
  28. }

6、消息转换器

代码里直接发送对象,虽然接收的到消息,但是rabbitmq的界面上看到的消息会是乱码 

  1. 依赖
  2.  <dependency>
  3.      <groupId>com.fasterxml.jackson.dataformat</groupId>
  4.      <artifactId>jackson-dataformat-xml</artifactId>
  5.      <version>2.9.10</version>
  6.  </dependency>
  7. 配置
  8. @Configuration
  9. public class RabbitmqConfig {
  10.   // 消息转换配置
  11. @Bean
  12. public MessageConverter jsonMessageConverter(){
  13. return new Jackson2JsonMessageConverter();
  14. }
  15. }
  16. 再次发送就会是转换好的消息

1.6 进阶

1、基于插件延迟队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

使用场景:

 1. 订单在十分钟之内未支付则自动取消

 2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

 3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。

 4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。

      5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。

但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

下载延迟插件

Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

rabbitmq_delayed_message_exchange-3.9.0.ez

上传到 任意位置

  1. 将插件拷贝到RabbitMQ
  2. docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
  3. 进入容器并安装延迟队列插件
  4. docker exec -it 自己rabbitMQ容器id /bin/bash
  5. 启动插件并重启容器
  6. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  7. 重启容器
  8. 在容器重启完成后,我们可以登录RabbitMQ的Web端管理界面,在Exchanges选项卡下,点击Add a new exchange,在Type里面看是否出现了x-delayed-message选项,如下图所示

RabbitAdmin配置 

RabbitAdmin 是用于对交换机和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息的组件。

  1. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  2. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  3. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitAdminConfig {
  9. @Value("${spring.rabbitmq.host}")
  10. private String host;
  11. @Value("${spring.rabbitmq.username}")
  12. private String username;
  13. @Value("${spring.rabbitmq.password}")
  14. private String password;
  15. @Value("${spring.rabbitmq.virtualhost}")
  16. private String virtualhost;
  17. @Bean
  18. public ConnectionFactory connectionFactory(){
  19. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  20. connectionFactory.setAddresses(host);
  21. connectionFactory.setUsername(username);
  22. connectionFactory.setPassword(password);
  23. connectionFactory.setVirtualHost(virtualhost);
  24. return connectionFactory;
  25. }
  26. @Bean
  27. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
  28. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  29. rabbitAdmin.setAutoStartup(true);
  30. return rabbitAdmin;
  31. }
  32. }

封装发送延迟队列工具类 

  1. import org.springframework.amqp.core.Queue;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.CustomExchange;
  5. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.Resource;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. @Component
  13. public class DelayedQueue {
  14. // routingKey
  15. private static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
  16. // 延迟队列交换机
  17. private static final String DELAYED_EXCHANGE = "delayed.exchange";
  18.     @Autowired
  19. RabbitTemplate rabbitTemplate;
  20.     @Resource
  21. RabbitAdmin rabbitAdmin;
  22. /**
  23. * 发送延迟队列
  24. * @param queueName 队列名称
  25. * @param params 消息内容
  26. * @param expiration 延迟时间 毫秒
  27. */
  28. public void sendDelayedQueue(String queueName, Object params, Integer expiration) {
  29. // 先创建一个队列
  30. Queue queue = new Queue(queueName);
  31. rabbitAdmin.declareQueue(queue);
  32. // 创建延迟队列交换机
  33. CustomExchange customExchange = createCustomExchange();
  34. rabbitAdmin.declareExchange(customExchange);
  35. // 将队列和交换机绑定
  36. Binding binding = BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
  37. rabbitAdmin.declareBinding(binding);
  38. // 发送延迟消息
  39. rabbitTemplate.convertAndSend(DELAYED_EXCHANGE, DELAYED_ROUTING_KEY, params, msg -> {
  40. // 发送消息的时候 延迟时长
  41. msg.getMessageProperties().setDelay(expiration);
  42. return msg;
  43. });
  44. }
  45. public CustomExchange createCustomExchange() {
  46. Map<String, Object> arguments = new HashMap<>();
  47. /**
  48. * 参数说明:
  49. * 1.交换机的名称
  50. * 2.交换机的类型
  51. * 3.是否需要持久化
  52. * 4.是否自动删除
  53. * 5.其它参数
  54. */
  55. arguments.put("x-delayed-type", "direct");
  56. return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", true, false, arguments);
  57. }
  58. }
  59. 生产者
  60. @Autowired
  61. private DelayedQueue delayedQueue;
  62. /**
  63. * 发送延迟队列
  64. * @param queueName 队列名称
  65. * @param params 消息内容
  66. * @param expiration 延迟时间 毫秒
  67. */
  68. @GetMapping("/test9")
  69. public void topicTest8() {
  70. delayedQueue.sendDelayedQueue("delayTest2","这是消息",5000);
  71. }
  72. 消费者
  73. @RabbitListener(queuesToDeclare = @Queue(value = "delayTest2",durable = "true"))
  74. public void declareExchange2(String message){
  75. System.out.println("delayTest2 = " + message);
  76. }

2、TTL队列

TTL是time to live的缩写,生存时间,RabbitMQ支持消息的过期时间,单位是毫秒,消息发送时可以指定,从消息入队列开始计算,只要超过队列的超时时间配置,消息没被接收,消息就会自动清除 ,如果配置了死信队列被丢到死信队列中【则会成为"死信"】

封装发送TTL队列工具类 

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.Resource;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. @Component
  10. public class TtlQueue {
  11. // routingKey
  12. private static final String TTL_KEY = "ttl.routingkey";
  13. private static final String TTL_EXCHANGE = "ttl.exchange";
  14. @Autowired
  15. RabbitTemplate rabbitTemplate;
  16. @Resource
  17. RabbitAdmin rabbitAdmin;
  18. /**
  19. * 发送TTL队列
  20. * @param queueName 队列名称
  21. * @param params 消息内容
  22. * @param expiration 过期时间 毫秒
  23. */
  24. public void sendTtlQueue(String queueName, Object params, Integer expiration) {
  25. /**
  26. * ----------------------------------先创建一个ttl队列--------------------------------------------
  27. */
  28. Map<String, Object> map = new HashMap<>();
  29. // 队列设置存活时间,单位ms,必须是整形数据。
  30. map.put("x-message-ttl",expiration);
  31. /*参数1:队列名称  参数2:持久化  参数3:是否排他 参数4:自动删除队列  参数5:队列参数*/
  32. Queue queue = new Queue(queueName,true,false,false,map);
  33. rabbitAdmin.declareQueue(queue);
  34. /**
  35. * ---------------------------------创建交换机---------------------------------------------
  36. */
  37. DirectExchange directExchange = new DirectExchange(TTL_EXCHANGE, true, false);
  38. rabbitAdmin.declareExchange(directExchange);
  39. /**
  40. * ---------------------------------队列绑定交换机---------------------------------------------
  41. */
  42. // 将队列和交换机绑定
  43. Binding binding = BindingBuilder.bind(queue).to(directExchange).with(TTL_KEY);
  44. rabbitAdmin.declareBinding(binding);
  45. // 发送消息
  46. rabbitTemplate.convertAndSend(TTL_EXCHANGE,TTL_KEY,params);
  47. }
  48. }
  49. 生产者
  50. @Autowired
  51. private TtlQueue ttlQueue;
  52. /**
  53. * 发送TTL队列
  54. * @param queueName 队列名称
  55. * @param params 消息内容
  56. * @param expiration 过期时间 毫秒
  57. */
  58. @GetMapping("/test10")
  59. public void topicTest10() {
  60. ttlQueue.sendTtlQueue("ttlQueue", "这是消息内容", 5000);
  61. }
  62. 消费者
  63. @RabbitListener(queues = "ttlQueue" )
  64. public void ttlQueue(String message){
  65.   System.out.println("message = " + message);
  66. }

3、死信队列

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景: 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中. 还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器。队列消息变成死信(deadmessage)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

消息变成死信的几种情况:

1、消息被拒绝(basic.reject/ basic.nack)并且requeue=false

2、 消息TTL过期

3、 队列达到最大长度

流程:发送消息,消息过期后进入到另一个队列(这个队列设置持久化,不过期)的过程。

封装发送死信队列工具类 

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.DirectExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.Resource;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. @Component
  13. public class DLXQueue {
  14. // routingKey
  15. private static final String DEAD_ROUTING_KEY = "dead.routingkey";
  16. private static final String ROUTING_KEY = "routingkey";
  17. private static final String DEAD_EXCHANGE = "dead.exchange";
  18. private static final String EXCHANGE = "common.exchange";
  19. @Autowired
  20. RabbitTemplate rabbitTemplate;
  21. @Resource
  22. RabbitAdmin rabbitAdmin;
  23. /**
  24. * 发送死信队列,过期后进入死信交换机,进入死信队列
  25. * @param queueName 队列名称
  26. * @param deadQueueName 死信队列名称
  27. * @param params 消息内容
  28. * @param expiration 过期时间 毫秒
  29. */
  30. public void sendDLXQueue(String queueName, String deadQueueName,Object params, Integer expiration){
  31. /**
  32. * ----------------------------------先创建一个ttl队列和死信队列--------------------------------------------
  33. */
  34. Map<String, Object> map = new HashMap<>();
  35. // 队列设置存活时间,单位ms,必须是整形数据。
  36. map.put("x-message-ttl",expiration);
  37. // 设置死信交换机
  38. map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  39. // 设置死信交换器路由键
  40. map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
  41. /*参数1:队列名称  参数2:持久化  参数3:是否排他 参数4:自动删除队列  参数5:队列参数*/
  42. Queue queue = new Queue(queueName,true,false,false,map);
  43. rabbitAdmin.declareQueue(queue);
  44. /**
  45. * ---------------------------------创建交换机---------------------------------------------
  46. */
  47. DirectExchange directExchange = new DirectExchange(EXCHANGE, true, false);
  48. rabbitAdmin.declareExchange(directExchange);
  49. /**
  50. * ---------------------------------队列绑定交换机---------------------------------------------
  51. */
  52. Binding binding = BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
  53. rabbitAdmin.declareBinding(binding);
  54. /**
  55. * ---------------------------------在创建一个死信交换机和队列,接收死信队列---------------------------------------------
  56. */
  57. DirectExchange deadExchange = new DirectExchange(DEAD_EXCHANGE, true, false);
  58. rabbitAdmin.declareExchange(deadExchange);
  59. Queue deadQueue = new Queue(deadQueueName,true,false,false);
  60. rabbitAdmin.declareQueue(deadQueue);
  61. /**
  62. * ---------------------------------队列绑定死信交换机---------------------------------------------
  63. */
  64. // 将队列和交换机绑定
  65. Binding deadbinding = BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
  66. rabbitAdmin.declareBinding(deadbinding);
  67. // 发送消息
  68. rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,params);
  69. }
  70. }
  71. 生产者
  72. @Autowired
  73. private DLXQueue dlxQueue;
  74. /**
  75. * 发送死信队列,过期后进入死信交换机,进入死信队列
  76. * @param queueName 队列名称
  77. * @param deadQueueName 死信队列名称
  78. * @param params 消息内容
  79. * @param expiration 过期时间 毫秒
  80. */
  81. @GetMapping("/test11")
  82. public void topicTest11() {
  83. dlxQueue.sendDLXQueue("queue","deadQueue","这是消息内容",5000);
  84. }
  85. 消费者
  86.   // 接收转移后的队列消息
  87. @RabbitListener(queuesToDeclare = @Queue(value = "deadQueue",durable = "true"))
  88. public void ttlQueue(String message){
  89. System.out.println("message = " + message);
  90. }

4、消息确认

1、发送消息确认机制

为确保消息发送有真的发送出去,设置发布时确认,确认消息是否到达 Broker 服务器

配置 

  1. spring:
  2.   rabbitmq:
  3.     host: 47.99.110.29
  4.     port: 5672
  5.     username: guest
  6.     password: guest
  7.     virtual-host: /
  8.     listener:
  9.       simple:
  10.         prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
  11.     publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)
  12.     publisher-returns: true  #确认消息已发送到队列(Queue)

如果有使用rabbitAdmin配置的话,那里也需要加配置

修改RabbitAdmin配置 

  1. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  2. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  3. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitAdminConfig {
  9. @Value("${spring.rabbitmq.host}")
  10. private String host;
  11. @Value("${spring.rabbitmq.username}")
  12. private String username;
  13. @Value("${spring.rabbitmq.password}")
  14. private String password;
  15. @Value("${spring.rabbitmq.virtualhost}")
  16. private String virtualhost;
  17. @Bean
  18. public ConnectionFactory connectionFactory(){
  19. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  20. connectionFactory.setAddresses(host);
  21. connectionFactory.setUsername(username);
  22. connectionFactory.setPassword(password);
  23. connectionFactory.setVirtualHost(virtualhost);
  24. // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
  25. connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
  26. connectionFactory.setPublisherReturns(true);
  27. return connectionFactory;
  28. }
  29. @Bean
  30. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
  31. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  32. rabbitAdmin.setAutoStartup(true);
  33. return rabbitAdmin;
  34. }
  35. }

实现发送消息确认接口 

消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调

  1. /**
  2.  * 消息发送确认配置
  3.  */
  4. @Component
  5. public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback{
  6. @Autowired
  7. private RabbitTemplate rabbitTemplate;
  8. @PostConstruct // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执
  9. public void init(){
  10. rabbitTemplate.setConfirmCallback(this);
  11. }
  12. /**
  13. * 交换机不管是否收到消息的一个回调方法
  14. * @param correlationData 消息相关数据
  15. * @param ack 交换机是否收到消息
  16. * @param cause 失败原因
  17. */
  18. @Override
  19. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  20. if (ack){ // 消息投递到broker 的状态,true表示成功
  21. System.out.println("消息发送成功!");
  22. }else { // 发送异常
  23. System.out.println("发送异常原因 = " + cause);
  24. }
  25. }
  26. }

实现发送消息回调接口 

如果消息未能投递到目标queue里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。 

  1. @Component
  2. public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @PostConstruct // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执
  6. public void init(){
  7. rabbitTemplate.setReturnsCallback(this);
  8. }
  9. @Override
  10. public void returnedMessage(ReturnedMessage returnedMessage) {
  11. System.out.println("消息"+returnedMessage.getMessage().toString()+"被交换机"+returnedMessage.getExchange()+"回退!"
  12. +"退回原因为:"+returnedMessage.getReplyText());
  13. // 回退了所有的信息,可做补偿机制
  14. }
  15. }

2、消费者消息确认机制

为确保消息消费成功,需设置消费者消息确认机制,如果消费失败或异常了,可做补偿机制。

配置 

  1. spring:
  2.   rabbitmq:
  3.     host: 47.99.110.29
  4.     port: 5672
  5.     username: guest
  6.     password: guest
  7.     virtual-host: /
  8.     # 消费者配置
  9.     listener:
  10.       simple:
  11.         prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
  12.         acknowledge-mode: manual # 设置消费端手动ack确认
  13.         retry:
  14.           enabled: true # 是否支持重试
  15.     # 生产者配置      
  16.     publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)
  17.     publisher-returns: true  #确认消息已发送到队列(Queue)
  18. channel.basicAck消息确认 
  19. 消费者修改,利用消费者参数Channel 进行消息确认操作
  20. @RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
  21. public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
  22. // 消息
  23. System.out.println("msg = " + msg);
  24. /**
  25.  * 确认
  26.  * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
  27.  * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
  28.  */ 
  29. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  30. }

channel.basicNack消息回退 

将消息重返队列

  1. @RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
  2. public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
  3. try {
  4. // 消息
  5. System.out.println("msg = " + msg);
  6. throw new RuntimeException("来个异常");
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. System.out.println("消息消费异常,重回队列");
  10. /**
  11. * deliveryTag:表示消息投递序号。
  12. * multiple:是否批量确认。
  13. * requeue:值为 true 消息将重新入队列。
  14. */
  15. channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
  16. }
  17. // 确认
  18. /**
  19. * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
  20. * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
  21. */
  22. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  23. }

channel.basicReject消息拒绝 

  1. 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
  2. /**
  3.  * 消息拒绝
  4.  * deliveryTag:表示消息投递序号。
  5.  * requeue:值为 true 消息将重新入队列。
  6.  */
  7. channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

封装消息确认处理类

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/515751
推荐阅读
相关标签
  

闽ICP备14008679号