当前位置:   article > 正文

Spring Cloud 集成 RabbitMQ_spring cloud 整合rabbitmq

spring cloud 整合rabbitmq

前言

在当今的微服务架构盛行的时代,消息队列作为一种重要的通信机制,在分布式系统中扮演着不可或缺的角色。RabbitMQ,作为一款开源的消息代理和队列服务器,以其高可用性、易扩展性、灵活的路由机制以及多协议支持等特点,深受开发者们的青睐。而Spring Cloud,作为Spring生态中针对微服务架构的一套集成解决方案,也提供了与RabbitMQ的集成支持,使得在Spring Cloud环境下使用RabbitMQ变得更加简单高效。
Spring Cloud集成RabbitMQ,不仅继承了RabbitMQ本身的诸多优点,还充分利用了Spring Cloud的自动配置和声明式编程特性,极大地简化了消息队列的配置和使用过程。开发者可以通过简单的配置和注解,轻松实现消息的发布、订阅、路由和持久化等功能,从而构建出稳定可靠、高性能的分布式系统。
此外,Spring Cloud集成RabbitMQ还提供了丰富的消息处理机制,如消息确认、死信队列、延迟队列等,这些机制可以帮助开发者更好地处理消息丢失、重复消费、消息堆积等问题,提升系统的健壮性和可靠性。

步骤

引入相关maven依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

添加相关配置

spring:  
  rabbitmq:  
    # RabbitMQ服务器的地址  
    host: 127.0.0.1  
    # RabbitMQ服务器的端口号  
    port: 5672  
    # RabbitMQ服务器的用户名  
    username: guest  
    # RabbitMQ服务器的密码  
    password: guest  
    # 消息监听器的配置  
    listener:  
      simple:  
        # 确认模式,这里设置为手动,意味着需要手动确认消息处理成功  
        acknowledge-mode: manual  
        # 消息重试的配置  
        retry:  
          # 是否启用重试机制  
          enabled: true  
          # 最大重试次数  
          max-attempts: 5  
          # 最大重试间隔,单位是毫秒  
          max-interval: 20000ms  
          # 初始重试间隔,单位是毫秒  
          initial-interval: 2000ms  
          # 重试间隔的倍增系数  
          multiplier: 2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

使用方法

配置消息序列化

// 定义一个Bean,返回一个MessageConverter实例,用于消息的序列化和反序列化  
@Bean  
public MessageConverter messageConverter() {  
    // 使用Jackson2JsonMessageConverter作为消息转换器,它基于Jackson库进行JSON格式转换  
    return new Jackson2JsonMessageConverter();  
}  
  
// 定义一个Bean,返回一个RabbitTemplate实例,用于发送和接收消息  
@Bean  
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {  
    // 创建一个RabbitTemplate实例,传入连接工厂,用于建立与RabbitMQ的连接  
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  
    // 设置RabbitTemplate的消息转换器,用于将Java对象转换为消息,以及将消息转换为Java对象  
    rabbitTemplate.setMessageConverter(messageConverter);  
    // 返回配置好的RabbitTemplate实例,供其他组件使用  
    return rabbitTemplate;  
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

创建第一个消息队列和交换机

// 定义actuator队列的名称常量  
public static final String ACTUATOR_QUEUE_NAME = "actuator_queue";  
// 定义actuator交换机的名称常量  
public static final String ACTUATOR_EXCHANGE_NAME = "actuator_exchange";  
// 定义actuator路由键的常量  
public static final String ACTUATOR_ROUTING_KEY = "actuator_routing_key";  
  
// 定义一个Bean,用于创建actuator队列  
@Bean  
public Queue actuatorQueue() {  
    // 创建一个持久化队列,队列名称为ACTUATOR_QUEUE_NAME常量定义的值  
    return new Queue(ACTUATOR_QUEUE_NAME, true);  
}  
  
// 定义一个Bean,用于创建actuator交换机  
@Bean  
public DirectExchange actuatorExchange() {  
    // 创建一个Direct类型的交换机,交换机名称为ACTUATOR_EXCHANGE_NAME常量定义的值  
    return new DirectExchange(ACTUATOR_EXCHANGE_NAME);  
}  
  
// 定义一个Bean,用于绑定actuator队列和交换机  
@Bean  
public Binding actuatorBinding(@Qualifier("actuatorQueue") Queue queue, @Qualifier("actuatorExchange") DirectExchange exchange) {  
    // 使用BindingBuilder来绑定actuator队列和actuator交换机  
    // 并指定ACTUATOR_ROUTING_KEY作为路由键  
    return BindingBuilder.bind(queue).to(exchange).with(ACTUATOR_ROUTING_KEY);  
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

使用方法

// 使用RabbitListener注解,指定监听ACTUATOR_QUEUE_NAME队列中的消息  
@RabbitListener(queues = ActuatorBind.ACTUATOR_QUEUE_NAME)  
public void onMessage(TaskInfoBo taskInfoBo, Message message, Channel channel) throws IOException {  
    // 获取消息的属性,并从中提取消息的deliveryTag  
    long deliveryTag = message.getMessageProperties().getDeliveryTag();  
    try {  
        // 执行一些业务逻辑代码......  
        // ...(业务代码省略)  
  
        // 如果业务代码执行成功,则手动确认消息处理成功  
        // deliveryTag用于标识需要确认的消息,false表示是否进行批量确认(这里不进行批量确认)  
        channel.basicAck(deliveryTag, false);  
    } catch (Exception e) {  
        // 如果在业务代码执行过程中出现异常,则手动拒绝该消息  
        // deliveryTag用于标识需要拒绝的消息,false表示是否将消息重新放回队列(这里不重新放回队列)  
        channel.basicReject(deliveryTag, false);  
    }  
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

在这个方法中,我们使用了@RabbitListener注解来声明这个方法是一个消息监听器,它监听ActuatorBind.ACTUATOR_QUEUE_NAME队列中的消息。当消息到达这个队列时,Spring会调用这个方法来处理消息。
方法接受三个参数:TaskInfoBo taskInfoBo(用于接收消息体中的信息,并自动转换为TaskInfoBo对象)、Message message(代表原始的RabbitMQ消息)和Channel channel(用于与RabbitMQ进行交互的通道)。
在方法体中,我们首先通过message对象获取消息的deliveryTag,这个deliveryTag用于后续的消息确认或拒绝操作。
然后,我们尝试执行一些业务逻辑代码。如果业务代码执行成功,我们使用channel.basicAck方法手动确认消息已被成功处理,从而从队列中移除该消息。如果业务代码执行过程中出现异常,我们使用channel.basicReject方法手动拒绝该消息,这里拒绝时不将消息重新放回队列(第二个参数为false)。
这种手动确认消息的方式确保了消息处理的可靠性和健壮性,只有当消息被成功处理后才从队列中移除,否则会被拒绝或重新尝试处理。

总结

完成上诉步骤我们完成了Spring Cloud 集成RabbitMQ。还有很多其他的模式,等到业务用到后再进行补充。


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/430573
推荐阅读
相关标签
  

闽ICP备14008679号