赞
踩
最近由于业务需要搭建了StarRocks2.5.3版本的集群,但是在使用过程中发现曾经在2.3版本设置的一些配置项在2.5版本不太好用,看起来需要重新盘一盘2.5版本的compaction压缩机制了。
本文是通过StarRocks2.5.3版本源码进行阅读梳理的,其他版本内容仅供参考。
void CompactionManager::_schedule() { LOG(INFO) << "start compaction scheduler"; while (!_stop.load(std::memory_order_consume)) { ++_round; _wait_to_run(); std::shared_ptr<CompactionTask> compaction_task = _try_get_next_compaction_task(); if (!compaction_task) { std::unique_lock<std::mutex> lk(_mutex); _cv.wait_for(lk, 1000ms); } else { if (compaction_task->compaction_type() == CompactionType::BASE_COMPACTION) { StarRocksMetrics::instance()->tablet_base_max_compaction_score.set_value( compaction_task->compaction_score()); } else { StarRocksMetrics::instance()->tablet_cumulative_max_compaction_score.set_value( compaction_task->compaction_score()); } compaction_task->set_task_id(next_compaction_task_id()); LOG(INFO) << "submit task to compaction pool" << ", task_id:" << compaction_task->task_id() << ", tablet_id:" << compaction_task->tablet()->tablet_id() << ", compaction_type:" << starrocks::to_string(compaction_task->compaction_type()) << ", compaction_score:" << compaction_task->compaction_score() << " for round:" << _round << ", task_queue_size:" << candidates_size(); auto st = _compaction_pool->submit_func([compaction_task] { compaction_task->start(); }); if (!st.ok()) { LOG(WARNING) << "submit compaction task " << compaction_task->task_id() << " to compaction pool failed. status:" << st.to_string(); compaction_task->tablet()->reset_compaction(); CompactionCandidate candidate; candidate.tablet = compaction_task->tablet(); update_candidates({candidate}); } } } }
首先注意到的地方是compaction_manager.cpp的_schedule函数,根据字面意思,这个函数是所有压缩的调度器,用来向compaction pool提交待压缩任务的。
它是通过_try_get_next_compaction_task()
函数获取的待压缩任务队列。
std::shared_ptr<CompactionTask> CompactionManager::_try_get_next_compaction_task() {
VLOG(2) << "try to get next qualified tablet for round:" << _round
<< ", current candidates size:" << candidates_size();
CompactionCandidate compaction_candidate;
std::shared_ptr<CompactionTask> compaction_task = nullptr;
if (pick_candidate(&compaction_candidate)) {
compaction_task = compaction_candidate.tablet->create_compaction_task();
}
return compaction_task;
}
这个函数比较短,可以看出由pick_candidate
函数来选择的压缩候选者。
bool CompactionManager::pick_candidate(CompactionCandidate* candidate) { std::lock_guard lg(_candidates_mutex); if (_compaction_candidates.empty()) { return false; } auto iter = _compaction_candidates.begin(); while (iter != _compaction_candidates.end()) { if (_check_precondition(*iter)) { *candidate = *iter; _compaction_candidates.erase(iter); return true; } iter++; } return false; }
接下来看pick_candidate
函数,首先有一个_compaction_candidates
队列,循环这个队列,对每一行调用一次_check_precondition
函数进行判断,如果满足压缩前置条件,那么就出队并返回true。
这里有2个函数需要注意
_compaction_candidates
队列的设计,待压缩的tablet是怎样入队的。_check_precondition
函数的判断逻辑。void CompactionManager::update_candidates(std::vector<CompactionCandidate> candidates) { size_t erase_num = 0; { std::lock_guard lg(_candidates_mutex); // TODO(meegoo): This is very inefficient to implement, just to fix bug, it will refactor later for (auto iter = _compaction_candidates.begin(); iter != _compaction_candidates.end();) { bool has_erase = false; for (auto& candidate : candidates) { if (candidate.tablet->tablet_id() == iter->tablet->tablet_id()) { iter = _compaction_candidates.erase(iter); erase_num++; has_erase = true; break; } } if (!has_erase) { iter++; } } for (auto& candidate : candidates) { if (candidate.tablet->enable_compaction()) { VLOG(1) << "update candidate " << candidate.tablet->tablet_id() << " type " << starrocks::to_string(candidate.type) << " score " << candidate.score; _compaction_candidates.emplace(std::move(candidate)); } } } _notify(); }
这个函数是队列的入队函数,它主要是将candidates
队列进行了1次循环,将所有开启了compaction的tablet进行了入队操作。
在这段代码中有一个TODO备注,根据描述未来这里会重构。
读到这里又有2个疑问需要继续阅读
candidates
队列维护机制update_candidates
调用机制
void CompactionManager::update_tablet(TabletSharedPtr tablet) {
if (_disable_update_tablet) {
return;
}
VLOG(1) << "update tablet " << tablet->tablet_id();
if (tablet->need_compaction()) {
CompactionCandidate candidate;
candidate.tablet = tablet;
candidate.score = tablet->compaction_score();
candidate.type = tablet->compaction_type();
update_candidates({candidate});
}
}
// Base compaction may be started by time(once every day now) // Compaction checker will check whether to schedule base compaction for tablets size_t StorageEngine::_compaction_check_one_round() { size_t batch_size = 1000; int batch_sleep_time_ms = 1000; std::vector<TabletSharedPtr> tablets; tablets.reserve(batch_size); size_t tablets_num_checked = 0; while (!bg_worker_stopped()) { bool finished = _tablet_manager->get_next_batch_tablets(batch_size, &tablets); for (auto& tablet : tablets) { _compaction_manager->update_tablet(tablet); } tablets_num_checked += tablets.size(); tablets.clear(); if (finished) { break; } std::unique_lock<std::mutex> lk(_checker_mutex); _checker_cv.wait_for(lk, std::chrono::milliseconds(batch_sleep_time_ms), [this] { return bg_worker_stopped(); }); } return tablets_num_checked; }
void StorageEngine::compaction_check() { int checker_one_round_sleep_time_s = 1800; while (!bg_worker_stopped()) { MonotonicStopWatch stop_watch; stop_watch.start(); LOG(INFO) << "start to check compaction"; size_t num = _compaction_check_one_round(); stop_watch.stop(); LOG(INFO) << num << " tablets checked. time elapse:" << stop_watch.elapsed_time() / 1000000000 << " seconds." << " compaction checker will be scheduled again in " << checker_one_round_sleep_time_s << " seconds"; std::unique_lock<std::mutex> lk(_checker_mutex); _checker_cv.wait_for(lk, std::chrono::seconds(checker_one_round_sleep_time_s), [this] { return bg_worker_stopped(); }); } }
关于update_candidates
函数的调用机制是以上3个函数完成的
系统定时调用compaction_check
进行检查,然后在该函数里面调用
_compaction_check_one_round
函数,该函数会循环存储引擎的所有tablet进行update_tablet
函数调用,在这个函数里会进行need_compaction
函数调用,如果需要压缩则调用update_candidates
函数进入候选者队列
bool Tablet::need_compaction() {
std::lock_guard lock(_compaction_task_lock);
if (_compaction_task == nullptr && _enable_compaction) {
_compaction_context->type = INVALID_COMPACTION;
if (_compaction_context != nullptr &&
_compaction_context->policy->need_compaction(&_compaction_context->score, &_compaction_context->type)) {
// if there is running task, return false
// else, return true
return true;
}
}
return false;
}
在need_compaction
函数里,根据enable_size_tiered_compaction_strategy
是否启动会有2种处理机制。
bool SizeTieredCompactionPolicy::need_compaction(double* score, CompactionType* type) { if (_tablet->tablet_state() != TABLET_RUNNING) { return false; } bool force_base_compaction = false; if (type && *type == BASE_COMPACTION) { force_base_compaction = true; } auto st = _pick_rowsets_to_size_tiered_compact(force_base_compaction, &_rowsets, &_score); if (st.ok()) { if (_rowsets[0]->start_version() == 0) { _compaction_type = BASE_COMPACTION; } else { _compaction_type = CUMULATIVE_COMPACTION; } *score = _score; } else { _compaction_type = INVALID_COMPACTION; *score = 0; } *type = _compaction_type; return _compaction_type != INVALID_COMPACTION; }
这个是2.5的新机制
核心逻辑在_pick_rowsets_to_size_tiered_compact
函数中
Status SizeTieredCompactionPolicy::_pick_rowsets_to_size_tiered_compact(bool force_base_compaction, std::vector<RowsetSharedPtr>* input_rowsets, double* score) { input_rowsets->clear(); *score = 0; std::vector<RowsetSharedPtr> candidate_rowsets; _tablet->pick_all_candicate_rowsets(&candidate_rowsets); if (candidate_rowsets.size() <= 1) { return Status::NotFound("compaction no suitable version error."); } std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); if (!force_base_compaction && candidate_rowsets.size() == 2 && candidate_rowsets[0]->end_version() == 1 && candidate_rowsets[1]->rowset_meta()->get_compaction_score() <= 1) { // the tablet is with rowset: [0-1], [2-y] // and [0-1] has no data. in this situation, no need to do base compaction. return Status::NotFound("compaction no suitable version error."); } if (time(nullptr) - candidate_rowsets[0]->creation_time() > config::base_compaction_interval_seconds_since_last_operation) { force_base_compaction = true; } struct SizeTieredLevel { SizeTieredLevel(std::vector<RowsetSharedPtr> r, int64_t s, int64_t l, int64_t t, double sc) : rowsets(std::move(r)), segment_num(s), level_size(l), total_size(t), score(sc) {} std::vector<RowsetSharedPtr> rowsets; int64_t segment_num; int64_t level_size; int64_t total_size; double score; }; struct LevelComparator { bool operator()(const SizeTieredLevel* left, const SizeTieredLevel* right) const { return left->score > right->score || (left->score == right->score && left->rowsets[0]->start_version() > right->rowsets[0]->start_version()); } }; std::vector<std::unique_ptr<SizeTieredLevel>> order_levels; std::set<SizeTieredLevel*, LevelComparator> priority_levels; std::vector<RowsetSharedPtr> transient_rowsets; size_t segment_num = 0; int64_t level_multiple = config::size_tiered_level_multiple; auto keys_type = _tablet->keys_type(); bool reached_max_version = false; if (candidate_rowsets.size() > config::tablet_max_versions / 10 * 9) { reached_max_version = true; } int64_t level_size = -1; int64_t total_size = 0; int64_t prev_end_version = -1; for (auto rowset : candidate_rowsets) { int64_t rowset_size = rowset->data_disk_size() > 0 ? rowset->data_disk_size() : 1; if (level_size == -1) { level_size = rowset_size < _max_level_size ? rowset_size : _max_level_size; total_size = 0; } // meet missed version if (rowset->start_version() != prev_end_version + 1) { if (!transient_rowsets.empty()) { auto level = std::make_unique<SizeTieredLevel>( transient_rowsets, segment_num, level_size, total_size, _cal_compaction_score(segment_num, level_size, total_size, keys_type, reached_max_version)); priority_levels.emplace(level.get()); order_levels.emplace_back(std::move(level)); } level_size = rowset_size < _max_level_size ? rowset_size : _max_level_size; segment_num = 0; total_size = 0; transient_rowsets.clear(); } if (_tablet->version_for_delete_predicate(rowset->version())) { // meet a delete version // base compaction can handle delete condition if (!transient_rowsets.empty() && transient_rowsets[0]->start_version() == 0) { } else { // if upper level only has one rowset, we can merge into one level int64_t i = order_levels.size() - 1; while (i >= 0) { if (order_levels[i]->rowsets.size() == 1 && transient_rowsets[0]->start_version() == order_levels[i]->rowsets[0]->end_version() + 1 && !_tablet->version_for_delete_predicate(order_levels[i]->rowsets[0]->version())) { transient_rowsets.insert(transient_rowsets.begin(), order_levels[i]->rowsets[0]); auto rs = order_levels[i]->rowsets[0]->data_disk_size() > 0 ? order_levels[i]->rowsets[0]->data_disk_size() : 1; level_size = rs < _max_level_size ? rs : _max_level_size; segment_num += order_levels[i]->segment_num; total_size += level_size; priority_levels.erase(order_levels[i].get()); i--; } else { break; } } order_levels.resize(i + 1); // after merge, check if we match base compaction condition if (!transient_rowsets.empty() && transient_rowsets[0]->start_version() != 0) { auto level = std::make_unique<SizeTieredLevel>( transient_rowsets, segment_num, level_size, total_size, _cal_compaction_score(segment_num, level_size, total_size, keys_type, reached_max_version)); priority_levels.emplace(level.get()); order_levels.emplace_back(std::move(level)); } if (transient_rowsets.empty() || (!transient_rowsets.empty() && transient_rowsets[0]->start_version() != 0)) { segment_num = 0; transient_rowsets.clear(); level_size = -1; continue; } } } else if (!force_base_compaction && level_size > config::size_tiered_min_level_size && rowset_size < level_size && level_size / rowset_size > (level_multiple - 1)) { if (!transient_rowsets.empty()) { auto level = std::make_unique<SizeTieredLevel>( transient_rowsets, segment_num, level_size, total_size, _cal_compaction_score(segment_num, level_size, total_size, keys_type, reached_max_version)); priority_levels.emplace(level.get()); order_levels.emplace_back(std::move(level)); } segment_num = 0; transient_rowsets.clear(); level_size = rowset_size < _max_level_size ? rowset_size : _max_level_size; total_size = 0; } segment_num += rowset->rowset_meta()->get_compaction_score(); total_size += rowset_size; transient_rowsets.emplace_back(rowset); prev_end_version = rowset->end_version(); } if (!transient_rowsets.empty()) { auto level = std::make_unique<SizeTieredLevel>( transient_rowsets, segment_num, level_size, total_size, _cal_compaction_score(segment_num, level_size, total_size, keys_type, reached_max_version)); priority_levels.emplace(level.get()); order_levels.emplace_back(std::move(level)); } for (auto& level : order_levels) { *score += level->score; } SizeTieredLevel* selected_level = nullptr; if (!priority_levels.empty()) { // We need a minimum number of segments that trigger compaction to // avoid triggering compaction too frequently compared to the old version // But in the old version of compaction, the user may set a large min_cumulative_compaction_num_singleton_deltas // to avoid TOO_MANY_VERSION errors, it is unnecessary in size tiered compaction auto min_compaction_segment_num = std::max( static_cast<int64_t>(2), std::min(config::min_cumulative_compaction_num_singleton_deltas, config::size_tiered_level_multiple)); selected_level = *priority_levels.begin(); if (selected_level->segment_num >= min_compaction_segment_num) { *input_rowsets = selected_level->rowsets; } } // Cumulative compaction will process with at least 1 rowset. // So when there is no rowset being chosen, we should return Status::NotFound("cumulative compaction no suitable version error."); if (input_rowsets->empty()) { return Status::NotFound("cumulative compaction no suitable version error."); } RETURN_IF_ERROR(_check_version_continuity(*input_rowsets)); LOG(INFO) << "pick tablet " << _tablet->tablet_id() << " for size-tiered compaction rowset version=" << input_rowsets->front()->start_version() << "-" << input_rowsets->back()->end_version() << " score=" << selected_level->score << " level_size=" << selected_level->level_size << " total_size=" << selected_level->total_size << " segment_num=" << selected_level->segment_num << " force_base_compaction=" << force_base_compaction << " reached_max_versions=" << reached_max_version; return Status::OK(); }
本函数就是分层压缩逻辑的主要函数。
这个函数非常长,可以简化为几个筛选逻辑
优先压缩逻辑
待压缩rowset数量是否达到最大版本号的90%,如果达到了,说明tablet有溢出版本号的风险,需要优先压缩
分层压缩逻辑
有3个默认配置
size_tiered_min_level_size
=128k
size_tiered_level_multiple
= 5
size_tiered_level_num
= 7
在该模型下,tablet根据大小分为7层,相邻两层之间大小差距5倍,每一层都会保留1个rowset,compaction只会将数据压缩到对应层级中。
默认配置下7个层级大小如下
128 k
640 k
3200 k 约 3m
16000 k约 15.6m
80000 k约 78.125m
400000 k 约 390.625m
2000000 k 约 1.9g
压缩分数逻辑
double SizeTieredCompactionPolicy::_cal_compaction_score(int64_t segment_num, int64_t level_size, int64_t total_size, KeysType keys_type, bool reached_max_version) { // base score is segment num double score = segment_num; // data bonus if (keys_type == KeysType::DUP_KEYS) { // duplicate keys only has write amplification, so that we use more aggressive size-tiered strategy score += ((double)(total_size - level_size) / level_size) * 2; } else { // agg/unique key also has read amplification, segment num occupies a greater weight score += (segment_num - 1) * 2 + ((double)(total_size - level_size) / level_size); } // Normalized score, max data bouns limit to triple size_tiered_level_multiple score = std::min((double)config::size_tiered_level_multiple * 3 + segment_num, score); // level bonus: The lower the level means the smaller the data volume of the compaction, the higher the execution priority int64_t level_bonus = 0; for (int64_t v = level_size; v < _max_level_size && level_bonus <= 7; ++level_bonus) { v = v * config::size_tiered_level_multiple; } score += level_bonus; // version limit bonus: The version num of the tablet is about to exceed the limit, we let it perform compaction faster and reduce the version num if (reached_max_version) { score *= 2; } return score; }
明细模型 score
因为只需要无脑聚合,没有计算逻辑,所以压缩优先级比其他模型高
((double)(total_size - level_size) / level_size) * 2 + segment_num
聚合/更新模型 score
数据大小对压缩的优先级影响降低,但是segment数量对优先级影响提高,因为这两个模型segment压缩后文件大小会减少,对读取效率会提高
((double)(total_size - level_size) / level_size) + segment_num * 3 - 2
主键模型 score
只有segment_num
std::min((double)config::size_tiered_level_multiple * 3 + segment_num, score);
如果上面计算的score分数大于 15 + segment数量,则降为该值
int64_t level_bonus = 0;
for (int64_t v = level_size; v < _max_level_size && level_bonus <= 7; ++level_bonus) {
v = v * config::size_tiered_level_multiple;
}
score += level_bonus;
根据文件大小对应的分段,增加优先级分数,越小的文件优先级越高
if (reached_max_version) {
score *= 2;
}
如果segments数量达到了最大值的90%,优先级直接*2
bool DefaultCumulativeBaseCompactionPolicy::need_compaction(double* score, CompactionType* type) { if (_tablet->tablet_state() != TABLET_RUNNING) { return false; } _tablet->calculate_cumulative_point(); auto cumu_st = _pick_rowsets_to_cumulative_compact(&_cumulative_rowsets, &_cumulative_score); auto base_st = _pick_rowsets_to_base_compact(&_base_rowsets, &_base_score); if (cumu_st.ok() && base_st.ok()) { if (_cumulative_score >= _base_score) { _compaction_type = CUMULATIVE_COMPACTION; *score = _cumulative_score; } else { _compaction_type = BASE_COMPACTION; *score = _base_score; } } else if (cumu_st.ok()) { _compaction_type = CUMULATIVE_COMPACTION; *score = _cumulative_score; } else if (base_st.ok()) { _compaction_type = BASE_COMPACTION; *score = _base_score; } else { _compaction_type = INVALID_COMPACTION; *score = 0; } *type = _compaction_type; return _compaction_type != INVALID_COMPACTION; }
默认的判断机制和2.3一样,不再赘述了,可以翻看我前面的文章。
bool CompactionManager::_check_precondition(const CompactionCandidate& candidate) { if (!candidate.tablet) { LOG(WARNING) << "candidate with null tablet"; return false; } const TabletSharedPtr& tablet = candidate.tablet; if (tablet->tablet_state() != TABLET_RUNNING) { VLOG(2) << "skip tablet:" << tablet->tablet_id() << " because tablet state is:" << tablet->tablet_state() << ", not RUNNING"; return false; } if (tablet->has_compaction_task()) { // tablet already has a running compaction task, skip it VLOG(2) << "skip tablet:" << tablet->tablet_id() << " because there is another running compaction task."; return false; } int64_t last_failure_ts = 0; DataDir* data_dir = tablet->data_dir(); if (candidate.type == CUMULATIVE_COMPACTION) { std::unique_lock lk(tablet->get_cumulative_lock(), std::try_to_lock); if (!lk.owns_lock()) { VLOG(2) << "skip tablet:" << tablet->tablet_id() << " for cumulative lock"; return false; } // control the concurrent running tasks's limit // allow overruns up to twice the configured limit uint16_t num = running_cumulative_tasks_num_for_dir(data_dir); if (config::cumulative_compaction_num_threads_per_disk > 0 && num >= config::cumulative_compaction_num_threads_per_disk * 2) { VLOG(2) << "skip tablet:" << tablet->tablet_id() << " for limit of cumulative compaction task per disk. disk path:" << data_dir->path() << ", running num:" << num; return false; } last_failure_ts = tablet->last_cumu_compaction_failure_time(); } else if (candidate.type == BASE_COMPACTION) { std::unique_lock lk(tablet->get_base_lock(), std::try_to_lock); if (!lk.owns_lock()) { VLOG(2) << "skip tablet:" << tablet->tablet_id() << " for base lock"; return false; } uint16_t num = running_base_tasks_num_for_dir(data_dir); if (config::base_compaction_num_threads_per_disk > 0 && num >= config::base_compaction_num_threads_per_disk * 2) { VLOG(2) << "skip tablet:" << tablet->tablet_id() << " for limit of base compaction task per disk. disk path:" << data_dir->path() << ", running num:" << num; return false; } last_failure_ts = tablet->last_base_compaction_failure_time(); } int64_t now_ms = UnixMillis(); if (candidate.type == CompactionType::CUMULATIVE_COMPACTION) { if (now_ms - last_failure_ts <= config::min_cumulative_compaction_failure_interval_sec * 1000) { VLOG(1) << "Too often to schedule failure compaction, skip it." << "compaction_type=" << starrocks::to_string(candidate.type) << ", min_cumulative_compaction_failure_interval_sec=" << config::min_cumulative_compaction_failure_interval_sec << ", last_failure_timestamp=" << last_failure_ts / 1000 << ", tablet_id=" << tablet->tablet_id(); return false; } } else if (candidate.type == CompactionType::BASE_COMPACTION) { if (now_ms - last_failure_ts <= config::min_compaction_failure_interval_sec * 1000) { VLOG(1) << "Too often to schedule failure compaction, skip it." << "compaction_type=" << starrocks::to_string(candidate.type) << ", min_compaction_failure_interval_sec=" << config::min_compaction_failure_interval_sec << ", last_failure_timestamp=" << last_failure_ts / 1000 << ", tablet_id=" << tablet->tablet_id(); return false; } } return true; }
在压缩执行前会进行一系列的校验,不满足条件就跳过
StarRocks 2.5版本关于压缩主要是引入了一个新的模型,分段压缩
与传统的压缩模式相比,分段压缩将一个tablet分为了7段,每次只压缩相同段内的数据
通过分段压缩模式,cumulative不再只压缩1次,而是逐步向上一段传递,减少了base compaction触发次数,从而提升压缩效率。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。