赞
踩
本文是基于StarRocks 2.3.3版本源码阅读总结,不同版本源码可能有较大变化,仅供参考。
由于StarRocks的be是用c++语言写的,我的c++水平一般,所以自己对源码的解读可能有不正确的地方,欢迎大佬们指正。
在StarRocks 2.2版本发布了资源隔离与pipeline引擎,在StarRocks2.3版本进行了公测,看上去这是两个完全不相关的功能,但是在实际运行时这两个是一回事,StarRocks的资源隔离方案是基于pipeline引擎实现的,如果没有打开pipeline引擎,即使配置了资源隔离相关参数,也不会有任何左右,接下来通过一些源码的阅读来更好的理解。
GlobalDriverExecutor::GlobalDriverExecutor(std::string name, std::unique_ptr<ThreadPool> thread_pool,
bool enable_resource_group)
: Base(name),
_enable_resource_group(enable_resource_group),
// 根据是否启动资源组,资源队列的类型也不一样
_driver_queue(enable_resource_group ? std::unique_ptr<DriverQueue>(std::make_unique<WorkGroupDriverQueue>())
: std::make_unique<QuerySharedDriverQueue>()),
_thread_pool(std::move(thread_pool)),
_blocked_driver_poller(new PipelineDriverPoller(_driver_queue.get())),
_exec_state_reporter(new ExecStateReporter()) {}
首先要关注的是pipeline_driver_executor.cpp这个文件的这一段代码,这是确定StarRocks的资源队列类型的方法,它的核心部分我加了中文注释。
首先它对enable_resource_group这个配置进行了判断,如果为true则生成WorkGroupDriverQueue类型任务队列。
如果为false则生成QuerySharedDriverQueue类型资源队列。
关于enable_resource_group这个配置,就是资源隔离方案的资源组开关配置,这说明了开启与关闭资源隔离,pipeline引擎会使用完全不同的两种任务队列进行分配。
首先阅读一些在不开启资源隔离时的资源队列,了解一下当有一大批查询需要进行计算时,它是以一种怎样的机制进行处理的。
首先阅读一下入队代码。
// 不开资源组时的驱动放回队列
void QuerySharedDriverQueue::put_back_from_executor(const DriverRawPtr driver) {
// QuerySharedDriverQueue::put_back_from_executor is identical to put_back.
put_back(driver);
}
这块没有什么重点,继续阅读put_back
void QuerySharedDriverQueue::put_back(const DriverRawPtr driver) { // 计算驱动优先级 int level = _compute_driver_level(driver); // 将驱动按照优先级放回队列 driver->set_driver_queue_level(level); { //锁 std::lock_guard lock(_global_mutex); // 按照优先级放回队列 _queues[level].put(driver); // 队列有内容时 ready为true driver->set_in_ready_queue(true); _cv.notify_one(); ++_num_drivers; } }
已经能看出大概的思路了,首先它会给待入队的任务计算出一个level指标,代表着driver的优先级,然后根据优先级将driver放入不同的队列中。
那么我们接下来就可以看2个地方:
优先级(level)是怎样计算的。
_queues共有多少个队列。
首先看优先级计算的方法_compute_driver_level
int QuerySharedDriverQueue::_compute_driver_level(const DriverRawPtr driver) const {
int time_spent = driver->driver_acct().get_accumulated_time_spent();
for (int i = driver->get_driver_queue_level(); i < QUEUE_SIZE; ++i) {
if (time_spent < _level_time_slices[i]) {
return i;
}
}
return QUEUE_SIZE - 1;
}
可以看出,它就是用自己的已经消耗的时间和_level_time_slices数组的每一个值进行比较,达到了某个指标就放入某个队列,如果超出了所有指标就放入最后一个队列中。
接下来看一下_level_time_slices数组
static constexpr size_t QUEUE_SIZE = 8; static constexpr double RATIO_OF_ADJACENT_QUEUE = 1.2; int64_t _level_time_slices[QUEUE_SIZE]; static constexpr int64_t LEVEL_TIME_SLICE_BASE_NS = 200'000'000L; /// QuerySharedDriverQueue. QuerySharedDriverQueue::QuerySharedDriverQueue() { double factor = 1; for (int i = QUEUE_SIZE - 1; i >= 0; --i) { // initialize factor for every sub queue, // Higher priority queues have more execution time, // so they have a larger factor. _queues[i].factor_for_normal = factor; factor *= RATIO_OF_ADJACENT_QUEUE; } int64_t time_slice = 0; for (int i = 0; i < QUEUE_SIZE; ++i) { time_slice += LEVEL_TIME_SLICE_BASE_NS * (i + 1); _level_time_slices[i] = time_slice; } }
这块有3个代码块,我为了方便阅读都放在了一起,可以看出,一共有8个队列
我就简单概述一下我对这段代码的理解,首先queues 一共有8个子队列,它的入队阈值是一个简单的等差数列求和结果Sn =n*a1 + n(n-1)d/2 其中a1和d均为200ms
就是100n^2 + 100n 单位为毫秒
取前8个值则为0.2s, 0.6s, 1.2s, 2s, 3s, 4.2s, 5.6s, 7.2s,
其次,每个字队列还有一个权重factor字段,它的计算方式为1.2^(n-1)
取前8个值则为1,1.2,1.44,1.72, 2.07, 2.49, 2.99, 3.58
权重的这个字段的作用在下面讲出队时会讲到。
入队总结
任务首先根据自身已经使用的时间与0.2s, 0.6s, 1.2s, 2s, 3s, 4.2s, 5.6s, 7.2s 做比较,属于某个区间的就进到某个队列里面去。
StatusOr<DriverRawPtr> QuerySharedDriverQueue::take() { // -1 means no candidates; else has candidate. int queue_idx = -1; double target_accu_time = 0; DriverRawPtr driver_ptr; { std::unique_lock<std::mutex> lock(_global_mutex); while (true) { if (_is_closed) { return Status::Cancelled("Shutdown"); } // Find the queue with the smallest execution time. for (int i = 0; i < QUEUE_SIZE; ++i) { // we just search for queue has element if (!_queues[i].empty()) { // 这个地方比较重要 double local_target_time = _queues[i].accu_time_after_divisor(); if (queue_idx < 0 || local_target_time < target_accu_time) { target_accu_time = local_target_time; queue_idx = i; } } } if (queue_idx >= 0) { break; } _cv.wait(lock); } // record queue's index to accumulate time for it. driver_ptr = _queues[queue_idx].take(); driver_ptr->set_in_ready_queue(false); --_num_drivers; } // next pipeline driver to execute. return driver_ptr; }
出队的逻辑也很简单,首先循环前面提到的8个队列,找到不为空的队列,然后用accu_time_after_divisor这个值做比较,找到最小的值,从最小的值代表的队列中,取出一个任务进行出列操作。
接下来可以看一下accu_time_after_divisor值的计算规则。
double accu_time_after_divisor() { return _accu_consume_time.load() / factor_for_normal; }
void update_accu_time(const DriverRawPtr driver) {
_accu_consume_time.fetch_add(driver->driver_acct().get_last_time_spent());
}
可以看到这个函数非常简单,就2个值相除,分子为该队列总用时,分母为咱们前面提到的权重。
在不开启资源组的场景下,根据任务的已执行时间,分为了8个资源队列进行入队操作,任务在出队时会在加权场景下尽量平均每个队列的总计算时间,让大的计算任务拥有更多的时间进行计算,同时由于有限级的操作,让小的计算任务也能及时获得响应。
弊端:
8个资源队列是的分配阈值与权重是固定的,并不可以动态修改,在我的使用场景下,99%的计算任务会在200ms以内完成,此时该资源队列的出队权重会被拉的非常低,此时如果有1条大的慢查询接入,该慢查询在进入后面的资源队列时会有非常高的优先级进行运行,直至计算完成,小查询由于过低的权重只能一直等待资源直至自身变成慢查询。
接下来阅读一下开启了资源隔离之后的任务队列源码。
// 开启资源组时驱动放回队列 void WorkGroupDriverQueue::put_back_from_executor(const DriverRawPtr driver) { // 先加锁 std::lock_guard lock(_global_mutex); _put_back(driver); } void WorkGroupDriverQueue::_put_back(const DriverRawPtr driver) { // 将驱动返回工作组的队列中 auto* wg_entity = driver->workgroup()->driver_sched_entity(); wg_entity->set_in_queue(this); wg_entity->queue()->put_back(driver); // if (_wg_entities.find(wg_entity) == _wg_entities.end()) { _enqueue_workgroup(wg_entity); } _cv.notify_one(); } void WorkGroupDriverQueue::_enqueue_workgroup(workgroup::WorkGroupDriverSchedEntity* wg_entity) { _sum_cpu_limit += wg_entity->cpu_limit(); // The runtime needn't be adjusted for the workgroup put back from executor thread, // because it has updated before executor thread put the workgroup back by update_statistics(). if constexpr (!from_executor) { if (auto* min_wg_entity = _min_wg_entity.load(); min_wg_entity != nullptr) { // The workgroup maybe leaves for a long time, which results in that the runtime of it // may be much smaller than the other workgroups. If the runtime isn't adjusted, the others // will starve. Therefore, the runtime is adjusted according the minimum vruntime in _ready_wgs, // and give it half of ideal runtime in a schedule period as compensation. int64_t new_vruntime_ns = std::min(min_wg_entity->vruntime_ns() - _ideal_runtime_ns(wg_entity) / 2, min_wg_entity->runtime_ns() / int64_t(wg_entity->cpu_limit())); int64_t diff_vruntime_ns = new_vruntime_ns - wg_entity->vruntime_ns(); if (diff_vruntime_ns > 0) { DCHECK(_wg_entities.find(wg_entity) == _wg_entities.end()); wg_entity->incr_runtime_ns(diff_vruntime_ns * wg_entity->cpu_limit()); } } } _wg_entities.emplace(wg_entity); _update_min_wg(); }
可以看到入队的逻辑非常简单,直接找到待入队的任务所属的work group,然后将任务放到该work group的队列里。
其中_enqueue_workgroup 方法理解的不是很清楚,不过根据源码注释的意思,应该就是如果一个work group长时间没处理过任务,那么一旦它来了一条sql需要计算,这条sql的优先级可能非常高,导致其他work group拿不到资源,因此需要调整一下这种场景下的任务优先级。
处理方式就是强行增加这个work group 的累积运行时间,让它的任务优先级降低一些。
StatusOr WorkGroupDriverQueue::take() { std::unique_lock lock(_global_mutex); workgroup::WorkGroupDriverSchedEntity* wg_entity = nullptr; while (wg_entity == nullptr) { if (_is_closed) { return Status::Cancelled("Shutdown"); } _update_bandwidth_control_period(); if (_wg_entities.empty()) { _cv.wait(lock); } else if (wg_entity = _take_next_wg(); wg_entity == nullptr) { int64_t cur_ns = MonotonicNanos(); int64_t sleep_ns = _bandwidth_control_period_end_ns - cur_ns; if (sleep_ns <= 0) { continue; } // All the ready tasks are throttled, so wait until the new period or a new task comes. _cv.wait_for(lock, std::chrono::nanoseconds(sleep_ns)); } } // 如果这个资源组只有1个待计算的任务了,那么这个资源组就从资源组列表出队,然后把最后的这个任务给计算了吧. // If wg only contains one ready driver, it will be not ready anymore // after taking away the only one driver. if (wg_entity->queue()->size() == 1) { _dequeue_workgroup(wg_entity); } return wg_entity->queue()->take(); }
出队函数会循环所有work group
有3个地方需要关注
_update_bandwidth_control_period() 这个是更新带宽占用的,看看是否还有可用的带宽用来处理资源
_take_next_wg() 找到有计算任务的资源队列
_dequeue_workgroup 如果整个work_group队列已经空了,则把整个work group从work group的队列中踢出。
void WorkGroupDriverQueue::_update_bandwidth_control_period() { int64_t cur_ns = MonotonicNanos(); if (_bandwidth_control_period_end_ns == 0 || _bandwidth_control_period_end_ns <= cur_ns) { _bandwidth_control_period_end_ns = cur_ns + BANDWIDTH_CONTROL_PERIOD_NS; int64_t bandwidth_quota = _bandwidth_quota_ns(); int64_t bandwidth_usage = _bandwidth_usage_ns.load(); if (bandwidth_usage <= bandwidth_quota) { _bandwidth_usage_ns = 0; } else if (bandwidth_usage < 2 * bandwidth_quota) { _bandwidth_usage_ns -= bandwidth_quota; } else { _bandwidth_usage_ns = bandwidth_quota; } } } int64_t WorkGroupDriverQueue::_bandwidth_quota_ns() const { return BANDWIDTH_CONTROL_PERIOD_NS * workgroup::WorkGroupManager::instance()->normal_workgroup_cpu_hard_limit(); } size_t WorkGroupManager::normal_workgroup_cpu_hard_limit() const { static int num_hardware_cores = std::thread::hardware_concurrency(); return std::max<int>(1, num_hardware_cores - _rt_cpu_limit); }
更新cpu带宽使用量的函数,如果当前cpu使用量小于限额,则初始化为0,随便用
如果已经超过限额,但没超过2倍,则只申请到2倍-当前已使用的剩余部分
如果超过了2倍,则默认已经分配到达了上限,无法再申请到新的cpu资源
BANDWIDTH_CONTROL_PERIOD_NS 的值为100ms
_bandwidth_quota_ns 的值为100ms * normal_workgroup_cpu_hard_limit()
normal_workgroup_cpu_hard_limit 为系统核数 - 为了short query预留的cpu核数
更新cpu带宽使用量的函数,如果当前cpu使用量小于限额,则初始化为0,随便用
如果已经超过限额,但没超过2倍,则只申请到2倍-当前已使用的剩余部分
如果超过了2倍,则默认已经分配到达了上限,无法再申请到新的cpu资源
BANDWIDTH_CONTROL_PERIOD_NS 的值为100ms
_bandwidth_quota_ns 的值为100ms * normal_workgroup_cpu_hard_limit()
normal_workgroup_cpu_hard_limit 为系统核数 - 为了short query预留的cpu核数
_take_next_wg 函数: 循环所有的work group,依次判断work group是否属于节流状态,找到第一个不是节流状态的wrok group并出队。
节流状态的判断:
work group是否是short query,short query为实时资源组,它的特点就是优先级最高,有了就直接查询,性能越高越好,因此不会节流
判断是否有short query资源组的任务运行,如果没有则不需要节流
除此之外,在根据带宽进行校验,判断已使用的带宽是否大于限额带宽,如果大于了,则处于节流状态。
节流函数可以和前面的_update_bandwidth_control_period函数配合阅读,如果某个workgroup占用了过多的资源,第一次进入这个函数时,由于它使用的资源过多,会导致它被节流,申请不到任何资源,资源倾斜给了其他work group,但是第二次再次进入这个函数时,它的历史资源占用就可以被初始化为0重新申请到资源了。
在WorkGroupDriverQueue场景下,各查询任务都是在自己的work group里面入队和出队的。
只要该work group还有剩余资源,计算任务就可以运行。
在出队函数里看到了关于short query的高优先级远离,也看到了cpu和节流的一些配置,但是没有看到在资源组自定义的内存和cpu是如何工作的,可能需要再深入学习在具体执行的时候的代码。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。