当前位置:   article > 正文

Rabbitmq集成与使用_集成rabbitmq

集成rabbitmq

Springboot集成rabbitmq

pom.xml 依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.5.RELEASE</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

application.yml

通用配置,实际根据需要添加修改

spring:
  #配置rabbitMq 服务器
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin
    # 重试次数,默认为3次
    listener:
      simple:
        retry:
          enabled: true
          max-attempts: 5
        # 手动ack
        acknowledge-mode: manual
    # ack
    publisher-confirm-type: correlated
    # 发送失败返回
    publisher-returns: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

DirectRabbitConfig

队列配置类,以direct模式为例,其他模式类似。定义queue,定义exchange,绑定queue与exchange。

生产者,交换机,多个消息队列,多个消费者

package cloud.lcx.learn.common.config.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @ClassName DirectRabbitConfig
 * @Description DirectRabbitConfig
 * @Author jocker
 * @Date 2020/9/2
 */
@Configuration
public class DirectRabbitConfig {

    public static  final  String MY_DIRECT_QUEUE = "MyDirectQueue";
    public static  final  String MY_DIRECT_EXCHANGE = "MyDirectExchange";
    public static  final  String ROUTINGKEY = "hadoop";


    // direct queue
    @Bean
    public Queue myDirectQueue(){
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue(MY_DIRECT_QUEUE,true,true,false);
    }

    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange myDirectExchange() {
        return new DirectExchange(MY_DIRECT_EXCHANGE,true,false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(myDirectQueue()).to(myDirectExchange()).with(ROUTINGKEY);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • direct 如果路由键完全匹配的话,消息才会被投放到相应的队列,应用最多。

  • fanout 当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。

  • topic 设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符,应用较多。

  • header 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和 direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器

这里说的是消息怎么从交换机(exchange)到队列(queue)的过程,按以上的规则进行。而消息怎么从队列中被消费是竞争的,比如说在应用程序多节点部署情况下,会存在多个节点监听某个队列,一般情况下消息只会被消费一次,默认情况下是轮询的。

MqAckConfig

全局ACK配置,配置消息确认消费处理器,配置消息发送失败处理器。(非必须)

package cloud.lcx.learn.common.config.mq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * @ClassName MqAckConfig
 * @Description 消息消费确认配置
 * @Author jocker
 * @Date 2020/9/3
 */
@Configuration
public class MqAckConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    //初始化加载方法,对RabbitTemplate进行配置
    @PostConstruct
    void rabbitTemplate(){
        //消息发送确认,发送到交换器Exchange后触发回调
        rabbitTemplate.setConfirmCallback(new ReturnCallBackHandler());
        //消息发送确认,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)
        rabbitTemplate.setReturnCallback(new ReturnHandler());
        //自定义格式转换
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        // 设置为自动提交,即使配置文件添加了配置
        factory.setConnectionFactory(connectionFactory);
        // 手动设置手动ACK
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

ConfirmCallback

定义消息确认消费处理器(非必须)

package cloud.lcx.learn.common.config.mq;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/* *
 * @Description 设置生产者消息confirm回调函数
 * @Param
 * @Returns
 * @Author jocker
 * @Date 2020/9/3 9:50
 */
@Slf4j
public class  ReturnCallBackHandler implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("rabbitMq Ack : {} recved",correlationData);
        }else{
            log.info("rabbitMq Ack : {} unrecved,cause {}",correlationData,cause);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

这此处处理消费者Ack或Nack的回调逻辑。

ReturnCallback

定义消息发送失败处理器(非必须)

package cloud.lcx.learn.common.config.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/* *
 * @Description 设置生产者消息returns回调函数
 * @Param 通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调
 * @Returns
 * @Author jocker
 * @Date 2020/9/3 9:44
 */
@Slf4j
public class  ReturnHandler implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("rabbitmq : {} returned in {} with {},{}",message,exchange,replyCode,replyText);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

与ConfirmCallback区别在于,此处一般处理的是事消息发送异常事件,消息未曾到达消费者那边。

MqMessage

可以定义一个统一的消息格式(非必须)

package cloud.lcx.learn.common.config.mq.model;

import lombok.Data;

/**
 * @ClassName MqMessage
 * @Description
 * @Author jocker
 * @Date 2020/9/3
 */
@Data
public class MqMessage {
    String appId;
    String msgBody;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

生产者发送消息

使用RabbitTemplate发送消息,和redis类似

//使用RabbitTemplate,这提供了接收/发送等等方法
@Autowired
RabbitTemplate rabbitTemplate;
  • 1
  • 2
  • 3
MqMessage message = new MqMessage();
message.setAppId("rabbitMq application");
message.setMsgBody("hello rabbit!");
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange,指定消息唯一ID
rabbitTemplate.convertAndSend(DirectRabbitConfig.MY_DIRECT_EXCHANGE, DirectRabbitConfig.ROUTINGKEY, message,new CorrelationData(UUID.randomUUID().toString()) );
  • 1
  • 2
  • 3
  • 4
  • 5

消费者消费消息

@Component
@Slf4j
public class RabbitDirectReceiver {

    /* *
     * 1. direct 下多个消费者只有一个消费者能消费消息,类似于负载,默认是轮询。
     * 2. 重复消费,中心思想,在redis或者db中存储消息唯一值,消费前判断下,多线程环境下需要考虑线程安全。
     *
     */

    @RabbitListener(queues = DirectRabbitConfig.MY_DIRECT_QUEUE, containerFactory="rabbitListenerContainerFactory")
    public void process(Message message, Channel channel) throws IOException {
        try {
            // 业务代码
            // .......
            // multiple: false 只确认当前消费者已消费消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            // multiple: true  确认改消息已被所有消费者消费
            //channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        }catch (Exception e){
            log.error("消息处理异常!",e);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

一般情况下,为防止消息人为丢失(代码错误导致消息消费异常而引起消息丢失),会开启手动消费确认,在客户端在取出消息处理完成后,手动回复确认是否消费成功,api如下:

成功消费

 // multiple: false 只确认当前消费者已消费消息
 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  • 1
  • 2

消费失败

// 消费失败,消息回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
// 消费失败,删除消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
  • 1
  • 2
  • 3
  • 4

multiple批量确认,我用的比较少,一般都是只确认自己节点,可能在一些特殊场景下会用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/728285
推荐阅读
相关标签
  

闽ICP备14008679号