当前位置:   article > 正文

Spring Boot 集成 RabbitMQ(二)

Spring Boot 集成 RabbitMQ(二)

什么是Spring Boot:

Spring Boot是Spring Framework的简化版,其目标是使用尽可能少的配置来简化Spring应用的开发、部署和维护。它根据项目的依赖关系自动配置Spring环境,开箱即用。

什么是RabbitMQ:

RabbitMQ是一种开源的、实现了高级消息队列协议(AMQP)的、可靠的消息队列系统,它旨在支持复杂的事务处理和强大的消息模型。

为什么要将Spring Boot集成RabbitMQ:

Spring Boot集成RabbitMQ后,可以充分利用RabbitMQ的消息队列特性,实现异步处理、流量削峰、解耦等功能,可以有效地提升系统的延展性和稳定性。

Spring Boot基础:

Spring Boot有许多方便的特性,如无需部署、独立运行、内嵌服务器等,这些都可以使开发人员专注于编写业务逻辑。Spring Boot还提供了许多“starter”依赖包,方便进行项目的依赖管理。

Spring Boot的基本使用:

Spring Boot的基本使用包括创建Spring Boot项目,配置Spring Boot属性,创建Controler、Service、Dao等类进行开发,运行Spring Boot项目等。

Spring Boot项目实战:创建一个简单的Spring Boot应用:

创建一个简单的Spring Boot应用通常可以由Spring Initializr工具进行项目的初始化,进行相关配置。

RabbitMQ基础:

RabbitMQ的基本工作流程是生产者发出消息,通过交换机(Exchange)分配到多个队列中,然后交给一个或多个消费者进行处理。

RabbitMQ的Exchange类型和队列绑定:

RabbitMQ有四种Exchange类型:Direct(直接)、Fanout(扇出)、Topic(主题)、Headers(头部),每种类型对应不同的路由策略。而队列绑定则是将某个队列和某个交换器绑定起来。

Spring Boot集成RabbitMQ:

Spring Boot与RabbitMQ的集成方法:

可以使用Spring Boot的"spring-boot-starter-amqp" 来进行RabbitMQ的集成。其主要步骤包括:

  • 在Spring Boot的pom.xml文件中添加相关依赖。
  • 在application.properties中添加RabbitMQ的连接信息,如主机名、端口、用户名和密码等。

创建Spring Boot与RabbitMQ的项目框架:

在创建好Spring Boot项目后,可以添加RabbitMQ相关的配置类,这个配置类给出了如何连接RabbitMQ,并定义了消息队列、交换器和绑定。

配置RabbitMQ连接和消息队列:

在配置文件中配置RabbitMQ连接信息,然后在配置类中设置队列,交换器,绑定(如果需要)。

发布和接收消息:

  • 发布消息:创建一个用于发布消息到特定队列的消息发送类。
  • 接收消息:创建一个监听特定队列并处理消息的消息接收类。

Spring Boot和RabbitMQ进阶特性:

消息确认和消息持久化:

Spring Boot支持消息的确认和持久化,分别可以确保消息不会在处理过程中丢失,并且即使RabbitMQ服务重启,消息也能从磁盘中恢复。

死信队列和延迟队列:

死信队列用于存放无法处理的消息,延迟队列则用于延迟消息的处理。Spring Boot+RabbitMQ可以设置这两个队列,处理相关问题。

高并发处理:

通过设置RabbitMQ的QoS参数,以及Spring Boot的ConnectionFactory的concurrency参数,可以实现在高并发环境中的效率处理。

Spring Boot和RabbitMQ项目实战:

设计和实现一个实用的Spring Boot和RabbitMQ应用:

一个常见的Spring Boot+RabbitMQ应用为,当有大量请求涌入系统时,将请求数据发送到RabbitMQ,然后由多个消费者在后台进行处理。

分析和解决在项目开发过程中遇到的问题:

这部分主要记录在开发过程中遇到的问题,以及如何解决。如消息未被正确消费等。

项目部署和测试:

最后,项目部署到实际环境,并进行必要的压力测试和功能测试,保证服务的稳定性和可用性。

首先,在Spring Boot的pom.xml文件中添加RabbitMQ的相关依赖:

xml

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

然后在application.properties中添加RabbitMQ的相关配置:

properties

  1. spring.rabbitmq.host=your_host
  2. spring.rabbitmq.port=your_port
  3. spring.rabbitmq.username=your_username
  4. spring.rabbitmq.password=your_password

接着在Java配置文件中创建连接工厂,RabbitTemplate,以及创建消息队列:

java

  1. @Configuration
  2. public class RabbitMqConfiguration {
  3. @Bean
  4. public Queue exampleQueue() {
  5. return new Queue("ExampleQueue", false);
  6. }
  7. @Bean
  8. public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
  9. final var rabbitTemplate = new RabbitTemplate(connectionFactory);
  10. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  11. return rabbitTemplate;
  12. }
  13. }

最后在发送者和接收者类中,我们可以定义消息的发送和接收:

java

  1. @Service
  2. public class MessageSender {
  3. private final RabbitTemplate rabbitTemplate;
  4. @Autowired
  5. public MessageSender(RabbitTemplate rabbitTemplate) {
  6. this.rabbitTemplate = rabbitTemplate;
  7. }
  8. public void sendToRabbitmq(String message) {
  9. this.rabbitTemplate.convertAndSend("ExampleQueue", message);
  10. }
  11. }
  12. @Service
  13. public class MessageReceiver {
  14. @RabbitListener(queues = "ExampleQueue")
  15. public void receiveMessageFromRabbitmq(String message) {
  16. // 处理接收到的消息
  17. }
  18. }

2、处理消息确认和消息持久化

以下是如何在Spring AMQP中实现消息确认和消息持久化的示例:

java

  1. @RabbitListener(queues = "queue_name")
  2. public void receiveMessage(Message message, Channel channel) throws IOException {
  3. try {
  4. // 同步处理消息
  5. System.out.println(new String(message.getBody()));
  6. // 确认一条消息已经被消费
  7. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  8. } catch (Exception e) {
  9. // 如果报错,消息会被重新发送给broker
  10. channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  11. }
  12. }

同时,在声明Queue和Exchange时,可以设置其durable属性为true来持久化它们:

java

  1. @Bean
  2. public Queue durableQueue() {
  3. return new Queue("durableQueue", true);
  4. }
  5. @Bean
  6. public Exchange durableExchange() {
  7. return new DirectExchange("durableExchange", true, false);
  8. }

处理死信队列和延迟队列

以下是如何在Spring AMQP中声明死信队列,并设置消息的过期时间来实现延迟队列:

java

  1. @Bean
  2. public Queue deadLetterQueue() {
  3. return new Queue("deadLetterQueue", true);
  4. }
  5. @Bean
  6. public Queue delayQueue() {
  7. Map<String, Object> args = new HashMap<>();
  8. args.put("x-dead-letter-exchange", "");
  9. args.put("x-dead-letter-routing-key", "deadLetterQueue");
  10. args.put("x-message-ttl", 5000); // 设置消息过期时间为5秒
  11. return new Queue("delayQueue", true, false, false, args);
  12. }

以上代码会在消息被发送到delayQueue后,如果5秒内没有被消费,消息会被发送到deadLetterQueue

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/384326
推荐阅读
相关标签
  

闽ICP备14008679号