当前位置:   article > 正文

600万订单每秒Disruptor +SpringBoot,如何解决消息不丢失?_disruptor 内容丢失

disruptor 内容丢失

尼恩说在前面

在40岁老架构师 尼恩的读者交流群(50+)中,最近有小伙伴拿到了一线互联网企业如得物、阿里、滴滴、极兔、有赞、shein 希音、百度、网易的面试资格,遇到很多很重要的面试题:

Disruptor 官方说能达到每秒600w OPS订单处理能力,怎么实现的?

Disruptor 什么情况下发生消费数据丢失? 该怎么解决?

小伙伴 没有回答好,导致面试挂了。

Disruptor 是队列之王,相关面试题也是一个非常常见的面试题,考察的是高性能的基本功。

如何才能回答得很漂亮,才能 让面试官刮目相看、口水直流呢?

这里,尼恩给大家做一下系统化、体系化的梳理,让面试官爱到 “不能自已、口水直流”,然后帮大家 实现 ”offer自由”

当然,这道面试题,以及参考答案,也会收入咱们的 《尼恩Java面试宝典》V175版本PDF集群,供后面的小伙伴参考,提升大家的 3高 架构、设计、开发水平。

注:本文以 PDF 持续更新,最新尼恩 架构笔记、面试题 的PDF文件,请关注本公众号【技术自由圈】获取。

队列之王 Disruptor 简介

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。

基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。

在这里插入图片描述

2011年,企业应用软件专家Martin Fowler专门撰写长文介绍Disruptor。

2011年,Disruptor还获得了Oracle官方的Duke大奖

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

Disruptor通过以下设计来,来实现 单线程能支撑每秒600万订单的问题:

  • 核心架构1:无锁架构
    生产和消费,都是无锁架构。具体来说,生产者位点/消费者位点的操作,都是无锁操作,或者使用轻量级CAS原子操作。

    无锁架构好处是,既没有锁的竞争, 也没有线程的内核态、用户态切换的开销。 关于内核态、用户态的原理请参见尼恩的葵花宝典。

  • 核心架构2:环形数组架构

    数组元素不会被回收,避免频繁的GC,所以,为了避免垃圾回收,采用数组而非链表。

    同时,数组对处理器的缓存机制更加友好。

    数组长度2^n,通过位运算,加快定位的速度。

    下标采取递增的形式。不用担心index溢出的问题。

    index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 核心架构3:cache line padding

    两个维度的CPU 缓存行加速,享受到 CPU Cache 那风驰电掣的速度带来的红利:

    第一维度: 对 位点等核心组件进行 CPU cache line padding,实现高并发访问(修改和取值)。

    第二个维度: ringbuffer 是一个数据,加载的时候一般也会塞满整个 CPU cache line。也就是说 从内存加载数据到 CPU Cache 里面的时候, 如果是加载数组里面的数据(如 Disruptor),那么 CPU 就会加载到数组里面连续的多个数据。
    所以,Disruptor 数组的遍历、还是位点的增长, 很容易享受到 CPU Cache 那风驰电掣的速度带来的红利。

SpringBoot + Disruptor 使用实战

有关 Disruptor的 简单实战,请参见 尼恩的 《Disruptor 学习圣经 V3》, 由于过于简单,这里不做啰嗦。

下面,来看一个SpringBoot + Disruptor的 使用实战

使用 Disruptor 实现一个生产消费模型步骤是:

  • 准备好简单的一个springboot应用

  • 定义事件(Event) : 你可以把 Event 理解为存放在队列中等待消费的消息对象。

  • 创建事件工厂 :事件工厂用于生产事件,我们在初始化 Disruptor 类的时候需要用到。

  • 创建处理事件的 Handler :Event 在对应的 Handler 中被处理,你可以将其理解为生产消费者模型中的消费者。

  • 创建并装配 Disruptor : 事件的生产和消费需要用到Disruptor 对象。

  • 定义生产者,并使用生产者发消息

  • 对简单的SpringBoot + Disruptor 进行扩容,实现 容量监控预警+ 动态扩容

定义一个Event和工厂

首先定义一个Event来包含需要传递的数据:

在这里插入图片描述

由于需要让Disruptor为我们创建事件,我们同时还声明了一个EventFactory来创建Event对象。

public class LongEventFactory implements EventFactory { 
    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述定义

事件处理器(消费者)

我们还需要一个事件消费者,也就是一个事件处理器。

这个例子中,事件处理器的工作,就是简单地把事件中存储的数据打印到终端:

    /** 
     * 类似于消费者
     *  disruptor会回调此处理器的方法
     */
    static class LongEventHandler implements EventHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
            System.out.println(longEvent.getValue());
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

disruptor会回调此处理器的方法

定义事件源(生产者)

事件都会有一个生成事件的源,类似于 生产者的角色.

注意,这是一个 600wqps 能力的 异步生产者。 这里定义两个版本:

在这里插入图片描述

生产者的角色的接口定义如下

在这里插入图片描述

入门级:一个简单 DisruptorProducer 生产者的定义和使用

定义一个简单 DisruptorProducer 生产者

大致的代码如下

package com.crazymaker.cloud.disruptor.demo.business.impl;


@Slf4j
public class DisruptorProducer implements AsyncProducer {

    //一个translator可以看做一个事件初始化器,publicEvent方法会调用它
    //填充Event
    private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, Long>() {
                public void translateTo(LongEvent event, long sequence, Long data) {
                    event.setValue(data);
                }
            };
    private final RingBuffer<LongEvent> ringBuffer;

    public DisruptorProducer() {
        this.ringBuffer = disruptor().getRingBuffer();
    }

    public void publishData(Long data) {
        log.info("生产一个数据:" + data + " | ringBuffer.remainingCapacity()= " + ringBuffer.remainingCapacity());

        ringBuffer.publishEvent(TRANSLATOR, data);

    }

 
    private Disruptor<LongEvent> disruptor() {

        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();

        LongEventFactory eventFactory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, bufferSize, namedThreadFactory,
                ProducerType.MULTI, new BlockingWaitStrategy());

        // 连接 消费者 处理器 ,两个消费者
        LongEventWorkHandler1 handler1 = new LongEventWorkHandler1();
        LongEventWorkHandler2 handler2 = new LongEventWorkHandler2();
        disruptor.handleEventsWith(handler1, handler2);
        //为消费者配置异常处理器
        disruptor.handleExceptionsFor(handler1).with(exceptionHandler);
        disruptor.handleExceptionsFor(handler2).with(exceptionHandler);

        // 开启 分裂者(事件分发)
        disruptor.start();
        return disruptor;
    }


    ExceptionHandler exceptionHandler =...//省略非核心代码  异常处理器实现
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

上面的代码,通过 disruptor() 方法创建和装配 一个Disruptor对象 ,Disruptor 里边有一个环形队列。然后 disruptor() 方法给 Disruptor对象设置消费者,并且为消费者设置异常处理器。

使用这一个简单 DisruptorProducer 生产者

定义一个配置类,用于实例化 生产者

在这里插入图片描述

定义controller, 注入这个 生产者,就可以异步发布数据 给消费者了

在这里插入图片描述

springboot应用启动之后, 可以通过 httpclient 工具,测试一下:

在这里插入图片描述

看一下测试数据

在这里插入图片描述

具体的代码和,演示过程,后面参考尼恩录制和发布《尼恩Java面试宝典》配套视频。

Disruptor:消费数据丢失问题的分析与解决

在处理高并发、大数据量等场景时,Disruptor虽然其高性能、低延迟,然而,在使用过程中,一些用户可能会遇到消费数据丢失问题。

为了解决这些问题,我们需要深入了解Disruptor的工作原理,并采取相应的解决方案。

消费数据丢失问题的根因

消费线程丢失问题通常发生在消费者处理速度跟不上生产者的时候。

由于Disruptor采用环形队列来存储数据,当队列满时,新的数据会覆盖旧的数据。

Disruptor 中,生产和消费的index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

生产和消费的index 下标采取递增的形式。不用担心index溢出的问题。

生产和消费的index 是通过 取模, 映射到 ring 环形数据的。

在这里插入图片描述

如果消费者速度慢, 生产者快,消费跟不上,生产的index(/Sequence)就会越来越大,取模的时候,又会从0开始,去覆盖ring前面的数据,从而导致没有消费的数据被丢失。

在这里插入图片描述

从上图可以看到,只要生产者 的Sequence 大于消费者 一个ring 的数量, 就开始 覆盖旧的数据,也就是开始丢失数据。

消费数据丢失问题解决方案:

  1. 增加消费者数量:增加消费者线程的数量,可以并行处理更多的数据,提高消费速度。

    同时,合理配置消费者与生产者的数量比例,确保队列生产者 的Sequence 不会大于消费者 一个ring 的数量。

  2. 增加ring环形数组的大小:通过增加数组的大小,从而保证一个环可以存放足够多的数据,但这个可能会导致OOM。

  3. 剩余容量监控与告警:
    通过Prometheus 对 remainingCapacity剩余容量 进行实时监控,当remainingCapacity 超过80%(阈值)及时发出告警通知相关人员处理,进行微服务实例的 HPA 横向扩容,或者进行 Disruptor 队列进行动态扩容。

  4. Disruptor 动态扩容
    对 Disruptor 框架进行合理封装,从单个Disruptor 队列模式,变成 ringbuffer 数组的形式,并且可以结合nacos 或者 Zookeeper 这种发布订阅组件, 对 ringbuffer 数组 进行在线扩容。

总之,通过增加消费者数量、增加ring环形数组的大小、剩余容量监控与告警, Disruptor 动态扩容等方式,可以有效解决 消费数据丢失问题。

高级版:Spring Boot + Prometheus 监控剩余容量 大小

我们的微服务项目中使用了 spring boot,集成 prometheus。

在这里插入图片描述

我们可以通过将remainingCapacity 作为指标暴露到 prometheus 中,通过如下代码:

在这里插入图片描述

增加这个代码之后,请求 /actuator/prometheus 之后,可以看到对应的返回:

在这里插入图片描述

这样,当这个值低于20%,我们就认为这个剩余空间不够,可以扩容了。

Disruptor 如何 动态扩容?

关于 Disruptor 动态扩容的方案,可以实现一个可以扩容的子类

在这里插入图片描述

定义一个环形队列的数据

    private Disruptor<LongEvent>[] executors;

  • 1
  • 2

在构造函数中,初始化为1:

    public ResizableDisruptorProducer() {
        executors = new Disruptor[1];
        executors[0] = disruptor();
        this.ringBuffer = executors[0].getRingBuffer();

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

发布事件的时候,通过取模的方式,确定使用executors 数组 其中的一个RingBuffer

在这里插入图片描述

在next方法,执行indx 取模和 获取 ringbuffer 的操作

在这里插入图片描述

这里参考了netty源码里边 PowerOfTwoEventExecutorChooser 反应器选择的方式,使用位运算取模,从而实现高性能取模。

什么时候扩容呢? 当监控发现超过80%的阈值后,运维会收到告警,然后可以通过naocos、Zookeeper的发布订阅, 通知微服务进行扩容。

微服务 扩容要回调下面的risize方法

在这里插入图片描述

具体的代码和,演示过程,后面参考尼恩录制和发布《尼恩Java面试宝典》配套视频。

在使用Disruptor框架时,需要根据实际情况选择合适的监控和扩容解决方案,并不断优化和调整系统配置,以满足日益增长的业务需求。

说在最后

以上的内容,如果大家能对答如流,如数家珍,基本上 面试官会被你 震惊到、吸引到。最终,让面试官爱到 “不能自已、口水直流”。offer, 也就来了。

在面试之前,建议大家系统化的刷一波 5000页《尼恩Java面试宝典》V174,在刷题过程中,如果有啥问题,大家可以来 找 40岁老架构师尼恩交流。

另外,如果没有面试机会,可以找尼恩来帮扶、领路。

尼恩已经指导了大量的就业困难的小伙伴上岸,前段时间,帮助一个40岁+就业困难小伙伴拿到了一个年薪100W的offer,小伙伴实现了 逆天改命 。

尼恩技术圣经系列PDF

……完整版尼恩技术圣经PDF集群,请找尼恩领取

《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》PDF,请到下面公号【技术自由圈】取↓↓↓

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/546065
推荐阅读
相关标签
  

闽ICP备14008679号