当前位置:   article > 正文

rabbitmq+springboot实现消息可靠投递_rabbitmq可靠投递springboot

rabbitmq可靠投递springboot

前言

文章最后附带rabbitmq生产环境的一些参数设置参考。
实现消息可靠性投递有两种方法,一个是开启事务,一个是使用确认机制,具体参考下图讲解。
在这里插入图片描述
通过上图可知消息投递失败将会发生在三个地方,生产者到交换机,交换机到队列,队列到消费者。
所以为了保证消息的可靠性,需要开启消息确认机制(confirmCallback、returnCallback)以及消费端手动确认模式(手动ack)或者消费者重试机制。

一、 配置消息确认回调机制

yml开启配置

spring:
  rabbitmq:
    host: 192.168.25.131
    port: 5672
    virtual-host: /
    #开启发送端确认,mq服务是否收到消息,收到消息触发confirmCallback方法回调
    publisher-confirm-type: correlated
    #开启发送端消息抵达队列的确认,消息是否从【交换机/其他】到达指定队列,触发returnCallback进行回调,详情看配置文件描述
    publisher-returns: true
    #只要抵达队列,以异步发送优先回调我们这个return confirm【跟publisher-returns是一组都设置成true】
    template:
      mandatory: true
    username: admin
    password: 123456
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

参数讲解:

  • publish-confirm-type
    开启消息从生产者到broker(mq)的确认机制,可选值:
    1、simple:同步等待confirm结果,直到超时;
    2、correlated:异步回调,定义ConfirmCallback,mq返回结果时会回调这个ConfirmCallback。
  • publish-returns
    开启消息从交换机到队列的确认机制,可选:false、true。
  • template.mandatory
    定义消息投递失败的策略,可选值:
    1、true:调用ReturnCallback;
    2、false:直接丢弃消息。

定义失败回调配置

ConfirmCallback:生产者—broker;
ReturnCallback:交换机—队列;

    public void initRabbitTemplate(){
        //设置确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
             * @param ack 消息是否成功收到,只要消息抵达Broker服务器,ack就等于true,不管消息之后的状态是否成功
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                /**
                 * 1、做好消息确认机制(publisher、consumer【手动ack】)
                 * 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一遍
                 */
                if (!ack){
                    System.out.println("confirm...correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]");
                }
            }
        });

        //设置消息抵达队列的确认回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息没有投递给指定的队列,就触发这个失败回调
             * @param message   投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange  当时这个消息发给哪个交换机
             * @param routingKey 当时这个消息用的哪个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //报错误了。修改数据库当前消息的状态-》错误。
                System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
            }
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

配置完之消息投递失败就会进入对应回调方法,根据业务需求,可在方法里进行补偿策略。

测试消息确认机制

这里直接测试,不再进行项目搭建准备的工作,队列交换机的创建可参考个人主页里的其他文章。

准备一个交换机和一个队列

	@Bean("order_event_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
	public Exchange oderEventExchange(){
	     //durable:是否持久化 autoDelete:是否自动删除
	     DirectExchange directExchange = new DirectExchange("order_event_exchange",true, false);
	     return directExchange;
	 }
	@Bean("order_release_queue")
	public Queue orderReleaseQueue() {
	     Queue queue = new Queue("order_release_queue", true, false, false);
	     return queue;
	 }
	 @Bean
	 public Binding orderReleaseOrderBinging(@Qualifier("order_release_queue") Queue queue,
	                                         @Qualifier("order_event_exchange") Exchange exchange) {
	     return BindingBuilder.bind(queue).to(exchange).with("order.release.order").noargs();
	 }    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

分别测试两个阶段:生产者-broker、交换机-队列。

  • 生产者到broker:指定错误的队列或者交换机
    编写接口逻辑,这里故意把交换机指定错误
@PostMapping("/sendOrder")
    public String sentOrder(@RequestBody Order order){
        //CorrelationData:给消息设置一个唯一id
        rabbitTemplate.convertAndSend("aaaaa","order.release.order", order,new CorrelationData(order.getId()));
        return "ok";
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

测试结果
在这里插入图片描述
因为找不到对应的交换机,所以投递消息到broker失败,触发消息确认回调方法。

  • 交换机到队列:指定错误的路由键
    编写接口,指定错误的runingkey
@PostMapping("/sendOrder")
    public String sentOrder(@RequestBody Order order){
        rabbitTemplate.convertAndSend("order_event_exchange","222222", order,new CorrelationData(order.getId()));
        return "ok";
    }
  • 1
  • 2
  • 3
  • 4
  • 5

测试结果,收到返回消息
在这里插入图片描述

二、手动确认机制

消息消费确认方法

消息消费确认主要会用到三个方法:
basicAck()、basicNack()/basicReject();
basicAck是确认消息收到,另两个方法是拒绝消息。
basicNack()和basicReject()方法的区别就是basicNack可以批量拒绝消息,比如:由于某些原因造成队列里有其他未确认消息,使用basicNack设置是否批量拒绝参数为true,就会把本次及之前的所有未确认消息全部拒绝。
具体看代码,我们可以在消费端这样写:

public void handleStockLockedRelease(SkuUnlockInfoTo skuUnlockInfoTo, Message message, Channel channel) throws IOException {
        try {
            orderService.unLockStock(skuUnlockInfoTo);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            e.printStackTrace();
            //是否是被重试的消息
            if (message.getMessageProperties().isRedelivered()) {
                /*
                 * @param deliveryTag:消息的唯一标签,按消息入队顺序生成,比如:1、2、3
                 * @param multiple:是否开启消息批量拒绝
                 * @param requeue:是否将消息重新放入队列
                 */
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                //TODO 防止异常死循环,最好放在数据库或记录一下,用定时任务定时重试
            } else {
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),  true);
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

利用try…catch即可针对异常来做出处理方法。如果想更精细一些,那么我们就可以利用消费者重试机制了,它可以设置重试次数,每次重试的间隔时间等。

三、消费者重试机制

说明

1、重试机制是基于本地的,跟rabbitmq队列没有任何关系,并不会将消息反复投递到队列,然后重复消费。
2、消费者端的代码中不能使用try{}catch(){}捕获异常,否则异常无法抛出,也就无法触发重试机制,非得捕获已知异常,也要在catch中重新抛出未知异常来触发重试机制。
3、消息重试次数用完之后,消息就需要被处理,这里rabbit提供了3个恢复器。
在这里插入图片描述
各恢复器含义:

RejectAndDontRequeueRecoverer:拒绝并且不会将消息重新发回队列;
RepublishMessageRecoverer:重新发布消息(自定义将消息转发到指定的其他交换机、队列);
ImmediateRequeueMessageRecoverer:立即重新返回原队列。
  • 1
  • 2
  • 3

常用 RepublishMessageRecoverer;
默认 RejectAndDontRequeueRecoverer。
4、除了可以使用mq提供的恢复器来实现消息的自定义处理,也可以利用死信队列来达到目的。
接下来,我们分别利用mq恢复器和死信队列来实现消息的转发。

yml配置

spring:
  rabbitmq:
    host: 192.168.25.131
    port: 5672
    virtual-host: /
    #开启发送端确认,mq服务是否收到消息,收到消息触发confirmCallback方法回调
    publisher-confirm-type: correlated
    #开启发送端消息抵达队列的确认,消息是否从【交换机/其他】到达指定队列,触发returnCallback进行回调,详情看配置文件描述
    publisher-returns: true
    #只要抵达队列,以异步发送优先回调我们这个return confirm【跟publisher-returns是一组都设置成true】
    template:
      mandatory: true
    listener:
      simple:
        # 消息确认模式 manual:手动ack  auto:自动
        acknowledge-mode: auto
        # 消费者消息重试机制配置
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 3000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
    username: admin
    password: 123456
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

RepublishMessageRecoverer实现转发

流程

需要再多定义一个交换机和队列,专门用来接收处理超出重试机制次数的消息,通过MessageRecoverer的实现类RepublishMessageRecoverer来设置。

配置

在上文正常的交换机(order_event_exchange)和队列(order_release_queue)的基础下,再多创建一套

	@Bean("error_event_exchange")
    public DirectExchange errorMessageExchange(){
        DirectExchange directExchange = new DirectExchange("error_event_exchange",true, false);
        return directExchange;
    }
    @Bean("error_queue")
    public Queue errorQueue(){
        Queue queue = new Queue("error_queue", true, false, false);
        return queue;
    }

    @Bean
    public Binding errorBinding(@Qualifier("error_queue") Queue queue,
                                @Qualifier("error_event_exchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("error").noargs();
    }
	/**
     * 设置消息重试机制完成之后,异常依然存在,消息应该到达哪里。
     */
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error_event_exchange", "error");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

测试

  • 正常消费端接口代码
@Service
@RabbitListener(queues = {"order_release_queue"})
public class OrderCloseListener {

    @RabbitHandler
    public void listener(Order order) {
        System.out.println("收到order信息");
        System.out.println(order);
        //模拟异常
        int a = 1 / 0;
    }
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 异常接收消费端代码
@Service
@RabbitListener(queues = {"error_queue"})
public class OrderErrorListener {

    @RabbitHandler
    public void listener(Order order) throws IOException {
        System.out.println("收到error-order信息");
        System.out.println(order);
    }
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 队列详情

在这里插入图片描述

  • 测试结果
    在这里插入图片描述
    3次重试之后,消息转发到error交换机、队列,然后被消费。

死信队列实现

说明及流程

  • 说明

死信队列就是普通的队列,换了一种叫法而已,当然,也要清楚什么情况下,消息才会被转发到死信队列:
1、过期的消息;2、超出队列长度的消息;3、被拒绝且不会被重新入队的消息。

  • 流程

这里,我们就要利用死信的第三个特点来实现消息自动转发效果,上文我们说过,消息重试机制的默认恢复器是RejectAndDontRequeueRecoverer,而这个恢复器的特点就是将消息拒绝并删除掉(不会重新入队)。所以,与其说用死信队列来实现,倒不如说用消息重试机制的默认恢复器+死信队列来实现消息的转发。

配置

首先注释掉前边配置的RepublishMessageRecoverer,否则会覆盖默认恢复器,另外,给正常队列order_release_queue设置死信交换机和队列,当然,这里的死信交换机和队列就是我们之前创建的error系列。
完整代码

@Configuration
public class MyMQConfig {

    /**
     *功能描述: 创建交换机
     * @author zhouwenjie
     * @date 2022/5/6 0:36
     * @param
     * @return org.springframework.amqp.core.Exchange
     */
    @Bean("order_event_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
    public Exchange oderEventExchange(){
        //durable:是否持久化 autoDelete:是否自动删除
        DirectExchange directExchange = new DirectExchange("order_event_exchange",true, false);
        return directExchange;
    }

    /**
     * 功能描述: 【订单相关】声明正常收取队列
     *
     * @param
     * @return org.springframework.amqp.core.Queue
     * @author zhouwenjie
     */
    @Bean("order_release_queue")
    public Queue orderReleaseQueue() {
        Map<String, Object> arguments = new HashMap<>();
        //配置队列的死信应该发送给哪个交换机
        arguments.put("x-dead-letter-exchange", "error_event_exchange");
        //发送给交换机使用的路由key
        arguments.put("x-dead-letter-routing-key", "error");
        Queue queue = new Queue("order_release_queue", true, false, false,arguments);
        return queue;
    }

    @Bean
    public Binding orderReleaseOrderBinging(@Qualifier("order_release_queue") Queue queue,
                                            @Qualifier("order_event_exchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.release.order").noargs();
    }


    @Bean("error_event_exchange")
    public DirectExchange errorMessageExchange(){
        DirectExchange directExchange = new DirectExchange("error_event_exchange",true, false);
        return directExchange;
    }
    @Bean("error_queue")
    public Queue errorQueue(){
        Queue queue = new Queue("error_queue", true, false, false);
        return queue;
    }

    @Bean
    public Binding errorBinding(@Qualifier("error_queue") Queue queue,
                                @Qualifier("error_event_exchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("error").noargs();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

测试

基于之前的监听代码不动,接口也一样,另外,注意一下,重启项目之前一定要先删除原来的正常队列,让程序自动基于现在的配置重建,否则会报异常,如下图即可。
在这里插入图片描述

  • 测试结果
    因为此种方式控制台会输出一堆异常信息,正常的 /by zero 异常,所以分开截取。
    在这里插入图片描述
    在这里插入图片描述

注意事项

  • 重试机制和确认模式的搭配建议
    这里提前说明一下,如果使用了消费者重试机制(retry),那么消费端确认模式最好使用AUTO模式,程序正常执行完或者抛出异常才删除消息,这样才能保证在程序抛出异常的时候,消息才能正确被处理。
    在这里插入图片描述
    原因很简单,如果使用手动模式MANUAL,程序在执行途中就出现异常,就不会执行到最后一行手动确认消息的代码,消费端就没有机会用ack去响应MQ。因此 ,在MQ管理端就会一直存在一条消息没有被消费(实验了很多次,决定放弃这个模式),就如下图一样,永远无法执行到basicAck。
    在这里插入图片描述
  • 消息自定义序列化和确认机制的兼容
    这里说明一下我遇到的问题场景:
    因为rabbit默认使用jdk自带的序列化机制,所以我想换成Jackson2,所以就要重新定义RabbitTemplate和的bean来进行序列化和反序列化,如下图。
    在这里插入图片描述在这里插入图片描述
    但是这样之后,发现这个配置会覆盖rabbit的其他配置,比如下边的这些。
    在这里插入图片描述
    导致这些配置都不生效,后来经过一系列的操作,也是解决了这个问题,具体请参考这里:
    rabbitmq自定义消息序列化与反序列化

四、rabbitmq生产环境一些参数设置

参考这篇文章

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/471944
推荐阅读
相关标签
  

闽ICP备14008679号