当前位置:   article > 正文

RabbitMQ高级之消息可靠性投递与消费(含源码)_rabbitmq消息重新投递

rabbitmq消息重新投递

原创 程序猿羊 程序猿小杨 2023-11-17 07:45 发表于上海

收录于合集

#消息中间件MQ9个

#近期热推20个

程序猿小杨

分享Java相关技术、数据库、Python、职场、感悟、视频资源等干货和学习心得。 如:kettle、ES、redis\mongoDB、springboot、Zookeeper、高并发多线程、中间件、JVM、程序员攻略等。

86篇原创内容

公众号

             

图片

                 

近期热推文章:

    1、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;

    2、SpringBoot整合多数据源,并支持动态新增与切换(详细教程)

    3、SpringBoot使用@Async实现多线程异步

    4、研发必会-异步编程利器之CompletableFuture(含源码 中)

    5、SpringBoot常见异步编程,你会多少?

    6、SpringBoot整合RabbitMQ(含源码)

    7、RabbitMQ发送和接收消息的几种方式

图片

一、生产者重连

    有时候由于网路波动,可能会出现客户端连接MQ失败的情况,通过配置可以开启连接失败后的重试机制

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest    #virtual-host: /tech-sharing    publisher-returns: false   #一般不需要开启    publisher-confirm-type: correlated   #发布消息成功到交换器后会触发回调方法 也即:发送确认confirm    connection-timeout: 1s  #设置MQ的连接超时时间    template:      mandatory: false  #必须设置成true 消息路由失败通知监听者,而不是将消息丢弃      retry:        enabled: true  #开启超时重试机制        initial-interval: 1000ms  #失败后的初始等待时间        multiplier: 1  #失败后下次的等待时长倍数,下次等待时长= initial-interval * multiplier        max-attempts: 3  #最大重试次数    listener:      simple:        prefetch: 1  #消费者每次只能获取一条消息,才处理完才能获取下一条(可实现能者多劳)        acknowledge-mode: NONE  #1、NONE:全自动 2、AUTO:半自动 3、MANUAL:手动 开启消费端消息确认机制

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。

    如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然可以考虑使用异步线程来执行发送消息的代码。

图片

二、生产者确认

    RabbitMQ有Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

      1、消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功;

      2、临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功;

       3、持久消息投递到了MO,并且入队完成持久化,返回ACK,告知投递成功;

       4、其它情况都会返回NACK,告知投递失败。

图片

yml中开启生效:

图片

详细配置如下:

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest    #virtual-host: /tech-sharing    publisher-returns: true   #一般不需要开启    publisher-confirm-type: correlated   #发布消息成功到交换器后会触发回调方法 也即:发送确认confirm    connection-timeout: 1s  #设置MQ的连接超时时间    template:      mandatory: true  #必须设置成true 消息路由失败通知监听者,而不是将消息丢弃      retry:        enabled: true  #开启超时重试机制        initial-interval: 1000ms  #失败后的初始等待时间        multiplier: 1  #失败后下次的等待时长倍数,下次等待时长= initial-interval * multiplier        max-attempts: 3  #最大重试次数    listener:      simple:        prefetch: 1  #消费者每次只能获取一条消息,才处理完才能获取下一条(可实现能者多劳)        acknowledge-mode: NONE  #1、NONE:全自动 2、AUTO:半自动 3、MANUAL:手动 开启消费端消息确认机制

2.1、开启Return机制:

@Component@Slf4jpublic class MQConfirmConfig implements ApplicationContextAware {    @Override    public void setApplicationContext(ApplicationContext applicationContext)            throws BeansException {        //获取RabbitTemplate        RabbitTemplate rabbitTemplate=applicationContext.getBean(RabbitTemplate.class);        //设置        rabbitTemplate.setReturnsCallback(returnCallback->{            log.info("发送消息失败,返回结果为returnCallback:{}",returnCallback);        });
    }}

注意:每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置。

2.2、开启Confirm机制:

  /********使用confirm确认机制发送消息*******************************/    public void sendMsgByConfirm(String message){        CorrelationData correlationData=new CorrelationData();        //给Future添加confirmcallbancj        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {            @Override            public void onFailure(Throwable ex) {                log.info("发生异常.....");  //基本不会触发            }            @Override            public void onSuccess(@Nullable CorrelationData.Confirm result) {                //result.isAck() , boolean类型,true代表ack回执,false 代表nack回执                if(result.isAck()){                    log.info("发送消息投递成功....");                }else{                    log.error("发送消息投递失败:{}",result.getReason());                }            }        });        //发送消息        rabbitTemplate.convertAndSend("TestDirectExchange001",                "TestDirectRouting001", message,correlationData);    }

图片

总结:

图片

最后说明:大多数情况下,不需要开启消息确认机制,如果业务场景要求比较高的情况下,再考虑开启。

三、MQ的可靠性

图片

3.1、数据持久化

图片

说明:3.6版本以后数据持久化已经默认。3.6版本以前需要自行设置。

--spring中交换机与队列持久化源码:

图片

发送消息,非持久化会出现paged out,会阻塞IO,性能下降:

图片

持久化消息,不会出现阻塞情况,性能下降的比较小:

图片

3.2、Lazy Queue

图片

(3.12版本之前)需要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:

图片

图片

设置后效果如下:

图片

总结:

RabbitMQ如何保证消息的可靠性:

    首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ重启消息依然存在。

    RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后会称为队列的默认模式LazyQueue会将所有消息都持久化

开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执。

3.3、消息落库的方式

   消息投递成功,根据ID更新消息状态;消息发送失败时,进行消息重发,如果没有超过最大尝试次数,进行消息重发的同时更新DB消息状态。(具体的见一下章节)。

四、消费者可靠性

    保证消费者的可靠性主要有三种手段:1、消费者确认机制;2、消费失败处理;3、业务幂等性。

4.1、消费者确认机制

    为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。

回执有三种可选值:
    ack:成功处理消息,RabbitMQ从队列中删除该消息。
    nack:消息处理失败,RabbitMQ需要再次投递消息  。
    reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息(一般是消息的参数不正确) 。

Spring  AMQP已经实现了消息确认功能。并允许通过配置文件选择ACK处理方式,有三种方式

    none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用。

    manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活

    auto:自动模式。Spring AMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果:1、如果是业务异常,会自动返回nack;2、如果是消息处理或校验异常,自动返回reject。

开启消费者确认其机制:

spring:  rabbitmq:    listener: # 开启消费者确认其机制      simple:        prefetch: 1  #消费者每次只能获取一条消息,才处理完才能获取下一条(可实现能者多劳)        acknowledge-mode: AUTO  # none:关闭ack;manual:手动ack;auto:自动ack

消费者业务模拟异常:

 @RabbitListener(bindings = @QueueBinding(value=@Queue(name="BatchDirectQueue02"),            exchange=@Exchange(name="BatchDirectExchange02",type = ExchangeTypes.DIRECT,ignoreDeclarationExceptions = "true")    ))    public void receiveBatchMsg(String message){        log.info("接收发送消息....."+message);        throw new RuntimeException("测试异常");    }

 auto模式下,消息未被处理会保留消息,尝试重新投递给消费者。如下模拟的异常情况下,消息会一直被投递。

图片

4.2、消息失败处理策略

    当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mg的消息处理飙升,带来不必要的压力。

失败消息处理策略:

    在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

    RejectAndDontRequeueRecoverer: 重试耗尽后,直接reject,丢弃消息。默认就是这种方式。

    ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。

    RepublishMessageRecoverer: 重试耗尽后,将失败消息投递到指定的交换机。

RepublishMessageRecoverer原理:

图片

定义接收失败消息的交换机、队列及其绑定关系:

 /**     * 功能描述:定义接收错误消费的日志     * @MethodName: receiveErrorMessage     * @MethodParam: [message]     * @Return: void     * @Author: yyalin     * @CreateDate: 2023/11/15 9:55     */    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "errorQueue"),            exchange = @Exchange(name = "errorExchange", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"),            key = "errorRouting"    ))    public void receiveErrorMessage(String message) {        log.info("消费者收到发送错误的消息: " + message);    }

定义RepublishMessageRecoverer:

/** * @Description: TODO:定义错误消息接收 * @Author: yyalin * @CreateDate: 2023/11/15 9:58 * @Version: V1.0 */@Configuration@Slf4jpublic class ErrorConfig {    @Bean    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){        log.debug("加载RepublishMessageRecoverer");        return new RepublishMessageRecoverer(rabbitTemplate,"errorExchange","errorRouting");    }}

测试结果:

图片

消费者如何保证消息一定被消费?

    1、开启消费者确认机制为auto,由spring确认消息处理成功后返回ack,异常时返回nack。如果一直处理异常会一直重试。

    2、开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理。

4.3、业务幂等性

   指同一个业务,执行一次或多次对业务状态的影响是一致的。

图片

方案一:是给每个消息都设置一个唯一id,利用id区分是否是重复消息。

步骤:

    1、每一条消息都生成一个唯一的id,与消息一起投递给消费者。

/**     * 功能描述:生成唯一ID配置     * @MethodName: jacksonMessageConvertor     * @MethodParam: []     * @Return: org.springframework.amqp.support.converter.MessageConverter     * @Author: yyalin     * @CreateDate: 2023/11/16 13:42     */    @Bean    public MessageConverter jacksonMessageConvertor() {        //1.定义消息转换器        Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();        //2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息        jjmc.setCreateMessageIds(true);        return jjmc;    }

    2、消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库。(对业务有侵入)

 @RabbitListener(bindings = @QueueBinding(value=@Queue(name="BatchDirectQueue02"),            exchange=@Exchange(name="BatchDirectExchange02",type = ExchangeTypes.DIRECT,ignoreDeclarationExceptions = "true"),            key = "BatchDirectRouting02"    ))    public void receiveBatchMsg(Message message){        log.info("接收发送消息:message:{},消息体body:{}",message,message.getBody());        log.info("获取消息id:"+ message.getMessageProperties().getMessageId());    }

    3、如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理

图片

方案二:是结合业务逻辑,基于业务本身做判断。以我们的业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理。

图片

五、源码获取方式

     更多优秀文章,请关注个人微信公众号或搜索“程序猿小杨”查阅。然后回复:源码,可以获取对应的源码,开箱即可使用。

图片

图片

说明:后期中间件MQ相关代码都会放到此文件夹中,请大家前去下载,开箱即可使用。

参考网站:

https://blog.csdn.net/jinkkkkkkk/article/details/133933737

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号