当前位置:   article > 正文

RabbitMQ_rabbitmq官网

rabbitmq官网

RabbitMQ简介


RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。 (Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛)

RabbitMQ官方地址:http://www.rabbitmq.com/

RabbitMQ 基础架构如下图:

上图说明:
1、Broker:接收和分发消息的应用,就是一个中介,RabbitMQ Server就是 Message Broker2、Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等3、Connection:publisher/consumer 和 broker 之间的 TCP 连接4、Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
5、Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)6、Queue:消息最终被送到这里等待 consumer 取走7、Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);官网对应模式介绍:https://www.rabbitmq.com/getstarted.html , 点击手册按钮 RabbitMQ Tutorials

2. 安装及配置RabbitMQ


RabbitMQ 官方地址:http://www.rabbitmq.com/ ,详见安装文档

http://erlang.org/download/

https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.11.4

国内源

rabbitmq:https://mirrors.huaweicloud.com/rabbitmq-server/

win10安装

管理WEB页面安装
rabbitmq-plugins.bat enable rabbitmq_management
登录URL:

http://localhost:15672/

用户名密码:guest/guest

问题:

登录用户是中文解决方案:

1、创建用户为英文,再安装相关环境

2、修改相应的目录

用管理员执行CMD

  1. rabbitmq-service.bat remove
  2. set RABBITMQ_BASE=D:\rabbitmq_server\data
  3. rabbitmq-service.bat install
  4. rabbitmq-plugins enable rabbitmq_management

3. RabbitMQ快速入门


3.1 生产方工程搭建

1.添加相关依赖

修改pom.xml文件内容为如下:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. </dependencies>

2.启动类

  1. package com.woniu.rabbitmq;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class RabbitApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(RabbitApplication.class);
  8. }
  9. }

3.配置RabbitMQ

创建application.yml,内容如下:

  1. spring:
  2. rabbitmq:
  3. host: localhost
  4. port: 5672
  5. virtual-host: /woniu
  6. username: woniu
  7. password: woniu

创建队列参数说明:

参数

说明

name

字符串值,exchange 的名称。

durable

布尔值,表示该 queue 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 queue 是否会恢复/仍存在。 另外,需要注意的是,queue 的持久化不等于其中的消息也会被持久化。

exclusive

布尔值,表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用,其它连接不可见/不可用。

autoDelete

布尔值,表示当该 queue 没“人”(connection)用时,是否会被自动删除。

不指定 durable、exclusive 和 autoDelete 时,默认为 truefalsefalse 。表示持久化、非排它、不用自动删除。

创建交换机参数说明

参数

说明

name

字符串值,exchange 的名称。

durable

布尔值,表示该 exchage 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 exchange 是否会恢复/仍存在。

autoDelete

布尔值,表示当该 exchange 没“人”(queue)用时,是否会被自动删除。

不指定 durable 和 autoDelete 时,默认为 truefalse 。表示持久化、不用自动删除

4.创建RabbitMQ队列与交换机绑定的配置类com..rabbitmq.config.RabbitMQConfig

  1. package com.woniu.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitMQConfig {
  8. //队列名称
  9. public static final String ITEM_QUEUE = "item_queue";
  10. //声明队列
  11. @Bean("itemQueue")
  12. public Queue itemQueue(){
  13. return QueueBuilder.durable(ITEM_QUEUE).build();
  14. }
  15. }

5.创建ProducerController类,发送消息到消息队列

  1. package com.woniu.rabbitmq.controller;
  2. import com.woniu.rabbitmq.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.PathVariable;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. @RestController
  9. public class ProducerController {
  10. @Autowired
  11. private RabbitTemplate rabbitTemplate;
  12. @RequestMapping("/senMq/{msg}")
  13. public String senMq(@PathVariable String msg){
  14. rabbitTemplate.convertAndSend(RabbitMQConfig.WONIU_QUEUE, msg);
  15. return "OK";
  16. }
  17. }

6.创建ConsumerController类,接收消息到消息队列

  1. package com.woniu.rabbitmq.controller;
  2. import com.woniu.rabbitmq.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. @RestController
  8. public class ConsumerController {
  9. @Autowired
  10. private RabbitTemplate rabbitTemplate;
  11. @RequestMapping("/getMq")
  12. public Object getMq(){
  13. return rabbitTemplate.receiveAndConvert(RabbitMQConfig.WONIU_QUEUE);
  14. }
  15. }

或者用消息监听处理类

编写消息监听器com.woniu.rabbitmq.listener.MyListener

  1. package com..rabbitmq.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class MyListener {
  6. @RabbitListener(queues = RabbitMQConfig.WONIU_QUEUE)
  7. public void myListener1(String message){
  8. System.out.println("消费者接收到的消息为:" + message);
  9. }
  10. }
  11. 方式二
  12. @Component
  13. @RabbitListener(queues = RabbitMQConfig.WONIU_QUEUE)
  14. public class HelloReceiver {
  15. @RabbitHandler
  16. public void process(String hello) {
  17. System.out.println(hello);
  18. }
  19. }

上述的入门案例中中其实使用的是如下的简单模式:

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序

  • C:消费者:消息的接受者,会一直等待消息到来。

  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

4. AMQP


4.1. 相关概念介绍

AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。

RabbitMQ是AMQP协议的Erlang的实现。

概念

说明

连接Connection

一个网络连接,比如TCP/IP套接字连接。

会话Session

端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。

信道Channel

多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。

客户端Client

AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。

服务节点Broker

消息中间件的服务节点;一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。

端点

AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。

消费者Consumer

一个从消息队列里请求消息的客户端程序。

生产者Producer

一个向交换机发布消息的客户端应用程序。

4.2. RabbitMQ运转流程

在入门案例中:

  • 生产者发送消息

  1. 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;

  1. 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;

  1. 将路由键(空字符串)与队列绑定起来;

  1. 发送消息至RabbitMQ Broker;

  1. 关闭信道;

  1. 关闭连接;

  • 消费者接收消息

  1. 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker

  1. 向Broker 请求消费相应队列中的消息,设置相应的回调函数;

  1. 等待Broker回应投递队列中的消息,消费者接收消息;

  1. 确认(ack,自动确认)接收到的消息;

  1. RabbitMQ从队列中删除相应已经被确认的消息;

  1. 关闭信道;

  1. 关闭连接;

4.3. 生产者流转过程说明

  1. 客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQP0-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。

  1. 客户端调用connection.createChannel方法。此方法开启信道,其包装的channel.open命令发送给Broker;

  1. channel.basicPublish方法对应的AMQP命令为Basic.Publish,这个命令包含了content Header 和content Body()。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。

  1. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channel.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。

4.4. 消费者流转过程说明

  1. 消费者客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQP0-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。

  1. 消费者客户端调用connection.createChannel方法。和生产者客户端一样,协议涉及Channel . Open/Open-Ok命令。

  1. 在真正消费之前,消费者客户端需要向Broker 发送Basic.Consume 命令(即调用channel.basicConsume 方法〉将Channel 置为接收模式,之后Broker响应Basic . Consume - Ok 以告诉消费者客户端准备好消费消息。

  1. Broker 向消费者客户端推送(Push) 消息,即Basic.Deliver 命令,这个命令和Basic.Publish 命令一样会携带Content Header 和Content Body。

  1. 消费者接收到消息并正确消费之后,向Broker 发送确认,即Basic.Ack 命令。

  1. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。

5. RabbitMQ工作模式


5.1. Work queues工作队列模式

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。它们处于竞争者的关系,一条消息只会被一个消费者接收,rabbit采用轮询的方式将消息是平均发送给消费者的;消费者在处理完某条消息后,才会收到下一条消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。如生产者生产一千条消息,那么c1和c2各消费500条,队列消费消息是均衡分配

1、生产者

  1. package com.woniu.rabbitmq.controller;
  2. import com.woniu.rabbitmq.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.PathVariable;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. @RestController
  9. public class ProducerController {
  10. @Autowired
  11. private RabbitTemplate rabbitTemplate;
  12. @RequestMapping("/senMq/{msg}")
  13. public String senMq(@PathVariable String msg){
  14. rabbitTemplate.convertAndSend(RabbitMQConfig.WONIU_QUEUE, msg);
  15. return "OK";
  16. }
  17. }

2、消费者

  1. package com.woniu.rabbitmq.controller;
  2. import com.woniu.rabbitmq.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. @RestController
  8. public class ConsumerController {
  9. @Autowired
  10. private RabbitTemplate rabbitTemplate;
  11. @RequestMapping("/getMq")
  12. public Object getMq(){
  13. return rabbitTemplate.receiveAndConvert(RabbitMQConfig.WONIU_QUEUE);
  14. }
  15. }

复制消费方代码,重新编写一个消费端,然后启动两个消费端,进行测试

5.2. 订阅模式类型

订阅模式示例图:

在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • C:消费者,消息的接受者,会一直等待消息到来。

  • Queue:消息队列,接收消息、缓存消息。

  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列

  • Direct:定向,把消息交给符合指定routing key 的队列

  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

5.2.1 广播模式

1.创建RabbitMQ队列与交换机绑定的配置类com..rabbitmq.config.RabbitMQConfig

  1. package com.woniu.springconsumer.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.amqp.support.converter.SerializerMessageConverter;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.context.annotation.Scope;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. @Configuration
  12. public class RabbitConfig {
  13. // 广播模式
  14. @Bean
  15. public Queue gb(){
  16. return new Queue("gb");
  17. }
  18. @Bean
  19. public Queue gb01(){
  20. return new Queue("gb01");
  21. }
  22. @Bean
  23. public FanoutExchange fanoutExchange(){
  24. return new FanoutExchange("gbe");
  25. }
  26. }

1、实现生产者

  1. package com.woniu.rabbitmq.controller;
  2. import com.woniu.rabbitmq.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.PathVariable;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. @RestController
  9. public class ProducerController {
  10. @Autowired
  11. private RabbitTemplate rabbitTemplate;
  12. @RequestMapping("/sendFM/{msg}")
  13. public String sendFM(@PathVariable String msg){
  14. rabbitTemplate.convertAndSend("gbe", "", msg);
  15. return "消息发送成功";
  16. }
  17. }

创建交换机参数说明:

参数

说明

exchange

字符串值,交换机名称

type

交换机的类型,有三种类型:FANOUT、DIRECT、TOPIC

durable

交换机是否持久化,表示当rabbitmq重启时或者意外宕机,这个交换机还在不在

autoDelete

是否自动删除,表示当该交换机没人发消息时,是否会被自动删除。

internal

内部使用,一般为false

arguments

其它参数

发送消息参数说明

参数

说明

exchange

字符串值,交换机名称

routingKey

如果交换机类型是fanout,则routingKey为""

props

消息基本属性配置

body

要发送的消息的内容

2、消费方实现

GbListener类

  1. package com.woniu.rabbitmq.mq;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "gb")
  7. public class GbListener {
  8. @RabbitHandler
  9. public void getMsg(String msg){
  10. System.out.println("广播消息:" + msg);
  11. }
  12. }

Gb01Listener 类

  1. package com.woniu.rabbitmq.mq;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "gb01")
  7. public class Gb01Listener {
  8. @RabbitHandler
  9. public void getMsg(String msg){
  10. System.out.println("广播01消息:" + msg);
  11. }
  12. }

发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。

5.2.2 Routing路由模式

路由模式特点:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。

1、创建RabbitMQ队列与交换机绑定的配置类com..rabbitmq.config.RabbitMQConfig

  1. package com.woniu.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitMQConfig {
  7. // Routing路由模式
  8. @Bean
  9. public Queue zl01(){
  10. return new Queue("zl01");
  11. }
  12. @Bean
  13. public Queue zl02(){
  14. return new Queue("zl02");
  15. }
  16. @Bean
  17. public DirectExchange directExchange(){
  18. return new DirectExchange("zle");
  19. }
  20. @Bean
  21. public Binding zlBinding01(){
  22. return BindingBuilder.bind(zl01()).to(directExchange()).with("01");
  23. }
  24. @Bean
  25. public Binding zlBinding02(){
  26. return BindingBuilder.bind(zl02()).to(directExchange()).with("02");
  27. }
  28. }

2、生产方实现

  1. package com.woniu.rabbitmq.controller;
  2. import com.woniu.rabbitmq.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.PathVariable;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. @RestController
  9. public class ProducerController {
  10. @Autowired
  11. private RabbitTemplate rabbitTemplate;
  12. @RequestMapping("/sendD1M/{msg}")
  13. public String sendD1M(@PathVariable String msg){
  14. rabbitTemplate.convertAndSend("zle", "01", msg);
  15. return "success";
  16. }
  17. @RequestMapping("/sendD2M/{msg}")
  18. public String sendD2M(@PathVariable String msg){
  19. rabbitTemplate.convertAndSend("zle", "02", msg);
  20. return "success";
  21. }
  22. }

3.消费方实现

创建2个消费方并启动,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果

Zl01Listener类

  1. package com.woniu.rabbitmq.mq;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.core.MessageProperties;
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.stereotype.Component;
  8. import java.io.IOException;
  9. @Component
  10. @RabbitListener(queues = "zl01")
  11. public class Zl01Listener {
  12. @RabbitHandler
  13. public void getMsg(String msg, Message message, Channel channel) throws IOException {
  14. try {
  15. System.out.println("zl01消息:" + msg);
  16. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  17. } catch (Exception e) {
  18. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  19. }
  20. }
  21. }

Zl02Listener类

  1. package com.woniu.rabbitmq.mq;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.core.MessageProperties;
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.stereotype.Component;
  8. import java.io.IOException;
  9. @Component
  10. @RabbitListener(queues = "zl02")
  11. public class Zl02Listener {
  12. @RabbitHandler
  13. public void getMsg(String msg, Message message, Channel channel) throws IOException {
  14. try {
  15. System.out.println("zl02消息:" + msg);
  16. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  17. } catch (Exception e) {
  18. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
  19. }
  20. }
  21. }
5.2.3. Topics通配符模式

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配0个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert

创建RabbitMQ队列与交换机绑定的配置类com..rabbitmq.config.RabbitMQConfig

  1. package com.woniu.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitMQConfig {
  7. // Topics通配符模式
  8. @Bean
  9. public Queue tt01(){
  10. return new Queue("tt01");
  11. }
  12. @Bean
  13. public Queue tt02(){
  14. return new Queue("tt02");
  15. }
  16. @Bean
  17. public TopicExchange topicExchange(){
  18. return new TopicExchange("tte");
  19. }
  20. @Bean
  21. public Binding ttBinding01(){
  22. return BindingBuilder.bind(tt01()).to(topicExchange()).with("#.error");
  23. }
  24. @Bean
  25. public Binding ttBinding02(){
  26. return BindingBuilder.bind(tt02()).to(topicExchange()).with("order.*");
  27. }
  28. }

1、生产方代码实现

使用topic类型的Exchange

  1. package com.woniu.rabbitmq.controller;
  2. import com.woniu.rabbitmq.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.PathVariable;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. @RestController
  9. public class ProducerController {
  10. @RequestMapping("/sendT1MT1/{msg}")
  11. public String sendT1MT1(@PathVariable String msg){
  12. rabbitTemplate.convertAndSend("tte", "11.error", msg);
  13. return "success";
  14. }
  15. @RequestMapping("/sendT1MT2/{msg}")
  16. public String sendT1MT2(@PathVariable String msg){
  17. rabbitTemplate.convertAndSend("tte", "11.22.error", msg);
  18. return "success";
  19. }
  20. @RequestMapping("/sendT1MF/{msg}")
  21. public String sendT1MF(@PathVariable String msg){
  22. rabbitTemplate.convertAndSend("tte", "11.error.22", msg);
  23. return "success";
  24. }
  25. @RequestMapping("/sendT2MF/{msg}")
  26. public String sendT2MF(@PathVariable String msg){
  27. rabbitTemplate.convertAndSend("tte", "order.11.22", msg);
  28. return "success";
  29. }
  30. @RequestMapping("/sendT2MT/{msg}")
  31. public String sendT2MT(@PathVariable String msg){
  32. rabbitTemplate.convertAndSend("tte", "order.1", msg);
  33. return "success";
  34. }
  35. }

2、消费方实现

  1. package com.woniu.rabbitmq.mq;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.io.IOException;
  8. @Component
  9. @RabbitListener(queues = "tt01")
  10. public class Tt01Listener {
  11. @RabbitHandler
  12. public void getMsg(String msg, Message message, Channel channel) throws IOException {
  13. try {
  14. System.out.println("tt01消息:" + msg);
  15. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  16. } catch (Exception e) {
  17. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
  18. }
  19. }
  20. }
  21. package com.woniu.rabbitmq.mq;
  22. import com.rabbitmq.client.Channel;
  23. import org.springframework.amqp.core.Message;
  24. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  25. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  26. import org.springframework.stereotype.Component;
  27. import java.io.IOException;
  28. @Component
  29. @RabbitListener(queues = "tt02")
  30. public class Tt02Listener {
  31. @RabbitHandler
  32. public void getMsg(String msg, Message message, Channel channel) throws IOException {
  33. try {
  34. System.out.println("tt02消息:" + msg);
  35. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  36. } catch (Exception e) {
  37. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
  38. }
  39. }
  40. }

创建2个消费方并启动,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。

RabbitMQ高级


1.消息的可靠投递


在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式

  • return 退回模式

rabbitmq 整个消息投递的路径为:producer--->rabbitmq broker--->exchange--->queue--->consumer

  • 消息从 producer 到 exchange,不管exchange是否收到生产者消息,都会返回一个 confirmCallback 。

  • 消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

1.1.confirmCallback确认模式

1.引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2.在配置文件中 添加publisher-confirm-type: correlated配置

  1. spring:
  2. rabbitmq:
  3. host: localhost
  4. port: 5672
  5. username: woniu
  6. password: woniu
  7. virtual-host: /woniu
  8. publisher-confirm-type: correlated
  9. publisher-returns: true
在springboot2.2.0.RELEASE版本之前(spring.rabbitmq.publisher-confirm发布确认属性配置)是amqp正式支持的属性,用来配置消息发送到交换器之后是否触发回调方法,在2.2.0及之后该属性过期使用spring.rabbitmq.publisher-confirm-type属性配置代替,用来配置更多的确认类型;
NONE值是禁用发布确认模式,是默认值; CORRELATED值是发布消息成功到交换器后会触发回调方法; SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。

3.RabbitConfig增加代码

由于spring的Bean默认都是单例的,这个RabbitTemplate也不例外,既然每个RabbitTemplate对象只支持一个回调,那我们就在该Bean放入spring容器把该RabbitTemplate设置为原型的(也就是@Scope=“prototype”),RabbitTemplate 设置成多列模式,每次bean都是新的, 代码如下

  1. @Bean
  2. @Scope("prototype")
  3. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  4. RabbitTemplate template = new RabbitTemplate(connectionFactory);
  5. template.setMandatory(true);
  6. template.setMessageConverter(new SerializerMessageConverter());
  7. return template;
  8. }
注意:如果在controller中调用,还需要再controller添加
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

4、编写ConsumerController类

  1. package com.woniu.springconsumer.controller;
  2. import okhttp3.OkHttpClient;
  3. import okhttp3.Request;
  4. import okhttp3.Response;
  5. import org.apache.commons.httpclient.HttpClient;
  6. import org.apache.commons.httpclient.methods.GetMethod;
  7. import org.springframework.amqp.core.Message;
  8. import org.springframework.amqp.rabbit.connection.CorrelationData;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.beans.factory.config.ConfigurableBeanFactory;
  12. import org.springframework.cloud.client.ServiceInstance;
  13. import org.springframework.cloud.client.discovery.DiscoveryClient;
  14. import org.springframework.context.annotation.Scope;
  15. import org.springframework.web.bind.annotation.PathVariable;
  16. import org.springframework.web.bind.annotation.RequestMapping;
  17. import org.springframework.web.bind.annotation.RestController;
  18. import org.springframework.web.client.RestTemplate;
  19. import java.io.IOException;
  20. import java.util.List;
  21. @RestController
  22. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  23. public class ConsumerController {
  24. @Autowired
  25. private RabbitTemplate rabbitTemplate;
  26. @RequestMapping("/sendD2M/{msg}")
  27. public String sendD2M(@PathVariable String msg){
  28. /** 步骤:
  29. * 确认模式: 生产者把消息发送给交换机,交换机收到消息的确认
  30. * 1、确认模式的开启:publisher-confirm-type: correlated
  31. * 2、在rabbitTemplate定义confirmCallback回调函数
  32. */
  33. //定义回调
  34. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  35. /**
  36. * @param correlationData 相关配置信息
  37. * @param ack 表示交换机是否成功收到生产者发送的 消息,true成功,false 失败
  38. * @param s 失败原因,如果成功则该参数值为“”
  39. */
  40. @Override
  41. public void confirm(CorrelationData correlationData, boolean ack, String s) {
  42. System.out.println("confirm方法被执行.....");
  43. if(b){
  44. System.out.println("接收成功消息" + s);
  45. }else{
  46. //接收失败
  47. System.out.println("接收失败消息" + s);
  48. //做一些处理,让消息再次发送。
  49. }
  50. }
  51. });
  52. //该交换机zle01不存在,故回调方法的ack为false
  53. rabbitTemplate.convertAndSend("zle", "02", msg);
  54. return "消息发送成功";
  55. }
  56. }

1.2.ReturnCallBack确认模式

在上个例子的基础上,再添加一个测试方法testReturn

  1. package com.woniu.springconsumer.controller;
  2. import okhttp3.OkHttpClient;
  3. import okhttp3.Request;
  4. import okhttp3.Response;
  5. import org.apache.commons.httpclient.HttpClient;
  6. import org.apache.commons.httpclient.methods.GetMethod;
  7. import org.springframework.amqp.core.Message;
  8. import org.springframework.amqp.rabbit.connection.CorrelationData;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.beans.factory.config.ConfigurableBeanFactory;
  12. import org.springframework.cloud.client.ServiceInstance;
  13. import org.springframework.cloud.client.discovery.DiscoveryClient;
  14. import org.springframework.context.annotation.Scope;
  15. import org.springframework.web.bind.annotation.PathVariable;
  16. import org.springframework.web.bind.annotation.RequestMapping;
  17. import org.springframework.web.bind.annotation.RestController;
  18. import org.springframework.web.client.RestTemplate;
  19. import java.io.IOException;
  20. import java.util.List;
  21. @RestController
  22. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  23. public class ConsumerController {
  24. @Autowired
  25. private RabbitTemplate rabbitTemplate;
  26. @RequestMapping("/sendD2M/{msg}")
  27. public String sendD2M(@PathVariable String msg){
  28. /**
  29. * 回退模式:当消息发送给Exchange,Exchange路由到Queue失败时,才执行ReturnCallback
  30. * 步骤
  31. * 1、开启回退模式:publisher-returns="true"
  32. * 2、设置ReturnCallBack
  33. * 3、设置Exchange处理消息的模式
  34. * 1、如果消息没有路由到queue,则丢弃消息(这是默认规则)
  35. * 2、如果消息没有路由到queue,返回给消息发送方ReturnCallBack
  36. * */
  37. //设置 ReturnCallBack
  38. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  39. /**
  40. *
  41. * @param message 消息对象
  42. * @param replyCode 错误码
  43. * @param replyText 错误信息
  44. * @param exchange 交换机名称
  45. * @param routingKey 路由键
  46. */
  47. @Override
  48. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  49. System.out.println("return 执行了....");
  50. System.out.println(message);
  51. System.out.println(replyCode);
  52. System.out.println(replyText);
  53. System.out.println(exchange);
  54. System.out.println(routingKey);
  55. //处理
  56. }
  57. });
  58. rabbitTemplate.convertAndSend("zle", "022", msg);
  59. return "消息发送成功";
  60. }
  61. }
由于路由键不正确 022,故交换机的消息无法发送到消息队列,setReturnCallback()方法,也就是Exchange路由到Queue失败时执行,这个前提是必须设置 rabbitTemplate.setMandatory(true);如果不加这句话,意味着交换机处理消息模式采用默认的模式,模式模式是直接丢掉该消息,不会执行setReturnCallback()方法。 当然如果交换机发送消息到队列,如果成功了也不会执行该方法,因为setReturnCallback是交换机发送消息到队列失败才执行的。

消息的可靠投递小结

设置ConnectionFactory的publisher-confirms="true" 开启 确认模式。使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

设置ConnectionFactory的publisher-returns="true" 开启 退回模式。使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。使用channel下列方法,完成事务控制: txSelect(), 用于将当前channel设置成transaction模式 txCommit(),用于提交事务 txRollback(),用于回滚事务

2.Consumer ACK


ack指Acknowledge(翻译为:应答),表示消费端收到消息后的确认方式。有三种确认方式:

  • 自动确认:acknowledge="none"

  • 手动确认:acknowledge="manual"

  • 根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动确认,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

2.1 消费方工程搭建

1.引入相关依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2.在配置文件中 添加手动确认的配置

  1. spring:
  2. rabbitmq:
  3. listener:
  4. direct:
  5. acknowledge-mode: manual
  6. simple:
  7. acknowledge-mode: manual

3.编写Ack监听器

  1. package com.woniu.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.connection.ChannelListener;
  5. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  6. /**
  7. * Consumer ACK机制:
  8. * 1、设置手动签收,在 listener-container容器中 添加 acknowledge = ”manual“
  9. * 2. @RabbitListener(queues = "zl01")和 @RabbitHandler
  10. * 3、如果消息成功处理,则调用channel的basicAck()签收
  11. * 4、如果消息处理失败,则调用channel的basicNack()拒绝签收,broker会重新发送消息给consumer
  12. */
  13. @Component
  14. @RabbitListener(queues = "zl01")
  15. public class Zl01Listener {
  16. @RabbitHandler
  17. public void getMsg(String msg, Message message, Channel channel) throws IOException {
  18. try {
  19. //1、接受转换的消息
  20. //2、处理业务逻辑
  21. System.out.println("zl01消息:" + msg);
  22. int i = 1/ 0;
  23. //3、手动签收
  24. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  25. } catch (Exception e) {
  26. //basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息, 不能批量拒绝消息,第二个参数为true则不丢掉,为false表示丢掉
  27. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  28. }
  29. }
  30. }
basicAck的批量应答问题说明:
channel.basicAck(8,true) 如果前面还有4,6,7的deliveryTag未被确认,则会一起确认,减少网络流量,当然当前deliveryTag=8这条消息也会确认,如果没有前面没有未被确认的消息,则只会确认当前消息,也就是说可以一次性确认某个队列小于等于delivery_tag值的所有消息
basicNack的参数说明:
第一个参数为deliveryTag,也就是每个消息标记index,消息标记值从1开始,依次递增第二个参数为multiple,表示是否批量,如果为true,那么小于或者等于该消息标记的消息(如果还没有签收)都会拒绝签收第三个参数为requeue,表示被拒绝的消息是否重回队列,如果设置为true,则消息重新回到queue,那么broker会重新推送该消息给消费端,如果设置为false,则消息在队列中被删除,即消息会被直接丢失(当然如果为false,还有一种情况就是放到死信队列)

4.测试

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
  3. public class ConsumerTest {
  4. @Test
  5. public void test(){
  6. while(true){}
  7. }
  8. }

启动之前的生产者发送消息给test_queue_confirm队列,如果抛出异常则该消息一直重发

2.2 消息可靠性总结

持久化

  • exchange要持久化

  • queue要持久化

  • message要持久化

生产方确认Confirm消费方确认AckBroker高可用

3.消费端限流


消费端每次从队列中取一部分消息,然后消费者解决完业务处理,当业务处理完之后,消费者采用手动应答的方式,回应消息队列,然后继续取一部分消息处理,实现削峰填谷的效果

例如:多个生产者同时给MQ发送消息10000条,如果不做消费端限流,那么A系统请求瞬间增多 。限流就是让A系统每次从MQ取1000条,然后做业务处理,当处理完后,手动应答队列,然后队列在发1000条处理,反复10次即可处理完请求。

1.在消费端工程的配置文件配置如下

  1. spring:
  2. rabbitmq:
  3. listener:
  4. direct:
  5. prefetch: 1
rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息,消费端的确认模式一定为手动确认。acknowledge="manual"

2.编写监听器

  1. package com.woniu.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * Consumer 限流机制
  8. * 1、ack机制为手动确认
  9. * 2、listener-container配置属性
  10. * prefetch = 1 表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
  11. */
  12. @Component
  13. public class QosListener implements ChannelAwareMessageListener {
  14. @Override
  15. public void onMessage(Message message, Channel channel) throws Exception {
  16. Thread.sleep(1000);
  17. //1、获取消息
  18. System.out.println(new String(message.getBody()));
  19. //2、处理业务逻辑
  20. //3、签收
  21. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  22. }
  23. }

3.启动消费端工程

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
  3. public class ConsumerTest {
  4. @Test
  5. public void test(){
  6. while(true){}
  7. }
  8. }

4.在生产者方,添加一个测试方法,给test_queue_confirm队列发送消息

  1. //限流测试
  2. @Test
  3. public void testSendQos(){
  4. for (int i = 0; i <10 ; i++) {
  5. rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm ...");
  6. }
  7. }
注意:每次从队列中取一条处理,然后手动应答,如果注释掉手动应答(签收),那么消费者在第一次取一条消息后,不会从队列取消息了,因为这个时候队列的状态是Unacked(表示有一条为签收),ready为9,表示队列还有9条消息
如果发送到消费者的消息一直不确认,或者很长时间才确认,只有等到A消费者与rabbitmq的连接中断,rabbitmq才会考虑将A消费者未确认的消息重新投递给另一个消费者,而且每次也不会拉取固定的n条消息。确认多少则拉取多少

4.TTL消息生命周期


TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。当消息超过过期时间还没有被消费,则丢弃

4.1 图形化设置

1、添加交换机

2、添加队列,设置队列的过期时间

3、交换机和消息队列的绑定

交换机发送消息

4.2 代码实现

代码实现:由于ttl表示消息在队列的存活时间,所以在生产者工程操作

1.配置文件

  1. <!--ttl配置-->
  2. <rabbit:queue id="test_queue_ttl" name="test_queue_ttl">
  3. <!--设置 queue的参数-->
  4. <rabbit:queue-arguments>
  5. <!--队列的消息 存活时间为10秒-->
  6. <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
  7. </rabbit:queue-arguments>
  8. </rabbit:queue>
  9. <rabbit:topic-exchange name="test_exchange_ttl">
  10. <rabbit:bindings>
  11. <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
  12. </rabbit:bindings>
  13. </rabbit:topic-exchange>

2.测试

情况1:发给test_queue_ttl 队列的消息统一设置过期时间,交换机发给 test_queue_ttl 队列后,10秒后,10条消息消失

  1. @Test
  2. public void testTtl(){
  3. for (int i = 0; i <10 ; i++) {
  4. rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.haha"+i,"message ttl");
  5. }
  6. }

情况2:某条消息单独设置过期时间

  1. @Test
  2. public void testTtl(){
  3. MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
  4. @Override
  5. public Message postProcessMessage(Message message) throws AmqpException {
  6. //刚才我们在配置文件设置的队列的消息是10秒,这里是5秒,注意:以时间短的为准
  7. message.getMessageProperties().setExpiration("5000"); //消息的过期时间
  8. return message;//消息一定要返回
  9. }
  10. };
  11. rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.haha","message haha",messagePostProcessor);
  12. }

情况3:发送给队列的n条信息中,单独给某个消息设置过期

  1. @Test
  2. public void testTtl(){
  3. MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
  4. @Override
  5. public Message postProcessMessage(Message message) throws AmqpException {
  6. //刚才我们在配置文件设置的队列的消息是10秒,这里是5秒,注意:以时间短的为准
  7. message.getMessageProperties().setExpiration("5000"); //消息的过期时间
  8. return message;//消息一定要返回
  9. }
  10. };
  11. for (int i = 0; i < 10; i++) {
  12. if(i==5){
  13. rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.haha","message haha",messagePostProcessor); }else{
  14. rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.haha","message haha");
  15. }
  16. }
  17. }
情况3:当i == 5 时,也就是给第五条消息设置过期时间是5秒,其它的还是10秒,发现失效,这里要注意一点,由于这条消息发送给队列的时候不是在队列的头部,故不会单独判断,而是和其它队列一样,10秒钟就消失,可以改成i==0,则第一条消息是5秒过期,或者i<3,即队列的头三条都是5秒的时间。
1、多条消息,要单独设置过期时间,则从第一条消息开始设置
2、配置文件和代码都设置了过期时间,以时间短的为准

5. 死信队列


死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息在队列成为Dead message后,通过该队列把这条死信消息发给另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况(面试常问):

  • 队列消息长度到达限制;

  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  • 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机:给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

5.1 死信队列实现过程

1、配置交换机和队列

  1. <!--1、声明正常队列(test_queue_dlx)和交换机(test_exchange_dlx)-->
  2. <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
  3. <!--1、正常队列绑定死信交换机-->
  4. <rabbit:queue-arguments>
  5. <!--1.1 x-dead-letter-exchange 死信交换机的名称-->
  6. <entry key="x-dead-letter-exchange" value="exchange_dlx" />
  7. <!--1.2 x-dead-letter-routing-key 正常队列发送消息到死信 交换机的routingKey-->
  8. <!--注意:这个routingKey和死信交换机发送消息到死信队列 匹配一致 dlx.# 能匹配到 dlx.hehe-->
  9. <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
  10. <!--2 消息成为死信的三种情况 -->
  11. <!-- 2.1 设置队列的过期时间 ttl x-message-ttl -->
  12. <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
  13. <!--2.2 设置队列的长度限制 x-max-length-->
  14. <entry key="x-max-length" value-type="java.lang.Integer" value="10" />
  15. <!--2.3 消费者拒接消费消息,并且不重回队列 这种情况后面在消费工程测试-->
  16. </rabbit:queue-arguments>
  17. </rabbit:queue>
  18. <rabbit:topic-exchange name="test_exchange_dlx">
  19. <rabbit:bindings>
  20. <!--正常交换机发给正常队列的routingkey-->
  21. <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
  22. </rabbit:bindings>
  23. </rabbit:topic-exchange>
  24. <!--2、声明死信队列(queue_dlx)和死信交换机(exchange_dlx)-->
  25. <rabbit:queue name="queue_dlx"></rabbit:queue>
  26. <rabbit:topic-exchange name="exchange_dlx">
  27. <rabbit:bindings>
  28. <!--pattern 死信交换机发送给死信队列的 routingkey-->
  29. <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
  30. </rabbit:bindings>
  31. </rabbit:topic-exchange>

2、生产者工程测试:

  1. //死信队列测试
  2. @Test
  3. public void testDlx(){
  4. //1、测试过期时间,死信消息
  5. //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗");
  6. //2、测试队列长度限制,消息死信
  7. for (int i = 0; i < 20 ; i++) {
  8. rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗");
  9. }
  10. //前两步测试结果:死信队列会有21条记录 1(过期) + 10(限制)+10(正常队列过期后的10条)
  11. }

3、消息成为死信的第三种情况实现

1).打开消费端工程

  1. <!--定义监听器容器-->
  2. <!--acknowledge="manual" 表示手动应答,prefetch = 1 表示每次从队列取1条,当然也可以prefetch = 10
  3. 每次取10条-->
  4. <rabbit:listener-container connection-factory="connectionFactory"
  5. acknowledge="manual" prefetch = "1">
  6. <!--<rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />-->
  7. <!--<rabbit:listener ref="qosListener" queue-names="test_queue_confirm" />-->
  8. <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx" />
  9. </rabbit:listener-container>

2).添加正常队列的监听器

  1. package com.woniu.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  5. @Component
  6. public class DlxListener implements ChannelAwareMessageListener {
  7. @Override
  8. public void onMessage(Message message, Channel channel) throws Exception {
  9. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  10. try {
  11. System.out.println(new String(message.getBody()));
  12. System.out.println("处理业务逻辑");
  13. int m = 1/0;
  14. channel.basicAck(deliveryTag,true);
  15. }catch(Exception ex){
  16. /** basicNack(long deliveryTag, boolean multiple, boolean requeue)
  17. * multiple是否批量. true:将一次性拒绝所有小于或者等于deliveryTag的消息。
  18. * requeue:被拒绝的消息是否重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端,如果 * 为 requeue=false,不重回队列,则消息发送最终到死信队列
  19. */
  20. channel.basicNack(deliveryTag,true,false);
  21. }
  22. }
  23. }

3).在生产端的testDlx方法再次给正常交换机发送消息

  1. //死信队列测试
  2. @Test
  3. public void testDlx(){
  4. rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
  5. }

6. 延迟队列


延迟队列,即消息进入队列后不会立即被消费者调用,只有到达指定时间后,才会被调用者调用消费。

如下需求:

1) 下单后,30分钟未支付,取消订单,回滚库存。

当用户提交订单后,数据库保存订单信息,同时库存表相应的库存减少,然后消息队列保存订单的信息(如订单Id),此时库存系统监听队列,队列不会把消息立刻发送给库存,而是过30分钟再把信息发送给库存系统,库存系统去查询订单数据库,根据订单id查询,如果该订单还没有支付,则取消订单,回滚库存,如果支付过了,则库存表什么都不用做。也就是给用户30分钟的机会,一个订单在30分钟后还没有支付,则该订单的库存信息直接回滚。

2) 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器:我们可以写一段代码,在某个时间段查询订单表的支付情况。把提交订单的时间查出来和当前系统时间比较,30分钟之类如果订单状态为支付,则取消该订单,大家思考一下有什么问题?

  1. 延迟队列:很可惜,在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

6.1 延迟队列实现过程

1、延迟队列配置文件创建

  1. <!--延迟队列
  2. 1、定义正常交换机(order_exchange)和队列(order_queue)
  3. 2、定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
  4. 3、绑定,设置正常队列过期时间为30分钟
  5. -->
  6. <!--1、定义正常交换机(order_exchange)和队列(order_queue)-->
  7. <rabbit:queue name="order_queue" id="order_queue">
  8. <rabbit:queue-arguments>
  9. <!--3、绑定,设置正常队列过期时间为30分钟。这里我们时间设置10秒 为了测试-->
  10. <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
  11. <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
  12. <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
  13. </rabbit:queue-arguments>
  14. </rabbit:queue>
  15. <rabbit:topic-exchange name="order_exchange">
  16. <rabbit:bindings>
  17. <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
  18. </rabbit:bindings>
  19. </rabbit:topic-exchange>
  20. <!--2、定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
  21. <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
  22. <rabbit:topic-exchange name="order_exchange_dlx">
  23. <rabbit:bindings>
  24. <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
  25. </rabbit:bindings>
  26. </rabbit:topic-exchange>

2、消费者监听死信队列配置

  1. <!--定义监听器容器-->
  2. <!--acknowledge="manual" 表示手动应答,prefetch = 1 表示每次从队列取1条, -->
  3. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch = "1">
  4. <!--<rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />-->
  5. <!--<rabbit:listener ref="qosListener" queue-names="test_queue_confirm" />-->
  6. <!-- <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx" />-->
  7. <!--注意这个地方 是监听死信队列的 -->
  8. <rabbit:listener ref="orderListener" queue-names="order_queue_dlx" />
  9. </rabbit:listener-container>

3、消费方监听类

  1. package com.woniu.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  5. @Component
  6. public class OrderListener implements ChannelAwareMessageListener {
  7. @Override
  8. public void onMessage(Message message, Channel channel) throws Exception {
  9. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  10. try {
  11. //1.接收转换消息
  12. System.out.println(new String(message.getBody()));
  13. //2. 处理业务逻辑
  14. System.out.println("处理业务逻辑...");
  15. System.out.println("根据订单id查询其状态...");
  16. System.out.println("判断状态是否为支付成功");
  17. System.out.println("取消订单,回滚库存....");
  18. //3. 手动签收
  19. channel.basicAck(deliveryTag,true);
  20. } catch (Exception e) {
  21. System.out.println("出现异常,拒绝接受");
  22. //4.拒绝签收,不重回队列 requeue=false
  23. channel.basicNack(deliveryTag,true,false);
  24. }
  25. }
  26. }

4、生产者测试类

  1. //延迟队列测试
  2. @Test
  3. public void testDelay() throws InterruptedException {
  4. //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
  5. rabbitTemplate.convertAndSend("order_exchange","order",
  6. "订单信息:id=1,time=2021年1月1日16:41:47");
  7. //2.打印倒计时10秒 10秒后 消息发送到死信队列,而监听器OrderListener是监听死信队列的
  8. for (int i = 10; i > 0 ; i--) {
  9. System.out.println(i+"...");
  10. Thread.sleep(1000);
  11. }
  12. }

5、启动消费者测试程序

7. 日志与监控


7.1 RabbitMQ日志

RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log

日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。

7.2 rabbitmq常用命令

1、查看队列

  1. rabbitmqctl list_queues #查看所有虚拟主机里面的队列
  2. rabbitmqctl list_queues -p /vhost #查看某个虚拟主机里面的队列

2、删除所有队列

  1. rabbitmqctl stop_app #关闭应用
  2. rabbitmqctl reset #清除队列中的消息
  3. rabbitmqctl start_app # 再次启动此应用
注意:此方式,会同时删除一些配置信息,需要慎用

3、查看rabbitmq中的交换机

rabbitmqctl list_exchanges [-p  vhost]

4、rabbitmq的用户操作命令

  1. rabbitmqctl list_users
  2. rabbitmqctl add_user 用户名 密码
  3. rabbitmqctl delete_user 用户名

5、查看未被确认的队列

rabbitmqctl list_queues  name messages_unacknowledged

6、查看队列环境变量

rabbitmqctl environment

7、查看队列消费者信息

rabbitmqctl list_consumers

8、查看队列连接

rabbitmqctl list_connections

9、查看准备就绪的队列

rabbitmqctl list_queues name messages_ready

10、查看单个队列的内存使用

rabbitmqctl list_queues name memory

11、列出所有虚拟主机

  1. rabbitmqctl list_vhosts
  2. rabbitmqctl status | grep rabbit ##查看rabbitmq的版本

8 消息追踪


在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。

对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。

在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。

8.1 消息追踪-Firehose(了解)

firehose的机制是将生产者投递给队列的消息,以及队列投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际交换机和队列的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。

1、打开trace 功能

rabbitmqctl trace_on [-p vhost]     ##开启Firehose命令
打开 trace 会影响消息写入功能,适当打开后请关闭,关闭Firehose命令:rabbitmqctl trace_off [-p vhost],打开后会多一个交换机,如下图

2、新建一个消息队列,并给该交换机绑定一个消息队列

3、打开任何一个其他的队列,并往队列发送一条消息,则这个test_trace队列也会有其他队列的消息

8.2 消息追踪-rabbitmq_tracing

rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。

1、启用插件:

  1. [root@localhost ~]# rabbitmq-plugins list ###查询插件
  2. [root@localhost ~]# rabbitmq-plugins enable rabbitmq_tracing

1、新建一个trace,将来所有的消息都被trace保存起来,文件的默认路径为/var/tmp/rabbitmq-tracing

不管在哪个队列发送消息,都会保存到日志文件mytrace.log中

如果是用其它的用户创建这个消息日志。则需要在/etc/rabbitmq/rabbit.config配置文件添加如下内容:创建的用户名和密码

  1. {rabbitmq_tracing,
  2. [
  3. {directory, "/var/log/rabbitmq/rabbitmq_tracing"},
  4. {username, "woniu"},
  5. {password, "woniu"}
  6. ]
  7. }

重启消息队列服务器即可

9.RabbitMQ应用问题


消息可靠性保障、消息幂等性处理 、微服务中用消息队列实现微服务的异步调用,而用openfeign采用的同步

9.1 消息可靠性保障-消息补偿

  • 消息补偿机制

需求:100%确保消息发送成功

9.2 消息幂等性保障-乐观锁(了解)

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

  • 乐观锁解决方案

第一次生产者发送一条消息,但是消费方系统宕机,即不能立即消费,于是回调检查服务监听不到Q2的响应消息,也不会写入数据库MDB,当隔一段时间后,生产者又发送一条延迟消息到Q3队列,回调检查服务能监听到Q3队列消息,于是和MDB去比较是否有,由于消费方的失败,消息最终没有入库MDB,这个时候回调检查服务和MDB数据库比对失败,于是通知生产者,重新发送一条消息给消费者,那么这个时候Q1就有2条消息了,当消费方正常运行的时候,由于监听的Q1是两条2消息,怎么办呢?乐观锁

第一次执行:version=1update account set money = money - 500 , version = version + 1 where id = 1 and version = 1

第二次执行:version=2update account set money = money - 500 , version = version + 1 where id = 1 and version = 1

9.3 消息积压问题

实际场景可能有这样现象:大量消息在rabbitmq里积压了几个小时了还没消息,怎么办?

这种时候只好采用 “丢弃+批量重导” 的方式来解决了,临时写个程序,连接到mq里面消费数据,收到消息之后直接将其丢弃,快速消费掉积压的消息,降低MQ的压力。或者多启几个消费端。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号