赞
踩
RabbitMQ在Windows上的详细安装教程。
我这里使用的系统环境如下:
windows系统:Windows 10 专业版
Erlang:24.1.7
RabbitMQ:3.11.6
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。 AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。 RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
具体特点包括:
1.可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
2.灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
3.消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
4.高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
5.多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
6.多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
7.管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
8.跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
9.插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
这里通过官网下载需要的版本:RabbitMq官方地址
由于官网访问下载比较慢,贴一个云盘地址:百度网盘地址
进入官网,往下拉,找到Download+Installation
点击Download+Installation,找到下图箭头Install:Windows
如果需要找其他版本,可以点击release(如下图)找其他版本,这里就不做说明。
页面下拉,找到dependencies依赖关系:
选择windows对应版本
点击这里,可以查看rabbitmq与erlang版本对应表
这里我截图一部分版本参照表,其他的可自行查看
第一步找到我们软件下载的位置
第二步先安装otp_win64_25.2.exe
右键以管理员身份运行,选取安装的路径,然后一路傻瓜式安装next下一步即可。
注意,不要安装在中文或带空格的文件路径下
配置系统环境变量:
找到之前安装的路径,找到bin目录,复制bin目录,进入系统环境变量path,点击编辑
粘贴刚刚复制的内容。
至此,erlang环境已经配置好了。
第三步,安装rabbitMq
右键以管理员身份运行,选择安装路径,接下来全部next下一步,遇到弹窗允许,没有弹窗则继续。
第四步:安装完成后找到安装文件路径,找到 sbin 目录下,全选路径 输入 cmd
运行下面命令,安装网页管理页面插件。
rabbitmq-plugins enable rabbitmq_management
第四步、打开任务资源管理器。win11 快捷键 Ctrl+Shift+Esc,找到rabbitmq服务右键重新启动。
安装完成
该页面是rabbitMq主页面,共6个选项卡:
1.)overview
mq的概述情况,里面包括集群的各个节点信息、端口映射信息
totals:准备消费的消息数、待确认的消息数、消息总数以及消息的各种处理速率(发送速率、确认速率、写入硬盘速率等等)。
nodes:支撑 RabbitMQ 运行的一些机器,相当于集群的节点。点击每个节点,可以查看节点的详细信息。
churn statistics:展示的是 Connection、Channel 以及 Queue 的创建/关闭速率。这个里边展示了端口的映射信息以及 Web 的上下文信息。
Ports and contexts:端口信息
5672 :是 RabbitMQ 通信端口。
15672: 是 Web 管理页面端口。
25672:是集群通信端口。
Export definitions && Import definitions: 两个可以导入导出当前实例的一些配置信息
2.)connections
MQ运行中的当前保持连接的连接信息,其中包含了连接的虚拟主机路径(virtuil host),连接的主机及端口、那个用户连接的、状态等信息,在java代码中可以通过ConnectionFactory 的 newConnection() 后进行创建一个连接,无论是消息消费者,还是消息生产者,只要连接上了,都会显示在这里,当时用connection.close()之后,连接关闭。
3.)Channels
信道或者通道,Channel是在连接中存在的,一个Connection可以有多个Channel。在java代码中,通过连接来创建信道。当代码执行connection.createChannel(),该界面下面就会有信道信息,它和连接紧密相关,一个连接可以有多个信道,多个通道通过多线程实现,一般情况下,我们在通道中创建队列、交换机等。生产者的通道一般会立马关闭;消费者是一直监听的,通道几乎是会一直存在。
channel:通道的名称。
user name:该通道登录使用的用户名。
Model:通道确认模式,C表示confirm,T表示事务。
State:通道当前的状态,running表示正在运行中,idle表示空闲。
Unconfirmed:待确认的消息总数。
Prefetch:Prefetch 表示每个消费者最大的能承受的未确认消息数目,简单来说就是用来指定一个消费者一次可以从 RabbitMQ 中获取多少条消息并缓存在消费者中,一旦消费者的缓冲区满了,RabbitMQ 将会停止投递新的消息到该消费者中直到它发出有消息被 ack 了。总的来说,消费者负责不断处理消息,不断 ack,然后只要 unAcked 数少于 prefetch * consumer 数目,RabbitMQ 就不断将消息投递过去。
Unacked:带确认的消息总数。
publish:消息生产者发送消息的速率。
confirm:消息生产者确认消息的速率。
unroutable (drop):表示未被接收,且已经删除的消息
deliver / get:消息消费者获得消息的速率。
ack:消息消费者ack消息的速率。
4.)Exchanges
exchange表示交换机,他与队列进行绑定后,根据消息exchange类型,按照不同的绑定规则分发消息到消息队列中,可以是一个消息被分发给多个消息队列,也可以是一个消息分发到一个消息队列,和队列紧密相关
Type:交换机的类型。
Features:有两个取值,D和I,D 表示交换机持久化,将交换机的属性在服务器内部保存,当 MQ 的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新手动或执行代码去建立交换机,交换机会自动建立,相当于一直存在。I 表示这个交换机不可以被消息生产者用来推送消息,仅用来进行交换机和交换机之间的绑定。
Message rate in:消息进入的速率。
Message rate out:消息出去的速率。
name:交换机的名称。
type:交换机的类型,有四种:
直连交换机:Direct exchange
扇形交换机:Fanout exchange
主题交换机:Topic exchange
首部交换机:Headers exchange
Durability:是否需要持久化,设置是否持久durab设置为ture表示持久化,反之是非持久化,设置为持久化则将Exchange存盘,即使服务重启数据也不会的丢失.
Auto delete:自动删除,当最后一个绑定到Exchange上的队列删除后,自动删除改exchange,也就是说如果该Exchange没有和任何队列Queue绑定则会自动删除。
Internal:rabbitmq是否为内部使用,json格式,可以设置消息最大数量等属性。
Bindings:交换机和队列建立绑定关系。
Publish message:发送消息,通过交换机发送消息到和它有绑定关系的队列中。
5.)Queue
queue是mq中的队列,是Message的落脚点和等待接收的地方,Queue很适合做负载均衡,RabbitMq可以在若干consumer中实现轮流调度(Round-Robin)
Features:表示消息队列的特性,D表示消息持久化。
State:表示当前队列的状态,running表示正在运行在中,idle表示空闲。
Ready:表示待消费的消息总数。
Unacked:表示待应答的消息总数。
Total:表示消息总数(Ready+Unacked)
incoming:消息进入的速率。
deliver/get:消息获取的速率。
ack:消息应答的速率。
Add a new queue:添加一个新的消息队列。
点击每一个消息队列的名称,可以进入到消息队列中。进入到消息队列后,可以完成对消息队列的进一步操作;发送消息。获取一条消息。移动一条消息(需要插件的支持)。删除消息队列。清空消息队列中的消息。
**
**
Durablity:持久化选项,Durable(持久化保存),Transient(即时保存), 持久化保存会在RabbitMQ宕机或者重启后,未消费的消息仍然存在,即时保存在RabbitMQ宕机或者重启后交换机会不存在。需要重新定义该Exchange。
即使保存可以理解为计算机内存,关机就没了。
持久化保存可以理解为硬盘,关机,文件还在。
Get messages:获取消息;
点击队列名称,查看队列的详细信息:
6.)Admin
用户信息
Name: 表示用户名称。
Tags: 表示角色标签,只能选取一个。
Can access virtual hosts: 表示允许进入的虚拟主机。
Has password: 表示这个用户是否设置了密码。常见的两个操作是管理用户和虚拟主机。
Add a user 可以添加一个新的用户,添加用户的时候需要给用户设置 Tags,其实就是用户角色,如下:
none: 不能访问 management plugin
management: 用户可以通过 AMQP 做的任何事 列出自己可以通过 AMQP 登入的 virtual hosts 查看自己的 virtual hosts 中的 queues,
exchanges 和 bindings 查看和关闭自己的 channels 和 connections 查看有关自己的 virtual hosts 的“全局”的统计信息,包含其他用户在这些 virtual hosts 中的活动
policymaker: management 可以做的任何事 查看、创建和删除自己的 virtual hosts 所属的 policies 和 parameters
monitoring:management 可以做的任何事 列出所有 virtual hosts,包括他们不能登录的 virtual hosts 查看其他用户的 connections 和 channels 查看节点级别的数据如 clustering 和 memory 使用情况 查看真正的关于所有 virtual hosts 的全局的统计信息
administrator:policymaker 和 monitoring 可以做的任何事 创建和删除 virtual hosts 查看、创建和删除 users 查看创建和删除 permissions 关闭其他用户的 connections
impersonator (模拟者) 模拟者,无法登录管理控制台。
我们以及了解了Exchange的四种类型,这里集成我们介绍以下常用的三种交换机类型:
Direct Exchange
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,一个队列绑定到直连交换机上,同时赋予一个路由键routing key。然后当一个消息携带着路由键值,这个消息通过生产者发送给交换机时,交换机就会根据这个路由键值去寻找绑定值与路由键值相同的队列。
FanoutExchange
扇形交换机,这个交换机没有路由键概念,就算你绑定了路由键也是无视的,这个交换机在接受到消息后会直接转发到所有绑定到它上面的队列。
Topic Exchange
主题交换机,这个交换机和直连交换机的流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单介绍下它的规则:
*(星号)用来表示一个单词(必须出现的)
#(井号)用来表示任意数量(0个或多个)单词
通配的绑定键是跟队列进行绑定。
主题交换机是非常强大的,为啥这么膨胀?
当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
另外还有 Header Exchange 头交换机 ,Default Exchange 默认交换机,Dead Letter Exchange 死信交换机,这几个该篇暂不做讲述。
本次示例教程需要创建2个springboot项目,一个rabbitMqProviderDemo(生产者),一个rabbitmqConsumerDemo(消费者)。
首先创建rabbitMqProviderDemo。pom.xml中添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml中添加配置:
server:
port: 7000
spring:
#配置rabbitmq 服务
rabbitmq:
host: 192.168.1.42
port: 5672
username: admin
password: mxkj1234..
#虚拟host 可以不设置,使用server默认host 我们这里使用默认host
# virtual-host: AndyHost
我们首先使用下direct exchange(直连型交换机),创建DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,在注释里有说明,后面的不同交换机的配置就不做同样说明了):
package com.adny.rabbitmqdemo.rabbitmq.directexchange; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author andyhoop * @data 2023/1/10 13:46 */ @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("TestDirectQueue",true); } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("TestDirectExchange",true,false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } }
然后写个简单的接口进行消息推送(根据需求也可以改为定时任务等等,具体看需求),SendMessageController.java
package com.adny.rabbitmqdemo.rabbitmq.directexchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Optional; import java.util.UUID; /** * @author andyhoop * @data 2023/1/10 13:49 */ @RestController @RequestMapping("") public class SendMessageController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage") public Optional sendMessage(){ UUID uuid = UUID.randomUUID(); String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String message = "发送了一条消息到mq" + uuid + "发送时间为:"+ time; rabbitTemplate.convertAndSend("TestDirectExchange","TestDirectRouting",message); return Optional.of("ok"); } }
项目启动之后,使用postman调用接口:
发送成功,查看rabbitmq管理页面,查看是否推送成功(这里我们没有弄消费者,如果推送成功,可以再队列中查看到一条数据)。
查看队列
此时TestDirectQueue队列中已经准备好了一条数据,这说明我们推送的数据已经成功。
接下来,创建rabbitMqConsumerDemo项目:
pom.xml中的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml配置:
server:
port: 7001
spring:
rabbitmq:
host: 192.168.1.42
port: 5672
username: admin
password: mxkj1234..
然后一样,创建DirectRabbitConfig.java(消费者单纯的使用,其实可以不用添加这个配置,直接建后面的监听就好,使用注解来让监听器监听对应的队列即可。配置上了的话,其实消费者也是生成者的身份,也能推送该消息。):
package com.andy.rabbitmqconsumerdemo.consumer; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author andyhoop * @data 2023/1/10 14:15 */ @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { return new Queue("TestDirectQueue",true); } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { return new DirectExchange("TestDirectExchange"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } }
创建消息接收监听类:
package com.andy.rabbitmqconsumerdemo.consumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author andyhoop * @data 2023/1/10 14:14 */ @Component @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue public class DirectReceiver { @RabbitHandler public void process(String testMessage) { System.out.println("DirectReceiver消费者收到消息 : " + testMessage); } }
服务运行。可以看到消费了之前推送的消息
这时候再去rabbitmq管理页面查看,队列中的消息已经被消费掉了
继续调用rabbitMqProviderDemo中的推送消息接口,可以看到会继续消费。
接着,我们使用Topic Exchange主题交换机。
在rabbitMqProviderDemo中创建TopicRabbitConfig.java:
package com.adny.rabbitmqdemo.rabbitmq.directexchange; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author andyhoop * @data 2023/1/10 14:42 */ @Configuration public class TopicRabbitConfig { //绑定键 public final static String man = "topic.man"; public final static String woman = "topic.woman"; @Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.man); } @Bean public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man //这样只要是消息携带的路由键是topic.man,才会分发到该队列 @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 @Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } }
然后在SendMessageController.java中添加2个推送消息接口;
@GetMapping("/sendTopicMessage1") public Optional sendTopicMessage1() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: M A N "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> manMap = new HashMap<>(); manMap.put("messageId", messageId); manMap.put("messageData", messageData); manMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap); return Optional.of("ok"); } @GetMapping("/sendTopicMessage2") public Optional sendTopicMessage2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: woman is all "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> womanMap = new HashMap<>(); womanMap.put("messageId", messageId); womanMap.put("messageData", messageData); womanMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap); return Optional.of("ok"); }
然后启动服务,两个接口分别各调用一次。
查看rabbitmq管理页面,会发现Exchanges中出现topicExchange
Queues中两个队列topic.man和topic.woman中都有了数据。但是sendTopicMessage2接口只调用了一次,top.woman中为什么有两条数据呢?。
这是因为TopicRabbitConfig.java中的配置中的bindingExchangeMessage2绑定的路由键规则为topic.#,它既能匹配topic.man,也能匹配上topic.woman,因此会向topic.man和topic.woman两个队列中都发送数据。
很好,topicExchange数据推送成功。
生产者已经成功生产消息,接下来在rabbitMqConsumerDemo中,加主题交换机的相关配置,TopicRabbitConfig.java(消费者一定要加这个配置吗? 不需要的其实,理由在前面已经说过了。):
package com.andy.rabbitmqconsumerdemo.consumer; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author andyhoop * @data 2023/1/10 14:56 */ @Configuration public class TopicRabbitConfig { //绑定键 public final static String man = "topic.man"; public final static String woman = "topic.woman"; @Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.man); } @Bean public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man //这样只要是消息携带的路由键是topic.man,才会分发到该队列 @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 @Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } }
然后创建TopicManReceiver.java:
package com.andy.rabbitmqconsumerdemo.consumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @author andyhoop * @data 2023/1/10 14:56 */ @Component @RabbitListener(queues = "topic.man") public class TopicManReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("TopicManReceiver消费者收到消息 : " + testMessage.toString()); } }
在创建TopicWomanReceiver.java:
package com.andy.rabbitmqconsumerdemo.consumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @author andyhoop * @data 2023/1/10 15:00 */ @Component @RabbitListener(queues = "topic.woman") public class TopicWomanReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("TopicWomanReceiver消费者收到消息 : " + testMessage.toString()); } }
至此,消费者的代码已经写好了,启动消费者服务。
这时候你会发现,消费了消费者也成功消费了三次,这里是因为我们的TopicWomanReceiver配置的路由键是topic.#,而sendTopicMessage1推送的消息携带的路由键为:topic.man ,topic.#绑定键能与topic.man路由键匹配,同时也能与topic.woman路由键匹配
接下来是使用Fanout Exchang 扇型交换机。
同样的,先在rabbitMqProviderDemor项目上创建FanoutRabbitConfig.java:
package com.adny.rabbitmqdemo.rabbitmq.directexchange; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author andyhoop * @data 2023/1/10 15:36 */ @Configuration public class FanoutRabbitConfig { /** * 创建三个队列 :fanout.A fanout.B fanout.C * 将三个队列都绑定在交换机 fanoutExchange 上 * 因为是扇型交换机, 路由键无需配置,配置也不起作用 */ @Bean public Queue queueA() { return new Queue("fanout.A"); } @Bean public Queue queueB() { return new Queue("fanout.B"); } @Bean public Queue queueC() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }
然后在SendMessageController中写一个接口用于推送消息:
@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: testFanoutMessage ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return "ok";
}
项目启动,调用接口查看rabbitmq管理页面。可以看到生成了三个队列,并且都有一份数据
接下来在rabbitMqConsumerDemo中创建FanoutRabbitConfig.java配置类(消费者中非必要)
package com.andy.rabbitmqconsumerdemo.consumer; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author andyhoop * @data 2023/1/10 15:42 */ @Configuration public class FanoutRabbitConfig { /** * 创建三个队列 :fanout.A fanout.B fanout.C * 将三个队列都绑定在交换机 fanoutExchange 上 * 因为是扇型交换机, 路由键无需配置,配置也不起作用 */ @Bean public Queue queueA() { return new Queue("fanout.A"); } @Bean public Queue queueB() { return new Queue("fanout.B"); } @Bean public Queue queueC() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }
接着配置消费类:
FanoutReceiverA.java:
package com.andy.rabbitmqconsumerdemo.consumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @author andyhoop * @data 2023/1/10 15:43 */ @Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverA消费者收到消息 : " +testMessage.toString()); } }
FanoutReceiverB.java
package com.andy.rabbitmqconsumerdemo.consumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @author andyhoop * @data 2023/1/10 15:44 */ @Component @RabbitListener(queues = "fanout.B") public class FanoutReceiverB { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverB消费者收到消息 : " +testMessage.toString()); } }
FanoutReceiverC.java:
package com.andy.rabbitmqconsumerdemo.consumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @author andyhoop * @data 2023/1/10 15:45 */ @Component @RabbitListener(queues = "fanout.C") public class FanoutReceiverC { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverC消费者收到消息 : " +testMessage.toString()); } }
代码编写完成,启动服务。
三条同样的消息被消费
到了这里其实三个常用的交换机的使用我们已经完毕了,那么接下来我们继续讲讲消息的回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)。
本篇文章使用springboot版本为 :2.5.4
在rabbitmq-provider项目的application.yml文件上,加上消息确认的配置项后的application.yml文件为:
server: port: 7000 spring: #配置rabbitmq 服务 rabbitmq: host: 192.168.1.42 port: 5672 username: admin password: mxkj1234.. #虚拟host 可以不设置,使用server默认host 我们这里使用默认host # virtual-host: AndyHost #消息确认已发送到交换机 publisher-confirm-type: correlated #消息确认已发送到队列 publisher-returns: true
然后是配置相关的消息确认回调函数,RabbitConfig.java:
package com.adny.rabbitmqdemo.rabbitmq.directexchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author andyhoop * @data 2023/1/10 15:52 */ @Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback: "+"相关数据:"+correlationData); System.out.println("ConfirmCallback: "+"确认情况:"+ack); System.out.println("ConfirmCallback: "+"原因:"+cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("ReturnCallback: "+"消息:"+message); System.out.println("ReturnCallback: "+"回应码:"+replyCode); System.out.println("ReturnCallback: "+"回应信息:"+replyText); System.out.println("ReturnCallback: "+"交换机:"+exchange); System.out.println("ReturnCallback: "+"路由键:"+routingKey); } }); return rabbitTemplate; } }
到这里,生产者推送消息的消息确认调用回调函数已经完毕。
可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?
先从总体的情况分析,推送消息存在四种情况:
①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送到sever,交换机和队列啥都没找到
④消息推送成功
那么我先写几个接口来分别测试和认证下以上4种情况,消息确认触发回调函数的情况:
1、消息推送到server,但是在server里找不到交换机
写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):
@GetMapping("/TestMessageAck")
public String TestMessageAck() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: non-existent-exchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
return "ok";
}
接口调用后台打印日志为:
结论: 1、这种情况触发的是 ConfirmCallback 回调函数。
2、消息推送到server,找到交换机了,但是没找到队列
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作:
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):
@GetMapping("/TestMessageAck2")
public String TestMessageAck2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: lonelyDirectExchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);
return "ok";
}
可以看到这种情况,两个函数都被调用了;
这种情况下,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:2、这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。
3、消息推送到sever,交换机和队列啥都没找到
这种情况其实一看就觉得跟1、很像,没错 ,3、和1、情况回调是一致的,所以不做结果说明了。
结论:3、这种情况触发的是 ConfirmCallback 回调函数。
4、消息推送成功
那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendFanoutMessage接口,可以看到控制台输出:
结论:4、这种情况触发的是 ConfirmCallback 回调函数。
以上是生产者推送消息的消息确认 回调函数的使用介绍(可以在回调函数根据需求做对应的扩展或者业务数据处理)。
接下来我们继续, 消费者接收到消息的消息确认机制。
和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式:
① 自动确认, 这也是默认的消息确认情况。 AcknowledgeMode.NONE
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
② 根据情况确认, 这个不做介绍
③ 手动确认 , 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
basic.ack用于肯定确认
basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。
而basic.nack,basic.reject表示没有被正确处理:
着重讲下 reject ,因为有时候一些场景是需要重新入列的。
channel.basicReject(deliveryTag, true); 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。
使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。
但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。
顺便也简单讲讲 nack ,这个也是相当于设置不消费某条消息。
channel.basicNack(deliveryTag, false, true);
第一个参数依然是当前消息到的数据的唯一id;
第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。
同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。
看了上面这么多介绍,接下来我们一起配置下,看看一般的消息接收 手动确认是怎么样的。
在消费者rabbitMqConsumerDemo项目里,新建MessageListenerConfig.java上添加代码相关的配置代码:
package com.andy.rabbitmqconsumerdemo.consumer; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author andyhoop * @data 2023/1/10 16:21 */ @Configuration public class MessageListenerConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private MyAckReceiver myAckReceiver;//消息接收处理类 @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息 //设置一个队列 container.setQueueNames("TestDirectQueue"); //如果同时设置多个如下: 前提是队列都是必须已经创建存在的 // container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3"); //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues //container.setQueues(new Queue("TestDirectQueue",true)); //container.addQueues(new Queue("TestDirectQueue2",true)); //container.addQueues(new Queue("TestDirectQueue3",true)); container.setMessageListener(myAckReceiver); return container; } }
对应的手动确认消息监听类,MyAckReceiver.java(手动确认模式需要实现 ChannelAwareMessageListener):
//之前的相关监听器可以先注释掉,以免造成多个同类型监听器都监听同一个队列。
package com.andy.rabbitmqconsumerdemo.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.Map; /** * @author andyhoop * @data 2023/1/10 16:23 */ @Component public class MyAckReceiver implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { byte[] body = message.getBody(); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body)); Map<String,String> msgMap = (Map<String,String>) ois.readObject(); String messageId = msgMap.get("messageId"); String messageData = msgMap.get("messageData"); String createTime = msgMap.get("createTime"); ois.close(); System.out.println(" MyAckReceiver messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime); System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue()); channel.basicAck(deliveryTag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 // channel.basicReject(deliveryTag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝 } catch (Exception e) { channel.basicReject(deliveryTag, false); e.printStackTrace(); } } }
这时,我们先调用sendFanoutMessage接口,给扇形交换机fanoutExchange 的队列fanout.A队列 推送一条消息,可以看到监听器正常消费了下来:
到这里,我们已经掌握了怎么去使用消息消费的手动确认了。
但是如果我们的消费者项目里面,监听了多个队列,都需要变成手动确认模式,并且处理的业务逻辑都不一样。这时候我们便可以这么处理:
第一步,往SimpleMessageListenerContainer()方法里面里添加多个队列:
然后我们的手动确认消息监听类,MyAckReceiver.java 就可以同时将上面设置到的队列的消息都消费下来。
但是我们需要做不用的业务逻辑处理,那么只需要 根据消息来自的队列名进行区分处理即可,如:
package com.andy.rabbitmqconsumerdemo.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.Map; /** * @author andyhoop * @data 2023/1/10 16:23 */ @Component public class MyAckReceiver implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { byte[] body = message.getBody(); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body)); Map<String,String> msgMap = (Map<String,String>) ois.readObject(); String messageId = msgMap.get("messageId"); String messageData = msgMap.get("messageData"); String createTime = msgMap.get("createTime"); ois.close(); if ("fanout.B".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue()); System.out.println("消息成功消费到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime); System.out.println("执行TestDirectQueue中的消息的业务处理流程......"); } if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue()); System.out.println("消息成功消费到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime); System.out.println("执行fanout.A中的消息的业务处理流程......"); } channel.basicAck(deliveryTag, true); // channel.basicReject(deliveryTag, true);//为true会重新放回队列 } catch (Exception e) { channel.basicReject(deliveryTag, false); e.printStackTrace(); } } }
这个时候我们可以调用sendFanoutMessage接口看看效果:
如果你还想新增其他的监听队列,也就是按照这种方式新增配置即可(或者完全可以分开多个消费者项目去监听处理)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。