赞
踩
消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不同进程Process/线程Thread之间通信。
那么为什么会产生消息队列呢?有几个原因:
不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。
RabbitMQ 是一个消息代理:它接受和转发消息。您可以将其视为邮局:当您将要寄的邮件放入邮箱时,您可以确信信使最终会将邮件发送给您的收件人。在本例中,RabbitMQ 是邮箱、邮局和信使。
RabbitMQ 与邮局的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块——消息。
RabbitMQ 和一般意义上的消息传递使用了一些术语。
生产仅仅意味着发送。发送消息的程序称为生产者
队列是 RabbitMQ 中邮箱的名称。虽然消息会流经 RabbitMQ 和您的应用程序,但它们只能存储在队列中。队列仅受主机内存和磁盘限制的约束,它本质上是一个大型消息缓冲区。许多生产者可以发送消息到一个队列,并且许多消费者可以尝试从一个队列中接收数据。这就是我们表示队列的方式
其是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
通常我们谈到队列服务,会有三个概念:发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上,多做了一层抽象,在发消息者和队列之间,加入了交换器 (Exchange)。这样发消息者和队列就没有直接联系,转而变成发消息者把消息给交换器,交换器根据调度策略再把消息给队列。那么,其中比较重要的概念有4个,分别为:虚拟主机,交换机,队列,和绑定。
Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 项目对消息各种支持。
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
配置 RabbitMQ 的安装地址、端口以及账户信息
- # 配置文件
- spring:
- datasource:
- driver-class-name: com.mysql.cj.jdbc.Driver
- url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
- username: root
- password: 123456
- # RabbitMQ配置
- rabbitmq:
- host: 192.168.146.1
- port: 5672
- username: admin
- password: 123456
我这里还配置了数据库
- package com.nianxi.mybatisplus.config;
-
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitConfig {
-
- @Bean
- public Queue Queue() {
- return new Queue("hello");
- }
- }
rabbitTemplate 是 Spring Boot 提供的默认实现
- package com.nianxi.mybatisplus.mapper;
-
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.util.Date;
-
- @Component
- public class HelloSender {
-
- @Autowired
- private AmqpTemplate rabbitTemplate;
-
- public void send() {
- String context = "hello " + new Date();
- System.out.println("Sender : " + context);
- this.rabbitTemplate.convertAndSend("hello", context);
- }
- }
- package com.nianxi.mybatisplus.mapper;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- @RabbitListener(queues = "hello")
- public class HelloReceiver {
-
- @RabbitHandler
- public void process(String hello) {
- System.out.println("Receiver : " + hello);
- }
- }
- package com.nianxi.mybatisplus;
-
- import com.nianxi.mybatisplus.mapper.HelloSender;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- @SpringBootTest
- public class RabbitMqHelloTest {
-
- @Autowired
- private HelloSender helloSender;
-
- @Test
- public void hello() throws Exception {
- helloSender.send();
- }
- }
注意:发送者和接收者的 queue name 必须一致,不然不能接收
RabbitTemplate
是Spring AMQP提供的一个高级消息操作模板,用于在与RabbitMQ进行交互时进行消息的发送和接收操作。它是对底层AMQP协议的封装,简化了与RabbitMQ的交互过程, 是SpringAMQP中的核心类,提供声明式方式处理RabbitMQ,包括发送和接收消息、消息转换、属性设置及回调机制。通过配置和正确使用,简化了RabbitMQ的集成与操作。
RabbitTemplate
提供了多种发送消息的方法,包括同步发送和异步发送。通过指定交换机、路由键和消息体,我们可以将消息发送到 RabbitMQ 服务器上的指定位置。此外,RabbitTemplate
还支持消息的确认机制,以确保消息被成功发送和接收。
rabbitTemplate.convertAndSend("exchangeName", "routingKey", message);
除了发送消息外,RabbitTemplate
还提供了接收消息的功能。通过调用相关方法,我们可以从指定的队列中接收消息,并进行相应的处理。这通常涉及到监听队列、处理消息和确认消息接收等步骤。
- Message receivedMessage = rabbitTemplate.receive("queueName");
- MyMessage myMessage = rabbitTemplate.receiveAndConvert("queueName", MyMessage.class);
RabbitTemplate
支持消息的自动转换。这意味着我们可以将 Java 对象作为消息体发送,而 RabbitTemplate
会自动将其转换为可序列化的格式(如 JSON 或 XML)。同样地,当从队列中接收消息时,RabbitTemplate
也可以自动将消息体转换回 Java 对象。
- Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
- rabbitTemplate.setMessageConverter(messageConverter);
在发送消息时,我们可以设置各种消息属性,如消息的优先级、持久化标志、过期时间等。这些属性可以通过 MessageProperties
对象进行设置,并在发送消息时传递给 RabbitTemplate
。
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- @Service
- public class MessageSender {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- public void sendMessage(String exchange, String routingKey, String message, int priority, boolean persistent, int ttl) {
- // 创建MessageProperties
- MessageProperties properties = new MessageProperties();
- // 设置优先级,值范围0-9,其中0为最低优先级,9为最高优先级
- properties.setPriority(priority);
- // 设置消息持久化
- properties.setDeliveryMode(persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
- // 设置消息的过期时间,单位为毫秒
- properties.setExpiration(String.valueOf(ttl));
-
- // 使用MessageBuilder构建Message对象
- Message msg = MessageBuilder.withBody(message.getBytes())
- .setContentEncoding("UTF-8")
- .setContentType("text/plain")
- .setMessageId(UUID.randomUUID().toString()) // 可选,设置消息ID
- .setTimestamp(new Date()) // 可选,设置时间戳
- .setHeaders(Collections.singletonMap("x-custom-header", "value")) // 可选,设置自定义头
- .andProperties(properties)
- .build();
-
- // 发送消息
- rabbitTemplate.convertAndSend(exchange, routingKey, msg);
- }
- }
RabbitTemplate
支持发送消息时的回调机制。这意味着在发送消息后,我们可以注册一个回调函数来处理发送结果或接收响应。这对于需要异步处理发送结果或接收响应的场景非常有用。
setConfirmCallback
方法是RabbitTemplate
类中的一个回调方法,用于处理消息的确认(acknowledgment)结果。当消息成功发送到RabbitMQ的交换机时,会触发确认回调,你可以在该回调中处理相应的逻辑。
correlationData
:关联数据,可以是任意类型的对象,通常用于唯一标识消息。ack
:布尔值,表示消息是否成功发送到交换机。true
表示成功,false
表示失败。cause
:失败的原因,当ack
为false
时,此参数会提供一个可选的异常信息。- rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- if (ack) {
- // 消息发送成功
- System.out.println("Message sent successfully");
- } else {
- // 消息发送失败,进行处理
- System.out.println("Message sent failed: " + cause);
- }
- });
RabbitTemplate
支持异步消息处理,你可以注册ConfirmCallback
和ReturnCallback
来处理消息的确认和返回结果。ConfirmCallback
用于确认消息是否成功发送到交换机,ReturnCallback
用于处理无法路由到队列的消息。
- rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- if (ack) {
- // 消息发送成功
- } else {
- // 消息发送失败,进行处理
- }
- });
-
- rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
- // 处理无法路由到队列的消息
- });
在使用 RabbitTemplate
之前,我们需要对其进行配置。这通常涉及到设置连接工厂、交换机、队列和绑定等。这些配置可以通过 XML 配置或 Java 配置完成。
一旦配置完成,我们可以创建一个 RabbitTemplate
实例。这个实例将使用我们提供的配置来与 RabbitMQ 服务器进行交互。
使用 RabbitTemplate
的发送方法,我们可以将消息发送到指定的交换机和路由键。我们可以指定消息体、消息属性和其他发送选项。
要接收消息,我们可以使用 RabbitTemplate
的接收方法或结合监听器来监听指定的队列。当消息到达时,我们可以处理消息并执行相应的业务逻辑。
在使用 RabbitTemplate
时,我们还需要考虑异常和错误处理。例如,当发送消息失败或接收消息时发生异常时,我们需要有相应的处理机制来确保系统的稳定性和可靠性。
RabbitTemplate
封装了底层细节,使得开发者能够专注于业务逻辑的实现,而无需关心底层的消息传输细节。RabbitTemplate
提供了丰富的配置选项和扩展点,使得开发者能够根据实际需求进行定制和优化。RabbitTemplate
内部进行了性能优化,如连接池管理、消息缓存等,以提高消息传输的效率和可靠性。RabbitTemplate
的配置正确无误,包括连接工厂、交换机、队列和绑定等的设置。错误的配置可能导致消息无法正确发送或接收。RabbitTemplate
时,要充分考虑异常处理机制,确保在发生异常时能够及时发现并处理。RabbitTemplate
后,要确保释放相关资源,如关闭连接、释放连接池中的连接等,以避免资源泄漏和性能问题。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。