赞
踩
MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。
- 同步通信相当于两个人当面对话,你一言我一语。必须及时回复:
- 异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系。
两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。
数据结构中概念。在队列中,数据先进先出,后进后出。
在电商平台中,用户下订单需要调用订单系统,此时订单系统还需要调用库存系统、支付系统、物流系统完成业务。此时会产生两个问题:
如果在系统中引入MQ,即订单系统将消息先发送到MQ中,MQ再转发到其他系统,则会解决以下问题:
如果订单系统同步访问每个系统,则用户下单等待时长如下:
假设我们的系统每秒只能承载1000请求,如果请求瞬间增多到每秒5000,则会造成系统崩溃。此时引入mq即可解决该问题
使用了MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
这些业务场景都是短时间内需要处理大量请求,如果直接连接系统处理业务,会耗费大量资源,有可能造成系统瘫痪。
而使用MQ后,可以先让用户将请求发送到MQ中,MQ会先保存请求消息,不会占用系统资源,且MQ会进行消息排序,先请求的秒杀成功,后请求的秒杀失败。
如电商网站要推送促销信息,该业务耗费时间较多,但对时效性要求不高,可以使用MQ做消息分发。
假如我们需要将数据保存到数据库之外,还需要一段时间将数据同步到缓存(如Redis)、搜索引擎(如Elasticsearch)中。此时可以将数据库的数据作为消息发送到MQ中,并同步到缓存、搜索引擎中。
在电商系统中,订单完成后,需要及时的通知子系统(进销存系统发货,用户服务积分,发送短信)进行下一步操作。为了保证订单系统的高性能,应该直接返回订单结果,之后让MQ通知子系统做其他非实时的业务操作。这样能保证核心业务的高效及时。
在银行系统中,如果要查询近十年的历史账单,这是非常耗时的操作。如果发送同步请求,则会花费大量时间等待响应。此时使用MQ发送异步请求,等到查询出结果后获取结果即可。
RabbitMQ是由Erlang语言编写的基于AMQP的MQ产品。
即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,专门为消息中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受不同中间件产品,不同开发语言等条件的限制。2006年AMQP规范发布,类比HTTP。
生产者(Publisher)将消息发布到交换机(Exchange),交换机根据规则将消息分发给交换机绑定的队列(Queue),队列再将消息投递给订阅了此队列的消费者。
就好比是报纸印刷厂(Publisher)将印刷出来的报纸交给报社(Exchange),报社再将报纸交给不同的邮递员(Queue),邮递员再将报纸交给用户(Consumer)。
消息的生产者。也是一个向交换机发布消息的客户端应用程序。也就是Java代码。
连接。生产者/消费者和RabbitMQ服务器之间建立的TCP连接。
信道。是TCP里面的虚拟连接。例如:Connection相当于电缆,Channel相当于独立光纤束,一条TCP连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。
消息队列服务器实体。即RabbitMQ服务器
虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制。当多个不同的用户使用同一个RabbitMQ服务器时,可以划分出多个虚拟主机。RabbitMQ默认的虚拟主机路径是/
交换机。用来接收生产者发送的消息,并根据分发规则,将这些消息分发给服务器中的队列中。不同的交换机有不同的分发规则。
消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。消息一直在队列里面,等待消费者链接到这个队列将其取走。
消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定信息保存到交换机的路由表中,作为消息的分发依据。
消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。
(面试)RabbitMQ为什么使用信道而不直接使用TCP连接通信?
TCP连接的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。高峰时每秒成千上万条TCP连接的创建会造成资源巨大的浪费。而且操作系统每秒处理TCP连接数也是有限制的,会造成性能瓶颈。而如果一条线程使用一条信道,一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。
RabbitMQ是使用Erlang语言编写的,所以在安装RabbitMQ前需要先安装Erlang环境 .
yum install -y epel-release
- wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
- rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
yum install -y erlang24.2.1
erl -version
- # 关闭运行的防火墙
- systemctl stop firewalld.service
- # 禁止防火墙自启动
- systemctl disable firewalld.service
- # 修改文件
- vim /etc/sysconfig/network
- # 添加如下内容
- NETWORKING=yes
- HOSTNAME=zj
-
-
- # 修改文件
- vim /etc/hosts
- # 添加如下内容
- 服务器ip zj
- # 解压RabbitMQ
- tar xf rabbitmq-server-generic-unix-3.9.13.tar.xz
-
-
- # 重命名:
- mv rabbitmq_server-3.9.13 rabbitmq
-
-
- # 移动文件夹:
- mv rabbitmq /usr/local/
- # 编辑/etc/profile文件
- vim /etc/profile
-
-
- #添加如下内容
- export PATH=$PATH:/usr/local/rabbitmq/sbin
-
-
- # 运行文件,让修改内容生效
- source /etc/profile
rabbitmq-plugins enable rabbitmq_management
- #启动rabbitmq
- rabbitmq-server -detached
-
-
- #停止rabbitmq
- rabbitmqctl stop
路径:http://ip地址:15672
,用户名:guest
,密码:guest
管控台的端口是15672,MQ的端口是5672
- # 创建配置文件夹
- mkdir -p /usr/local/rabbitmq/etc/rabbitmq
- # 创建配置文件
- vim /usr/local/rabbitmq/etc/rabbitmq/rabbitmq.conf
- # 添加如下内容
- loopback_users=none
-
-
- # 重启RabbitMQ
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl start_app
此时就能进入控制台了。
guest账户默认只允许本地使用,我们可以创建新账户远程访问RabbitMQ(如2.2中),但是不推荐远程使用MQ.
- # 创建账户
- rabbitmqctl add_user MQzhang(用户名) MQzhang(密码)
rabbitmqctl set_user_tags 用户名 administrator
- # "/"表示虚拟机
- # zj表示用户名
- # ".*" ".*" ".*" 表示完整权限
- rabbitmqctl set_permissions -p "/" MQzhang".*" ".*" ".*"
rabbitmqctl stop
- # 安装Docker
- curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
-
-
- # 启动docker
- systemctl start docker
docker pull rabbitmq
docker run -d --hostname zj--name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用)
接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行工作。
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。
- # 开启管控台插件
- rabbitmq-plugins enable rabbitmq_management
- # 启动rabbitmq
- rabbitmq-server -detached
- <dependencies>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.14.0</version>
- </dependency>
- </dependencies>
- package com.zj.mq.Simple;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*生产者*/
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.100");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("MQzhang");
- connectionFactory.setPassword("MQzhang");
- connectionFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionFactory.newConnection();
- //3.建立信道
- Channel channel = connection.createChannel();
- //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
- /* 参数1:队列名
- * 参数2:是否持久化,true表示MQ重启后队列还在。
- * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
- * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
- * 参数5:其他额外参数*/
- channel.queueDeclare("simpleQueue",false,false,false,null);
- //5.发送消息
- String msg ="hello rabbitMQ";
- /*
- * 参数1:交换机名,""表示默认交换机
- * 参数2:路由键,简单模式就是队列名
- * 参数3:其他额外参数
- * 参数4:要传递的消息字节数组
- */
- channel.basicPublish("","simpleQueue",null,msg.getBytes());
- //6.关闭资源(信道和连接)
- channel.close();
- connection.close();
- System.out.println("OK");
- }
- }
- package com.zj.mq.Simple;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*消费者*/
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.100");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("MQzhang");
- connectionFactory.setPassword("MQzhang");
- connectionFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionFactory.newConnection();
- //3.创建信道
- Channel channel = connection.createChannel();
- //4.监听队列(一直在连接不会关闭连接)
- /*
- * 参数一:监听的队列名
- * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
- * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
- * */
- channel.basicConsume("simpleQueue",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("接受消息为:"+message);
- }
- });
- }
- }
消费者随时在监听队列只要队列有消息就会被消费。
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下:
- package com.zj.mq.work;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*生产者*/
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.100");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("MQzhang");
- connectionFactory.setPassword("MQzhang");
- connectionFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionFactory.newConnection();
- //3.建立信道
- Channel channel = connection.createChannel();
- //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
- /* 参数1:队列名
- * 参数2:是否持久化,true表示MQ重启后队列还在。
- * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
- * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
- * 参数5:其他额外参数*/
- channel.queueDeclare("WorkQueue",false,false,false,null);
- //5.发送大量消息
- for (int i = 0; i < 100; i++) {
- /*
- * 参数1:交换机名,""表示默认交换机
- * 参数2:路由键,简单模式就是队列名
- * 参数3:表示该消息是持久化消息,即保存到内存也会保存到磁盘
- * 参数4:要传递的消息字节数组
- */
- channel.basicPublish("","WorkQueue", MessageProperties.PERSISTENT_TEXT_PLAIN,("这是第"+i+"个消息").getBytes());
- }
-
- //6.关闭资源(信道和连接)
- channel.close();
- connection.close();
- }
- }
编写三个消费者,他们都监听的是一个队列。
- package com.zj.mq.work;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*消费者*/
- public class Consumer1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.100");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("MQzhang");
- connectionFactory.setPassword("MQzhang");
- connectionFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionFactory.newConnection();
- //3.创建信道
- Channel channel = connection.createChannel();
- //4.监听队列(一直在连接不会关闭连接)
- /*
- * 参数一:监听的队列名
- * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
- * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
- * */
- channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("消费者1接受消息为:"+message);
- }
- });
- }
- }
- package com.zj.mq.work;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*消费者*/
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.100");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("MQzhang");
- connectionFactory.setPassword("MQzhang");
- connectionFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionFactory.newConnection();
- //3.创建信道
- Channel channel = connection.createChannel();
- //4.监听队列(一直在连接不会关闭连接)
- /*
- * 参数一:监听的队列名
- * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
- * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
- * */
- channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("消费者2接受消息为:"+message);
- }
- });
- }
- }
- package com.zj.mq.work;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*消费者*/
- public class Consumer3 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.100");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("MQzhang");
- connectionFactory.setPassword("MQzhang");
- connectionFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionFactory.newConnection();
- //3.创建信道
- Channel channel = connection.createChannel();
- //4.监听队列(一直在连接不会关闭连接)
- /*
- * 参数一:监听的队列名
- * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
- * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
- * */
- channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("消费者3接受消息为:"+message);
- }
- });
- }
- }
- 消费者1接受消息为:这是第0个消息
- 消费者1接受消息为:这是第3个消息
- 消费者1接受消息为:这是第6个消息
- 消费者1接受消息为:这是第9个消息
- 消费者1接受消息为:这是第12个消息
- 消费者1接受消息为:这是第15个消息
- 消费者1接受消息为:这是第18个消息
- 消费者1接受消息为:这是第21个消息
- ……
-
-
- 消费者2接受消息为:这是第1个消息
- 消费者2接受消息为:这是第4个消息
- 消费者2接受消息为:这是第7个消息
- 消费者2接受消息为:这是第10个消息
- 消费者2接受消息为:这是第13个消息
- 消费者2接受消息为:这是第16个消息
- 消费者2接受消息为:这是第19个消息
- 消费者2接受消息为:这是第22个消息
- 消费者2接受消息为:这是第25个消息
- ……
-
-
- 消费者3接受消息为:这是第2个消息
- 消费者3接受消息为:这是第5个消息
- 消费者3接受消息为:这是第8个消息
- 消费者3接受消息为:这是第11个消息
- 消费者3接受消息为:这是第14个消息
- 消费者3接受消息为:这是第17个消息
- 消费者3接受消息为:这是第20个消息
- 消费者3接受消息为:这是第23个消息
- 消费者3接受消息为:这是第26个消息
- 消费者3接受消息为:这是第29个消息
- 消费者3接受消息为:这是第32个消息
在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)
- package com.zj.mq.publish;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*生产者*/
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.100");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("MQzhang");
- connectionFactory.setPassword("MQzhang");
- connectionFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionFactory.newConnection();
- //3.建立信道
- Channel channel = connection.createChannel();
- //4.创建交换机fanout
- /*
- * 参数一:交换机名称
- * 参数二:交换机类型
- * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
- channel.exchangeDeclare("exchangeFanout", BuiltinExchangeType.FANOUT,false);
- //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
- channel.queueDeclare("mailQueue", false,false,false,null);
- channel.queueDeclare("messageQueue", false,false,false,null);
- channel.queueDeclare("stationQueue", false,false,false,null);
- //6.交换机绑定队列
- /*
- * 参数一:队列名称
- * 参数二:交换机名称
- * 参数三:路由关键字,发布订阅模式不存在路由关键字*/
- channel.queueBind("mailQueue","exchangeFanout","");
- channel.queueBind("messageQueue","exchangeFanout","");
- channel.queueBind("stationQueue","exchangeFanout","");
- //7.往交换机发送消息
- for (int i = 0; i < 10; i++) {
- channel.basicPublish("exchangeFanout","",null,("你好,MQ"+i).getBytes());
- }
- //8.关闭资源
- channel.close();
- connection.close();
-
- }
- }
- package com.zj.mq.publish;
-
- import com.rabbitmq.client.*;
- import com.sun.deploy.ui.AboutDialog;
-
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*站内信消费者*/
- public class ConsumerStation {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionaFactory = new ConnectionFactory();
- connectionaFactory.setHost("192.168.66.100");
- connectionaFactory.setPort(5672);
- connectionaFactory.setUsername("MQzhang");
- connectionaFactory.setPassword("MQzhang");
- connectionaFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionaFactory.newConnection();
- //3.创建信道
- Channel channel = connection.createChannel();
- //4.监听队列(一直在连接不会关闭连接)
- /*
- * 参数一:监听的队列名
- * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
- * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
- * */
- channel.basicConsume("stationQueue",true,new DefaultConsumer(channel){
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("发送站内信:"+message);
- }
- });
- }
- }
- 发送站内信:你好,MQ0
- 发送站内信:你好,MQ1
- 发送站内信:你好,MQ2
- 发送站内信:你好,MQ3
- 发送站内信:你好,MQ4
- 发送站内信:你好,MQ5
- 发送站内信:你好,MQ6
- 发送站内信:你好,MQ7
- 发送站内信:你好,MQ8
- 发送站内信:你好,MQ9
当然也能创建多个消费者来监听同一个队列来提高消费速度。
使用发布订阅模式时,所有消息都会发送到绑定的队列中(发送到绑定到交换机上的每个队列,队列再发送给消费者),但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。
- package com.zj.mq.route;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*生产者*/
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.100");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("MQzhang");
- connectionFactory.setPassword("MQzhang");
- connectionFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionFactory.newConnection();
- //3.建立信道
- Channel channel = connection.createChannel();
- //4.创建交换机fanout
- /*
- * 参数一:交换机名称
- * 参数二:交换机类型
- * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
- channel.exchangeDeclare("exchangeRoute", BuiltinExchangeType.DIRECT,false);
- //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
- /* 参数1:队列名
- * 参数2:是否持久化,true表示MQ重启后队列还在。
- * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
- * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
- * 参数5:其他额外参数
- * */
- channel.queueDeclare("mailQueue", false,false,false,null);
- channel.queueDeclare("messageQueue", false,false,false,null);
- channel.queueDeclare("stationQueue", false,false,false,null);
- //6.交换机绑定队列
- /*
- * 参数一:队列名称
- * 参数二:交换机名称
- * 参数三:路由关键字,一个队列可以有多个路由关键字
- * */
- channel.queueBind("mailQueue","exchangeRoute","import");
- channel.queueBind("messageQueue","exchangeRoute","normal");
- channel.queueBind("stationQueue","exchangeRoute","import");
- //7.往交换机发送消息,路由关键字是import,表示交换机会将消息发送到带有import关键字的队列。
- channel.basicPublish("exchangeRoute","import",null,("你好,import MQ").getBytes());
- //路由关键字是normal表示交换机会将消息发送到带有normal关键字的队列
- channel.basicPublish("exchangeRoute","normal",null,("你好,normal MQ").getBytes());
- //8.关闭资源
- channel.close();
- connection.close();
-
- }
- }
消费者还是和其他模式的消费者是一样的。这里以mailQuene举例子。
- package com.zj.mq.route;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*消费者*/
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.100");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("MQzhang");
- connectionFactory.setPassword("MQzhang");
- connectionFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionFactory.newConnection();
- //3.创建信道
- Channel channel = connection.createChannel();
- //4.监听队列(一直在连接不会关闭连接)
- /*
- * 参数一:监听的队列名
- * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
- * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
- *
- */
- channel.basicConsume("mailQueue",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("接受消息为:"+message);
- }
- });
- }
- }
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。
通配符规则:
.
分割。#
可以匹配任意多个单词,*
可以匹配任意一个单词。- package com.zj.mq.topic;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*生产者*/
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.100");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("MQzhang");
- connectionFactory.setPassword("MQzhang");
- connectionFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionFactory.newConnection();
- //3.建立信道
- Channel channel = connection.createChannel();
- //4.创建交换机fanout
- /*
- * 参数一:交换机名称
- * 参数二:交换机类型
- * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
- channel.exchangeDeclare("exchangeTopic", BuiltinExchangeType.TOPIC,false);
- //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
- channel.queueDeclare("mailQueue", false,false,false,null);
- channel.queueDeclare("messageQueue", false,false,false,null);
- channel.queueDeclare("stationQueue", false,false,false,null);
- //6.交换机绑定队列
- /*
- * 参数一:队列名称
- * 参数二:交换机名称
- * 参数三:路由关键字,【#.mail.#】 表示:mail前后可以匹配多个单词*/
- channel.queueBind("mailQueue","exchangeTopic","#.mail.#");
- channel.queueBind("messageQueue","exchangeTopic","#.message.#");
- channel.queueBind("stationQueue","exchangeTopic","#.station.#");
- //7.往交换机发送消息到三个队列
- channel.basicPublish("exchangeTopic","mail.message.station",null,("你好,MQ").getBytes());
- //8.关闭资源
- channel.close();
- connection.close();
- }
- }
也是和其他模式的消费者是一样的只需要监听消费者。
- package com.zj.mq.topic;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*消费者*/
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.100");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("MQzhang");
- connectionFactory.setPassword("MQzhang");
- connectionFactory.setVirtualHost("/");
- //2.创建连接
- Connection connection = connectionFactory.newConnection();
- //3.创建信道
- Channel channel = connection.createChannel();
- //4.监听队列(一直在连接不会关闭连接)
- /*
- * 参数一:监听的队列名
- * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
- * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
- *
- */
- channel.basicConsume("mailQueue",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("接受消息为:"+message);
- }
- });
- }
- }
之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用SpringBoot整合RabbitMQ,简化代码编写。
- <!-- RabbitMQ起步依赖 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- spring:
- rabbitmq:
- host: 192.168.66.100
- port: 5672
- username: MQzhang
- password: MQzhang
- virtual-host: /
-
-
- #日志格式
- logging:
- pattern:
- console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
-
SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,写法如下:
- package com.zj.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitConfig {
-
- private final String EXCHANGE_NAME = "boot_topic_exchange";
- private final String QUEUE_NAME = "boot_queue";
-
-
- // 创建交换机
- @Bean(EXCHANGE_NAME)
- public Exchange getExchange() {
- return ExchangeBuilder
- .topicExchange(EXCHANGE_NAME) // 交换机类型和名称
- .durable(true) // 是否持久化
- .build();
- }
-
-
- // 创建队列
- @Bean(QUEUE_NAME)
- public Queue getMessageQueue() {
- return new Queue(QUEUE_NAME); // 队列名
- }
-
-
- // 交换机绑定队列
- @Bean
- public Binding bindMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange,
- @Qualifier(QUEUE_NAME) Queue queue) {
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("#.message.#")
- .noargs();
- }
- }
SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送消息,编写生产者时只需要注入RabbitTemplate即可发送消息。
- package com.zj;
-
- import org.junit.jupiter.api.Test;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.boot.test.context.SpringBootTest;
-
- import javax.annotation.Resource;
-
- @SpringBootTest
- class DemoApplicationTests {
-
-
- @Resource
- public RabbitTemplate rabbitTemplate;
-
- @Test
- public void testProducer(){
- /*
- * 参数一:交换机名称
- * 参数二:路由关键字
- * 参数三:要发送的消息
- */
- rabbitTemplate.convertAndSend("boot_topic_exchange","message","hello MQ");
- }
-
- }
我们编写另一个SpringBoot项目作为RabbitMQ的消费者,因为在同一个项目中的话直接方法调用就可以。
1、创建项目导入依赖。
2、编写配置文件,和生产者的相同
3、编写消费者,监听队列
- @Component
- public class Consumer {
- // 监听队列
- @RabbitListener(queues = "boot_queue")
- public void listen_message(String message){
- System.out.println("发送短信:"+message);
- }
- }
4、运行项目。观察管控台队列和控制台
RabbitMQ消息投递的路径为:
生产者
--->交换机
--->队列
--->消费者
在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?
首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:
- spring:
- rabbitmq:
- host: 192.168.66.100
- port: 5672
- username: MQzhang
- password: MQzhang
- virtual-host: /
-
-
- #日志格式
- logging:
- pattern:
- console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
-
在生产者的配置类创建交换机和队列:
- package com.zj.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitConfig {
-
- private final String EXCHANGE_NAME = "boot_topic_exchange";
- private final String QUEUE_NAME = "boot_queue";
-
-
- // 创建交换机
- @Bean(EXCHANGE_NAME)
- public Exchange getExchange() {
- return ExchangeBuilder
- .topicExchange(EXCHANGE_NAME) // 交换机类型和名称
- .durable(true) // 是否持久化
- .build();
- }
-
-
- // 创建队列
- @Bean(QUEUE_NAME)
- public Queue getMessageQueue() {
- return new Queue(QUEUE_NAME); // 队列名
- }
-
-
- // 交换机绑定队列
- @Bean
- public Binding bindMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange,
- @Qualifier(QUEUE_NAME) Queue queue) {
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("#.message.#")
- .noargs();
- }
- }
创建生产者
- @Component
- public class Consumer {
- // 监听队列
- @RabbitListener(queues = "boot_queue")
- public void listen_message(String message){
- System.out.println("发送短信:"+message);
- }
- }
确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下:
1、生产者配置文件开启确认模式
- spring:
- rabbitmq:
- host: 192.168.66.100
- port: 5672
- username: MQzhang
- password: MQzhang
- virtual-host: /
- # 开启确认模式
- publisher-confirm-type: correlated
2、生产者定义确认模式的回调方法,并模拟向不存在的交换机aaa发送消息。
- package com.zj;
-
- import org.junit.jupiter.api.Test;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.boot.test.context.SpringBootTest;
-
- import javax.annotation.Resource;
-
- @SpringBootTest
- class DemoApplicationTests {
-
-
- @Resource
- public RabbitTemplate rabbitTemplate;
-
- @Test
- public void testProducer(){
-
- //定义确认模式的回调方法,当消息向交换机发送后会调用confirm方法。
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
-
- /**
- * @param correlationData 相关配置信息
- * @param ack 交换机是否收到消息
- * @param cause 失败原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if(ack){
- System.out.println("消息接受成功");
- }else {
- System.out.println("消息接受失败:"+cause);
- //做一些处理让消息再次发送
- }
-
- }
- });
-
- rabbitTemplate.convertAndSend("aaa","message","hello MQ");
- }
-
- }
3、运行结果
消息接受失败:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aaa' in vhost '/', class-id=60, method-id=40)
退回模式(return)可以监听消息是否从交换机成功传递到队列,使用方法如下:
1、生产者配置文件开启退回模式
- spring:
- rabbitmq:
- host: 192.168.66.100
- port: 5672
- username: MQzhang
- password: MQzhang
- virtual-host: /
- # 开启确认模式
- publisher-confirm-type: correlated
- # 开启回退模式
- publisher-returns: true
2、生产者定义退回模式的回调方法,模拟向不存在的队列bbb发送消息。
-
- @SpringBootTest
- class DemoApplicationTests {
-
-
- @Resource
- public RabbitTemplate rabbitTemplate;
-
- @Test
- public void testProducer(){
-
- //定义退回模式的回调方法,只有交换机将消息发送到队列失败后才会执行该方法。
- rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
- /**
- *
- * @param returnedMessage 失败后将失败信息封装到该参数
- */
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- System.out.println("消息对象:"+returnedMessage);
- System.out.println("错误码:"+returnedMessage.getReplyCode());
- System.out.println("错误信息:"+returnedMessage.getReplyText());
- System.out.println("交换机:"+returnedMessage.getExchange());
- System.out.println("路由键:"+returnedMessage.getRoutingKey());
-
- //处理消息……
- }
- });
-
- rabbitTemplate.convertAndSend("boot_topic_exchange","bbb","hello MQ");
- }
- }
- 消息对象:ReturnedMessage [message=(Body:'hello MQ' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=boot_topic_exchange, routingKey=bbb]
- 错误码:312
- 错误信息:NO_ROUTE
- 交换机:boot_topic_exchange
- 路由键:bbb
在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。
消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。
1.消费者配置开启手动签收
- spring:
- rabbitmq:
- host: 192.168.66.100
- port: 5672
- username: MQzhang
- password: MQzhang
- virtual-host: /
- # 开启手动签收
- listener:
- simple:
- acknowledge-mode: manual
2、消费者处理消息时定义手动签收和拒绝签收的情况
- package com.zj.consumer;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
-
- @Component
- public class ACKConsumer {
-
- // 监听队列
- /**
- *
- * @param message 消息对象
- * @param channel 信道对象,用于手动接受消息
- */
- @RabbitListener(queues = "boot_queue")
- public void listen_message(Message message, Channel channel) throws IOException {
- //deliveryTag:消息投递序号,每次投递该值都会+1.
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- //签收消息
- /*
- * 参数一:消息投递序号
- * 参数二:一次是否可以签收多条消息
- */
- channel.basicAck(deliveryTag,true);
- }catch (Exception e){
- //拒签消息
- /*
- * 参数一:消息投递序号
- * 参数二:一次是否可以签收多条消息
- * 参数三:拒签后消息是否重回队列(处在队列中的消息会不断的再向消费者发送消息)
- */
- channel.basicNack(deliveryTag,true,true);
- System.out.println("消息消费失败");
- }
-
- }
- }
之前我们说MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。
消费端限流的写法如下:
1.生产者批量发送消息
- @SpringBootTest
- class DemoApplicationTests {
-
-
- @Resource
- public RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendBatch() {
- // 发送十条消息
- for (int i = 0; i < 100; i++) {
- rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
- }
- }
- }
2.消费端配置限流机制
- spring:
- rabbitmq:
- host: 192.168.66.100
- port: 5672
- username: MQzhang
- password: MQzhang
- virtual-host: /
- listener:
- simple:
- # 限流机制必须开启手动签收
- acknowledge-mode: manual
- # 消费端最多拉取20条消息消费,签收后不满20条才会继续拉取消息。
- prefetch: 20
-
3、消费者接受消息
- package com.zj.consumer;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
- @Component
- public class OosConsumer {
- @RabbitListener(queues = "boot_queue")
- public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
- //1.获取消息
- System.out.println("当前时间:"+new String(message.getBody()));
- //2.模拟业务处理
- Thread.sleep(2000);
- //3.签收消息
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- channel.basicAck(deliveryTag,true);
- }
- }
20230619
在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。
使用方法如下:
1.生产者批量发送消息
-
- @SpringBootTest
- class DemoApplicationTests {
-
-
- @Resource
- public RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendBatch() {
- // 发送十条消息
- for (int i = 0; i < 10; i++) {
- rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
- }
- }
- }
2.消费端配置不公平分发
- spring:
- rabbitmq:
- host: 192.168.66.100
- port: 5672
- username: MQzhang
- password: MQzhang
- virtual-host: /
- listener:
- simple:
- # 限流机制必须开启手动签收
- acknowledge-mode: manual
- # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
- prefetch: 1
3、编写两个消费者消费相同的队列信息
- package com.zj.consumer;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class UnfairConsumer {
- // 消费者1
- @RabbitListener(queues = "boot_queue")
- public void listenMessage1(Message message, Channel channel) throws Exception {
- //1.获取消息
- System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
- //2. 处理业务逻辑
- Thread.sleep(500); // 消费者1处理快
- //3. 手动签收
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
- }
-
-
- // 消费者2
- @RabbitListener(queues = "boot_queue")
- public void listenMessage2(Message message, Channel channel) throws Exception {
- //1.获取消息
- System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
- //2. 处理业务逻辑
- Thread.sleep(3000);// 消费者2处理慢
- //3. 手动签收
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
- }
- }
- 消费者2:send message...1
- 消费者1:send message...0
- 19:53:21.676 INFO --- [main ] com.zj.DemoApplication :Started DemoApplication in 0.867 seconds (JVM running for 1.259)
- 消费者1:send message...3
- 消费者1:send message...4
- 消费者1:send message...2
- 消费者1:send message...5
- 消费者1:send message...6
- 消费者2:send message...7
- 消费者1:send message...8
- 消费者1:send message...9
发现消费者1消费的要比消费者2消费的多。能者多劳。
RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。
1、在创建队列时设置其存活时间:
- // 创建队列
- @Bean(QUEUE_NAME)
- public Queue getMessageQueue() {
- return QueueBuilder
- .durable(QUEUE_NAME)// 队列名
- .ttl(10000) //队列存活时间10s单位毫秒
- .build();
- }
2、生产者生产消息
- @SpringBootTest
- class DemoApplicationTests {
-
-
- @Resource
- public RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendBatch() {
- // 发送十条消息
- for (int i = 0; i < 10; i++) {
- rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
- }
- }
- }
十秒后,未被消费的消息会被移除。
1、在消息发送的时候设置发送时间
- /*发送消息并设置消息的存活时间*/
- @Test
- public void testSend() {
- //1.创建消息属性
- MessageProperties messageProperties = new MessageProperties();
-
- //2.设置存活时间,单位毫秒
- messageProperties.setExpiration("10000");
-
- //3.创建消息对象
- Message message = new Message(("send message……").getBytes(), messageProperties);
-
- //4.发送消息
- rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
- }
注意:
如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。
消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。
假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。
优先级队列用法如下:
1、设置队列的优先级
- // 创建队列
- @Bean(QUEUE_NAME)
- public Queue getMessageQueue() {
- return QueueBuilder
- .durable(QUEUE_NAME)// 队列名
- // .ttl(10000) //队列中消息存活时间10s单位毫秒
- .maxPriority(10) //设置队列的优先级越大优先级越高,最大255,推荐最大不超过10
- .build();
- }
2、编写生产者发送有优先级的消息
- @SpringBootTest
- class DemoApplicationTests {
-
-
- @Resource
- public RabbitTemplate rabbitTemplate;
-
-
- @Test
- public void testSend() {
- for (int i = 0; i < 10; i++) {
- if (i == 5) { // i为5时消息的优先级较高
- //1.创建消息属性
- MessageProperties messageProperties = new MessageProperties();
- //2.设置消息优先级
- messageProperties.setPriority(9);
- //3.创建消息对象
- Message message = new Message(("send message……" + i).getBytes(), messageProperties);
- rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
- }else {
- rabbitTemplate.convertAndSend("boot_topic_exchange","message","send message……" + i);
- }
- }
- }
- }
3、编写消费者测试是否是第五条消息最先被消费
- package com.zj.consumer;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
-
- @Component
- public class Consumer {
- // 监听队列
- @RabbitListener(queues = "boot_queue")
- public void listen_message(Message message, Channel channel) throws IOException {
- //1.获取消息
- System.out.println(new String(message.getBody()));
- //2.手动签收
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
- }
- }
- . ____ _ __ _ _
- /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
- ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
- \\/ ___)| |_)| | | | | || (_| | ) ) ) )
- ' |____| .__|_| |_|_| |_\__, | / / / /
- =========|_|==============|___/=/_/_/_/
- :: Spring Boot :: (v2.7.0)
- 17:47:14.858 INFO --- [main ] com.zj.DemoApplication :Starting DemoApplication using Java 1.8.0_341 on ZHANGJIN with PID 26080 (D:\Java\code\springbootcode\sb_rabbitMQ_consumer\target\classes started by 张锦 in D:\Java\code\springbootcode\sb_rabbitMQ)
- 17:47:14.858 INFO --- [main ] com.zj.DemoApplication :No active profile set, falling back to 1 default profile: "default"
- 17:47:15.482 INFO --- [main ] o.s.a.rabbit.connection.CachingConnectionFactory :Attempting to connect to: [192.168.66.100:5672]
- 17:47:15.498 INFO --- [main ] o.s.a.rabbit.connection.CachingConnectionFactory :Created new connection: rabbitConnectionFactory#2f2bf0e2:0/SimpleConnection@27f0ad19 [delegate=amqp://MQzhang@192.168.66.100:5672/, localPort= 53985]
- 17:47:15.529 INFO --- [main ] com.zj.DemoApplication :Started DemoApplication in 0.893 seconds (JVM running for 1.338)
- send message……5
- send message……0
- send message……1
- send message……2
- send message……3
- send message……4
- send message……6
- send message……7
- send message……8
- send message……9
第五条消息首先被消费。
在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。
消息成为死信的情况:
1、创建死信交换机和死信队列
- package com.zj.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitConfig {
-
- //死信交换机和死信队列
- private final String DEAD_EXCHANGE = "dead_exchange";
- private final String DEAD_QUEUE = "dead_queue";
-
-
- //普通交换机和普通队列
- private final String NORMAL_EXCHANGE = "normal_exchange";
- private final String NORMAL_QUEUE = "normal_queue";
-
-
- // 创建死信交换机
- @Bean(DEAD_EXCHANGE)
- public Exchange deadExchange(){
- return ExchangeBuilder
- .topicExchange(DEAD_EXCHANGE) //死信交换机类型和名称
- .durable(false) //是否持久化
- .build();
- }
-
- // 创建死信队列
- @Bean(DEAD_QUEUE)
- public Queue deadQueue(){
- return QueueBuilder
- .durable(DEAD_QUEUE) //死信队列名称
- .build();
- }
-
- // 死信交换机绑定死信队列
- @Bean
- public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,
- @Qualifier(DEAD_QUEUE) Queue queue){
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("dead") //交换机路由键
- .noargs();
- }
-
- //创建普通交换机
- @Bean(NORMAL_EXCHANGE)
- public Exchange normalExchange(){
- return ExchangeBuilder
- .topicExchange(NORMAL_EXCHANGE) //普通交换机类型和名称
- .durable(false) //是否持久化
- .build();
- }
- //创建普通队列
- @Bean(NORMAL_QUEUE)
- public Queue normalQueue(){
- return QueueBuilder
- .durable(NORMAL_QUEUE) //普通信队列名称
- .deadLetterExchange(DEAD_EXCHANGE) //绑定死信交换机,因为队列中的无法消费的信息会被放到死信交换机上。
- .deadLetterRoutingKey("dead") //死信队列路由关键字
- .ttl(10000) //消息存活时间
- .maxLength(10) //消息最大长度
- .build();
- }
- //普通交换机绑定普通对列
- @Bean
- public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,
- @Qualifier(NORMAL_QUEUE) Queue queue){
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("normal")
- .noargs();
- }
- }
2.创建生产者发送消息(测试存活时间过期变成死信)
- @Test
- public void testSend() {
- //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
- //1.存活时间过期
- rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
- }
十秒后
消息全部去了死信队列。
2.创建生产者(超过队列长度变成死信)
- @SpringBootTest
- class DemoApplicationTests {
-
-
- @Resource
- public RabbitTemplate rabbitTemplate;
-
-
- @Test
- public void testSend() {
- //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
- //1.存活时间过期
- // rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
- //2.超过队列长度变成死信
- for (int i = 0; i < 20; i++) {
- rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
- }
- }
- }
因为设置了普通队列的长度,所以超出队列长度的那部分就去了死信队列。也设置了队列的存活时间,因此普通队列的消息在10秒后变成了死信。
2.创建生产者和消费者(超过队列长度变成死信)
- @SpringBootTest
- class DemoApplicationTests {
-
-
- @Resource
- public RabbitTemplate rabbitTemplate;
-
-
- @Test
- public void testSend() {
- //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
- //1.生产者拒签消息,消息变成死信。
- rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
- }
- }
-
-
-
-
- @Component
- public class Consumer {
-
- // 监听队列
- @RabbitListener(queues = "normal_queue")
- public void listen_message(Message message, Channel channel) throws IOException {
- //拒签消息
- channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
- }
- }
拒签消息,消息变成了死信。
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单。
但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。
1.创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、lombok依赖。
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
2.编写配置文件
- spring:
- rabbitmq:
- host: 192.168.66.100
- port: 5672
- username: MQzhang
- password: MQzhang
- virtual-host: /
-
-
-
- #日志格式
- logging:
- pattern:
- console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
-
3.创建队列和交换机
- package com.zj.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitConfig {
- // 订单交换机和队列
- private final String ORDER_EXCHANGE = "order_exchange";
- private final String ORDER_QUEUE = "order_queue";
- // 过期订单交换机和队列(死信交换机和死信队列)
- private final String EXPIRE_EXCHANGE = "expire_exchange";
- private final String EXPIRE_QUEUE = "expire_queue";
-
- // 过期订单交换机
- @Bean(EXPIRE_EXCHANGE)
- public Exchange deadExchange(){
- return ExchangeBuilder
- .topicExchange(EXPIRE_EXCHANGE)
- .durable(false)
- .build();
- }
- // 过期订单队列
- @Bean(EXPIRE_QUEUE)
- public Queue deadQueue(){
- return QueueBuilder
- .durable(EXPIRE_QUEUE)
- .build();
- }
- // 将过期订单队列绑定到交换机
- @Bean
- public Binding bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,@Qualifier(EXPIRE_QUEUE) Queue queue){
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("expire_routing")
- .noargs();
- }
-
- // 订单交换机
- @Bean(ORDER_EXCHANGE)
- public Exchange normalExchange(){
- return ExchangeBuilder
- .topicExchange(ORDER_EXCHANGE)
- .durable(false)
- .build();
- }
-
- // 订单队列
- @Bean(ORDER_QUEUE)
- public Queue normalQueue(){
- return QueueBuilder
- .durable(ORDER_QUEUE)
- .ttl(10000) // 存活时间为10s,模拟30min
- .deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
- .deadLetterRoutingKey("expire_routing") // 死信交换机的路由关键字
- .build();
- }
- // 将订单队列绑定到交换机
- @Bean
- public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,
- @Qualifier(ORDER_QUEUE) Queue queue){
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("order_routing")
- .noargs();
- }
- }
4.编写下单的控制器方法,下单后向订单交换机发送消息
- @RestController
- public class OrderController {
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- //下单
- @GetMapping("/place/{id}")
- public String placeOrder(@PathVariable("id") String id){
- System.out.println("处理订单数据");
- //将订单id发送到订单队列
- rabbitTemplate.convertAndSend("order_exchange","order_routing",id);
- return "下单成功,修改库存。";
- }
- }
5.编写监听死信队列的消费者
- @Component
- public class Consumer {
-
- // 监听过期队列
- @RabbitListener(queues = "expire_queue")
- public void listen_message(String id) {
- System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 ");
- }
- }
在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。
RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。
1、使用xftpj将延迟插件上传到虚拟机
2.安装插件
- # 将插件放入RabbitMQ插件目录中
- mv rabbitmq_delayed_message_exchange-3.9.0.ez /usr/local/rabbitmq/plugins/
- # 启用插件
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.重启RabbitMQ服务
- #停止rabbitmq
- rabbitmqctl stop
-
-
- #启动rabbitmq
- rabbitmq-server restart -detached
此时登录管控台可以看到交换机类型多了延迟消息:
4、创建延迟交换机和延迟队列
- package com.zj.config;
-
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
-
- @Configuration
- public class RabbitConfig {
-
- // 创建延迟交换机和延迟队列
- private final String DELAYED_EXCHANGE = "delayed_exchange";
- private final String DELAYED_QUEUE = "delayed_queue";
-
- // 延迟交换机,ExchangeBuilder只能创建普通的交换机例如:topic、direct、fanout交换机。要创建延迟交换机只能创建自定义交换机。
- @Bean(DELAYED_EXCHANGE)
- public Exchange deadExchange(){
- HashMap<String, Object> args = new HashMap<>();
- args.put("x-delayed-type","topic"); //topic:延迟交换机的实际类型。
- return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",false,true,args);
- }
- // 延迟队列
- @Bean(DELAYED_QUEUE)
- public Queue deadQueue(){
- return QueueBuilder
- .durable(DELAYED_QUEUE)
- .build();
- }
-
- // 将延迟队列绑定到延迟交换机
- @Bean
- public Binding bindExchangeQueue(@Qualifier(DELAYED_EXCHANGE) Exchange exchange,
- @Qualifier(DELAYED_QUEUE) Queue queue){
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("delayed-routing")
- .noargs();
- }
-
- }
5.编写下单的控制器方法
- package com.zj.controller;
-
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessagePostProcessor;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
-
- @RestController
- public class OrderController {
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- //下单
- @GetMapping("/place/{id}")
- public String placeOrder(@PathVariable("id") String id){
- System.out.println("处理订单数据");
- //设置消息的延迟时间为10s
- MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
-
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setDelay(10000);
- return message;
- }
- };
-
- rabbitTemplate.convertAndSend("delayed_exchange","delayed-routing",id,messagePostProcessor);
- return "下单成功,修改库存。";
- }
- }
6.编写延迟队列的消费者
- package com.zj.consumer;
-
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class Consumer {
-
- // 监听延迟队列
- @RabbitListener(queues = "delayed_queue")
- public void listen_message(String id) {
- System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 ");
- }
- }
7、下单测试
延迟队列中没有消息是因为消费者将消息消费了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。