赞
踩
**第一步:**下载安装erlang,RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。
下载地址:http://www.erlang.org/downloads
选择自己合适的版本进行下载,在安装时记得记住安装路径,安装完事儿后要记得配置一下系统的环境变量
此电脑–>鼠标右键“属性”–>高级系统设置–>环境变量–>“新建”系统环境变量
变量名:ERLANG_HOME,变量值就是刚才erlang的安装地址,点击确定。
双击系统变量path,点击“新建”,将%ERLANG_HOME%\bin加入到path中。
最后windows键+R键,输入cmd,再输入erl,看到版本号就说明erlang安装成功了。
第二步:安装rabbitmq
下载地址:http://www.rabbitmq.com/download.html
cmd到rabbitmq-server的 sbin目录下 打开cmd命令行工具
cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin
执行命令:
rabbitmq-plugins.bat enable rabbitmq_management
重启rabbitmq服务器:
此电脑–》右击 点击管理界面–》双击服务和应用程序–》双击 服务–》点击重启按钮即可
浏览器中输入localhost:15672 输 guest/guest
其中的概念有:
admin选项卡中就代表的是用户界面:
其中下面的add a user
就可以添加用户:
我们创建一个名为zhangsan的用户
角色说明:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
然后上面的User列表中就会出现一个新的用户:
可以看见zhangsan用户没有自己虚拟主机,此时我们可以给zhangsan分配一个虚拟主机,名字是pay,也就是支付相关的:
点入pay中可以设置权限:
设置完后:
参数说明:
配置完后,zhangsan就有了pay虚拟主机:
添加依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
生产者的创建分为如下几个步骤:
//创建链接工厂对象
//设置RabbitMQ服务主机地址,默认localhost
//设置RabbitMQ服务端口,默认5672
//设置虚拟主机名字,默认/
//设置用户连接名,默认guest
//设置链接密码,默认guest
//创建链接
//创建频道
//声明队列
//创建消息
//消息发送
//关闭资源
按照上面的步骤,我们创建一个消息生产者,
package com.yxinmiracle.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 10:22 */ public class Producer { // 发送消息 public static void main(String[] args) throws IOException, TimeoutException { // 创建工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置RabbitMq服务器主机 connectionFactory.setHost("localhost"); // 设置端口 connectionFactory.setPort(5672); // 设置虚拟主机名字 connectionFactory.setVirtualHost("/pay"); connectionFactory.setUsername("zhangsan"); connectionFactory.setPassword("zhangsan"); Connection connection = connectionFactory.newConnection(); // 创建频道 Channel channel = connection.createChannel(); // 创建队列 /* * 1. 参数1: 制定队列的名称 * 2. 参数2: 制定是否持久化,一般为true * 3. 参数3:是否独占这个通道,一般选择不独占 * 4. 参数4:指定是否自动删除 * 5. 参数5:指定额外的参数 * */ channel.queueDeclare("simple_queue1",true,false,false,null); // 创建消息 String msg = "hello i am from simple producer"; // 消息发送 /* * 1. 参数1: 指定交换机 简单模式中使用默认的交换机 指定空字符串 * 2. 参数2: 简单模式就是队列名称 * 3. 参数3: 指定携带的额外的参数 null * 4. 发送的消息本身 * */ channel.basicPublish("","simple_queue1",null,msg.getBytes()); channel.close(); connection.close(); } }
在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息:
如果想查看消息,可以点击队列名称->Get Messages
,如下图:
消费者创建可以按照如下步骤实现:
//创建链接工厂对象
//设置RabbitMQ服务主机地址,默认localhost
//设置RabbitMQ服务端口,默认5672
//设置虚拟主机名字,默认/
//设置用户连接名,默认guest
//设置链接密码,默认guest
//创建链接
//创建频道
//创建队列
//创建消费者,并设置消息处理
//消息监听
//关闭资源(不建议关闭,建议一直监听消息)
按照上面的步骤创建消息消费者:
package com.yxinmiracle.simple; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 10:23 */ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { // 创建工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置RabbitMq服务器主机 connectionFactory.setHost("localhost"); // 设置端口 connectionFactory.setPort(5672); // 设置虚拟主机名字 connectionFactory.setVirtualHost("/pay"); connectionFactory.setUsername("zhangsan"); connectionFactory.setPassword("zhangsan"); Connection connection = connectionFactory.newConnection(); // 创建频道 Channel channel = connection.createChannel(); // 监听队列 channel.queueDeclare("simple_queue1",true,false,false,null); // 创建消费者,并设置处理消息 DefaultConsumer consumer = new DefaultConsumer(channel){ // 重写父类方法,接收到消息之后要进行处理消息 /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body,"UTF-8"); System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; // 消息监听 channel.basicConsume("simple_queue1",true,consumer); } }
运行后控制台打印:
工具类抽取
无论是消费者,还是生产者,我们发现前面的几个步骤几乎一模一样,所以可以抽取一个工具类,将下面这段代码抽取出去。
创建工具类,用于创建Connection,代码如下:
package com.yxinmiracle.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 11:23 */ public class ConnectionUtil { /*** * 创建链接对象 * @return * @throws IOException * @throws TimeoutException */ public static Connection getConnection() throws IOException, TimeoutException { //创建链接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置RabbitMQ服务主机地址,默认localhost connectionFactory.setHost("localhost"); //设置RabbitMQ服务端口,默认5672 connectionFactory.setPort(5672); //设置虚拟主机名字,默认/ connectionFactory.setVirtualHost("/pay"); //设置用户连接名,默认guest connectionFactory.setUsername("zhangsan"); //设置链接密码,默认guest connectionFactory.setPassword("zhangsan"); //创建链接 Connection connection = connectionFactory.newConnection(); return connection; } }
链接:https://rabbitmq.com/getstarted.html,通过这个网站可以查看官方所说的所有模式:
Work Queues
与入门程序的简单模式
相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
Work Queues
与入门程序的简单模式
的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。
(1)生产者
创建消息生产者对象,代码如下:
package com.yxinmiracle.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.yxinmiracle.utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 11:27 */ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); // 创建队列 /* * 1. 参数1: 制定队列的名称 * 2. 参数2: 制定是否持久化,一般为true * 3. 参数3:是否独占这个通道,一般选择不独占 * 4. 参数4:指定是否自动删除 * 5. 参数5:指定额外的参数 * */ channel.queueDeclare("simple_queue2",true,false,false,null); for (int i = 0; i < 20; i++) { // 创建消息 String msg = "hello i am from simple producer"; // 消息发送 /* * 1. 参数1: 指定交换机 简单模式中使用默认的交换机 指定空字符串 * 2. 参数2: 简单模式就是队列名称 * 3. 参数3: 指定携带的额外的参数 null * 4. 发送的消息本身 * */ channel.basicPublish("","simple_queue2",null,msg.getBytes()); } channel.close(); connection.close(); } }
(2)消费者One
创建第1个Work消费者,代码如下:
public class WorkConsumerOne { /*** * 消息消费者 * @param args * @throws IOException * @throws TimeoutException */ public static void main(String[] args) throws IOException, TimeoutException { //创建链接 Connection connection = ConnectionUtil.getConnection(); //创建频道 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("work_queue",true,false,false,null); //创建消费者,并设置消息处理 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body,"UTF-8"); System.out.println("Work-One:routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; /** * 消息监听 * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume("work_queue",true,defaultConsumer); //关闭资源(不建议关闭,建议一直监听消息) //channel.close(); //connection.close(); } }
(3)消费者Two
创建第2个Work消费者,代码如下:
public class WorkConsumerTwo { /*** * 消息消费者 * @param args * @throws IOException * @throws TimeoutException */ public static void main(String[] args) throws IOException, TimeoutException { //创建链接 Connection connection = ConnectionUtil.getConnection(); //创建频道 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("work_queue",true,false,false,null); //创建消费者,并设置消息处理 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body,"UTF-8"); System.out.println("Work-Two:routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; /** * 消息监听 * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume("work_queue",true,defaultConsumer); //关闭资源(不建议关闭,建议一直监听消息) //channel.close(); //connection.close(); } }
测试,可以看见两个消费者分别收到10条消息
(1)第一个消费者
(2)第二个消费者
在这个订阅模式中,多了一个exchange的角色,该角色的作用就是转发消息到队列中,他不做消息的存储,只做消息的转发,队列要想接受到交换机转发过来的信息,需要做一个绑定。
前面2个案例中,只有3个角色:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
Queue:消息队列,图中红色部分
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接受者,会一直等待消息到来。
Queue:消息队列,接收消息、缓存消息。
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
package com.yxinmiracle.famout; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.yxinmiracle.utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 11:27 */ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); // 创建交换机 channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT); // 声名两个队列 channel.queueDeclare("fanout_queue1",true,false,false,null); channel.queueDeclare("fanout_queue2",true,false,false,null); // 将队列绑定到指定的交换机上 // 第三个参数在广播模式中默认为空值 channel.queueBind("fanout_queue1","exchange_fanout",""); channel.queueBind("fanout_queue2","exchange_fanout",""); // 创建消息 String msg = "hello i am from fanout producer"; // 消息发送 /* * 1. 参数1: 指定交换机 简单模式中使用默认的交换机 指定空字符串 * 2. 参数2: 简单模式就是队列名称 * 3. 参数3: 指定携带的额外的参数 null * 4. 发送的消息本身 * */ channel.basicPublish("exchange_fanout","",null,msg.getBytes()); channel.close(); connection.close(); } }
package com.yxinmiracle.famout; import com.rabbitmq.client.*; import com.yxinmiracle.utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 11:30 */ public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); // 监听队列 channel.queueDeclare("fanout_queue1",true,false,false,null); // 创建消费者,并设置处理消息 DefaultConsumer consumer = new DefaultConsumer(channel){ // 重写父类方法,接收到消息之后要进行处理消息 /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body,"UTF-8"); System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; // 消息监听 channel.basicConsume("fanout_queue1",true,consumer); } }
package com.yxinmiracle.famout; import com.rabbitmq.client.*; import com.yxinmiracle.utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 11:30 */ public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); // 监听队列 channel.queueDeclare("fanout_queue2",true,false,false,null); // 创建消费者,并设置处理消息 DefaultConsumer consumer = new DefaultConsumer(channel){ // 重写父类方法,接收到消息之后要进行处理消息 /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body,"UTF-8"); System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; // 消息监听 channel.basicConsume("fanout_queue2",true,consumer); } }
生产者运行后:
消费者:
路由模式特点:
1.队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
2.消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
3.Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
图解:
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
(1)生产者
生产者需要注意如下3点:
1.声明交换机
2.声明队列
3.队列需要绑定指定的交换机
创建消息生产者,代码如下:
package com.yxinmiracle.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.yxinmiracle.utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 16:15 */ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); // 创建交换机 channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT); // 声名两个队列 channel.queueDeclare("direct_queue1",true,false,false,null); channel.queueDeclare("direct_queue2",true,false,false,null); // 将队列绑定到指定的交换机上 // 第三个参数在广播模式中默认为空值 // 路由模式中一定要制定一个routing key channel.queueBind("direct_queue1","direct_exchange","order.insert"); channel.queueBind("direct_queue2","direct_exchange","order.select"); // 创建消息 String msg1 = "hello i am from fanout producer insert"; String msg2 = "hello i am from fanout producer select"; // 消息发送 /* * 1. 参数1: 指定交换机 简单模式中使用默认的交换机 指定空字符串 * 2. 参数2: 简单模式就是队列名称 * 3. 参数3: 指定携带的额外的参数 null * 4. 发送的消息本身 * */ channel.basicPublish("direct_exchange","order.insert",null,msg1.getBytes()); channel.basicPublish("direct_exchange","order.select",null,msg2.getBytes()); channel.close(); connection.close(); } }
(2)消费者One
package com.yxinmiracle.routing; import com.rabbitmq.client.*; import com.yxinmiracle.utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 11:30 */ public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); // 监听队列 channel.queueDeclare("direct_queue1",true,false,false,null); // 创建消费者,并设置处理消息 DefaultConsumer consumer = new DefaultConsumer(channel){ // 重写父类方法,接收到消息之后要进行处理消息 /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body,"UTF-8"); System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; // 消息监听 channel.basicConsume("direct_queue1",true,consumer); } }
(3)消费者Two
package com.yxinmiracle.routing; import com.rabbitmq.client.*; import com.yxinmiracle.utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 11:30 */ public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); // 监听队列 channel.queueDeclare("direct_queue2",true,false,false,null); // 创建消费者,并设置处理消息 DefaultConsumer consumer = new DefaultConsumer(channel){ // 重写父类方法,接收到消息之后要进行处理消息 /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body,"UTF-8"); System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; // 消息监听 channel.basicConsume("direct_queue2",true,consumer); } }
Topic
类型与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.insert.abc
或者 item.insert
item.*
:只能匹配item.insert
图解:
usa.#
,因此凡是以 usa.
开头的routing key
都会被匹配到#.news
,因此凡是以 .news
结尾的 routing key
都会被匹配添加依赖
<!--父工程--> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <!--依赖--> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
启动类:
package com.yxinmiracle; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 18:32 */ @SpringBootApplication public class RabbitMqApplication { public static void main(String[] args) { SpringApplication.run(RabbitMqApplication.class,args); } @Bean public Queue queue(){ return new Queue("springboot_topic_queue"); } @Bean public TopicExchange exchange(){ return new TopicExchange("topic_exchange"); } @Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(exchange()).with("order.*"); } // 发送消息 @RequestMapping("/order") @RestController class OrderController{ @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/add") public String addOrder(){ System.out.println("======下单中======"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("======下单成功======"); rabbitTemplate.convertAndSend("topic_exchange","order.insert","消息本身,insert"); rabbitTemplate.convertAndSend("topic_exchange","order.delete","消息本身,delete"); return "success"; } } }
配置rabbitMq
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /szitheima
username: admin
password: admin
消息监听处理类
package com.yxinmiracle; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 18:32 */ @SpringBootApplication public class RabbitMqApplication { public static void main(String[] args) { SpringApplication.run(RabbitMqApplication.class,args); } @Bean public Queue queue(){ return new Queue("springboot_topic_queue"); } @Bean public TopicExchange exchange(){ return new TopicExchange("topic_exchange"); } @Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(exchange()).with("order.*"); } // 发送消息 @RequestMapping("/order") @RestController class OrderController{ @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/add") public String addOrder(){ System.out.println("======下单中======"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("======下单成功======"); rabbitTemplate.convertAndSend("topic_exchange","order.insert","消息本身,insert"); rabbitTemplate.convertAndSend("topic_exchange","order.delete","消息本身,delete"); return "success"; } } }
测试:
在消息的使用过程当中存在一些问题。比如发送消息我们如何确保消息的投递的可靠性呢?如何保证消费消息可靠性呢?如果不能保证在某些情况下可能会出现损失。比如当我们发送消息的时候和接收消息的时候能否根据消息的特性来实现某一些业务场景的模拟呢?订单30分钟过期等等,系统通信的确认等等。
可靠性消息
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式,mq提供了如下两种模式:
+ confirm模式
生产者发送消息到交换机的时机
+ return模式
交换机转发消息给queue的时机
MQ投递消息的流程如下:
1.生产者发送消息到交换机
2.交换机根据routingkey 转发消息给队列
3.消费者监控队列,获取队列中信息
4.消费成功删除队列中的消息
工程结构:
添加依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
启动类:
package com.yxinmiracle; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 20:17 */ @SpringBootApplication public class SpringRabbitApplication { public static void main(String[] args) { SpringApplication.run(SpringRabbitApplication.class,args); } @Bean public Queue queue(){ return new Queue("queue_demo01"); } @Bean public DirectExchange exchange(){ return new DirectExchange("exchange_direct_demo1"); } @Bean public Binding createBinding(){ return BindingBuilder.bind(queue()).to(exchange()).with("item.insert"); } }
创建application.yml文件,配置如下,配置开启confirms模式,默认为false
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirms: true
server:
port: 8080
创建controller 发送消息
package com.yxinmiracle.controller; import com.yxinmiracle.confirm.MyConfirmCallback; import com.yxinmiracle.confirm.MyReturnCallBack; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 20:17 */ @RestController @RequestMapping("/test") public class TestController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MyConfirmCallback myConfirmCallback; @RequestMapping("/send") public String send1(){ rabbitTemplate.setConfirmCallback(myConfirmCallback); rabbitTemplate.setReturnCallback(myReturnCallBack); rabbitTemplate.convertAndSend("exchange_direct_demo01","item.insert","hello,insert"); return "success"; } }
创建回调函数:
package com.yxinmiracle.confirm; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 20:33 */ @Component public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback { /** * * @param correlationData * @param ack 是否确认发送成功 如果是true就是成功 * @param cause 如果是成功 cause是null值 如果是失败,那就有失败的信息 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("===成功==="); }else { System.out.println(cause); } } }
测试发送消息:
改为不存在的交换机:
再进行测试:
报错!
总结:
1.发送放可以根据confrim机制来确保是否消息已经发送到交换机
2.confirm机制能保证消息发送到交换机有回调,不能保证消息转发到queue有回调
如上,已经实现了消息发送到交换机上的内容,但是如果是,交换机发送成功,但是在路由转发到队列的时候,发送错误,此时就需要用到returncallback模式了。接下来我们实现下。
实现步骤如下:
1.开启returncallback模式
2.设置回调函数
3.发送消息
配置yml开启returncallback
编写returncallback代码:
package com.yxinmiracle.confirm; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 20:57 */ @Component public class MyReturnCallBack implements RabbitTemplate.ReturnCallback { /** * 一但出现了错误就调用该方法 * @param message * @param replyCode * @param replyText 错误信息表述 * @param exchange 交换机 * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(new String(message.getBody())); System.out.println("退回的replyCode是:"+replyCode); System.out.println("退回的replyText是:"+replyText); System.out.println("退回的exchange是:"+exchange); System.out.println("退回的routingKey是:"+routingKey); } }
发送消息
我们发送正确的交换机 ,但是发送错误的routingkey测试下
controller:
package com.yxinmiracle.controller; import com.yxinmiracle.confirm.MyConfirmCallback; import com.yxinmiracle.confirm.MyReturnCallBack; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 20:17 */ @RestController @RequestMapping("/test") public class TestController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MyConfirmCallback myConfirmCallback; @Autowired private MyReturnCallBack myReturnCallBack; @RequestMapping("/send") public String send1(){ rabbitTemplate.setConfirmCallback(myConfirmCallback); rabbitTemplate.setReturnCallback(myReturnCallBack); rabbitTemplate.convertAndSend("exchange_direct_demo01","itemxxx.insert","hello,insert"); return "success"; } }
测试:
可以看见报错了。
confirm模式用于在消息发送到交换机时机使用,return模式用于在消息被交换机路由到队列中发送错误时使用。
但是一般情况下我们使用confirm即可,因为路由key 由开发人员指定,一般不会出现错误。如果要保证消息在交换机和routingkey的时候那么需要结合两者的方式来进行设置。
上边我们学习了发送方的可靠性投递,但是在消费方也有可能出现问题,比如没有接受消息,比如接受到消息之后,在代码执行过程中出现了异常,这种情况下我们需要额外的处理,那么就需要手动进行确认签收消息。rabbtimq给我们提供了一个机制:ACK机制。
ACK机制:有三种方式
解释:
其中自动确认是指:
当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
其中手动确认方式是指:
则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()等方法,让其按照业务功能进行处理,比如:重新发送,比如拒绝签收进入死信队列等等。
实现的步骤:
1.创建普通消息这监听器监听消息
2.修改controller 发送正确消息测试
3.设置配置文件开启ack手动确认,默认是自动确认
4.修改消息监听器进行手动确认业务判断逻辑
创建普通消息监听器:
package com.yxinmiracle.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-30 21:11 正常则签收,不正常则进行丢弃处理。 */ @Component @RabbitListener(queues = "queue_demo01") public class MyRabbitListener { @RabbitHandler // 可以根据不同的类型触发不同的方法 public void msg(Message message, Channel channel, String msg) { System.out.println("接收到的消息是:" + msg); try { System.out.println("start============="); Thread.sleep(1000); int i = 1 / 0; System.out.println("end=============="); // 成功了 // 参数1:制定消息的序号 // 参数2:制定是否批量的进行签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); // 接受 } catch (Exception e) { e.printStackTrace(); // 异常 就要拒绝签收 try { // 参数3:是重回队列 //如果出现异常,则拒绝消息 可以重回队列 也可以丢弃 可以根据业务场景来 channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
第一种:签收
channel.basicAck()
第二种:拒绝签收 批量处理
channel.basicNack()
第三种:拒绝签收 不批量处理
channel.basicReject()
修改Testcontroller方法用于测试发送正确消息:
@RequestMapping("/send")
public String send1(){
rabbitTemplate.setConfirmCallback(myConfirmCallback);
rabbitTemplate.setReturnCallback(myReturnCallBack);
rabbitTemplate.convertAndSend("exchange_direct_demo01","itemxxx.insert","hello,insert");
return "success";
}
设置yml设置为手动确认模式
测试结果:
如果并发量大的情况下,生产方不停的发送消息,可能处理不了那么多消息,此时消息在队列中堆积很多,当消费端启动,瞬间就会涌入很多消息,消费端有可能瞬间垮掉,这时我们可以在消费端进行限流操作,每秒钟放行多少个消息。这样就可以进行并发量的控制,减轻系统的负载,提供系统的可用性,这种效果往往可以在秒杀和抢购中进行使用。在rabbitmq中也有限流的一些配置。
配置如下:
默认是250个。
测试:并发发送10个消息,此时,如下图所示,每一个都是一个处理一个只有等处理完成之后,才能继续处理。
也可以模拟多线程:
TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ设置过期时间有两种:
需要注意一点的是:
针对某一个特定的消息设置过期时间时,一定是消息在队列中在队头的时候进行计算,如果某一个消息A 设置过期时间5秒,消息B在队头,消息B没有设置过期时间,B此时过了已经5秒钟了还没被消费。注意,此时A消息并不会被删除,因为它并没有再队头。
一般在工作当中,单独使用TTL的情况较少。
演示TTL 代码步骤:
1.创建配置类配置 过期队列 交换机 和绑定
2.创建controller 测试发送消息
创建配置类:
package com.yxinmiracle.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TtlConfig { //创建过期队列 @Bean public Queue createqueuettl1(){ //设置队列过期时间为10000 10S钟 return QueueBuilder.durable("queue_demo02").withArgument("x-message-ttl",10000).build(); } //创建交换机 @Bean public DirectExchange createExchangettl(){ return new DirectExchange("exchange_direct_demo02"); } //创建绑定 @Bean public Binding createBindingttl(){ return BindingBuilder.bind(createqueuettl1()).to(createExchangettl()).with("item.ttl"); } }
创建controller测试:
@RequestMapping("/send2")
public String send2(){
rabbitTemplate.setConfirmCallback(myConfirmCallback);
rabbitTemplate.setReturnCallback(myReturnCallBack);
rabbitTemplate.convertAndSend("exchange_direct_demo02","item.ttl","hello,insert");
return "success";
}
测试:
10s后:
死信队列:当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是Dead Letter Exchange(死信交换机 简写:DLX)。
如下图的过程:
成为死信的三种条件:
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中的消息做相应的处理。
刚才说到死信队列也是一个正常的exchange.只需要设置一些参数即可。
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key。
步骤:
1.创建queue1 接收转发过来的死信
2.创建queue2 用来接收生产者发送过来的消息
3.创建交换机 死信交换机
4.绑定queue1到死信交换机
配置类代码:
package com.yxinmiracle.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DlxConfig { //创建队列 1 这个接收转发过来的死信 queue1 @Bean public Queue createqueuetdlq(){ return QueueBuilder.durable("queue_demo03").build(); } //创建队列 2 queue2 用来接收生产者发送过来的消息 然后要过期 变成死信 转发给了queue1 @Bean public Queue createqueuetdelq2(){ return QueueBuilder .durable("queue_demo03_deq") // 名字 .withArgument("x-max-length",1)//设置队列的长度 .withArgument("x-message-ttl",10000)//设置队列的消息过期时间 10S .withArgument("x-dead-letter-exchange","exchange_direct_demo03_dlx")//设置死信交换机名称 .withArgument("x-dead-letter-routing-key","item.dlx")//设置死信路由key item.dlx 就是routingkey .build(); } //创建死信交换机 @Bean public DirectExchange createExchangedel(){ return new DirectExchange("exchange_direct_demo03_dlx"); } // queue1 绑定给 死信交换机 routingkey 和 队列转发消息时指定的死信routingkey 要一致 @Bean public Binding createBindingdel(){ return BindingBuilder.bind(createqueuetdlq()).to(createExchangedel()).with("item.dlx"); } }
controller:
@RequestMapping("/send3")
public String send3(){
rabbitTemplate.convertAndSend("","queue_demo03_deq","hello,insert");
return "success";
}
变成死信队列后:
package com.yxinmiracle.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @version 1.0 * @author: YxinMiracle * @date: 2021-08-31 18:24 */ @Component @RabbitListener(queues = "queue_demo03_deq") public class DlxListner { @RabbitHandler public void jieshouMsg(Message message, Channel channel,String msg){ System.out.println(msg); try { channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } } }
查看控制台,不需要等待10S,立刻转发给死信队列
修改配置,添加队列长度参数
超过长度的消息会直接进入死信队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。在rabbitmq中,并没有延迟队列概念,但是我们可以使用ttl 和死信队列的方式进行达到延迟的效果。这种需求往往在某些应用场景中出现。当然还可以使用插件。
如图所示:
1.生产者产生一个消息发送到queue1
2.queue1中的消息过期则转发到queue2
3.消费者在queue2中获取消息进行消费
如上场景中 典型的案例:下订单之后,30分钟如果还未支付则,取消订单回滚库存。我们来模拟下需求:
创建配置类
package com.yxinmiracle.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DelayConfig { //正常的队列 接收死信队列转移过来的消息 @Bean public Queue createQueue2(){ return QueueBuilder.durable("queue_order_queue2").build(); } //死信队列 --->将来消息发送到这里 这里不设置过期时间,我们应该在发送消息时设置某一个消息(某一个用户下单的)的过期时间 @Bean public Queue createQueue1(){ return QueueBuilder .durable("queue_order_queue1") .withArgument("x-dead-letter-exchange","exchange_order_delay")//设置死信交换机 .withArgument("x-dead-letter-routing-key","item.order")//设置死信路由key .build(); } //创建交换机 @Bean public DirectExchange createOrderExchangeDelay(){ return new DirectExchange("exchange_order_delay"); } //创建绑定 将正常队列绑定到死信交换机上 @Bean public Binding createBindingDelay(){ return BindingBuilder.bind(createQueue2()).to(createOrderExchangeDelay()).with("item.order"); } }
修改controller
@RequestMapping("/send5") public String send5(){ // 模拟下单 System.out.println("下单成功"); // 模拟减库存 System.out.println("减库存成功"); // 生产者发送消息 rabbitTemplate.convertAndSend("queue_order_queue1", (Object) "延迟队列的消息:orderId的值为:12334545", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("10000");// 设置过期时间 return message; } }); return "success"; }
注意:发送消息要发送到queue1,监听消息要监听queue2
/** * 发送下单 * * @return */ @RequestMapping("/send6") public String send6() { //发送消息到死信队列 可以使用默认的交换机 指定ourtingkey为死信队列名即可 System.out.println("用户下单成功,10秒钟之后如果没有支付,则过期,回滚订单"); System.out.println("时间:"+new Date()); rabbitTemplate.convertAndSend("queue_order_queue1", (Object) "哈哈我要检查你是否有支付", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("10000");//设置该消息的过期时间 return message; } }); return "用户下单成功,10秒钟之后如果没有支付,则过期,回滚订单"; }
设置监听类
注意,监听消息要监听queue2 ,发送消息要发送queue1
@Component @RabbitListener(queues = "queue_order_queue2") public class OrderListener { @RabbitHandler public void orderhandler(Message message, Channel channel, String msg) { System.out.println("获取到消息:" + msg + ":时间为:" + new Date()); try { System.out.println("模拟检查开始=====start"); Thread.sleep(1000); System.out.println("模拟检查结束=====end"); System.out.println("用户没付款,检查没通过,进入回滚库存处理"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } }
测试成功
代码结构:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。