当前位置:   article > 正文

高性能消息中间件 RabbitMQ_mq消息性能

mq消息性能

一、RabbitMQ概念

1.1 MQ是什么

消息队列

MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信

  • 同步通信相当于两个人当面对话,你一言我一语。必须及时回复:

  • 异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系。

消息

两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。

队列

数据结构中概念。在队列中,数据先进先出,后进后出。

 1.2  MQ的优势

应用解耦

在电商平台中,用户下订单需要调用订单系统,此时订单系统还需要调用库存系统、支付系统、物流系统完成业务。此时会产生两个问题:

  1. 如果库存系统出现故障,会造成整个订单系统崩溃。
  2. 如果需求修改,新增了一个X系统,此时必须修改订单系统的代码。

如果在系统中引入MQ,即订单系统将消息先发送到MQ中,MQ再转发到其他系统,则会解决以下问题:

  1. 由于订单系统只发消息给MQ,不直接对接其他系统,如果库存系统出现故障,不影响整个订单。
  2. 如果需求修改,新增了一个X系统,此时无需修改订单系统的代码,只需修改MQ将消息发送给X系统即可。

异步提速

如果订单系统同步访问每个系统,则用户下单等待时长如下:

削峰填谷

假设我们的系统每秒只能承载1000请求,如果请求瞬间增多到每秒5000,则会造成系统崩溃。此时引入mq即可解决该问题

 使用了MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

1.3 MQ的劣势 

  • 系统可用性降低 系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
  • 系统复杂度提高 MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
  • 一致性问题 A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败,则会造成数据处理的不一致。

 1.4 MQ的应用场景

  • 抢红包、秒杀活动、抢火车票等

这些业务场景都是短时间内需要处理大量请求,如果直接连接系统处理业务,会耗费大量资源,有可能造成系统瘫痪。

 而使用MQ后,可以先让用户将请求发送到MQ中,MQ会先保存请求消息,不会占用系统资源,且MQ会进行消息排序,先请求的秒杀成功,后请求的秒杀失败。

  • 消息分发

如电商网站要推送促销信息,该业务耗费时间较多,但对时效性要求不高,可以使用MQ做消息分发。

  • 数据同步

假如我们需要将数据保存到数据库之外,还需要一段时间将数据同步到缓存(如Redis)、搜索引擎(如Elasticsearch)中。此时可以将数据库的数据作为消息发送到MQ中,并同步到缓存、搜索引擎中。

  • 异步处理

在电商系统中,订单完成后,需要及时的通知子系统(进销存系统发货,用户服务积分,发送短信)进行下一步操作。为了保证订单系统的高性能,应该直接返回订单结果,之后让MQ通知子系统做其他非实时的业务操作。这样能保证核心业务的高效及时。

  • 离线处理

在银行系统中,如果要查询近十年的历史账单,这是非常耗时的操作。如果发送同步请求,则会花费大量时间等待响应。此时使用MQ发送异步请求,等到查询出结果后获取结果即可。

 1.5 AMQP协议

 RabbitMQ是由Erlang语言编写的基于AMQP的MQ产品。

AMQP

即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,专门为消息中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受不同中间件产品,不同开发语言等条件的限制。2006年AMQP规范发布,类比HTTP。

AMQP工作过程

生产者(Publisher)将消息发布到交换机(Exchange),交换机根据规则将消息分发给交换机绑定的队列(Queue),队列再将消息投递给订阅了此队列的消费者。

 就好比是报纸印刷厂(Publisher)将印刷出来的报纸交给报社(Exchange),报社再将报纸交给不同的邮递员(Queue),邮递员再将报纸交给用户(Consumer)。

1.6 RabbitMQ工作原理

  • Producer

    消息的生产者。也是一个向交换机发布消息的客户端应用程序。也就是Java代码。

  • Connection

    连接。生产者/消费者和RabbitMQ服务器之间建立的TCP连接。

  • Channel

    信道。是TCP里面的虚拟连接。例如:Connection相当于电缆,Channel相当于独立光纤束,一条TCP连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。

  • Broker

    消息队列服务器实体。即RabbitMQ服务器

  • Virtual host

    虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制。当多个不同的用户使用同一个RabbitMQ服务器时,可以划分出多个虚拟主机。RabbitMQ默认的虚拟主机路径是/

  • Exchange

    交换机。用来接收生产者发送的消息,并根据分发规则,将这些消息分发给服务器中的队列中。不同的交换机有不同的分发规则。

  • Queue

    消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。消息一直在队列里面,等待消费者链接到这个队列将其取走。

  • Binding

    消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定信息保存到交换机的路由表中,作为消息的分发依据。

  • Consumer

    消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。 

(面试)RabbitMQ为什么使用信道而不直接使用TCP连接通信?

TCP连接的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。高峰时每秒成千上万条TCP连接的创建会造成资源巨大的浪费。而且操作系统每秒处理TCP连接数也是有限制的,会造成性能瓶颈。而如果一条线程使用一条信道,一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

 二、RabbitMQ安装

 2.1 安装Erlang

RabbitMQ是使用Erlang语言编写的,所以在安装RabbitMQ前需要先安装Erlang环境 .

1、安装Erlang所需的依赖

yum install -y epel-release

2、添加存储库条目

  1. wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
  2. rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

3、安装Erlang

yum install -y erlang24.2.1

4、查看Erlang是否安装成功

erl -version

2.2 安装RabbitMQ并启动

1、为了外部能够正常访问RabbitMQ服务,先关闭防火墙

  1. # 关闭运行的防火墙
  2. systemctl stop firewalld.service
  3. # 禁止防火墙自启动
  4. systemctl disable firewalld.service

2、RabbitMQ是通过主机名进行访问的,必须给服务器添加主机名

  1. # 修改文件
  2. vim /etc/sysconfig/network
  3. # 添加如下内容
  4. NETWORKING=yes
  5. HOSTNAME=zj
  6. # 修改文件
  7. vim /etc/hosts
  8. # 添加如下内容
  9. 服务器ip zj

3.使用xftp上传RabbitMQ压缩文件到根目录下

4.安装RabbitMQ

  1. # 解压RabbitMQ
  2. tar xf rabbitmq-server-generic-unix-3.9.13.tar.xz
  3. # 重命名:
  4. mv rabbitmq_server-3.9.13 rabbitmq
  5. # 移动文件夹:
  6. mv rabbitmq /usr/local/

5.配置环境变量

  1. # 编辑/etc/profile文件
  2. vim /etc/profile
  3. #添加如下内容
  4. export PATH=$PATH:/usr/local/rabbitmq/sbin
  5. # 运行文件,让修改内容生效
  6. source /etc/profile

6.开启管控台插件

rabbitmq-plugins enable rabbitmq_management

7.后台运行

  1. #启动rabbitmq
  2. rabbitmq-server -detached
  3. #停止rabbitmq
  4. rabbitmqctl stop

8.通过管控台访问RabbitMQ

路径:http://ip地址:15672,用户名:guest,密码:guest

管控台的端口是15672,MQ的端口是5672

 9.此时会提示guest账户只允许本地使用,我们可以配置允许使用guest远程访问

  1. # 创建配置文件夹
  2. mkdir -p /usr/local/rabbitmq/etc/rabbitmq
  3. # 创建配置文件
  4. vim /usr/local/rabbitmq/etc/rabbitmq/rabbitmq.conf
  5. # 添加如下内容
  6. loopback_users=none
  7. # 重启RabbitMQ
  8. rabbitmqctl stop_app
  9. rabbitmqctl reset
  10. rabbitmqctl start_app

此时就能进入控制台了。

 2.3 账户管理

guest账户默认只允许本地使用,我们可以创建新账户远程访问RabbitMQ(如2.2中),但是不推荐远程使用MQ.

1、创建账户

  1. # 创建账户
  2. rabbitmqctl add_user MQzhang(用户名) MQzhang(密码)

2、给用户授予管理员角色

rabbitmqctl set_user_tags 用户名 administrator

3、给用户授权

  1. # "/"表示虚拟机
  2. # zj表示用户名
  3. # ".*" ".*" ".*" 表示完整权限
  4. rabbitmqctl set_permissions -p "/" MQzhang".*" ".*" ".*"

4、通过管控台访问rabbitmq即可。

2.4 管控台

2.5 Docker安装

 1、关闭RabbitMQ服务

rabbitmqctl stop

2、在Centos7中安装docker

  1. # 安装Docker
  2. curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
  3. # 启动docker
  4. systemctl start docker

3、拉取镜像

docker pull rabbitmq

4、启动MQ

docker run -d --hostname zj--name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

三、RabbitMQ工作模式

RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用)

3.1 简单模式

简介

特点

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。

 项目搭建

接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行工作。 

JMS

由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。

1、启动RabbitMQ

  1. # 开启管控台插件
  2. rabbitmq-plugins enable rabbitmq_management
  3. # 启动rabbitmq
  4. rabbitmq-server -detached

2、创建普通maven项目,添加RabbitMQ依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>5.14.0</version>
  6. </dependency>
  7. </dependencies>

3、编写生产者

  1. package com.zj.mq.Simple;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /*生产者*/
  8. public class Producer {
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. //1.创建连接工厂
  11. ConnectionFactory connectionFactory = new ConnectionFactory();
  12. connectionFactory.setHost("192.168.66.100");
  13. connectionFactory.setPort(5672);
  14. connectionFactory.setUsername("MQzhang");
  15. connectionFactory.setPassword("MQzhang");
  16. connectionFactory.setVirtualHost("/");
  17. //2.创建连接
  18. Connection connection = connectionFactory.newConnection();
  19. //3.建立信道
  20. Channel channel = connection.createChannel();
  21. //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
  22. /* 参数1:队列名
  23. * 参数2:是否持久化,true表示MQ重启后队列还在。
  24. * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
  25. * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
  26. * 参数5:其他额外参数*/
  27. channel.queueDeclare("simpleQueue",false,false,false,null);
  28. //5.发送消息
  29. String msg ="hello rabbitMQ";
  30. /*
  31. * 参数1:交换机名,""表示默认交换机
  32. * 参数2:路由键,简单模式就是队列名
  33. * 参数3:其他额外参数
  34. * 参数4:要传递的消息字节数组
  35. */
  36. channel.basicPublish("","simpleQueue",null,msg.getBytes());
  37. //6.关闭资源(信道和连接)
  38. channel.close();
  39. connection.close();
  40. System.out.println("OK");
  41. }
  42. }

 4.编写消费者

  1. package com.zj.mq.Simple;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /*消费者*/
  6. public class Consumer {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1.创建连接工厂
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. connectionFactory.setHost("192.168.66.100");
  11. connectionFactory.setPort(5672);
  12. connectionFactory.setUsername("MQzhang");
  13. connectionFactory.setPassword("MQzhang");
  14. connectionFactory.setVirtualHost("/");
  15. //2.创建连接
  16. Connection connection = connectionFactory.newConnection();
  17. //3.创建信道
  18. Channel channel = connection.createChannel();
  19. //4.监听队列(一直在连接不会关闭连接)
  20. /*
  21. * 参数一:监听的队列名
  22. * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
  23. * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
  24. * */
  25. channel.basicConsume("simpleQueue",true,new DefaultConsumer(channel){
  26. @Override
  27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28. String message = new String(body, "UTF-8");
  29. System.out.println("接受消息为:"+message);
  30. }
  31. });
  32. }
  33. }

 消费者随时在监听队列只要队列有消息就会被消费。

3.2 工作队列模式

与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下:

  1. 一个队列对应多个消费者。
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者。

1、编写生产者,并产生大量消息

  1. package com.zj.mq.work;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.MessageProperties;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /*生产者*/
  9. public class Producer {
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. //1.创建连接工厂
  12. ConnectionFactory connectionFactory = new ConnectionFactory();
  13. connectionFactory.setHost("192.168.66.100");
  14. connectionFactory.setPort(5672);
  15. connectionFactory.setUsername("MQzhang");
  16. connectionFactory.setPassword("MQzhang");
  17. connectionFactory.setVirtualHost("/");
  18. //2.创建连接
  19. Connection connection = connectionFactory.newConnection();
  20. //3.建立信道
  21. Channel channel = connection.createChannel();
  22. //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
  23. /* 参数1:队列名
  24. * 参数2:是否持久化,true表示MQ重启后队列还在。
  25. * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
  26. * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
  27. * 参数5:其他额外参数*/
  28. channel.queueDeclare("WorkQueue",false,false,false,null);
  29. //5.发送大量消息
  30. for (int i = 0; i < 100; i++) {
  31. /*
  32. * 参数1:交换机名,""表示默认交换机
  33. * 参数2:路由键,简单模式就是队列名
  34. * 参数3:表示该消息是持久化消息,即保存到内存也会保存到磁盘
  35. * 参数4:要传递的消息字节数组
  36. */
  37. channel.basicPublish("","WorkQueue", MessageProperties.PERSISTENT_TEXT_PLAIN,("这是第"+i+"个消息").getBytes());
  38. }
  39. //6.关闭资源(信道和连接)
  40. channel.close();
  41. connection.close();
  42. }
  43. }

 2.编写消费者

 编写三个消费者,他们都监听的是一个队列。

  1. package com.zj.mq.work;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /*消费者*/
  6. public class Consumer1 {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1.创建连接工厂
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. connectionFactory.setHost("192.168.66.100");
  11. connectionFactory.setPort(5672);
  12. connectionFactory.setUsername("MQzhang");
  13. connectionFactory.setPassword("MQzhang");
  14. connectionFactory.setVirtualHost("/");
  15. //2.创建连接
  16. Connection connection = connectionFactory.newConnection();
  17. //3.创建信道
  18. Channel channel = connection.createChannel();
  19. //4.监听队列(一直在连接不会关闭连接)
  20. /*
  21. * 参数一:监听的队列名
  22. * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
  23. * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
  24. * */
  25. channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
  26. @Override
  27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28. String message = new String(body, "UTF-8");
  29. System.out.println("消费者1接受消息为:"+message);
  30. }
  31. });
  32. }
  33. }
  1. package com.zj.mq.work;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /*消费者*/
  6. public class Consumer2 {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1.创建连接工厂
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. connectionFactory.setHost("192.168.66.100");
  11. connectionFactory.setPort(5672);
  12. connectionFactory.setUsername("MQzhang");
  13. connectionFactory.setPassword("MQzhang");
  14. connectionFactory.setVirtualHost("/");
  15. //2.创建连接
  16. Connection connection = connectionFactory.newConnection();
  17. //3.创建信道
  18. Channel channel = connection.createChannel();
  19. //4.监听队列(一直在连接不会关闭连接)
  20. /*
  21. * 参数一:监听的队列名
  22. * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
  23. * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
  24. * */
  25. channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
  26. @Override
  27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28. String message = new String(body, "UTF-8");
  29. System.out.println("消费者2接受消息为:"+message);
  30. }
  31. });
  32. }
  33. }
  1. package com.zj.mq.work;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /*消费者*/
  6. public class Consumer3 {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1.创建连接工厂
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. connectionFactory.setHost("192.168.66.100");
  11. connectionFactory.setPort(5672);
  12. connectionFactory.setUsername("MQzhang");
  13. connectionFactory.setPassword("MQzhang");
  14. connectionFactory.setVirtualHost("/");
  15. //2.创建连接
  16. Connection connection = connectionFactory.newConnection();
  17. //3.创建信道
  18. Channel channel = connection.createChannel();
  19. //4.监听队列(一直在连接不会关闭连接)
  20. /*
  21. * 参数一:监听的队列名
  22. * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
  23. * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
  24. * */
  25. channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
  26. @Override
  27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28. String message = new String(body, "UTF-8");
  29. System.out.println("消费者3接受消息为:"+message);
  30. }
  31. });
  32. }
  33. }
  1. 消费者1接受消息为:这是第0个消息
  2. 消费者1接受消息为:这是第3个消息
  3. 消费者1接受消息为:这是第6个消息
  4. 消费者1接受消息为:这是第9个消息
  5. 消费者1接受消息为:这是第12个消息
  6. 消费者1接受消息为:这是第15个消息
  7. 消费者1接受消息为:这是第18个消息
  8. 消费者1接受消息为:这是第21个消息
  9. ……
  10. 消费者2接受消息为:这是第1个消息
  11. 消费者2接受消息为:这是第4个消息
  12. 消费者2接受消息为:这是第7个消息
  13. 消费者2接受消息为:这是第10个消息
  14. 消费者2接受消息为:这是第13个消息
  15. 消费者2接受消息为:这是第16个消息
  16. 消费者2接受消息为:这是第19个消息
  17. 消费者2接受消息为:这是第22个消息
  18. 消费者2接受消息为:这是第25个消息
  19. ……
  20. 消费者3接受消息为:这是第2个消息
  21. 消费者3接受消息为:这是第5个消息
  22. 消费者3接受消息为:这是第8个消息
  23. 消费者3接受消息为:这是第11个消息
  24. 消费者3接受消息为:这是第14个消息
  25. 消费者3接受消息为:这是第17个消息
  26. 消费者3接受消息为:这是第20个消息
  27. 消费者3接受消息为:这是第23个消息
  28. 消费者3接受消息为:这是第26个消息
  29. 消费者3接受消息为:这是第29个消息
  30. 消费者3接受消息为:这是第32个消息

3.3 发布订阅模式 

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe) 

特点

  1. 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
  2. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

 1、编写生产者

  1. package com.zj.mq.publish;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /*生产者*/
  9. public class Producer {
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. //1.创建连接工厂
  12. ConnectionFactory connectionFactory = new ConnectionFactory();
  13. connectionFactory.setHost("192.168.66.100");
  14. connectionFactory.setPort(5672);
  15. connectionFactory.setUsername("MQzhang");
  16. connectionFactory.setPassword("MQzhang");
  17. connectionFactory.setVirtualHost("/");
  18. //2.创建连接
  19. Connection connection = connectionFactory.newConnection();
  20. //3.建立信道
  21. Channel channel = connection.createChannel();
  22. //4.创建交换机fanout
  23. /*
  24. * 参数一:交换机名称
  25. * 参数二:交换机类型
  26. * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
  27. channel.exchangeDeclare("exchangeFanout", BuiltinExchangeType.FANOUT,false);
  28. //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
  29. channel.queueDeclare("mailQueue", false,false,false,null);
  30. channel.queueDeclare("messageQueue", false,false,false,null);
  31. channel.queueDeclare("stationQueue", false,false,false,null);
  32. //6.交换机绑定队列
  33. /*
  34. * 参数一:队列名称
  35. * 参数二:交换机名称
  36. * 参数三:路由关键字,发布订阅模式不存在路由关键字*/
  37. channel.queueBind("mailQueue","exchangeFanout","");
  38. channel.queueBind("messageQueue","exchangeFanout","");
  39. channel.queueBind("stationQueue","exchangeFanout","");
  40. //7.往交换机发送消息
  41. for (int i = 0; i < 10; i++) {
  42. channel.basicPublish("exchangeFanout","",null,("你好,MQ"+i).getBytes());
  43. }
  44. //8.关闭资源
  45. channel.close();
  46. connection.close();
  47. }
  48. }

 2、站内信消费者(其他同理)

  1. package com.zj.mq.publish;
  2. import com.rabbitmq.client.*;
  3. import com.sun.deploy.ui.AboutDialog;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /*站内信消费者*/
  7. public class ConsumerStation {
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //1.创建连接工厂
  10. ConnectionFactory connectionaFactory = new ConnectionFactory();
  11. connectionaFactory.setHost("192.168.66.100");
  12. connectionaFactory.setPort(5672);
  13. connectionaFactory.setUsername("MQzhang");
  14. connectionaFactory.setPassword("MQzhang");
  15. connectionaFactory.setVirtualHost("/");
  16. //2.创建连接
  17. Connection connection = connectionaFactory.newConnection();
  18. //3.创建信道
  19. Channel channel = connection.createChannel();
  20. //4.监听队列(一直在连接不会关闭连接)
  21. /*
  22. * 参数一:监听的队列名
  23. * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
  24. * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
  25. * */
  26. channel.basicConsume("stationQueue",true,new DefaultConsumer(channel){
  27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28. String message = new String(body, "UTF-8");
  29. System.out.println("发送站内信:"+message);
  30. }
  31. });
  32. }
  33. }
  1. 发送站内信:你好,MQ0
  2. 发送站内信:你好,MQ1
  3. 发送站内信:你好,MQ2
  4. 发送站内信:你好,MQ3
  5. 发送站内信:你好,MQ4
  6. 发送站内信:你好,MQ5
  7. 发送站内信:你好,MQ6
  8. 发送站内信:你好,MQ7
  9. 发送站内信:你好,MQ8
  10. 发送站内信:你好,MQ9

 当然也能创建多个消费者来监听同一个队列来提高消费速度。

 3.4 路由模式

 使用发布订阅模式时,所有消息都会发送到绑定的队列中(发送到绑定到交换机上的每个队列,队列再发送给消费者),但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。

特点

  1. 每个队列绑定路由关键字RoutingKey
  2. 生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模式使用direct交换机。

编写生产者

  1. package com.zj.mq.route;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /*生产者*/
  9. public class Producer {
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. //1.创建连接工厂
  12. ConnectionFactory connectionFactory = new ConnectionFactory();
  13. connectionFactory.setHost("192.168.66.100");
  14. connectionFactory.setPort(5672);
  15. connectionFactory.setUsername("MQzhang");
  16. connectionFactory.setPassword("MQzhang");
  17. connectionFactory.setVirtualHost("/");
  18. //2.创建连接
  19. Connection connection = connectionFactory.newConnection();
  20. //3.建立信道
  21. Channel channel = connection.createChannel();
  22. //4.创建交换机fanout
  23. /*
  24. * 参数一:交换机名称
  25. * 参数二:交换机类型
  26. * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
  27. channel.exchangeDeclare("exchangeRoute", BuiltinExchangeType.DIRECT,false);
  28. //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
  29. /* 参数1:队列名
  30. * 参数2:是否持久化,true表示MQ重启后队列还在。
  31. * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
  32. * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
  33. * 参数5:其他额外参数
  34. * */
  35. channel.queueDeclare("mailQueue", false,false,false,null);
  36. channel.queueDeclare("messageQueue", false,false,false,null);
  37. channel.queueDeclare("stationQueue", false,false,false,null);
  38. //6.交换机绑定队列
  39. /*
  40. * 参数一:队列名称
  41. * 参数二:交换机名称
  42. * 参数三:路由关键字,一个队列可以有多个路由关键字
  43. * */
  44. channel.queueBind("mailQueue","exchangeRoute","import");
  45. channel.queueBind("messageQueue","exchangeRoute","normal");
  46. channel.queueBind("stationQueue","exchangeRoute","import");
  47. //7.往交换机发送消息,路由关键字是import,表示交换机会将消息发送到带有import关键字的队列。
  48. channel.basicPublish("exchangeRoute","import",null,("你好,import MQ").getBytes());
  49. //路由关键字是normal表示交换机会将消息发送到带有normal关键字的队列
  50. channel.basicPublish("exchangeRoute","normal",null,("你好,normal MQ").getBytes());
  51. //8.关闭资源
  52. channel.close();
  53. connection.close();
  54. }
  55. }

 编写消费者

消费者还是和其他模式的消费者是一样的。这里以mailQuene举例子。

  1. package com.zj.mq.route;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /*消费者*/
  6. public class Consumer {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1.创建连接工厂
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. connectionFactory.setHost("192.168.66.100");
  11. connectionFactory.setPort(5672);
  12. connectionFactory.setUsername("MQzhang");
  13. connectionFactory.setPassword("MQzhang");
  14. connectionFactory.setVirtualHost("/");
  15. //2.创建连接
  16. Connection connection = connectionFactory.newConnection();
  17. //3.创建信道
  18. Channel channel = connection.createChannel();
  19. //4.监听队列(一直在连接不会关闭连接)
  20. /*
  21. * 参数一:监听的队列名
  22. * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
  23. * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
  24. *
  25. */
  26. channel.basicConsume("mailQueue",true,new DefaultConsumer(channel){
  27. @Override
  28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  29. String message = new String(body, "UTF-8");
  30. System.out.println("接受消息为:"+message);
  31. }
  32. });
  33. }
  34. }

3.5 通配符模式

通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。

通配符规则:

  1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
  2. 队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。

编写生产者

  1. package com.zj.mq.topic;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /*生产者*/
  9. public class Producer {
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. //1.创建连接工厂
  12. ConnectionFactory connectionFactory = new ConnectionFactory();
  13. connectionFactory.setHost("192.168.66.100");
  14. connectionFactory.setPort(5672);
  15. connectionFactory.setUsername("MQzhang");
  16. connectionFactory.setPassword("MQzhang");
  17. connectionFactory.setVirtualHost("/");
  18. //2.创建连接
  19. Connection connection = connectionFactory.newConnection();
  20. //3.建立信道
  21. Channel channel = connection.createChannel();
  22. //4.创建交换机fanout
  23. /*
  24. * 参数一:交换机名称
  25. * 参数二:交换机类型
  26. * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
  27. channel.exchangeDeclare("exchangeTopic", BuiltinExchangeType.TOPIC,false);
  28. //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
  29. channel.queueDeclare("mailQueue", false,false,false,null);
  30. channel.queueDeclare("messageQueue", false,false,false,null);
  31. channel.queueDeclare("stationQueue", false,false,false,null);
  32. //6.交换机绑定队列
  33. /*
  34. * 参数一:队列名称
  35. * 参数二:交换机名称
  36. * 参数三:路由关键字,【#.mail.#】 表示:mail前后可以匹配多个单词*/
  37. channel.queueBind("mailQueue","exchangeTopic","#.mail.#");
  38. channel.queueBind("messageQueue","exchangeTopic","#.message.#");
  39. channel.queueBind("stationQueue","exchangeTopic","#.station.#");
  40. //7.往交换机发送消息到三个队列
  41. channel.basicPublish("exchangeTopic","mail.message.station",null,("你好,MQ").getBytes());
  42. //8.关闭资源
  43. channel.close();
  44. connection.close();
  45. }
  46. }

 编写消费者

也是和其他模式的消费者是一样的只需要监听消费者。

  1. package com.zj.mq.topic;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /*消费者*/
  6. public class Consumer {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1.创建连接工厂
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. connectionFactory.setHost("192.168.66.100");
  11. connectionFactory.setPort(5672);
  12. connectionFactory.setUsername("MQzhang");
  13. connectionFactory.setPassword("MQzhang");
  14. connectionFactory.setVirtualHost("/");
  15. //2.创建连接
  16. Connection connection = connectionFactory.newConnection();
  17. //3.创建信道
  18. Channel channel = connection.createChannel();
  19. //4.监听队列(一直在连接不会关闭连接)
  20. /*
  21. * 参数一:监听的队列名
  22. * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
  23. * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
  24. *
  25. */
  26. channel.basicConsume("mailQueue",true,new DefaultConsumer(channel){
  27. @Override
  28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  29. String message = new String(body, "UTF-8");
  30. System.out.println("接受消息为:"+message);
  31. }
  32. });
  33. }
  34. }

四、SpringBoot整合RabbitMQ

 之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用SpringBoot整合RabbitMQ,简化代码编写。

1.创建SpringBoot项目,引入RabbitMQ起步依赖(springboot版本是2.7.0)

  1. <!-- RabbitMQ起步依赖 -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2.编写配置文件

  1. spring:
  2. rabbitmq:
  3. host: 192.168.66.100
  4. port: 5672
  5. username: MQzhang
  6. password: MQzhang
  7. virtual-host: /
  8. #日志格式
  9. logging:
  10. pattern:
  11. console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'

3.创建队列和交换机

 SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,写法如下:

  1. package com.zj.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitConfig {
  8. private final String EXCHANGE_NAME = "boot_topic_exchange";
  9. private final String QUEUE_NAME = "boot_queue";
  10. // 创建交换机
  11. @Bean(EXCHANGE_NAME)
  12. public Exchange getExchange() {
  13. return ExchangeBuilder
  14. .topicExchange(EXCHANGE_NAME) // 交换机类型和名称
  15. .durable(true) // 是否持久化
  16. .build();
  17. }
  18. // 创建队列
  19. @Bean(QUEUE_NAME)
  20. public Queue getMessageQueue() {
  21. return new Queue(QUEUE_NAME); // 队列名
  22. }
  23. // 交换机绑定队列
  24. @Bean
  25. public Binding bindMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange,
  26. @Qualifier(QUEUE_NAME) Queue queue) {
  27. return BindingBuilder
  28. .bind(queue)
  29. .to(exchange)
  30. .with("#.message.#")
  31. .noargs();
  32. }
  33. }

4.编写生产者 

 SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送消息,编写生产者时只需要注入RabbitTemplate即可发送消息。

  1. package com.zj;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import javax.annotation.Resource;
  6. @SpringBootTest
  7. class DemoApplicationTests {
  8. @Resource
  9. public RabbitTemplate rabbitTemplate;
  10. @Test
  11. public void testProducer(){
  12. /*
  13. * 参数一:交换机名称
  14. * 参数二:路由关键字
  15. * 参数三:要发送的消息
  16. */
  17. rabbitTemplate.convertAndSend("boot_topic_exchange","message","hello MQ");
  18. }
  19. }

 5.编写消费者

 我们编写另一个SpringBoot项目作为RabbitMQ的消费者,因为在同一个项目中的话直接方法调用就可以。

1、创建项目导入依赖。

2、编写配置文件,和生产者的相同

3、编写消费者,监听队列

  1. @Component
  2. public class Consumer {
  3. // 监听队列
  4. @RabbitListener(queues = "boot_queue")
  5. public void listen_message(String message){
  6. System.out.println("发送短信:"+message);
  7. }
  8. }

4、运行项目。观察管控台队列和控制台

五、消息的可靠性投递

5.1 概念

RabbitMQ消息投递的路径为:

生产者--->交换机--->队列--->消费者

在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?

  • 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
  • 退回模式(return)可以监听消息是否从交换机成功传递到队列。
  • 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:

  1. spring:
  2. rabbitmq:
  3. host: 192.168.66.100
  4. port: 5672
  5. username: MQzhang
  6. password: MQzhang
  7. virtual-host: /
  8. #日志格式
  9. logging:
  10. pattern:
  11. console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'

在生产者的配置类创建交换机和队列:

  1. package com.zj.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitConfig {
  8. private final String EXCHANGE_NAME = "boot_topic_exchange";
  9. private final String QUEUE_NAME = "boot_queue";
  10. // 创建交换机
  11. @Bean(EXCHANGE_NAME)
  12. public Exchange getExchange() {
  13. return ExchangeBuilder
  14. .topicExchange(EXCHANGE_NAME) // 交换机类型和名称
  15. .durable(true) // 是否持久化
  16. .build();
  17. }
  18. // 创建队列
  19. @Bean(QUEUE_NAME)
  20. public Queue getMessageQueue() {
  21. return new Queue(QUEUE_NAME); // 队列名
  22. }
  23. // 交换机绑定队列
  24. @Bean
  25. public Binding bindMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange,
  26. @Qualifier(QUEUE_NAME) Queue queue) {
  27. return BindingBuilder
  28. .bind(queue)
  29. .to(exchange)
  30. .with("#.message.#")
  31. .noargs();
  32. }
  33. }

创建生产者

  1. @Component
  2. public class Consumer {
  3. // 监听队列
  4. @RabbitListener(queues = "boot_queue")
  5. public void listen_message(String message){
  6. System.out.println("发送短信:"+message);
  7. }
  8. }

5.2 确认模式

确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下:

1、生产者配置文件开启确认模式

  1. spring:
  2. rabbitmq:
  3. host: 192.168.66.100
  4. port: 5672
  5. username: MQzhang
  6. password: MQzhang
  7. virtual-host: /
  8. # 开启确认模式
  9. publisher-confirm-type: correlated

2、生产者定义确认模式的回调方法,并模拟向不存在的交换机aaa发送消息。

  1. package com.zj;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import javax.annotation.Resource;
  7. @SpringBootTest
  8. class DemoApplicationTests {
  9. @Resource
  10. public RabbitTemplate rabbitTemplate;
  11. @Test
  12. public void testProducer(){
  13. //定义确认模式的回调方法,当消息向交换机发送后会调用confirm方法。
  14. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  15. /**
  16. * @param correlationData 相关配置信息
  17. * @param ack 交换机是否收到消息
  18. * @param cause 失败原因
  19. */
  20. @Override
  21. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  22. if(ack){
  23. System.out.println("消息接受成功");
  24. }else {
  25. System.out.println("消息接受失败:"+cause);
  26. //做一些处理让消息再次发送
  27. }
  28. }
  29. });
  30. rabbitTemplate.convertAndSend("aaa","message","hello MQ");
  31. }
  32. }

3、运行结果

消息接受失败:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aaa' in vhost '/', class-id=60, method-id=40)

5.3 退回模式

退回模式(return)可以监听消息是否从交换机成功传递到队列,使用方法如下:

 1、生产者配置文件开启退回模式

  1. spring:
  2. rabbitmq:
  3. host: 192.168.66.100
  4. port: 5672
  5. username: MQzhang
  6. password: MQzhang
  7. virtual-host: /
  8. # 开启确认模式
  9. publisher-confirm-type: correlated
  10. # 开启回退模式
  11. publisher-returns: true

2、生产者定义退回模式的回调方法,模拟向不存在的队列bbb发送消息。

  1. @SpringBootTest
  2. class DemoApplicationTests {
  3. @Resource
  4. public RabbitTemplate rabbitTemplate;
  5. @Test
  6. public void testProducer(){
  7. //定义退回模式的回调方法,只有交换机将消息发送到队列失败后才会执行该方法。
  8. rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
  9. /**
  10. *
  11. * @param returnedMessage 失败后将失败信息封装到该参数
  12. */
  13. @Override
  14. public void returnedMessage(ReturnedMessage returnedMessage) {
  15. System.out.println("消息对象:"+returnedMessage);
  16. System.out.println("错误码:"+returnedMessage.getReplyCode());
  17. System.out.println("错误信息:"+returnedMessage.getReplyText());
  18. System.out.println("交换机:"+returnedMessage.getExchange());
  19. System.out.println("路由键:"+returnedMessage.getRoutingKey());
  20. //处理消息……
  21. }
  22. });
  23. rabbitTemplate.convertAndSend("boot_topic_exchange","bbb","hello MQ");
  24. }
  25. }
  1. 消息对象:ReturnedMessage [message=(Body:'hello MQ' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=boot_topic_exchange, routingKey=bbb]
  2. 错误码:312
  3. 错误信息:NO_ROUTE
  4. 交换机:boot_topic_exchange
  5. 路由键:bbb

5.4 Ack手动签收

在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。

消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

  • 自动确认:spring.rabbitmq.listener.simple.acknowledge="none"
  • 手动确认:spring.rabbitmq.listener.simple.acknowledge="manual"

 1.消费者配置开启手动签收

  1. spring:
  2. rabbitmq:
  3. host: 192.168.66.100
  4. port: 5672
  5. username: MQzhang
  6. password: MQzhang
  7. virtual-host: /
  8. # 开启手动签收
  9. listener:
  10. simple:
  11. acknowledge-mode: manual

2、消费者处理消息时定义手动签收和拒绝签收的情况

  1. package com.zj.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Component
  8. public class ACKConsumer {
  9. // 监听队列
  10. /**
  11. *
  12. * @param message 消息对象
  13. * @param channel 信道对象,用于手动接受消息
  14. */
  15. @RabbitListener(queues = "boot_queue")
  16. public void listen_message(Message message, Channel channel) throws IOException {
  17. //deliveryTag:消息投递序号,每次投递该值都会+1.
  18. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  19. try {
  20. //签收消息
  21. /*
  22. * 参数一:消息投递序号
  23. * 参数二:一次是否可以签收多条消息
  24. */
  25. channel.basicAck(deliveryTag,true);
  26. }catch (Exception e){
  27. //拒签消息
  28. /*
  29. * 参数一:消息投递序号
  30. * 参数二:一次是否可以签收多条消息
  31. * 参数三:拒签后消息是否重回队列(处在队列中的消息会不断的再向消费者发送消息)
  32. */
  33. channel.basicNack(deliveryTag,true,true);
  34. System.out.println("消息消费失败");
  35. }
  36. }
  37. }

六、RabbitMQ高级特性

6.1 消费端限流

之前我们说MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

消费端限流的写法如下:

1.生产者批量发送消息

  1. @SpringBootTest
  2. class DemoApplicationTests {
  3. @Resource
  4. public RabbitTemplate rabbitTemplate;
  5. @Test
  6. public void testSendBatch() {
  7. // 发送十条消息
  8. for (int i = 0; i < 100; i++) {
  9. rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
  10. }
  11. }
  12. }

2.消费端配置限流机制

  1. spring:
  2. rabbitmq:
  3. host: 192.168.66.100
  4. port: 5672
  5. username: MQzhang
  6. password: MQzhang
  7. virtual-host: /
  8. listener:
  9. simple:
  10. # 限流机制必须开启手动签收
  11. acknowledge-mode: manual
  12. # 消费端最多拉取20条消息消费,签收后不满20条才会继续拉取消息。
  13. prefetch: 20

3、消费者接受消息

  1. package com.zj.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Component
  8. public class OosConsumer {
  9. @RabbitListener(queues = "boot_queue")
  10. public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
  11. //1.获取消息
  12. System.out.println("当前时间:"+new String(message.getBody()));
  13. //2.模拟业务处理
  14. Thread.sleep(2000);
  15. //3.签收消息
  16. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  17. channel.basicAck(deliveryTag,true);
  18. }
  19. }

20230619

6.2 限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。

 使用方法如下:

1.生产者批量发送消息

  1. @SpringBootTest
  2. class DemoApplicationTests {
  3. @Resource
  4. public RabbitTemplate rabbitTemplate;
  5. @Test
  6. public void testSendBatch() {
  7. // 发送十条消息
  8. for (int i = 0; i < 10; i++) {
  9. rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
  10. }
  11. }
  12. }

2.消费端配置不公平分发

  1. spring:
  2. rabbitmq:
  3. host: 192.168.66.100
  4. port: 5672
  5. username: MQzhang
  6. password: MQzhang
  7. virtual-host: /
  8. listener:
  9. simple:
  10. # 限流机制必须开启手动签收
  11. acknowledge-mode: manual
  12. # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
  13. prefetch: 1

3、编写两个消费者消费相同的队列信息

  1. package com.zj.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class UnfairConsumer {
  8. // 消费者1
  9. @RabbitListener(queues = "boot_queue")
  10. public void listenMessage1(Message message, Channel channel) throws Exception {
  11. //1.获取消息
  12. System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
  13. //2. 处理业务逻辑
  14. Thread.sleep(500); // 消费者1处理快
  15. //3. 手动签收
  16. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  17. }
  18. // 消费者2
  19. @RabbitListener(queues = "boot_queue")
  20. public void listenMessage2(Message message, Channel channel) throws Exception {
  21. //1.获取消息
  22. System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
  23. //2. 处理业务逻辑
  24. Thread.sleep(3000);// 消费者2处理慢
  25. //3. 手动签收
  26. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  27. }
  28. }
  1. 消费者2:send message...1
  2. 消费者1:send message...0
  3. 19:53:21.676 INFO --- [main ] com.zj.DemoApplication :Started DemoApplication in 0.867 seconds (JVM running for 1.259)
  4. 消费者1:send message...3
  5. 消费者1:send message...4
  6. 消费者1:send message...2
  7. 消费者1:send message...5
  8. 消费者1:send message...6
  9. 消费者2:send message...7
  10. 消费者1:send message...8
  11. 消费者1:send message...9

发现消费者1消费的要比消费者2消费的多。能者多劳。

6.3 设置队列所有消息存活时间

 RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

1、在创建队列时设置其存活时间:

  1. // 创建队列
  2. @Bean(QUEUE_NAME)
  3. public Queue getMessageQueue() {
  4. return QueueBuilder
  5. .durable(QUEUE_NAME)// 队列名
  6. .ttl(10000) //队列存活时间10s单位毫秒
  7. .build();
  8. }

2、生产者生产消息

  1. @SpringBootTest
  2. class DemoApplicationTests {
  3. @Resource
  4. public RabbitTemplate rabbitTemplate;
  5. @Test
  6. public void testSendBatch() {
  7. // 发送十条消息
  8. for (int i = 0; i < 10; i++) {
  9. rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
  10. }
  11. }
  12. }

十秒后,未被消费的消息会被移除。

6.4 设置单条消息存活时间

1、在消息发送的时候设置发送时间

  1. /*发送消息并设置消息的存活时间*/
  2. @Test
  3. public void testSend() {
  4. //1.创建消息属性
  5. MessageProperties messageProperties = new MessageProperties();
  6. //2.设置存活时间,单位毫秒
  7. messageProperties.setExpiration("10000");
  8. //3.创建消息对象
  9. Message message = new Message(("send message……").getBytes(), messageProperties);
  10. //4.发送消息
  11. rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
  12. }

注意:

  1. 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。

  2. 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

6.5 优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

 优先级队列用法如下:

1、设置队列的优先级

  1. // 创建队列
  2. @Bean(QUEUE_NAME)
  3. public Queue getMessageQueue() {
  4. return QueueBuilder
  5. .durable(QUEUE_NAME)// 队列名
  6. // .ttl(10000) //队列中消息存活时间10s单位毫秒
  7. .maxPriority(10) //设置队列的优先级越大优先级越高,最大255,推荐最大不超过10
  8. .build();
  9. }

2、编写生产者发送有优先级的消息

  1. @SpringBootTest
  2. class DemoApplicationTests {
  3. @Resource
  4. public RabbitTemplate rabbitTemplate;
  5. @Test
  6. public void testSend() {
  7. for (int i = 0; i < 10; i++) {
  8. if (i == 5) { // i为5时消息的优先级较高
  9. //1.创建消息属性
  10. MessageProperties messageProperties = new MessageProperties();
  11. //2.设置消息优先级
  12. messageProperties.setPriority(9);
  13. //3.创建消息对象
  14. Message message = new Message(("send message……" + i).getBytes(), messageProperties);
  15. rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
  16. }else {
  17. rabbitTemplate.convertAndSend("boot_topic_exchange","message","send message……" + i);
  18. }
  19. }
  20. }
  21. }

3、编写消费者测试是否是第五条消息最先被消费

  1. package com.zj.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Component
  8. public class Consumer {
  9. // 监听队列
  10. @RabbitListener(queues = "boot_queue")
  11. public void listen_message(Message message, Channel channel) throws IOException {
  12. //1.获取消息
  13. System.out.println(new String(message.getBody()));
  14. //2.手动签收
  15. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  16. }
  17. }
  1. . ____ _ __ _ _
  2. /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
  3. ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
  4. \\/ ___)| |_)| | | | | || (_| | ) ) ) )
  5. ' |____| .__|_| |_|_| |_\__, | / / / /
  6. =========|_|==============|___/=/_/_/_/
  7. :: Spring Boot :: (v2.7.0)
  8. 17:47:14.858 INFO --- [main ] com.zj.DemoApplication :Starting DemoApplication using Java 1.8.0_341 on ZHANGJIN with PID 26080 (D:\Java\code\springbootcode\sb_rabbitMQ_consumer\target\classes started by 张锦 in D:\Java\code\springbootcode\sb_rabbitMQ)
  9. 17:47:14.858 INFO --- [main ] com.zj.DemoApplication :No active profile set, falling back to 1 default profile: "default"
  10. 17:47:15.482 INFO --- [main ] o.s.a.rabbit.connection.CachingConnectionFactory :Attempting to connect to: [192.168.66.100:5672]
  11. 17:47:15.498 INFO --- [main ] o.s.a.rabbit.connection.CachingConnectionFactory :Created new connection: rabbitConnectionFactory#2f2bf0e2:0/SimpleConnection@27f0ad19 [delegate=amqp://MQzhang@192.168.66.100:5672/, localPort= 53985]
  12. 17:47:15.529 INFO --- [main ] com.zj.DemoApplication :Started DemoApplication in 0.893 seconds (JVM running for 1.338)
  13. send message……5
  14. send message……0
  15. send message……1
  16. send message……2
  17. send message……3
  18. send message……4
  19. send message……6
  20. send message……7
  21. send message……8
  22. send message……9

第五条消息首先被消费。

七、RabbitMQ死信队列

7.1 概念

在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

消息成为死信的情况:

  1. 队列消息长度到达限制。
  2. 消费者拒签消息,并且不把消息重新放入原队列。
  3. 消息到达存活时间未被消费。

 7.2 代码实现

1、创建死信交换机和死信队列

  1. package com.zj.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitConfig {
  8. //死信交换机和死信队列
  9. private final String DEAD_EXCHANGE = "dead_exchange";
  10. private final String DEAD_QUEUE = "dead_queue";
  11. //普通交换机和普通队列
  12. private final String NORMAL_EXCHANGE = "normal_exchange";
  13. private final String NORMAL_QUEUE = "normal_queue";
  14. // 创建死信交换机
  15. @Bean(DEAD_EXCHANGE)
  16. public Exchange deadExchange(){
  17. return ExchangeBuilder
  18. .topicExchange(DEAD_EXCHANGE) //死信交换机类型和名称
  19. .durable(false) //是否持久化
  20. .build();
  21. }
  22. // 创建死信队列
  23. @Bean(DEAD_QUEUE)
  24. public Queue deadQueue(){
  25. return QueueBuilder
  26. .durable(DEAD_QUEUE) //死信队列名称
  27. .build();
  28. }
  29. // 死信交换机绑定死信队列
  30. @Bean
  31. public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,
  32. @Qualifier(DEAD_QUEUE) Queue queue){
  33. return BindingBuilder
  34. .bind(queue)
  35. .to(exchange)
  36. .with("dead") //交换机路由键
  37. .noargs();
  38. }
  39. //创建普通交换机
  40. @Bean(NORMAL_EXCHANGE)
  41. public Exchange normalExchange(){
  42. return ExchangeBuilder
  43. .topicExchange(NORMAL_EXCHANGE) //普通交换机类型和名称
  44. .durable(false) //是否持久化
  45. .build();
  46. }
  47. //创建普通队列
  48. @Bean(NORMAL_QUEUE)
  49. public Queue normalQueue(){
  50. return QueueBuilder
  51. .durable(NORMAL_QUEUE) //普通信队列名称
  52. .deadLetterExchange(DEAD_EXCHANGE) //绑定死信交换机,因为队列中的无法消费的信息会被放到死信交换机上。
  53. .deadLetterRoutingKey("dead") //死信队列路由关键字
  54. .ttl(10000) //消息存活时间
  55. .maxLength(10) //消息最大长度
  56. .build();
  57. }
  58. //普通交换机绑定普通对列
  59. @Bean
  60. public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,
  61. @Qualifier(NORMAL_QUEUE) Queue queue){
  62. return BindingBuilder
  63. .bind(queue)
  64. .to(exchange)
  65. .with("normal")
  66. .noargs();
  67. }
  68. }

2.创建生产者发送消息(测试存活时间过期变成死信)

  1. @Test
  2. public void testSend() {
  3. //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
  4. //1.存活时间过期
  5. rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
  6. }

十秒后

消息全部去了死信队列。

2.创建生产者(超过队列长度变成死信)

  1. @SpringBootTest
  2. class DemoApplicationTests {
  3. @Resource
  4. public RabbitTemplate rabbitTemplate;
  5. @Test
  6. public void testSend() {
  7. //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
  8. //1.存活时间过期
  9. // rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
  10. //2.超过队列长度变成死信
  11. for (int i = 0; i < 20; i++) {
  12. rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
  13. }
  14. }
  15. }

因为设置了普通队列的长度,所以超出队列长度的那部分就去了死信队列。也设置了队列的存活时间,因此普通队列的消息在10秒后变成了死信。

 2.创建生产者和消费者(超过队列长度变成死信)

  1. @SpringBootTest
  2. class DemoApplicationTests {
  3. @Resource
  4. public RabbitTemplate rabbitTemplate;
  5. @Test
  6. public void testSend() {
  7. //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
  8. //1.生产者拒签消息,消息变成死信。
  9. rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
  10. }
  11. }
  12. @Component
  13. public class Consumer {
  14. // 监听队列
  15. @RabbitListener(queues = "normal_queue")
  16. public void listen_message(Message message, Channel channel) throws IOException {
  17. //拒签消息
  18. channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
  19. }
  20. }

拒签消息,消息变成了死信。

 八、RabbitMQ延迟队列

8.1 概念

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单。

 但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。

8.2 死信队列实现延迟队列

 1.创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、lombok依赖。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.projectlombok</groupId>
  11. <artifactId>lombok</artifactId>
  12. </dependency>

2.编写配置文件

  1. spring:
  2. rabbitmq:
  3. host: 192.168.66.100
  4. port: 5672
  5. username: MQzhang
  6. password: MQzhang
  7. virtual-host: /
  8. #日志格式
  9. logging:
  10. pattern:
  11. console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'

 3.创建队列和交换机

  1. package com.zj.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitConfig {
  8. // 订单交换机和队列
  9. private final String ORDER_EXCHANGE = "order_exchange";
  10. private final String ORDER_QUEUE = "order_queue";
  11. // 过期订单交换机和队列(死信交换机和死信队列)
  12. private final String EXPIRE_EXCHANGE = "expire_exchange";
  13. private final String EXPIRE_QUEUE = "expire_queue";
  14. // 过期订单交换机
  15. @Bean(EXPIRE_EXCHANGE)
  16. public Exchange deadExchange(){
  17. return ExchangeBuilder
  18. .topicExchange(EXPIRE_EXCHANGE)
  19. .durable(false)
  20. .build();
  21. }
  22. // 过期订单队列
  23. @Bean(EXPIRE_QUEUE)
  24. public Queue deadQueue(){
  25. return QueueBuilder
  26. .durable(EXPIRE_QUEUE)
  27. .build();
  28. }
  29. // 将过期订单队列绑定到交换机
  30. @Bean
  31. public Binding bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,@Qualifier(EXPIRE_QUEUE) Queue queue){
  32. return BindingBuilder
  33. .bind(queue)
  34. .to(exchange)
  35. .with("expire_routing")
  36. .noargs();
  37. }
  38. // 订单交换机
  39. @Bean(ORDER_EXCHANGE)
  40. public Exchange normalExchange(){
  41. return ExchangeBuilder
  42. .topicExchange(ORDER_EXCHANGE)
  43. .durable(false)
  44. .build();
  45. }
  46. // 订单队列
  47. @Bean(ORDER_QUEUE)
  48. public Queue normalQueue(){
  49. return QueueBuilder
  50. .durable(ORDER_QUEUE)
  51. .ttl(10000) // 存活时间为10s,模拟30min
  52. .deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
  53. .deadLetterRoutingKey("expire_routing") // 死信交换机的路由关键字
  54. .build();
  55. }
  56. // 将订单队列绑定到交换机
  57. @Bean
  58. public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,
  59. @Qualifier(ORDER_QUEUE) Queue queue){
  60. return BindingBuilder
  61. .bind(queue)
  62. .to(exchange)
  63. .with("order_routing")
  64. .noargs();
  65. }
  66. }

4.编写下单的控制器方法,下单后向订单交换机发送消息

  1. @RestController
  2. public class OrderController {
  3. @Resource
  4. private RabbitTemplate rabbitTemplate;
  5. //下单
  6. @GetMapping("/place/{id}")
  7. public String placeOrder(@PathVariable("id") String id){
  8. System.out.println("处理订单数据");
  9. //将订单id发送到订单队列
  10. rabbitTemplate.convertAndSend("order_exchange","order_routing",id);
  11. return "下单成功,修改库存。";
  12. }
  13. }

 5.编写监听死信队列的消费者

  1. @Component
  2. public class Consumer {
  3. // 监听过期队列
  4. @RabbitListener(queues = "expire_queue")
  5. public void listen_message(String id) {
  6. System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 ");
  7. }
  8. }

8.3 插件实现延迟队列

 在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。

RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。

1、使用xftpj将延迟插件上传到虚拟机

2.安装插件

  1. # 将插件放入RabbitMQ插件目录中
  2. mv rabbitmq_delayed_message_exchange-3.9.0.ez /usr/local/rabbitmq/plugins/
  3. # 启用插件
  4. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.重启RabbitMQ服务

  1. #停止rabbitmq
  2. rabbitmqctl stop
  3. #启动rabbitmq
  4. rabbitmq-server restart -detached

 此时登录管控台可以看到交换机类型多了延迟消息:

 4、创建延迟交换机和延迟队列

  1. package com.zj.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import java.util.HashMap;
  7. @Configuration
  8. public class RabbitConfig {
  9. // 创建延迟交换机和延迟队列
  10. private final String DELAYED_EXCHANGE = "delayed_exchange";
  11. private final String DELAYED_QUEUE = "delayed_queue";
  12. // 延迟交换机,ExchangeBuilder只能创建普通的交换机例如:topic、direct、fanout交换机。要创建延迟交换机只能创建自定义交换机。
  13. @Bean(DELAYED_EXCHANGE)
  14. public Exchange deadExchange(){
  15. HashMap<String, Object> args = new HashMap<>();
  16. args.put("x-delayed-type","topic"); //topic:延迟交换机的实际类型。
  17. return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",false,true,args);
  18. }
  19. // 延迟队列
  20. @Bean(DELAYED_QUEUE)
  21. public Queue deadQueue(){
  22. return QueueBuilder
  23. .durable(DELAYED_QUEUE)
  24. .build();
  25. }
  26. // 将延迟队列绑定到延迟交换机
  27. @Bean
  28. public Binding bindExchangeQueue(@Qualifier(DELAYED_EXCHANGE) Exchange exchange,
  29. @Qualifier(DELAYED_QUEUE) Queue queue){
  30. return BindingBuilder
  31. .bind(queue)
  32. .to(exchange)
  33. .with("delayed-routing")
  34. .noargs();
  35. }
  36. }

5.编写下单的控制器方法

  1. package com.zj.controller;
  2. import org.springframework.amqp.AmqpException;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.core.MessagePostProcessor;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.PathVariable;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import javax.annotation.Resource;
  10. @RestController
  11. public class OrderController {
  12. @Resource
  13. private RabbitTemplate rabbitTemplate;
  14. //下单
  15. @GetMapping("/place/{id}")
  16. public String placeOrder(@PathVariable("id") String id){
  17. System.out.println("处理订单数据");
  18. //设置消息的延迟时间为10s
  19. MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
  20. @Override
  21. public Message postProcessMessage(Message message) throws AmqpException {
  22. message.getMessageProperties().setDelay(10000);
  23. return message;
  24. }
  25. };
  26. rabbitTemplate.convertAndSend("delayed_exchange","delayed-routing",id,messagePostProcessor);
  27. return "下单成功,修改库存。";
  28. }
  29. }

6.编写延迟队列的消费者

  1. package com.zj.consumer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class Consumer {
  6. // 监听延迟队列
  7. @RabbitListener(queues = "delayed_queue")
  8. public void listen_message(String id) {
  9. System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 ");
  10. }
  11. }

7、下单测试

 延迟队列中没有消息是因为消费者将消息消费了。

 九、RabbitMQ集群(暂略)

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/716789
推荐阅读
相关标签
  

闽ICP备14008679号