当前位置:   article > 正文

StarRocks源码阅读系列(1) pipeline引擎资源队列与资源隔离

starrocks源码阅读

StarRocks源码阅读系列(1) pipeline引擎资源队列与资源隔离

前言

本文是基于StarRocks 2.3.3版本源码阅读总结,不同版本源码可能有较大变化,仅供参考。
由于StarRocks的be是用c++语言写的,我的c++水平一般,所以自己对源码的解读可能有不正确的地方,欢迎大佬们指正。

pipeline引擎与资源隔离

在StarRocks 2.2版本发布了资源隔离与pipeline引擎,在StarRocks2.3版本进行了公测,看上去这是两个完全不相关的功能,但是在实际运行时这两个是一回事,StarRocks的资源隔离方案是基于pipeline引擎实现的,如果没有打开pipeline引擎,即使配置了资源隔离相关参数,也不会有任何左右,接下来通过一些源码的阅读来更好的理解。

GlobalDriverExecutor::GlobalDriverExecutor(std::string name, std::unique_ptr<ThreadPool> thread_pool,
                                           bool enable_resource_group)
        : Base(name),
          _enable_resource_group(enable_resource_group),

          // 根据是否启动资源组,资源队列的类型也不一样
          _driver_queue(enable_resource_group ? std::unique_ptr<DriverQueue>(std::make_unique<WorkGroupDriverQueue>())
                                              : std::make_unique<QuerySharedDriverQueue>()),
          _thread_pool(std::move(thread_pool)),
          _blocked_driver_poller(new PipelineDriverPoller(_driver_queue.get())),
          _exec_state_reporter(new ExecStateReporter()) {}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

首先要关注的是pipeline_driver_executor.cpp这个文件的这一段代码,这是确定StarRocks的资源队列类型的方法,它的核心部分我加了中文注释。
首先它对enable_resource_group这个配置进行了判断,如果为true则生成WorkGroupDriverQueue类型任务队列。
如果为false则生成QuerySharedDriverQueue类型资源队列。
关于enable_resource_group这个配置,就是资源隔离方案的资源组开关配置,这说明了开启与关闭资源隔离,pipeline引擎会使用完全不同的两种任务队列进行分配。

QuerySharedDriverQueue

首先阅读一些在不开启资源隔离时的资源队列,了解一下当有一大批查询需要进行计算时,它是以一种怎样的机制进行处理的。

入队

首先阅读一下入队代码。

// 不开资源组时的驱动放回队列
void QuerySharedDriverQueue::put_back_from_executor(const DriverRawPtr driver) {
    // QuerySharedDriverQueue::put_back_from_executor is identical to put_back.
    put_back(driver);
}
  • 1
  • 2
  • 3
  • 4
  • 5

这块没有什么重点,继续阅读put_back

void QuerySharedDriverQueue::put_back(const DriverRawPtr driver) {
    // 计算驱动优先级
    int level = _compute_driver_level(driver);
    // 将驱动按照优先级放回队列
    driver->set_driver_queue_level(level);
    {
        //锁
        std::lock_guard lock(_global_mutex);
        // 按照优先级放回队列
        _queues[level].put(driver);
        // 队列有内容时 ready为true
        driver->set_in_ready_queue(true);
        _cv.notify_one();
        ++_num_drivers;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

已经能看出大概的思路了,首先它会给待入队的任务计算出一个level指标,代表着driver的优先级,然后根据优先级将driver放入不同的队列中。
那么我们接下来就可以看2个地方:
优先级(level)是怎样计算的。
_queues共有多少个队列。
首先看优先级计算的方法_compute_driver_level

int QuerySharedDriverQueue::_compute_driver_level(const DriverRawPtr driver) const {
    int time_spent = driver->driver_acct().get_accumulated_time_spent();
    for (int i = driver->get_driver_queue_level(); i < QUEUE_SIZE; ++i) {
        if (time_spent < _level_time_slices[i]) {
            return i;
        }
    }

    return QUEUE_SIZE - 1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

可以看出,它就是用自己的已经消耗的时间和_level_time_slices数组的每一个值进行比较,达到了某个指标就放入某个队列,如果超出了所有指标就放入最后一个队列中。
接下来看一下_level_time_slices数组

static constexpr size_t QUEUE_SIZE = 8;
static constexpr double RATIO_OF_ADJACENT_QUEUE = 1.2;

int64_t _level_time_slices[QUEUE_SIZE];
static constexpr int64_t LEVEL_TIME_SLICE_BASE_NS = 200'000'000L;


/// QuerySharedDriverQueue.
QuerySharedDriverQueue::QuerySharedDriverQueue() {
    double factor = 1;
    for (int i = QUEUE_SIZE - 1; i >= 0; --i) {
        // initialize factor for every sub queue,
        // Higher priority queues have more execution time,
        // so they have a larger factor.
        _queues[i].factor_for_normal = factor;
        factor *= RATIO_OF_ADJACENT_QUEUE;
    }

    int64_t time_slice = 0;
    for (int i = 0; i < QUEUE_SIZE; ++i) {
        time_slice += LEVEL_TIME_SLICE_BASE_NS * (i + 1);
        _level_time_slices[i] = time_slice;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

这块有3个代码块,我为了方便阅读都放在了一起,可以看出,一共有8个队列
我就简单概述一下我对这段代码的理解,首先queues 一共有8个子队列,它的入队阈值是一个简单的等差数列求和结果Sn =n*a1 + n(n-1)d/2 其中a1和d均为200ms
就是100n^2 + 100n 单位为毫秒
取前8个值则为0.2s, 0.6s, 1.2s, 2s, 3s, 4.2s, 5.6s, 7.2s,
其次,每个字队列还有一个权重factor字段,它的计算方式为1.2^(n-1)
取前8个值则为1,1.2,1.44,1.72, 2.07, 2.49, 2.99, 3.58
权重的这个字段的作用在下面讲出队时会讲到。

入队总结
任务首先根据自身已经使用的时间与0.2s, 0.6s, 1.2s, 2s, 3s, 4.2s, 5.6s, 7.2s 做比较,属于某个区间的就进到某个队列里面去。

出队


StatusOr<DriverRawPtr> QuerySharedDriverQueue::take() {
    // -1 means no candidates; else has candidate.
    int queue_idx = -1;
    double target_accu_time = 0;
    DriverRawPtr driver_ptr;

    {
        std::unique_lock<std::mutex> lock(_global_mutex);
        while (true) {
            if (_is_closed) {
                return Status::Cancelled("Shutdown");
            }

            // Find the queue with the smallest execution time.
            for (int i = 0; i < QUEUE_SIZE; ++i) {
                // we just search for queue has element
                if (!_queues[i].empty()) {
                  // 这个地方比较重要
                    double local_target_time = _queues[i].accu_time_after_divisor();
                    if (queue_idx < 0 || local_target_time < target_accu_time) {
                        target_accu_time = local_target_time;
                        queue_idx = i;
                    }
                }
            }

            if (queue_idx >= 0) {
                break;
            }
            _cv.wait(lock);
        }
        // record queue's index to accumulate time for it.
        driver_ptr = _queues[queue_idx].take();
        driver_ptr->set_in_ready_queue(false);

        --_num_drivers;
    }

    // next pipeline driver to execute.
    return driver_ptr;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

出队的逻辑也很简单,首先循环前面提到的8个队列,找到不为空的队列,然后用accu_time_after_divisor这个值做比较,找到最小的值,从最小的值代表的队列中,取出一个任务进行出列操作。
接下来可以看一下accu_time_after_divisor值的计算规则。

double accu_time_after_divisor() { return _accu_consume_time.load() / factor_for_normal; }


void update_accu_time(const DriverRawPtr driver) {
  _accu_consume_time.fetch_add(driver->driver_acct().get_last_time_spent());
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

可以看到这个函数非常简单,就2个值相除,分子为该队列总用时,分母为咱们前面提到的权重。

总结

在不开启资源组的场景下,根据任务的已执行时间,分为了8个资源队列进行入队操作,任务在出队时会在加权场景下尽量平均每个队列的总计算时间,让大的计算任务拥有更多的时间进行计算,同时由于有限级的操作,让小的计算任务也能及时获得响应。
弊端:
8个资源队列是的分配阈值与权重是固定的,并不可以动态修改,在我的使用场景下,99%的计算任务会在200ms以内完成,此时该资源队列的出队权重会被拉的非常低,此时如果有1条大的慢查询接入,该慢查询在进入后面的资源队列时会有非常高的优先级进行运行,直至计算完成,小查询由于过低的权重只能一直等待资源直至自身变成慢查询。

WorkGroupDriverQueue

接下来阅读一下开启了资源隔离之后的任务队列源码。

入队

// 开启资源组时驱动放回队列
void WorkGroupDriverQueue::put_back_from_executor(const DriverRawPtr driver) {
    // 先加锁
    std::lock_guard lock(_global_mutex);
    _put_back(driver);
}


void WorkGroupDriverQueue::_put_back(const DriverRawPtr driver) {
    // 将驱动返回工作组的队列中
    auto* wg_entity = driver->workgroup()->driver_sched_entity();
    wg_entity->set_in_queue(this);
    wg_entity->queue()->put_back(driver);

    //
    if (_wg_entities.find(wg_entity) == _wg_entities.end()) {
        _enqueue_workgroup(wg_entity);
    }

    _cv.notify_one();
}

void WorkGroupDriverQueue::_enqueue_workgroup(workgroup::WorkGroupDriverSchedEntity* wg_entity) {
    _sum_cpu_limit += wg_entity->cpu_limit();
    // The runtime needn't be adjusted for the workgroup put back from executor thread,
    // because it has updated before executor thread put the workgroup back by update_statistics().
    if constexpr (!from_executor) {
        if (auto* min_wg_entity = _min_wg_entity.load(); min_wg_entity != nullptr) {
            // The workgroup maybe leaves for a long time, which results in that the runtime of it
            // may be much smaller than the other workgroups. If the runtime isn't adjusted, the others
            // will starve. Therefore, the runtime is adjusted according the minimum vruntime in _ready_wgs,
            // and give it half of ideal runtime in a schedule period as compensation.
            int64_t new_vruntime_ns = std::min(min_wg_entity->vruntime_ns() - _ideal_runtime_ns(wg_entity) / 2,
                                               min_wg_entity->runtime_ns() / int64_t(wg_entity->cpu_limit()));
            int64_t diff_vruntime_ns = new_vruntime_ns - wg_entity->vruntime_ns();
            if (diff_vruntime_ns > 0) {
                DCHECK(_wg_entities.find(wg_entity) == _wg_entities.end());
                wg_entity->incr_runtime_ns(diff_vruntime_ns * wg_entity->cpu_limit());
            }
        }
    }

    _wg_entities.emplace(wg_entity);
    _update_min_wg();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

可以看到入队的逻辑非常简单,直接找到待入队的任务所属的work group,然后将任务放到该work group的队列里。
其中_enqueue_workgroup 方法理解的不是很清楚,不过根据源码注释的意思,应该就是如果一个work group长时间没处理过任务,那么一旦它来了一条sql需要计算,这条sql的优先级可能非常高,导致其他work group拿不到资源,因此需要调整一下这种场景下的任务优先级。
处理方式就是强行增加这个work group 的累积运行时间,让它的任务优先级降低一些。

出队


StatusOr WorkGroupDriverQueue::take() {
    std::unique_lock lock(_global_mutex);

    workgroup::WorkGroupDriverSchedEntity* wg_entity = nullptr;
    while (wg_entity == nullptr) {
        if (_is_closed) {
            return Status::Cancelled("Shutdown");
        }

        _update_bandwidth_control_period();

        if (_wg_entities.empty()) {
            _cv.wait(lock);
        } else if (wg_entity = _take_next_wg(); wg_entity == nullptr) {
            int64_t cur_ns = MonotonicNanos();
            int64_t sleep_ns = _bandwidth_control_period_end_ns - cur_ns;
            if (sleep_ns <= 0) {
                continue;
            }

            // All the ready tasks are throttled, so wait until the new period or a new task comes.
            _cv.wait_for(lock, std::chrono::nanoseconds(sleep_ns));
        }
    }

    // 如果这个资源组只有1个待计算的任务了,那么这个资源组就从资源组列表出队,然后把最后的这个任务给计算了吧.
    // If wg only contains one ready driver, it will be not ready anymore
    // after taking away the only one driver.
    if (wg_entity->queue()->size() == 1) {
        _dequeue_workgroup(wg_entity);
    }

    return wg_entity->queue()->take();
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

出队函数会循环所有work group
有3个地方需要关注
_update_bandwidth_control_period() 这个是更新带宽占用的,看看是否还有可用的带宽用来处理资源
_take_next_wg() 找到有计算任务的资源队列
_dequeue_workgroup 如果整个work_group队列已经空了,则把整个work group从work group的队列中踢出。


void WorkGroupDriverQueue::_update_bandwidth_control_period() {
    int64_t cur_ns = MonotonicNanos();
    if (_bandwidth_control_period_end_ns == 0 || _bandwidth_control_period_end_ns <= cur_ns) {
        _bandwidth_control_period_end_ns = cur_ns + BANDWIDTH_CONTROL_PERIOD_NS;

        int64_t bandwidth_quota = _bandwidth_quota_ns();
        int64_t bandwidth_usage = _bandwidth_usage_ns.load();
        if (bandwidth_usage <= bandwidth_quota) {
            _bandwidth_usage_ns = 0;
        } else if (bandwidth_usage < 2 * bandwidth_quota) {
            _bandwidth_usage_ns -= bandwidth_quota;
        } else {
            _bandwidth_usage_ns = bandwidth_quota;
        }
    }
}


int64_t WorkGroupDriverQueue::_bandwidth_quota_ns() const {
    return BANDWIDTH_CONTROL_PERIOD_NS * workgroup::WorkGroupManager::instance()->normal_workgroup_cpu_hard_limit();
}

size_t WorkGroupManager::normal_workgroup_cpu_hard_limit() const {
    static int num_hardware_cores = std::thread::hardware_concurrency();
    return std::max<int>(1, num_hardware_cores - _rt_cpu_limit);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

更新cpu带宽使用量的函数,如果当前cpu使用量小于限额,则初始化为0,随便用
如果已经超过限额,但没超过2倍,则只申请到2倍-当前已使用的剩余部分
如果超过了2倍,则默认已经分配到达了上限,无法再申请到新的cpu资源

BANDWIDTH_CONTROL_PERIOD_NS 的值为100ms
_bandwidth_quota_ns 的值为100ms * normal_workgroup_cpu_hard_limit()
normal_workgroup_cpu_hard_limit 为系统核数 - 为了short query预留的cpu核数

更新cpu带宽使用量的函数,如果当前cpu使用量小于限额,则初始化为0,随便用
如果已经超过限额,但没超过2倍,则只申请到2-当前已使用的剩余部分
如果超过了2倍,则默认已经分配到达了上限,无法再申请到新的cpu资源

BANDWIDTH_CONTROL_PERIOD_NS 的值为100ms
_bandwidth_quota_ns 的值为100ms * normal_workgroup_cpu_hard_limit()
normal_workgroup_cpu_hard_limit 为系统核数 - 为了short query预留的cpu核数


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

_take_next_wg 函数: 循环所有的work group,依次判断work group是否属于节流状态,找到第一个不是节流状态的wrok group并出队。
节流状态的判断:
work group是否是short query,short query为实时资源组,它的特点就是优先级最高,有了就直接查询,性能越高越好,因此不会节流
判断是否有short query资源组的任务运行,如果没有则不需要节流
除此之外,在根据带宽进行校验,判断已使用的带宽是否大于限额带宽,如果大于了,则处于节流状态。

节流函数可以和前面的_update_bandwidth_control_period函数配合阅读,如果某个workgroup占用了过多的资源,第一次进入这个函数时,由于它使用的资源过多,会导致它被节流,申请不到任何资源,资源倾斜给了其他work group,但是第二次再次进入这个函数时,它的历史资源占用就可以被初始化为0重新申请到资源了。

总结

在WorkGroupDriverQueue场景下,各查询任务都是在自己的work group里面入队和出队的。
只要该work group还有剩余资源,计算任务就可以运行。
在出队函数里看到了关于short query的高优先级远离,也看到了cpu和节流的一些配置,但是没有看到在资源组自定义的内存和cpu是如何工作的,可能需要再深入学习在具体执行的时候的代码。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/993010
推荐阅读
相关标签
  

闽ICP备14008679号