当前位置:   article > 正文

消息队列-RabbitMQ解决各类问题(堆积、消息丢失、幂等性、分布式事务)_rabbitmq队列堆积

rabbitmq队列堆积

消息堆积

消息堆积的产生场景:

  1. 生产者产生的消息速度大于消费者消费的速度。解决:增加消费者的数量或速度。
  2. 没有消费者进行消费的时候。解决:死信队列、设置消息有效期。相当于对我们的消息设置有效期,在规定的时间内如果没有消费的话,自动过期,过期的时候会执行客户端回调监听的方法将消息存放到数据库表记录,后期实现补偿。

保证消息不丢失

1、生产者使用消息确认机制保证消息百分之百能够将消息投递到MQ成功。
2、MQ服务器端应该将消息持久化到硬盘
3、消费者使用手动ack机制确认消息消费成功

如果MQ服务器容量满了怎么办?
使用死信队列将消息存到数据库中去,后期补偿消费。

死信队列

RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。

产生背景:

  1. 消息投递到MQ中存放 消息已经过期
  2. 队列达到最大的长度 (队列容器已经满了)生产者拒绝接收消息
  3. 消费者消费多次消息失败,就会转移存放到死信队列中

代码案例:
maven依赖

<dependencies>

        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

yml配置

server:
#  服务启动端口配置
  port: 8081
  servlet:
#    应用访问路径
    context-path: /

spring:
  #增加application.druid.yml 的配置文件
#  profiles:
#    active: rabbitmq

  rabbitmq:
    ####连接地址
    host: www.kaicostudy.com
    ####端口号
    port: 5672
    ####账号
    username: kaico
    ####密码
    password: kaico
    ### 地址
    virtual-host: /kaicoStudy

###模拟演示死信队列
kaico:
  dlx:
    exchange: kaico_order_dlx_exchange
    queue: kaico_order_dlx_queue
    routingKey: kaico.order.dlx
  ###备胎交换机
  order:
    exchange: kaico_order_exchange
    queue: kaico_order_queue
    routingKey: kaico.order
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

队列配置类

@Configuration
public class DeadLetterMQConfig {
    /**
     * 订单交换机
     */
    @Value("${kaico.order.exchange}")
    private String orderExchange;

    /**
     * 订单队列
     */
    @Value("${kaico.order.queue}")
    private String orderQueue;

    /**
     * 订单路由key
     */
    @Value("${kaico.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${kaico.dlx.exchange}")
    private String dlxExchange;

    /**
     * 死信队列
     */
    @Value("${kaico.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${kaico.dlx.routingKey}")
    private String dlxRoutingKey;

    /**
     * 声明死信交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }

    /**
     * 声明死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }

    /**
     * 声明订单业务交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }

    /**
     * 绑定死信队列到死信交换机
     *
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }

    /**
     * 声明订单队列,并且绑定死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 订单队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);

        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }

    /**
     * 绑定订单队列到订单交换机
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105

死信队列消费者

@Component
public class OrderDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_dlx_queue")
    public void orderDlxConsumer(String msg) {
        System.out.println("死信队列消费订单消息" + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

普通队列消费者

@Component
public class OrderConsumer {
    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_queue")
    public void orderConsumer(String msg) {
        System.out.println("正常订单消费者消息msg:" + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

后台队列管理页面如下:
在这里插入图片描述

部署方式:死信队列不能够和正常队列存在同一个服务器中,应该分服务器存放。

延迟队列

订单30分钟未支付,系统自动超时关闭的实现方案。

  1. 基于任务调度实现,效率是非常低。
  2. 基于redis过期key实现,key失效时会回调客户端一个方法。
    用户下单的时候,生成一个令牌(有效期)30分钟,存放到我们redis;缺点:非常冗余,会在表中存放一个冗余字段。
  3. 基于mq的延迟队列(最佳方案)rabbitmq情况下。
    原理:在我们下单的时候,往mq投递一个消息设置有效期为30分钟,但该消息失效的时候(没有被消费的情况下),执行我们客户端一个方法告诉我们该消息已经失效,这时候查询这笔订单是否已经支付。

实现逻辑:
主要使用死信队列来实现。
在这里插入图片描述
想要的代码:就是正常的消费者不消费消息,或者没有正常的消费者,在设置的时间后进入死信队列中,然后死信消费者实现相应的业务逻辑。

RabbitMQ消息幂等问题

RabbitMQ消息自动重试机制

  1. 当消费者业务逻辑代码中,抛出异常自动实现重试 (默认是无数次重试
  2. 应该对RabbitMQ重试次数实现限制,比如最多重试5次,每次间隔3s;重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿。因为重试失败次数之后,队列会自动删除这个消息。

消息重试原理: 在重试的过程中,使用aop拦截我们的消费监听方法,也不会打印这个错误日志。如果重试多次还是失败,达到最大失败次数的时候才会打印错误日志。

如果消费多次还是失败的情况下:
1、自动删除该消息;(消息可能丢失)
解决办法:

  1. 如果充实多次还是失败的情况下,最终存放到死信队列;
  2. 采用表日志记,消费失败错误日志的日志记录,后期人工自动对该消息实现补偿。

合理的选择重试机制

  1. 消费者获取消息后,调用第三方接口(HTTP请求),但是调用第三方接口失败呢?是否需要重试 ?
    答:有时是因为网络异常调用失败,应该需要重试几次。
  2. 消费者获取消息后,应该代码问题抛出数据异常,是否需要重试?
    答:不需要重试,代码异常需要重新修改代码发布项目。

消费者开启手动ack模式

第一步、springboot项目配置需要开启ack模式

acknowledge-mode: manual
  • 1

第二步、消费者Java代码

int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
    // 开启消息确认机制
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
  • 1
  • 2
  • 3
  • 4
  • 5

rabbitMQ如何解决消息幂等问题

什么是消息幂等性?MQ消费者如何保证幂等性?
产生的原因:就是因为消费者可能会开启自动重试,重试过程中可能会导致消费者业务逻辑代码重复执行。此刻消息已经消费了,因为业务报错导致消息重新消费,这时会出现
解决方案:采用消息全局id根据业务来定,根据业务id(全局唯一id)消费者可以判断这条消息已经消费了。

消费者代码逻辑:
在这里插入图片描述

RabbitMQ解决分布式事务问题

分布式事务:在分布式系统中,因为跨服务调用接口,存在多个不同的事务,每个事务都互不影响。就存在分布式事务的问题。

解决分布式事务核心思想:数据最终一致性。

分布式领域中名词:
强一致性 :要么同步速度非常快或者采用锁的机制 不允许出现脏读;
强一致性解决方案:要么数据库A非常迅速的将数据同步给数据B,或者数据库A没有同步完成之前数据库B不能够读取数据。

弱一致性: 允许读取的数据为原来的脏数据,允许读取的结果不一致性。

最终一致性: 在我们的分布式系统中,因为数据之间同步通过网络实现通讯,短暂的数据延迟是允许的,但是最终数据必须要一致性。

基于RabbitMQ解决分布式事务的思路

基于RabbitMQ解决分布式事务的思路:(采用最终一致性的方案)

  1. 确认我们的生产者消息一定要投递到MQ中(消息确认机制)投递失败 就继续重试
  2. 消费者采用手动ack的形式确认消息实现消费 注意幂等性问题,消费失败的情况下,mq自动帮消费者重试。
  3. 保证我们的生产者第一事务先执行,如果执行失败采用补单队列(给生产者自己事务补充,确保生产者第一事务执行完成【数据最终一致性】)。

解决思路图:核心是利用mq发送消息给其他系统将数据修改回来。
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/546334
推荐阅读
相关标签
  

闽ICP备14008679号