当前位置:   article > 正文

StarRocks源码阅读系列(4)2.5版本新compaction机制_starrocks compaction

starrocks compaction

StarRocks源码阅读:2.5版本compaction机制

前言

最近由于业务需要搭建了StarRocks2.5.3版本的集群,但是在使用过程中发现曾经在2.3版本设置的一些配置项在2.5版本不太好用,看起来需要重新盘一盘2.5版本的compaction压缩机制了。
本文是通过StarRocks2.5.3版本源码进行阅读梳理的,其他版本内容仅供参考。

源码阅读

_schedule


void CompactionManager::_schedule() {
    LOG(INFO) << "start compaction scheduler";
    while (!_stop.load(std::memory_order_consume)) {
        ++_round;
        _wait_to_run();
        std::shared_ptr<CompactionTask> compaction_task = _try_get_next_compaction_task();
        if (!compaction_task) {
            std::unique_lock<std::mutex> lk(_mutex);
            _cv.wait_for(lk, 1000ms);
        } else {
            if (compaction_task->compaction_type() == CompactionType::BASE_COMPACTION) {
                StarRocksMetrics::instance()->tablet_base_max_compaction_score.set_value(
                        compaction_task->compaction_score());
            } else {
                StarRocksMetrics::instance()->tablet_cumulative_max_compaction_score.set_value(
                        compaction_task->compaction_score());
            }

            compaction_task->set_task_id(next_compaction_task_id());
            LOG(INFO) << "submit task to compaction pool"
                      << ", task_id:" << compaction_task->task_id()
                      << ", tablet_id:" << compaction_task->tablet()->tablet_id()
                      << ", compaction_type:" << starrocks::to_string(compaction_task->compaction_type())
                      << ", compaction_score:" << compaction_task->compaction_score() << " for round:" << _round
                      << ", task_queue_size:" << candidates_size();
            auto st = _compaction_pool->submit_func([compaction_task] { compaction_task->start(); });
            if (!st.ok()) {
                LOG(WARNING) << "submit compaction task " << compaction_task->task_id()
                             << " to compaction pool failed. status:" << st.to_string();
                compaction_task->tablet()->reset_compaction();
                CompactionCandidate candidate;
                candidate.tablet = compaction_task->tablet();
                update_candidates({candidate});
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

首先注意到的地方是compaction_manager.cpp的_schedule函数,根据字面意思,这个函数是所有压缩的调度器,用来向compaction pool提交待压缩任务的。

它是通过_try_get_next_compaction_task()函数获取的待压缩任务队列。

_try_get_next_compaction_task


std::shared_ptr<CompactionTask> CompactionManager::_try_get_next_compaction_task() {
    VLOG(2) << "try to get next qualified tablet for round:" << _round
            << ", current candidates size:" << candidates_size();
    CompactionCandidate compaction_candidate;
    std::shared_ptr<CompactionTask> compaction_task = nullptr;

    if (pick_candidate(&compaction_candidate)) {
        compaction_task = compaction_candidate.tablet->create_compaction_task();
    }

    return compaction_task;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

这个函数比较短,可以看出由pick_candidate函数来选择的压缩候选者。

pick_candidate

bool CompactionManager::pick_candidate(CompactionCandidate* candidate) {
    std::lock_guard lg(_candidates_mutex);
    if (_compaction_candidates.empty()) {
        return false;
    }

    auto iter = _compaction_candidates.begin();
    while (iter != _compaction_candidates.end()) {
        if (_check_precondition(*iter)) {
            *candidate = *iter;
            _compaction_candidates.erase(iter);
            return true;
        }
        iter++;
    }

    return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

接下来看pick_candidate函数,首先有一个_compaction_candidates队列,循环这个队列,对每一行调用一次_check_precondition函数进行判断,如果满足压缩前置条件,那么就出队并返回true。
这里有2个函数需要注意

  1. _compaction_candidates队列的设计,待压缩的tablet是怎样入队的。
  2. _check_precondition函数的判断逻辑。

update_candidates


void CompactionManager::update_candidates(std::vector<CompactionCandidate> candidates) {
    size_t erase_num = 0;
    {
        std::lock_guard lg(_candidates_mutex);
        // TODO(meegoo): This is very inefficient to implement, just to fix bug, it will refactor later
        for (auto iter = _compaction_candidates.begin(); iter != _compaction_candidates.end();) {
            bool has_erase = false;
            for (auto& candidate : candidates) {
                if (candidate.tablet->tablet_id() == iter->tablet->tablet_id()) {
                    iter = _compaction_candidates.erase(iter);
                    erase_num++;
                    has_erase = true;
                    break;
                }
            }
            if (!has_erase) {
                iter++;
            }
        }
        for (auto& candidate : candidates) {
            if (candidate.tablet->enable_compaction()) {
                VLOG(1) << "update candidate " << candidate.tablet->tablet_id() << " type "
                        << starrocks::to_string(candidate.type) << " score " << candidate.score;
                _compaction_candidates.emplace(std::move(candidate));
            }
        }
    }
    _notify();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

这个函数是队列的入队函数,它主要是将candidates队列进行了1次循环,将所有开启了compaction的tablet进行了入队操作。
在这段代码中有一个TODO备注,根据描述未来这里会重构。
读到这里又有2个疑问需要继续阅读

  1. candidates 队列维护机制
  2. update_candidates 调用机制

update_tablet



void CompactionManager::update_tablet(TabletSharedPtr tablet) {
    if (_disable_update_tablet) {
        return;
    }
    VLOG(1) << "update tablet " << tablet->tablet_id();
    if (tablet->need_compaction()) {
        CompactionCandidate candidate;
        candidate.tablet = tablet;
        candidate.score = tablet->compaction_score();
        candidate.type = tablet->compaction_type();
        update_candidates({candidate});
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

_compaction_check_one_round


// Base compaction may be started by time(once every day now)
// Compaction checker will check whether to schedule base compaction for tablets
size_t StorageEngine::_compaction_check_one_round() {
    size_t batch_size = 1000;
    int batch_sleep_time_ms = 1000;
    std::vector<TabletSharedPtr> tablets;
    tablets.reserve(batch_size);
    size_t tablets_num_checked = 0;
    while (!bg_worker_stopped()) {
        bool finished = _tablet_manager->get_next_batch_tablets(batch_size, &tablets);
        for (auto& tablet : tablets) {
            _compaction_manager->update_tablet(tablet);
        }
        tablets_num_checked += tablets.size();
        tablets.clear();
        if (finished) {
            break;
        }
        std::unique_lock<std::mutex> lk(_checker_mutex);
        _checker_cv.wait_for(lk, std::chrono::milliseconds(batch_sleep_time_ms),
                             [this] { return bg_worker_stopped(); });
    }
    return tablets_num_checked;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

compaction_check


void StorageEngine::compaction_check() {
    int checker_one_round_sleep_time_s = 1800;
    while (!bg_worker_stopped()) {
        MonotonicStopWatch stop_watch;
        stop_watch.start();
        LOG(INFO) << "start to check compaction";
        size_t num = _compaction_check_one_round();
        stop_watch.stop();
        LOG(INFO) << num << " tablets checked. time elapse:" << stop_watch.elapsed_time() / 1000000000 << " seconds."
                  << " compaction checker will be scheduled again in " << checker_one_round_sleep_time_s << " seconds";
        std::unique_lock<std::mutex> lk(_checker_mutex);
        _checker_cv.wait_for(lk, std::chrono::seconds(checker_one_round_sleep_time_s),
                             [this] { return bg_worker_stopped(); });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

关于update_candidates函数的调用机制是以上3个函数完成的
系统定时调用compaction_check进行检查,然后在该函数里面调用
_compaction_check_one_round函数,该函数会循环存储引擎的所有tablet进行update_tablet函数调用,在这个函数里会进行need_compaction函数调用,如果需要压缩则调用update_candidates函数进入候选者队列

need_compaction


bool Tablet::need_compaction() {
    std::lock_guard lock(_compaction_task_lock);
    if (_compaction_task == nullptr && _enable_compaction) {
        _compaction_context->type = INVALID_COMPACTION;
        if (_compaction_context != nullptr &&
            _compaction_context->policy->need_compaction(&_compaction_context->score, &_compaction_context->type)) {
            // if there is running task, return false
            // else, return true
            return true;
        }
    }
    return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

need_compaction函数里,根据enable_size_tiered_compaction_strategy是否启动会有2种处理机制。

size tiered compaction


bool SizeTieredCompactionPolicy::need_compaction(double* score, CompactionType* type) {
    if (_tablet->tablet_state() != TABLET_RUNNING) {
        return false;
    }

    bool force_base_compaction = false;
    if (type && *type == BASE_COMPACTION) {
        force_base_compaction = true;
    }
    auto st = _pick_rowsets_to_size_tiered_compact(force_base_compaction, &_rowsets, &_score);

    if (st.ok()) {
        if (_rowsets[0]->start_version() == 0) {
            _compaction_type = BASE_COMPACTION;
        } else {
            _compaction_type = CUMULATIVE_COMPACTION;
        }
        *score = _score;
    } else {
        _compaction_type = INVALID_COMPACTION;
        *score = 0;
    }
    *type = _compaction_type;

    return _compaction_type != INVALID_COMPACTION;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

这个是2.5的新机制
核心逻辑在_pick_rowsets_to_size_tiered_compact函数中

_pick_rowsets_to_size_tiered_compact

Status SizeTieredCompactionPolicy::_pick_rowsets_to_size_tiered_compact(bool force_base_compaction,
                                                                        std::vector<RowsetSharedPtr>* input_rowsets,
                                                                        double* score) {
    input_rowsets->clear();
    *score = 0;
    std::vector<RowsetSharedPtr> candidate_rowsets;
    _tablet->pick_all_candicate_rowsets(&candidate_rowsets);

    if (candidate_rowsets.size() <= 1) {
        return Status::NotFound("compaction no suitable version error.");
    }

    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);

    if (!force_base_compaction && candidate_rowsets.size() == 2 && candidate_rowsets[0]->end_version() == 1 &&
        candidate_rowsets[1]->rowset_meta()->get_compaction_score() <= 1) {
        // the tablet is with rowset: [0-1], [2-y]
        // and [0-1] has no data. in this situation, no need to do base compaction.
        return Status::NotFound("compaction no suitable version error.");
    }

    if (time(nullptr) - candidate_rowsets[0]->creation_time() >
        config::base_compaction_interval_seconds_since_last_operation) {
        force_base_compaction = true;
    }

    struct SizeTieredLevel {
        SizeTieredLevel(std::vector<RowsetSharedPtr> r, int64_t s, int64_t l, int64_t t, double sc)
                : rowsets(std::move(r)), segment_num(s), level_size(l), total_size(t), score(sc) {}

        std::vector<RowsetSharedPtr> rowsets;
        int64_t segment_num;
        int64_t level_size;
        int64_t total_size;
        double score;
    };

    struct LevelComparator {
        bool operator()(const SizeTieredLevel* left, const SizeTieredLevel* right) const {
            return left->score > right->score ||
                   (left->score == right->score &&
                    left->rowsets[0]->start_version() > right->rowsets[0]->start_version());
        }
    };

    std::vector<std::unique_ptr<SizeTieredLevel>> order_levels;
    std::set<SizeTieredLevel*, LevelComparator> priority_levels;
    std::vector<RowsetSharedPtr> transient_rowsets;
    size_t segment_num = 0;
    int64_t level_multiple = config::size_tiered_level_multiple;
    auto keys_type = _tablet->keys_type();

    bool reached_max_version = false;
    if (candidate_rowsets.size() > config::tablet_max_versions / 10 * 9) {
        reached_max_version = true;
    }

    int64_t level_size = -1;
    int64_t total_size = 0;
    int64_t prev_end_version = -1;
    for (auto rowset : candidate_rowsets) {
        int64_t rowset_size = rowset->data_disk_size() > 0 ? rowset->data_disk_size() : 1;
        if (level_size == -1) {
            level_size = rowset_size < _max_level_size ? rowset_size : _max_level_size;
            total_size = 0;
        }

        // meet missed version
        if (rowset->start_version() != prev_end_version + 1) {
            if (!transient_rowsets.empty()) {
                auto level = std::make_unique<SizeTieredLevel>(
                        transient_rowsets, segment_num, level_size, total_size,
                        _cal_compaction_score(segment_num, level_size, total_size, keys_type, reached_max_version));
                priority_levels.emplace(level.get());
                order_levels.emplace_back(std::move(level));
            }
            level_size = rowset_size < _max_level_size ? rowset_size : _max_level_size;
            segment_num = 0;
            total_size = 0;
            transient_rowsets.clear();
        }

        if (_tablet->version_for_delete_predicate(rowset->version())) {
            // meet a delete version
            // base compaction can handle delete condition
            if (!transient_rowsets.empty() && transient_rowsets[0]->start_version() == 0) {
            } else {
                // if upper level only has one rowset, we can merge into one level
                int64_t i = order_levels.size() - 1;
                while (i >= 0) {
                    if (order_levels[i]->rowsets.size() == 1 &&
                        transient_rowsets[0]->start_version() == order_levels[i]->rowsets[0]->end_version() + 1 &&
                        !_tablet->version_for_delete_predicate(order_levels[i]->rowsets[0]->version())) {
                        transient_rowsets.insert(transient_rowsets.begin(), order_levels[i]->rowsets[0]);
                        auto rs = order_levels[i]->rowsets[0]->data_disk_size() > 0
                                          ? order_levels[i]->rowsets[0]->data_disk_size()
                                          : 1;
                        level_size = rs < _max_level_size ? rs : _max_level_size;
                        segment_num += order_levels[i]->segment_num;
                        total_size += level_size;
                        priority_levels.erase(order_levels[i].get());
                        i--;
                    } else {
                        break;
                    }
                }
                order_levels.resize(i + 1);

                // after merge, check if we match base compaction condition
                if (!transient_rowsets.empty() && transient_rowsets[0]->start_version() != 0) {
                    auto level = std::make_unique<SizeTieredLevel>(
                            transient_rowsets, segment_num, level_size, total_size,
                            _cal_compaction_score(segment_num, level_size, total_size, keys_type, reached_max_version));
                    priority_levels.emplace(level.get());
                    order_levels.emplace_back(std::move(level));
                }

                if (transient_rowsets.empty() ||
                    (!transient_rowsets.empty() && transient_rowsets[0]->start_version() != 0)) {
                    segment_num = 0;
                    transient_rowsets.clear();
                    level_size = -1;
                    continue;
                }
            }
        } else if (!force_base_compaction && level_size > config::size_tiered_min_level_size &&
                   rowset_size < level_size && level_size / rowset_size > (level_multiple - 1)) {
            if (!transient_rowsets.empty()) {
                auto level = std::make_unique<SizeTieredLevel>(
                        transient_rowsets, segment_num, level_size, total_size,
                        _cal_compaction_score(segment_num, level_size, total_size, keys_type, reached_max_version));
                priority_levels.emplace(level.get());
                order_levels.emplace_back(std::move(level));
            }
            segment_num = 0;
            transient_rowsets.clear();
            level_size = rowset_size < _max_level_size ? rowset_size : _max_level_size;
            total_size = 0;
        }

        segment_num += rowset->rowset_meta()->get_compaction_score();
        total_size += rowset_size;
        transient_rowsets.emplace_back(rowset);
        prev_end_version = rowset->end_version();
    }

    if (!transient_rowsets.empty()) {
        auto level = std::make_unique<SizeTieredLevel>(
                transient_rowsets, segment_num, level_size, total_size,
                _cal_compaction_score(segment_num, level_size, total_size, keys_type, reached_max_version));
        priority_levels.emplace(level.get());
        order_levels.emplace_back(std::move(level));
    }

    for (auto& level : order_levels) {
        *score += level->score;
    }

    SizeTieredLevel* selected_level = nullptr;
    if (!priority_levels.empty()) {
        // We need a minimum number of segments that trigger compaction to
        // avoid triggering compaction too frequently compared to the old version
        // But in the old version of compaction, the user may set a large min_cumulative_compaction_num_singleton_deltas
        // to avoid TOO_MANY_VERSION errors, it is unnecessary in size tiered compaction
        auto min_compaction_segment_num = std::max(
                static_cast<int64_t>(2),
                std::min(config::min_cumulative_compaction_num_singleton_deltas, config::size_tiered_level_multiple));
        selected_level = *priority_levels.begin();
        if (selected_level->segment_num >= min_compaction_segment_num) {
            *input_rowsets = selected_level->rowsets;
        }
    }

    // Cumulative compaction will process with at least 1 rowset.
    // So when there is no rowset being chosen, we should return Status::NotFound("cumulative compaction no suitable version error.");
    if (input_rowsets->empty()) {
        return Status::NotFound("cumulative compaction no suitable version error.");
    }

    RETURN_IF_ERROR(_check_version_continuity(*input_rowsets));

    LOG(INFO) << "pick tablet " << _tablet->tablet_id()
              << " for size-tiered compaction rowset version=" << input_rowsets->front()->start_version() << "-"
              << input_rowsets->back()->end_version() << " score=" << selected_level->score
              << " level_size=" << selected_level->level_size << " total_size=" << selected_level->total_size
              << " segment_num=" << selected_level->segment_num << " force_base_compaction=" << force_base_compaction
              << " reached_max_versions=" << reached_max_version;

    return Status::OK();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190

本函数就是分层压缩逻辑的主要函数。
这个函数非常长,可以简化为几个筛选逻辑

  1. 优先压缩逻辑
    待压缩rowset数量是否达到最大版本号的90%,如果达到了,说明tablet有溢出版本号的风险,需要优先压缩

  2. 分层压缩逻辑
    有3个默认配置
    size_tiered_min_level_size=128k
    size_tiered_level_multiple = 5
    size_tiered_level_num = 7
    在该模型下,tablet根据大小分为7层,相邻两层之间大小差距5倍,每一层都会保留1个rowset,compaction只会将数据压缩到对应层级中。
    默认配置下7个层级大小如下
    128 k
    640 k
    3200 k 约 3m
    16000 k约 15.6m
    80000 k约 78.125m
    400000 k 约 390.625m
    2000000 k 约 1.9g

  3. 压缩分数逻辑


double SizeTieredCompactionPolicy::_cal_compaction_score(int64_t segment_num, int64_t level_size, int64_t total_size,
                                                         KeysType keys_type, bool reached_max_version) {
    // base score is segment num
    double score = segment_num;

    // data bonus
    if (keys_type == KeysType::DUP_KEYS) {
        // duplicate keys only has write amplification, so that we use more aggressive size-tiered strategy
        score += ((double)(total_size - level_size) / level_size) * 2;
    } else {
        // agg/unique key also has read amplification, segment num occupies a greater weight
        score += (segment_num - 1) * 2 + ((double)(total_size - level_size) / level_size);
    }
    // Normalized score, max data bouns limit to triple size_tiered_level_multiple
    score = std::min((double)config::size_tiered_level_multiple * 3 + segment_num, score);

    // level bonus: The lower the level means the smaller the data volume of the compaction, the higher the execution priority
    int64_t level_bonus = 0;
    for (int64_t v = level_size; v < _max_level_size && level_bonus <= 7; ++level_bonus) {
        v = v * config::size_tiered_level_multiple;
    }
    score += level_bonus;

    // version limit bonus: The version num of the tablet is about to exceed the limit, we let it perform compaction faster and reduce the version num
    if (reached_max_version) {
        score *= 2;
    }

    return score;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

明细模型 score
因为只需要无脑聚合,没有计算逻辑,所以压缩优先级比其他模型高
((double)(total_size - level_size) / level_size) * 2 + segment_num
聚合/更新模型 score
数据大小对压缩的优先级影响降低,但是segment数量对优先级影响提高,因为这两个模型segment压缩后文件大小会减少,对读取效率会提高
((double)(total_size - level_size) / level_size) + segment_num * 3 - 2
主键模型 score
只有segment_num

std::min((double)config::size_tiered_level_multiple * 3 + segment_num, score);
如果上面计算的score分数大于 15 + segment数量,则降为该值

   int64_t level_bonus = 0;
    for (int64_t v = level_size; v < _max_level_size && level_bonus <= 7; ++level_bonus) {
        v = v * config::size_tiered_level_multiple;
    }
    score += level_bonus;
  • 1
  • 2
  • 3
  • 4
  • 5

根据文件大小对应的分段,增加优先级分数,越小的文件优先级越高

if (reached_max_version) {
   score *= 2;
}
  • 1
  • 2
  • 3

如果segments数量达到了最大值的90%,优先级直接*2

default compaction


bool DefaultCumulativeBaseCompactionPolicy::need_compaction(double* score, CompactionType* type) {
    if (_tablet->tablet_state() != TABLET_RUNNING) {
        return false;
    }

    _tablet->calculate_cumulative_point();
    auto cumu_st = _pick_rowsets_to_cumulative_compact(&_cumulative_rowsets, &_cumulative_score);
    auto base_st = _pick_rowsets_to_base_compact(&_base_rowsets, &_base_score);

    if (cumu_st.ok() && base_st.ok()) {
        if (_cumulative_score >= _base_score) {
            _compaction_type = CUMULATIVE_COMPACTION;
            *score = _cumulative_score;
        } else {
            _compaction_type = BASE_COMPACTION;
            *score = _base_score;
        }
    } else if (cumu_st.ok()) {
        _compaction_type = CUMULATIVE_COMPACTION;
        *score = _cumulative_score;
    } else if (base_st.ok()) {
        _compaction_type = BASE_COMPACTION;
        *score = _base_score;
    } else {
        _compaction_type = INVALID_COMPACTION;
        *score = 0;
    }
    *type = _compaction_type;

    return _compaction_type != INVALID_COMPACTION;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

默认的判断机制和2.3一样,不再赘述了,可以翻看我前面的文章。

_check_precondition


bool CompactionManager::_check_precondition(const CompactionCandidate& candidate) {
    if (!candidate.tablet) {
        LOG(WARNING) << "candidate with null tablet";
        return false;
    }
    const TabletSharedPtr& tablet = candidate.tablet;
    if (tablet->tablet_state() != TABLET_RUNNING) {
        VLOG(2) << "skip tablet:" << tablet->tablet_id() << " because tablet state is:" << tablet->tablet_state()
                << ", not RUNNING";
        return false;
    }

    if (tablet->has_compaction_task()) {
        // tablet already has a running compaction task, skip it
        VLOG(2) << "skip tablet:" << tablet->tablet_id() << " because there is another running compaction task.";
        return false;
    }

    int64_t last_failure_ts = 0;
    DataDir* data_dir = tablet->data_dir();
    if (candidate.type == CUMULATIVE_COMPACTION) {
        std::unique_lock lk(tablet->get_cumulative_lock(), std::try_to_lock);
        if (!lk.owns_lock()) {
            VLOG(2) << "skip tablet:" << tablet->tablet_id() << " for cumulative lock";
            return false;
        }
        // control the concurrent running tasks's limit
        // allow overruns up to twice the configured limit
        uint16_t num = running_cumulative_tasks_num_for_dir(data_dir);
        if (config::cumulative_compaction_num_threads_per_disk > 0 &&
            num >= config::cumulative_compaction_num_threads_per_disk * 2) {
            VLOG(2) << "skip tablet:" << tablet->tablet_id()
                    << " for limit of cumulative compaction task per disk. disk path:" << data_dir->path()
                    << ", running num:" << num;
            return false;
        }
        last_failure_ts = tablet->last_cumu_compaction_failure_time();
    } else if (candidate.type == BASE_COMPACTION) {
        std::unique_lock lk(tablet->get_base_lock(), std::try_to_lock);
        if (!lk.owns_lock()) {
            VLOG(2) << "skip tablet:" << tablet->tablet_id() << " for base lock";
            return false;
        }
        uint16_t num = running_base_tasks_num_for_dir(data_dir);
        if (config::base_compaction_num_threads_per_disk > 0 &&
            num >= config::base_compaction_num_threads_per_disk * 2) {
            VLOG(2) << "skip tablet:" << tablet->tablet_id()
                    << " for limit of base compaction task per disk. disk path:" << data_dir->path()
                    << ", running num:" << num;
            return false;
        }
        last_failure_ts = tablet->last_base_compaction_failure_time();
    }

    int64_t now_ms = UnixMillis();
    if (candidate.type == CompactionType::CUMULATIVE_COMPACTION) {
        if (now_ms - last_failure_ts <= config::min_cumulative_compaction_failure_interval_sec * 1000) {
            VLOG(1) << "Too often to schedule failure compaction, skip it."
                    << "compaction_type=" << starrocks::to_string(candidate.type)
                    << ", min_cumulative_compaction_failure_interval_sec="
                    << config::min_cumulative_compaction_failure_interval_sec
                    << ", last_failure_timestamp=" << last_failure_ts / 1000 << ", tablet_id=" << tablet->tablet_id();
            return false;
        }
    } else if (candidate.type == CompactionType::BASE_COMPACTION) {
        if (now_ms - last_failure_ts <= config::min_compaction_failure_interval_sec * 1000) {
            VLOG(1) << "Too often to schedule failure compaction, skip it."
                    << "compaction_type=" << starrocks::to_string(candidate.type)
                    << ", min_compaction_failure_interval_sec=" << config::min_compaction_failure_interval_sec
                    << ", last_failure_timestamp=" << last_failure_ts / 1000 << ", tablet_id=" << tablet->tablet_id();
            return false;
        }
    }

    return true;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

在压缩执行前会进行一系列的校验,不满足条件就跳过

  1. null tablet
  2. 非running tablet
  3. 正在compaction的tablet
  4. 有cumulative/base lock的tablet
  5. 运行数量超过每个硬盘的并发任务上线
  6. 最近失败过并且未达到配置文件中约定的失败间隔(默认30s)的tablet

总结

StarRocks 2.5版本关于压缩主要是引入了一个新的模型,分段压缩
与传统的压缩模式相比,分段压缩将一个tablet分为了7段,每次只压缩相同段内的数据
通过分段压缩模式,cumulative不再只压缩1次,而是逐步向上一段传递,减少了base compaction触发次数,从而提升压缩效率。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号