赞
踩
https://blog.csdn.net/l1090739767/article/details/116302051
rabbitmq-server
https://repo.huaweicloud.com/rabbitmq-server/v3.12.1/
erlang
https://erlang.org/download/
rabbitmq-service.bat start # 启动服务
service rabbitmq-server start
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
如果环境变量不生效,可以进行强制设置
#设置ERLANG_HOME的路径地址
set ERLANG_HOME=E:\app\MQ\Erlang OTP
http://localhost:15672/
码云仓库
https://gitee.com/zhang_w_b/rabbit-mq
(1)RabbitMQ五种工作模式
rabbitmq-producer
rabbitmq-consumer
(2)SpringBoot 整合RabbitMQ
rabbitmq-producer
rabbitmq-consumer
小结
优势:
劣势:
没有使用MQ:
- 系统的耦合性越高,容错性就越低,可维护性就越低。
使用MQ:
没有使用MQ:
使用MQ:
没有使用MQ:
使用MQ:
小结:
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
小结
既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?
RabbitMQ 中的相关概念:
RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
小结
简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式
上述的入门案例中其实使用的是如下的简单模式:
在上图的模型中,有以下概念:
代码示例:
依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
生产者:
package com.zwb.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2、设置参数 //factory.setHost("192.168.1.100"); //默认localhost factory.setPort(5672); //默认5672 factory.setVirtualHost("/itcast"); //设置RabbitMQ服务器中的虚拟机,类似mysql数据库中的某一个库 factory.setUsername("heima"); factory.setPassword("heima"); //3、创建连接Connection Connection connection = factory.newConnection(); //4、创建Channel Channel channel = connection.createChannel(); //5.创建队列Queue 暂时不用交换机 /* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 1.queue 队列名称 2.durable 是否持久化,当mq重启之后,还在 3.exclusive: *是否独占:只能有一个消费者监听队列 *当connection关闭时,是否删除队列 4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉 5.arguments: 参数 */ //如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建 channel.queueDeclare("hello_world",true,false,false,null); //6.发送消息 /* String exchange, String routingKey, boolean mandatory, byte[] body 参数: 1. exchange:交换机名称,简单模式下交换机会使用默认的 "" 2. routingKey: 路由名称 3. props: 配置信息 4. body: 发送消息数据 */ String body="hello rabbitmq----测试内容222"; channel.basicPublish("","hello_world",null,body.getBytes()); //7、释放资源 /* channel.close(); connection.close(); */ } }
消费者:
package com.zwb.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { //1、创建工厂 ConnectionFactory factory = new ConnectionFactory(); //2、设置参数 //factory.setHost("192.168.1.100"); //默认localhost factory.setPort(5672);//默认5672 factory.setVirtualHost("/itcast");// 默认/ factory.setUsername("heima");// heima factory.setPassword("heima");// heima //3、创建连接Connection Connection connection = factory.newConnection(); //4、创建Channel Channel channel = connection.createChannel(); //5、创建队列Queue 暂时不用交换机 /* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 1.queue 队列名称 2.durable 是否持久化,当mq重启之后,还在 3.exclusive: *是否独占:只能有一个消费者监听队列 *当connection关闭时,是否删除队列 4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉 5.arguments: 参数 */ //如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建 channel.queueDeclare("hello_world",true,false,false,null); //6、接收消息 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck: 是否自动确认 3. callback: 回调对象 */ Consumer consumer = new DefaultConsumer(channel) { /* 回调方法,当收到消息后,会自动执行该方法 1.consumerTag:标识 2.envelope:获取一些信息。交换机,路由key... 3.properties: 配置信息 4.body: 数据 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body));//byte数组转string字符串 } }; channel.basicConsume("hello_world",true,consumer); //关闭资源?不要,因为消费者要监听 } }
(1)模式说明
(2)代码编写
Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多个消费者同时对消费消息的测试。
Producer_WorkQueues
package com.itheima.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; //简单模式 //发送消息 public class Producer_WorkQueues { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); //2.设置参数 factory.setHost("192.168.101.22");//ip地址 默认localhost factory.setPort(5672);//端口 默认值5672 factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机 factory.setUsername("heima");//用户名 默认guest factory.setPassword("heima");//密码 默认guest //3.创建连接 connection Connection connection = factory.newConnection(); //4.创建channel Channel channel = connection.createChannel(); //5.创建队列Queue 暂时不用交换机 /* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 1.queue 队列名称 2.durable 是否持久化,当mq重启之后,还在 3.exclusive: *是否独占:只能有一个消费者监听队列 *当connection关闭时,是否删除队列 4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉 5.arguments: 参数 */ //如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建 channel.queueDeclare("work_queues",true,false,false,null); //6.发送消息 /* String exchange, String routingKey, boolean mandatory, byte[] body 参数: 1. exchange:交换机名称,简单模式下交换机会使用默认的 "" 2. routingKey: 路由名称 3. props: 配置信息 4. body: 发送消息数据 */ for (int i = 1; i <=10; i++) { String body=i+"hello rabbitmq----"; channel.basicPublish("","work_queues",null,body.getBytes()); } //7.释放资源 /*channel.close(); connection.close();*/ } }
Consumer_WorkQueues1
package com.itheima.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_WorkQueues1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); //2.设置参数 factory.setHost("192.168.101.22");//ip地址 默认localhost factory.setPort(5672);//端口 默认值5672 factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机 factory.setUsername("heima");//用户名 默认guest factory.setPassword("heima");//密码 默认guest //3.创建连接 connection Connection connection = factory.newConnection(); //4.创建channel Channel channel = connection.createChannel(); //5.创建队列Queue 暂时不用交换机 /* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 1.queue 队列名称 2.durable 是否持久化,当mq重启之后,还在 3.exclusive: *是否独占:只能有一个消费者监听队列 *当connection关闭时,是否删除队列 4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉 5.arguments: 参数 */ //如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建 channel.queueDeclare("work_queues",true,false,false,null); //6.接收消息 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck: 是否自动确认 3. callback: 回调对象 */ Consumer consumer=new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1.consumerTag:标识 2.envelope:获取一些信息。交换机,路由key... 3.properties: 配置信息 4.body: 数据 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body));//byte数组转string字符串 } }; channel.basicConsume("work_queues",true,consumer); //关闭资源?不要,因为消费者要监听 } }
Consumer_WorkQueues2
package com.itheima.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_WorkQueues2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); //2.设置参数 factory.setHost("192.168.101.22");//ip地址 默认localhost factory.setPort(5672);//端口 默认值5672 factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机 factory.setUsername("heima");//用户名 默认guest factory.setPassword("heima");//密码 默认guest //3.创建连接 connection Connection connection = factory.newConnection(); //4.创建channel Channel channel = connection.createChannel(); //5.创建队列Queue 暂时不用交换机 /* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 1.queue 队列名称 2.durable 是否持久化,当mq重启之后,还在 3.exclusive: *是否独占:只能有一个消费者监听队列 *当connection关闭时,是否删除队列 4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉 5.arguments: 参数 */ //如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建 channel.queueDeclare("work_queues",true,false,false,null); //6.接收消息 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck: 是否自动确认 3. callback: 回调对象 */ Consumer consumer=new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1.consumerTag:标识 2.envelope:获取一些信息。交换机,路由key... 3.properties: 配置信息 4.body: 数据 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body));//byte数组转string字符串 } }; channel.basicConsume("work_queues",true,consumer); //关闭资源?不要,因为消费者要监听 } }
(3)测试
注意:先运行消费者1和消费者2,再运行生产者
结果如下:消费者1消费消息序号 1,3,5,7,9
消费者2消费消息序号 2,4,6,8,10
(4) 小结
(1)模式说明
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)。一方面,
接收生产者发送的消息
。另一方面,
知道如何处理消息
,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
(2)代码编写
生产者Producer_PubSub
package com.itheima.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; //发布订阅模式 //发送消息 public class Producer_PubSub { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); //2.设置参数 factory.setHost("192.168.101.22");//ip地址 默认localhost factory.setPort(5672);//端口 默认值5672 factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机 factory.setUsername("heima");//用户名 默认guest factory.setPassword("heima");//密码 默认guest //3.创建连接 connection Connection connection = factory.newConnection(); //4.创建channel Channel channel = connection.createChannel(); //5.创建交换机 /* exchangeDeclare (String exchange, 交换机名称 BuiltinExchangeType type, 交换机类型 枚举类型 DIRECT("direct"), :定向 FANOUT("fanout"), :扇形(广播),发送消息到每一个与之绑定的队列 TOPIC("topic"),:通配符方式 HEADERS("headers");参数匹配 boolean durable, 是否持久化 boolean autoDelete, 是否自动删除 boolean internal, 内部使用一般false Map<String, Object> arguments) 参数列表 */ String exchangeName="test_fanout"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); //6.创建两个队列 String queue1Name="test_fanout_queue1"; String queue2Name="test_fanout_queue2"; /* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 1.queue 队列名称 2.durable 是否持久化,当mq重启之后,还在 3.exclusive: *是否独占:只能有一个消费者监听队列 *当connection关闭时,是否删除队列 4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉 5.arguments: 参数 */ channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7,绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1.queue: 队列名称 2.exchange:交换机 3.routingKey:路由键:绑定规则 如果交换机的类型为fanout,routingKey设置为"" */ channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); //8.发送消息 String body="日志信息:张三调用了findAll方法...日志级别:info"; channel.basicPublish(exchangeName,"",null,body.getBytes()); //9.释放资源 channel.close(); connection.close(); } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
Consumer_PubSub1
package com.itheima.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_PubSub1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); //2.设置参数 factory.setHost("192.168.101.22");//ip地址 默认localhost factory.setPort(5672);//端口 默认值5672 factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机 factory.setUsername("heima");//用户名 默认guest factory.setPassword("heima");//密码 默认guest //3.创建连接 connection Connection connection = factory.newConnection(); //4.创建channel Channel channel = connection.createChannel(); //5.队列已经声明一次了,这里不用再声明队列 String queue1Name="test_fanout_queue1"; String queue2Name="test_fanout_queue2"; //6.接收消息 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck: 是否自动确认 3. callback: 回调对象 */ Consumer consumer=new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1.consumerTag:标识 2.envelope:获取一些信息。交换机,路由key... 3.properties: 配置信息 4.body: 数据 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body));//byte数组转string字符串 System.out.println("将日志信息打印到控制台"); } }; channel.basicConsume(queue1Name,true,consumer); //关闭资源?不要,因为消费者要监听 } }
Consumer_PubSub2
package com.itheima.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_PubSub2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); //2.设置参数 factory.setHost("192.168.101.22");//ip地址 默认localhost factory.setPort(5672);//端口 默认值5672 factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机 factory.setUsername("heima");//用户名 默认guest factory.setPassword("heima");//密码 默认guest //3.创建连接 connection Connection connection = factory.newConnection(); //4.创建channel Channel channel = connection.createChannel(); //5.队列已经声明一次了,这里不用再声明队列 String queue1Name="test_fanout_queue1"; String queue2Name="test_fanout_queue2"; //6.接收消息 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck: 是否自动确认 3. callback: 回调对象 */ Consumer consumer=new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1.consumerTag:标识 2.envelope:获取一些信息。交换机,路由key... 3.properties: 配置信息 4.body: 数据 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body));//byte数组转string字符串 System.out.println("将日志信息保存数据库"); } }; channel.basicConsume(queue2Name,true,consumer); //关闭资源?不要,因为消费者要监听 } }
运行结果:
(3)小结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:
工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机
(1)模式说明
图解:
(2)代码编写
Producer_Routing
package com.zwb.producer.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer_Routing { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2、设置参数 //factory.setHost("192.168.1.100"); //默认localhost factory.setPort(5672); //默认5672 factory.setVirtualHost("/itcast"); //设置RabbitMQ服务器中的虚拟机,类似mysql数据库中的某一个库 factory.setUsername("heima"); factory.setPassword("heima"); //3、创建连接Connection Connection connection = factory.newConnection(); //4、创建Channel Channel channel = connection.createChannel(); //5、创建交换机 /* exchangeDeclare (String exchange, 交换机名称 BuiltinExchangeType type, 交换机类型 枚举类型 DIRECT("direct"), :定向 FANOUT("fanout"), :扇形(广播),发送消息到每一个与之绑定的队列 TOPIC("topic"),:通配符方式 HEADERS("headers");参数匹配 boolean durable, 是否持久化 boolean autoDelete, 是否自动删除 boolean internal, 内部使用一般false Map<String, Object> arguments) 参数列表 */ String exchangeName = "test_direct"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,null); //6、创建两个队列 String queue1Name="test_direct_queue1"; String queue2Name="test_direct_queue2"; /* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 1.queue 队列名称 2.durable 是否持久化,当mq重启之后,还在 3.exclusive: *是否独占:只能有一个消费者监听队列 *当connection关闭时,是否删除队列 4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉 5.arguments: 参数 */ channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7、绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1.queue: 队列名称 2.exchange:交换机 3.routingKey:路由键:绑定规则 如果交换机的类型为fanout,routingKey设置为"" */ //队列1 channel.queueBind(queue1Name,exchangeName,"error"); //队列2 channel.queueBind(queue2Name,exchangeName,"info"); channel.queueBind(queue2Name,exchangeName,"warning"); channel.queueBind(queue2Name,exchangeName,"error"); //8.发送消息 String body="日志信息:张三调用了delete方法出错误了...日志级别:error"; channel.basicPublish(exchangeName,"error",null,body.getBytes()); //9.释放资源 channel.close(); connection.close(); } }
Consumer_Routing1
package com.zwb.consumer.routing; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_Routing1 { public static void main(String[] args) throws IOException, TimeoutException { //1、创建工厂 ConnectionFactory factory = new ConnectionFactory(); //2、设置参数 //factory.setHost("192.168.1.100"); //默认localhost factory.setPort(5672);//默认5672 factory.setVirtualHost("/itcast");// 默认/ factory.setUsername("heima");// heima factory.setPassword("heima");// heima //3、创建连接Connection Connection connection = factory.newConnection(); //4、创建Channel Channel channel = connection.createChannel(); //5.队列已经声明一次了,这里不用再声明队列 String queue1Name="test_direct_queue1"; String queue2Name="test_direct_queue2"; //6.接收消息 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck: 是否自动确认 3. callback: 回调对象 */ Consumer consumer=new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1.consumerTag:标识 2.envelope:获取一些信息。交换机,路由key... 3.properties: 配置信息 4.body: 数据 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body));//byte数组转string字符串 System.out.println("将日志信息打印到控制台"); } }; channel.basicConsume(queue2Name,true,consumer); //关闭资源?不要,因为消费者要监听 } }
Consumer_Routing2
package com.zwb.consumer.routing; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_Routing2 { public static void main(String[] args) throws IOException, TimeoutException { //1、创建工厂 ConnectionFactory factory = new ConnectionFactory(); //2、设置参数 //factory.setHost("192.168.1.100"); //默认localhost factory.setPort(5672);//默认5672 factory.setVirtualHost("/itcast");// 默认/ factory.setUsername("heima");// heima factory.setPassword("heima");// heima //3、创建连接Connection Connection connection = factory.newConnection(); //4、创建Channel Channel channel = connection.createChannel(); //5.队列已经声明一次了,这里不用再声明队列 String queue1Name="test_direct_queue1"; String queue2Name="test_direct_queue2"; //6.接收消息 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck: 是否自动确认 3. callback: 回调对象 */ Consumer consumer=new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1.consumerTag:标识 2.envelope:获取一些信息。交换机,路由key... 3.properties: 配置信息 4.body: 数据 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body));//byte数组转string字符串 System.out.println("将日志信息打印到控制台"); } }; channel.basicConsume(queue1Name,true,consumer); //关闭资源?不要,因为消费者要监听 } }
(3) 运行结果
小结
(1)模式说明
图解:
(2)代码实现
Producer_Topics
package com.zwb.producer.topics; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer_Topics { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2、设置参数 //factory.setHost("192.168.1.100"); //默认localhost factory.setPort(5672); //默认5672 factory.setVirtualHost("/itcast"); //设置RabbitMQ服务器中的虚拟机,类似mysql数据库中的某一个库 factory.setUsername("heima"); factory.setPassword("heima"); //3、创建连接Connection Connection connection = factory.newConnection(); //4、创建Channel Channel channel = connection.createChannel(); //5、创建交换机 /* exchangeDeclare (String exchange, 交换机名称 BuiltinExchangeType type, 交换机类型 枚举类型 DIRECT("direct"), :定向 FANOUT("fanout"), :扇形(广播),发送消息到每一个与之绑定的队列 TOPIC("topic"),:通配符方式 HEADERS("headers");参数匹配 boolean durable, 是否持久化 boolean autoDelete, 是否自动删除 boolean internal, 内部使用一般false Map<String, Object> arguments) 参数列表 */ String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,null); //6、创建两个队列 String queue1Name="test_topic_queue1"; String queue2Name="test_topic_queue2"; /* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 1.queue 队列名称 2.durable 是否持久化,当mq重启之后,还在 3.exclusive: *是否独占:只能有一个消费者监听队列 *当connection关闭时,是否删除队列 4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉 5.arguments: 参数 */ channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7、绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1.queue: 队列名称 2.exchange:交换机 3.routingKey:路由键:绑定规则 如果交换机的类型为fanout,routingKey设置为"" */ //routing key:系统的名称,日志级别 //--需求:所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name,exchangeName,"#.error"); channel.queueBind(queue1Name,exchangeName,"order.*"); channel.queueBind(queue2Name,exchangeName,"*.*"); //8.发送消息 String body="日志信息:张三调用了findAll方法...日志级别:info"; channel.basicPublish(exchangeName,"order.info",null,body.getBytes()); //9.释放资源 channel.close(); connection.close(); } }
Consumer_Topic1
package com.zwb.consumer.topics; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_Topic1 { public static void main(String[] args) throws IOException, TimeoutException { //1、创建工厂 ConnectionFactory factory = new ConnectionFactory(); //2、设置参数 //factory.setHost("192.168.1.100"); //默认localhost factory.setPort(5672);//默认5672 factory.setVirtualHost("/itcast");// 默认/ factory.setUsername("heima");// heima factory.setPassword("heima");// heima //3、创建连接Connection Connection connection = factory.newConnection(); //4、创建Channel Channel channel = connection.createChannel(); //5.队列已经声明一次了,这里不用再声明队列 String queue1Name="test_topic_queue1"; String queue2Name="test_topic_queue2"; //6.接收消息 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck: 是否自动确认 3. callback: 回调对象 */ Consumer consumer=new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1.consumerTag:标识 2.envelope:获取一些信息。交换机,路由key... 3.properties: 配置信息 4.body: 数据 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body));//byte数组转string字符串 System.out.println("将日志信息打印到控制台"); } }; channel.basicConsume(queue2Name,true,consumer); //关闭资源?不要,因为消费者要监听 } }
Consumer_Topic2
package com.zwb.consumer.topics; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_Topic2 { public static void main(String[] args) throws IOException, TimeoutException { //1、创建工厂 ConnectionFactory factory = new ConnectionFactory(); //2、设置参数 //factory.setHost("192.168.1.100"); //默认localhost factory.setPort(5672);//默认5672 factory.setVirtualHost("/itcast");// 默认/ factory.setUsername("heima");// heima factory.setPassword("heima");// heima //3、创建连接Connection Connection connection = factory.newConnection(); //4、创建Channel Channel channel = connection.createChannel(); //5.队列已经声明一次了,这里不用再声明队列 String queue1Name="test_topic_queue1"; String queue2Name="test_topic_queue2"; //6.接收消息 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck: 是否自动确认 3. callback: 回调对象 */ Consumer consumer=new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1.consumerTag:标识 2.envelope:获取一些信息。交换机,路由key... 3.properties: 配置信息 4.body: 数据 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body));//byte数组转string字符串 System.out.println("将日志信息打印到控制台"); } }; channel.basicConsume(queue1Name,true,consumer); //关闭资源?不要,因为消费者要监听 } }
结果
小结
创建生产者SpringBoot工程
引入start,依赖坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写yml配置,基本信息配置
定义交换机,队列以及绑定关系的配置类
注入RabbitTemplate,调用方法,完成消息发送
pom.xml
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.15</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.rabiitmq</groupId> <artifactId>producer-springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>producer-springboot</name> <description>producer-springboot</description> <properties> <java.version>8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> </dependencies>
RabbitMQConfig.java
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME="boot_topic_exchange"; public static final String QUEUE_NAME="boot_quque"; /* * 参数说明: * id: bean的名称 * name:quque名称 * auto-declare:自动创建 * auto-delete: 自动删除,最后一个消费者和该队列断开连接后,自动删除队列 * durable:是否持久化 * exclusive:是否独占 */ //1.交换机 @Bean("bootExchange") public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } //2.队列 @Bean("bootQueue") public Queue bootQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } //3.队列和交换机的绑定关系 @Bean public Binding binQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); } }
ProducerSpringbootApplication
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerSpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerSpringbootApplication.class, args);
}
}
application.yml
# 配置rabbitmq的基本信息 IP端口 username passwordspring:
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /itcast
username: heima
password: heima
ProducerTest
import com.rabiitmq.config.RabbitMQConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class ProducerSpringbootApplicationTests { //1.注入rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSend(){ rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","这是springboot-rabbitmq的测试消息22"); } }
测试:运行Junit Test
创建消费者SpringBoot工程
引入start,依赖坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写yml配置,基本信息配置
定义监听类,使用@RabbitListener注解完成队列监听。
ConsumerSpringbootApplication
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumerSpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerSpringbootApplication.class, args);
}
}
RabbitMQListener.java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_quque")
public void ListenerQueue(Message message){
System.out.println(new String(message.getBody()));
}
}
application.yml
# 配置rabbitmq的基本信息 IP端口 username passwordspring:
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /itcast
username: heima
password: heima
pom.xml
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.15</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.rabbitmq</groupId> <artifactId>consumer-springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>consumer-springboot</name> <description>consumer-springboot</description> <properties> <java.version>8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> </dependencies>
运行ConsumerSpringbootApplication,消费者从MQ中拿到消息
https://blog.csdn.net/weixin_62319133/article/details/127925299
在安装RabbitMQ是会出现无法读取ErlangOTP的情况,多数是因为版本不匹配
1.在官方文档中找到RabbitMQ版本对应的Erlang版本重新下载安装包
文档RabbitMQ Erlang Version Requirements — RabbitMQ
2.下载完成后停止Erlang和RabbitMQ服务并进入原本Erlang和RabbitMQ的安装目录,运行Uninstall.exe
3.卸载完成后检查Erlang和RabbitMQ的安装目录是否删干净
4.删除RabbitMQ和Erlang的所有安装目录。
5.运行CMD–>sc delete RabbitMQ
6.删除目录C:\Windows\System32\config\systemprofile中的.erlang.cookie文件(如果有的话)。
7.删除目录C:\Users\用户名 中的.erlang.cookie文件(如果有的话)。
8.删除目录C:\Users\用户名\AppData\Roaming目录下的RabbitMQ文件夹。
9.打开注册表编辑器,删除表
计算机\HKEY_LOCAL_MACHINE\SOFTWARE\WOW6432Node\Ericsson\Erlang下的子项
10.以管理员运行分别运行Erlang和RabbitMQ安装包即可
rabbitmq启动的时候报错:Please either set ERLANG_HOME to point to your Erlang installation or place
https://blog.csdn.net/weixin_42409107/article/details/100105905
https://www.rabbitmq.com/which-erlang.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。