赞
踩
本文是基于StarRocks 2.3.3版本源码阅读总结,不同版本源码可能有较大变化,仅供参考。
由于StarRocks的be是用c++语言写的,我的c++水平一般,所以自己对源码的解读可能有不正确的地方,欢迎大佬们指正。
在开始阅读源码之前先简单介绍一下StarRocks的compaction
机制
StarRocks为了保证数据写入的高性能,每一次有新的数据写入的时候,并不会直接写到旧的数据文件中,而是将这些新数据单独写到一个新文件中,称为一个single file
。
如果数据目录中的single file
过多,那么在查询的时候肯定性能会大幅降低,为此StarRocks有两种压缩机制来处理这些新写入的文件。
这是一种解决小文件过多的轻量级压缩机制,它不会将single file数据目录中的base file
压缩到一起,因为base file
太大了,可能会造成IO徒增影响集群性能,而是将多个single file
聚合到一起,聚合为一种cumulative file
。
默认情况下,每5个single file
生成都会触发一次cumulative compaction
。
它受到以下配置影响:
参数名 | 默认值 | 备注 |
---|---|---|
cumulative_compaction_check_interval_seconds | 1 | 线程检测周期,默认1s |
min_cumulative_compaction_num_singleton_deltas | 5 | 触发cumulative compaction的最小singleton file数量 |
max_cumulative_compaction_num_singleton_deltas | 1000 | 最大1次压缩多少个文件 |
cumulative_compaction_num_threads_per_disk | 1 | 每个磁盘用来处理cumulative 线程的数量 |
cumulative_compaction_skip_window_seconds | 30 | 跳过最新的single file时间,因为最新写入的数据可能会被马上查询,所以先不压缩 |
在执行了若干次cumulative compaction
后,细粒度的小文件问题得到了缓解,但是引入了新的小文件问题:cumulative file
太多了,还是会影响查询。
因此在达到了某些条件后,系统开始执行base compaction
压缩,来将所有的cumulative
合并为1个文件。
因为base compaction
操作比较重,吃磁盘IO比较高,因此一般来说执行频率不是很高。
它受到以下配置影响:
参数名 | 默认值 | 备注 |
---|---|---|
base_compaction_check_interval_seconds | 60 | 线程的检测周期,60s |
min_base_compaction_num_singleton_deltas | 5 | 最小的single文件数量,指的是被cumulative后的sigle文件数量 |
max_base_compaction_num_singleton_deltas | 100 | 单次BaseCompaction合并的最大segment数 |
base_compaction_num_threads_per_disk | 1 | 每个磁盘 BaseCompaction 线程的数目 |
base_cumulative_delta_ratio | 0.3 | Cumulative文件大小达到Base文件大小的比例 |
base_compaction_interval_seconds_since_last_operation | 86400 | 上一轮 BaseCompaction 距今的间隔,是触发 BaseCompaction 条件之一。 |
Status StorageEngine::start_bg_threads() { _update_cache_expire_thread = std::thread([this] { _update_cache_expire_thread_callback(nullptr); }); Thread::set_thread_name(_update_cache_expire_thread, "cache_expire"); LOG(INFO) << "update cache expire thread started"; _unused_rowset_monitor_thread = std::thread([this] { _unused_rowset_monitor_thread_callback(nullptr); }); Thread::set_thread_name(_unused_rowset_monitor_thread, "rowset_monitor"); LOG(INFO) << "unused rowset monitor thread started"; // start thread for monitoring the snapshot and trash folder _garbage_sweeper_thread = std::thread([this] { _garbage_sweeper_thread_callback(nullptr); }); Thread::set_thread_name(_garbage_sweeper_thread, "garbage_sweeper"); LOG(INFO) << "garbage sweeper thread started"; // start thread for monitoring the tablet with io error _disk_stat_monitor_thread = std::thread([this] { _disk_stat_monitor_thread_callback(nullptr); }); Thread::set_thread_name(_disk_stat_monitor_thread, "disk_monitor"); LOG(INFO) << "disk stat monitor thread started"; // convert store map to vector std::vector<DataDir*> data_dirs; for (auto& tmp_store : _store_map) { data_dirs.push_back(tmp_store.second); } int32_t data_dir_num = data_dirs.size(); if (!config::enable_event_based_compaction_framework) { // base and cumulative compaction threads int32_t base_compaction_num_threads_per_disk = std::max<int32_t>(1, config::base_compaction_num_threads_per_disk); int32_t cumulative_compaction_num_threads_per_disk = std::max<int32_t>(1, config::cumulative_compaction_num_threads_per_disk); int32_t base_compaction_num_threads = base_compaction_num_threads_per_disk * data_dir_num; int32_t cumulative_compaction_num_threads = cumulative_compaction_num_threads_per_disk * data_dir_num; // calc the max concurrency of compaction tasks int32_t max_compaction_concurrency = config::max_compaction_concurrency; if (max_compaction_concurrency < 0 || max_compaction_concurrency > base_compaction_num_threads + cumulative_compaction_num_threads) { max_compaction_concurrency = base_compaction_num_threads + cumulative_compaction_num_threads; } vectorized::Compaction::init(max_compaction_concurrency); _base_compaction_threads.reserve(base_compaction_num_threads); for (uint32_t i = 0; i < base_compaction_num_threads; ++i) { _base_compaction_threads.emplace_back([this, data_dir_num, data_dirs, i] { _base_compaction_thread_callback(nullptr, data_dirs[i % data_dir_num]); }); Thread::set_thread_name(_base_compaction_threads.back(), "base_compact"); } LOG(INFO) << "base compaction threads started. number: " << base_compaction_num_threads; _cumulative_compaction_threads.reserve(cumulative_compaction_num_threads); for (uint32_t i = 0; i < cumulative_compaction_num_threads; ++i) { _cumulative_compaction_threads.emplace_back([this, data_dir_num, data_dirs, i] { _cumulative_compaction_thread_callback(nullptr, data_dirs[i % data_dir_num]); }); Thread::set_thread_name(_cumulative_compaction_threads.back(), "cumulat_compact"); } LOG(INFO) << "cumulative compaction threads started. number: " << cumulative_compaction_num_threads; } else { // new compaction framework // compaction_manager must init_max_task_num() before any comapction_scheduler starts _compaction_manager->init_max_task_num(); _compaction_scheduler = std::thread([] { CompactionScheduler compaction_scheduler; compaction_scheduler.schedule(); }); Thread::set_thread_name(_compaction_scheduler, "compact_sched"); LOG(INFO) << "compaction scheduler started"; _compaction_checker_thread = std::thread([this] { compaction_check(); }); Thread::set_thread_name(_compaction_checker_thread, "compact_check"); LOG(INFO) << "compaction checker started"; } int32_t update_compaction_num_threads_per_disk = std::max<int32_t>(1, config::update_compaction_num_threads_per_disk); int32_t update_compaction_num_threads = update_compaction_num_threads_per_disk * data_dir_num; _update_compaction_threads.reserve(update_compaction_num_threads); for (uint32_t i = 0; i < update_compaction_num_threads; ++i) { _update_compaction_threads.emplace_back([this, data_dir_num, data_dirs, i] { _update_compaction_thread_callback(nullptr, data_dirs[i % data_dir_num]); }); Thread::set_thread_name(_update_compaction_threads.back(), "update_compact"); } LOG(INFO) << "update compaction threads started. number: " << update_compaction_num_threads; // tablet checkpoint thread for (auto data_dir : data_dirs) { _tablet_checkpoint_threads.emplace_back([this, data_dir] { _tablet_checkpoint_callback((void*)data_dir); }); Thread::set_thread_name(_tablet_checkpoint_threads.back(), "tablet_check_pt"); } LOG(INFO) << "tablet checkpoint thread started"; // fd cache clean thread _fd_cache_clean_thread = std::thread([this] { _fd_cache_clean_callback(nullptr); }); Thread::set_thread_name(_fd_cache_clean_thread, "fd_cache_clean"); LOG(INFO) << "fd cache clean thread started"; // path scan and gc thread if (config::path_gc_check) { for (auto data_dir : get_stores()) { _path_scan_threads.emplace_back([this, data_dir] { _path_scan_thread_callback((void*)data_dir); }); _path_gc_threads.emplace_back([this, data_dir] { _path_gc_thread_callback((void*)data_dir); }); Thread::set_thread_name(_path_scan_threads.back(), "path_scan"); Thread::set_thread_name(_path_gc_threads.back(), "path_gc"); } LOG(INFO) << "path scan/gc threads started. number:" << get_stores().size(); } LOG(INFO) << "all storage engine's background threads are started."; return Status::OK(); }
首先查看BE启动时的线程创建函数。
从if (!config::enable_event_based_compaction_framework) {
这行开始看
首先看到了一个enable_event_based_compaction_framework
的判断,这个配置源码里面默认值为false
顾名思义,应该是未来StarRocks会有一套新的压缩框架,那咱们接下来分别阅读新旧压缩框架的源码
旧压缩框架
// base and cumulative compaction threads
int32_t base_compaction_num_threads_per_disk =
std::max<int32_t>(1, config::base_compaction_num_threads_per_disk);
int32_t cumulative_compaction_num_threads_per_disk =
std::max<int32_t>(1, config::cumulative_compaction_num_threads_per_disk);
int32_t base_compaction_num_threads = base_compaction_num_threads_per_disk * data_dir_num;
int32_t cumulative_compaction_num_threads = cumulative_compaction_num_threads_per_disk * data_dir_num;
// calc the max concurrency of compaction tasks
int32_t max_compaction_concurrency = config::max_compaction_concurrency;
if (max_compaction_concurrency < 0 ||
max_compaction_concurrency > base_compaction_num_threads + cumulative_compaction_num_threads) {
max_compaction_concurrency = base_compaction_num_threads + cumulative_compaction_num_threads;
}
这几行用处不大,就是单纯的读取配置,获取base compaction和cumulative compaction的线程数
并且根据配置和上面线程数之和的到一个最大并发线程数max_compaction_concurrency vectorized::Compaction::init(max_compaction_concurrency);
这行就是把最大并发压缩线程数量放到一个变量中,未来每次有新的压缩任务运行时,都会判断一下当前并发线程数是否已经达到了这个值,如果达到了,就先不运行。
for (uint32_t i = 0; i < base_compaction_num_threads; ++i) { _base_compaction_threads.emplace_back([this, data_dir_num, data_dirs, i] { _base_compaction_thread_callback(nullptr, data_dirs[i % data_dir_num]); }); Thread::set_thread_name(_base_compaction_threads.back(), "base_compact"); } LOG(INFO) << "base compaction threads started. number: " << base_compaction_num_threads; _cumulative_compaction_threads.reserve(cumulative_compaction_num_threads); for (uint32_t i = 0; i < cumulative_compaction_num_threads; ++i) { _cumulative_compaction_threads.emplace_back([this, data_dir_num, data_dirs, i] { _cumulative_compaction_thread_callback(nullptr, data_dirs[i % data_dir_num]); }); Thread::set_thread_name(_cumulative_compaction_threads.back(), "cumulat_compact"); } LOG(INFO) << "cumulative compaction threads started. number: " << cumulative_compaction_num_threads;
创建具体的压缩线程,并且指定了线程扫描目录。
新压缩框架
compaction_manager must init_max_task_num() before any comapction_scheduler starts
_compaction_manager->init_max_task_num();
_compaction_scheduler = std::thread([] {
CompactionScheduler compaction_scheduler;
compaction_scheduler.schedule();
});
Thread::set_thread_name(_compaction_scheduler, "compact_sched");
LOG(INFO) << "compaction scheduler started";
_compaction_checker_thread = std::thread([this] { compaction_check(); });
Thread::set_thread_name(_compaction_checker_thread, "compact_check");
LOG(INFO) << "compaction checker started";
新压缩框架首先调用init_max_task_num初始化最大的压缩任务数。
然后并不会初始化创建所有的压缩线程,而是创建一个调度线程和一个检查线程。
调度线程定时进行压缩条件判断,每次有新的压缩任务满足条件时,如果当前运行的压缩任务数量没有达到最大的压缩任务数,就启动一个临时线程去处理压缩。
好处就是在压缩频率不高的场景下,剔除了那些闲置的压缩线程。但是在压缩频率很高的场景下,这个工作方式可能会降低压缩性能,因为只有1个线程去调度压缩任务了。
void CompactionManager::init_max_task_num() {
if (config::base_compaction_num_threads_per_disk >= 0 && config::cumulative_compaction_num_threads_per_disk >= 0) {
_max_task_num = static_cast<int32_t>(
StorageEngine::instance()->get_store_num() *
(config::cumulative_compaction_num_threads_per_disk + config::base_compaction_num_threads_per_disk));
} else {
// When cumulative_compaction_num_threads_per_disk or config::base_compaction_num_threads_per_disk is less than 0,
// there is no limit to _max_task_num if max_compaction_concurrency is also less than 0, and here we set maximum value to be 20.
_max_task_num = std::min(20, static_cast<int32_t>(StorageEngine::instance()->get_store_num() * 5));
}
if (config::max_compaction_concurrency > 0 && config::max_compaction_concurrency < _max_task_num) {
_max_task_num = config::max_compaction_concurrency;
}
}
这是判断最大并发任务的源码。
由上面的源码可知
如果配置了cumulative compaction per disk
和base compaction per disk
的线程数量,那么实例会初始化这两个之和乘以硬盘数量的线程数用于压缩,注意一点,就是两个配置都要配置,如果只配置一个是不行的。
如果上面的配置不满足要求,则默认创建(20和硬盘数量*5)的较小值作为初始化压缩线程数量。
如果单独配置了max_compaction_concurrency
限制并发压缩线程数量,并且该值小于前面计算的线程数量,则取这个配置的值作为初始化压缩线程数量。
主键模型压缩
int32_t update_compaction_num_threads_per_disk =
std::max<int32_t>(1, config::update_compaction_num_threads_per_disk);
int32_t update_compaction_num_threads = update_compaction_num_threads_per_disk * data_dir_num;
_update_compaction_threads.reserve(update_compaction_num_threads);
for (uint32_t i = 0; i < update_compaction_num_threads; ++i) {
_update_compaction_threads.emplace_back([this, data_dir_num, data_dirs, i] {
_update_compaction_thread_callback(nullptr, data_dirs[i % data_dir_num]);
});
Thread::set_thread_name(_update_compaction_threads.back(), "update_compact");
}
LOG(INFO) << "update compaction threads started. number: " << update_compaction_num_threads;
主键模型的更新时由单独的压缩线程完成的,这样线程隔离可以提升主键模型的性能,避免其他数据压缩过慢而影响主键模型的更新,除此之外不做过多赘述。
Base Compaction _base_compaction_thread_callback void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_dir) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif //string last_base_compaction_fs; //TTabletId last_base_compaction_tablet_id = -1; Status status = Status::OK(); while (!_bg_worker_stopped.load(std::memory_order_consume)) { // must be here, because this thread is start on start and if (!data_dir->reach_capacity_limit(0)) { status = _perform_base_compaction(data_dir); } else { status = Status::InternalError("data dir out of capacity"); } if (status.ok()) { continue; } int32_t interval = config::base_compaction_check_interval_seconds; if (interval <= 0) { LOG(WARNING) << "base compaction check interval config is illegal: " << interval << ", force set to 1"; interval = 1; } do { SLEEP_IN_BG_WORKER(interval); if (!_options.compaction_mem_tracker->any_limit_exceeded()) { break; } } while (true); } return nullptr; }
首先看_base_compaction_thread_callback
函数
be在启动时就创建了众多的base_compaction
,每个线程指定了该函数作为回调函数,并且传递了数据目录参数。
这个函数整体逻辑比较简单,两层while循环
每次执行完一次compaction
之后线程首先睡眠base_compaction_check_interval_seconds
时间,然后进行内存判断
通过compation_mem_tracker
函数检测是否有足够的内存进行compaction
,如果be内存不足,则compaction
线程继续休眠
如果有资源执行,则调用_perform_base_compaction
执行检索满足compaction
条件的tablet
并进行comapction
这个函数用来检测内存是否足够compaction用的
MemTracker* compaction_mem_tracker() { return _compaction_mem_tracker; } int64_t compaction_mem_limit = calc_max_compaction_memory(_mem_tracker->limit()); _compaction_mem_tracker = new MemTracker(compaction_mem_limit, "compaction", _mem_tracker); static int64_t calc_max_compaction_memory(int64_t process_mem_limit) { int64_t limit = config::compaction_max_memory_limit; int64_t percent = config::compaction_max_memory_limit_percent; if (config::compaction_memory_limit_per_worker < 0) { config::compaction_memory_limit_per_worker = 2147483648; // 2G } if (process_mem_limit < 0) { return -1; } if (limit < 0) { limit = process_mem_limit; } if (percent < 0 || percent > 100) { percent = 100; } return std::min<int64_t>(limit, process_mem_limit * percent / 100); }
calc_max_compaction_memory
函数用来计算每个compaction
线程能拿到多少内存,默认值是100%进程内存和2G取较小值。
可通过compaction_max_memory_limit
和compaction_max_memory_limit_percent
配置进行修改。
Status StorageEngine::_perform_base_compaction(DataDir* data_dir) { scoped_refptr<Trace> trace(new Trace); MonotonicStopWatch watch; watch.start(); SCOPED_CLEANUP({ // 执行时间超过compaction_trace_threshold(默认60s)的compaction,将记录一次info日志 if (watch.elapsed_time() / 1e9 > config::compaction_trace_threshold) { LOG(INFO) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL); } }); ADOPT_TRACE(trace.get()); TRACE("start to perform base compaction"); TabletSharedPtr best_tablet = // 调用find_best_tablet_to_compaction函数找到压缩优先级最高的tablet _tablet_manager->find_best_tablet_to_compaction(CompactionType::BASE_COMPACTION, data_dir); if (best_tablet == nullptr) { return Status::NotFound("there are no suitable tablets"); } TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id); StarRocksMetrics::instance()->base_compaction_request_total.increment(1); std::unique_ptr<MemTracker> mem_tracker = std::make_unique<MemTracker>(MemTracker::COMPACTION, -1, "", _options.compaction_mem_tracker); vectorized::BaseCompaction base_compaction(mem_tracker.get(), best_tablet); // ***调用compact函数进行压缩*** Status res = base_compaction.compact(); if (!res.ok()) { // 如果压缩失败了,记录一些信息 best_tablet->set_last_base_compaction_failure_time(UnixMillis()); if (!res.is_not_found()) { StarRocksMetrics::instance()->base_compaction_request_failed.increment(1); LOG(WARNING) << "failed to init vectorized base compaction. res=" << res.to_string() << ", table=" << best_tablet->full_name(); } return res; } best_tablet->set_last_base_compaction_failure_time(0); return Status::OK(); }
执行压缩的函数,因为函数较长,我直接把阅读理解放在备注里了。
重点在于find_best_tablet_to_compaction
和compaction
两个函数
TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType compaction_type, DataDir* data_dir) { int64_t now_ms = UnixMillis(); // 因为是公共方法,所以需要先判断一下压缩类型,用来分别处理 const std::string& compaction_type_str = compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative"; // only do compaction if compaction #rowset > 1 // 如果只有1个row set,那么不需要压缩 uint32_t highest_score = 1; TabletSharedPtr best_tablet; // 循环所有的_tablets_shards for (const auto& tablets_shard : _tablets_shards) { std::shared_lock rlock(tablets_shard.lock); for (auto [tablet_id, tablet_ptr] : tablets_shard.tablet_map) { // 跳过主键模型,主键模型由update compaction来实现压缩 if (tablet_ptr->keys_type() == PRIMARY_KEYS) { continue; } // 如果tablet正在执行表结构修改或者物化视图创建,则暂时不执行compaction AlterTabletTaskSharedPtr cur_alter_task = tablet_ptr->alter_task(); if (cur_alter_task != nullptr && cur_alter_task->alter_state() != ALTER_FINISHED && cur_alter_task->alter_state() != ALTER_FAILED) { TabletSharedPtr related_tablet = _get_tablet_unlocked(cur_alter_task->related_tablet_id()); if (related_tablet != nullptr && tablet_ptr->creation_time() > related_tablet->creation_time()) { // Current tablet is newly created during schema-change or rollup, skip it continue; } } // A not-ready tablet maybe a newly created tablet under schema-change, skip it // 如果tablet状态未not ready则暂时跳过 if (tablet_ptr->tablet_state() == TABLET_NOTREADY) { continue; } // 如果tablet所处的硬盘不是该compaction所负责的硬盘,则跳过 // 这块有一点性能问题,就是遍历的时候并不是只会遍历自己所负责的硬盘的tablet,而是全局的tablet,假设服务器有12块硬盘,那么检测线程至少有11/12的检测是没有意义的 // 感觉后续可以优化一下,每个硬盘的tablet单独放到一个队列中,compaction线程只轮询自己对应的硬盘的tablet if (tablet_ptr->data_dir()->path_hash() != data_dir->path_hash() || !tablet_ptr->is_used() || !tablet_ptr->init_succeeded() || !tablet_ptr->can_do_compaction()) { continue; } // 如果要compaction刚刚失败过,则短时间内不重试相同的tablet // 默认值为2min,可调整 int64_t last_failure_ms = tablet_ptr->last_cumu_compaction_failure_time(); if (compaction_type == CompactionType::BASE_COMPACTION) { last_failure_ms = tablet_ptr->last_base_compaction_failure_time(); } if (now_ms - last_failure_ms <= config::min_compaction_failure_interval_sec * 1000) { VLOG(1) << "Too often to check compaction, skip it." << "compaction_type=" << compaction_type_str << ", last_failure_time_ms=" << last_failure_ms << ", tablet_id=" << tablet_ptr->tablet_id(); continue; } // 如果锁获取失败,则跳过compaction // 一般来说就是正在有查询操作才会被lock // 这里有一点要注意的是,base compaction和cumulative的锁是不一样的 // 第二点要注意的是,这里尝试加锁成功之后马上解锁了,如果轮训列表很长,等找到优先级最高的tablet执行压缩时,可能又被锁了。 if (compaction_type == CompactionType::BASE_COMPACTION) { if (!tablet_ptr->get_base_lock().try_lock()) { continue; } tablet_ptr->get_base_lock().unlock(); } else { if (!tablet_ptr->get_cumulative_lock().try_lock()) { continue; } tablet_ptr->get_cumulative_lock().unlock(); } // 所有符合条件的tablet,计算一个优先级,然后优先级最高的那个tablet就是 // calc_base_compaction_score和calc_cumulative_compaction_score后面会讲到,这里先简单说一下 // calc_base_compaction_score的值为已经执行了cumulative compaction的文件总segments数 // calc_cumulative_compaction_score的值为未进行任何压缩的segments总数 // 默认配置下segments文件大小为1G,意思就是如果一个rowset为1.5G大小,那么它就会切分为2个segments进行存储,它的compaction score也就是2 uint32_t table_score = 0; { std::shared_lock rdlock(tablet_ptr->get_header_lock()); if (compaction_type == CompactionType::BASE_COMPACTION) { table_score = tablet_ptr->calc_base_compaction_score(); } else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { table_score = tablet_ptr->calc_cumulative_compaction_score(); } } if (table_score > highest_score) { highest_score = table_score; best_tablet = tablet_ptr; } } } // 将当前优先级最高的tablet以及对应的score输出日志并返回 // 并不代表着会真正进行compaction,还需要进一步判断这个最高优先级的tablet是否达到压缩标准 if (best_tablet != nullptr) { LOG(INFO) << "Found the best tablet to compact. " << "compaction_type=" << compaction_type_str << " tablet_id=" << best_tablet->tablet_id() << " highest_score=" << highest_score; // TODO(lingbin): Remove 'max' from metric name, it would be misunderstood as the // biggest in history(like peak), but it is really just the value at current moment. if (compaction_type == CompactionType::BASE_COMPACTION) { StarRocksMetrics::instance()->tablet_base_max_compaction_score.set_value(highest_score); } else { StarRocksMetrics::instance()->tablet_cumulative_max_compaction_score.set_value(highest_score); } } return best_tablet; }
const uint32_t Tablet::calc_base_compaction_score() const { uint32_t score = 0; // 找到cumulative的结束点 const int64_t point = cumulative_layer_point(); bool base_rowset_exist = false; for (auto& rs_meta : _tablet_meta->all_rs_metas()) { // 如果有一个副本的start_version是0,那么就代表着存在base_rowset // 这个地方主要是判断空目录的,空目录不需要进行compaction if (rs_meta->start_version() == 0) { base_rowset_exist = true; } // 跳过所有未进行cumulative compaction的rowset if (rs_meta->start_version() >= point) { // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here. continue; } // score += rs_meta->get_compaction_score(); } return base_rowset_exist ? score : 0; }
const uint32_t Tablet::calc_cumulative_compaction_score() const { uint32_t score = 0; bool base_rowset_exist = false; // point点为上一次完成cumulative的版本 // 因此该函数就是找到所有未进行过cumulative的rowset const int64_t point = cumulative_layer_point(); for (auto& rs_meta : _tablet_meta->all_rs_metas()) { if (rs_meta->start_version() == 0) { base_rowset_exist = true; } if (rs_meta->start_version() < point) { // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here. continue; } score += rs_meta->get_compaction_score(); } // If base doesn't exist, tablet may be altering, skip it, set score to 0 return base_rowset_exist ? score : 0; }
这个函数开始进入正题了
Status BaseCompaction::compact() { // 不重要,判断tablet的健康状态 if (!_tablet->init_succeeded()) { return Status::InvalidArgument("base compaction input parameter error."); } // 加锁,如果没加上锁,直接return std::unique_lock lock(_tablet->get_base_lock(), std::try_to_lock); if (!lock.owns_lock()) { return Status::OK(); } TRACE("got base compaction lock"); // 选择所有待合并的rowset,后续会专门看这个函数的。 // 1. pick rowsets to compact RETURN_IF_ERROR(pick_rowsets_to_compact()); TRACE("rowsets picked"); TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size()); MemTracker* prev_tracker = tls_thread_status.set_mem_tracker(_mem_tracker); DeferOp op([&] { tls_thread_status.set_mem_tracker(prev_tracker); }); // 执行压缩 // 2. do base compaction, merge rowsets RETURN_IF_ERROR(do_compaction()); TRACE("compaction finished"); // 压缩成功 // 3. set state to success _state = CompactionState::SUCCESS; // 上报日志 // 4. add metric to base compaction StarRocksMetrics::instance()->base_compaction_deltas_total.increment(_input_rowsets.size()); StarRocksMetrics::instance()->base_compaction_bytes_total.increment(_input_rowsets_size); TRACE("save base compaction metrics"); return Status::OK(); }
核心就2个函数值得深入阅读
1.pick_rowsets_to_compact
选择所有待压缩的rowset
2.do_compaction
执行压缩
选择所有待合并的rowsets
Status BaseCompaction::pick_rowsets_to_compact() { std::vector candidate_rowsets; _input_rowsets.clear(); // 调用这和函数生成condidate_rowsets _tablet->pick_candicate_rowsets_to_base_compaction(&candidate_rowsets); // 如果待合并rowsets就1个,不需要压缩 if (candidate_rowsets.size() <= 1) { return Status::NotFound("base compaction no suitable version error."); } // 判断rowsets之间是否连贯,并且不能有重叠的部分 std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); RETURN_IF_ERROR(check_version_continuity(candidate_rowsets)); RETURN_IF_ERROR(_check_rowset_overlapping(candidate_rowsets)); // 如果只有2个文件,并且第一个文件的最后一个版本号是1,那么不需要压缩 // 因为这种场景下,第一个文件是空文件 if (candidate_rowsets.size() == 2 && candidate_rowsets[0]->end_version() == 1) { // the tablet is with rowset: [0-1], [2-y] // and [0-1] has no data. in this situation, no need to do base compaction. return Status::NotFound("base compaction no suitable version error."); } // 临时rowsets std::vector transient_rowsets; size_t compaction_score = 0; // 循环所有候选者 for (auto& rowset : candidate_rowsets) { // 如果已达到最大压缩版本号限制,则不继续判断了 if (compaction_score >= config::max_base_compaction_num_singleton_deltas) { // got enough segments break; } // 否则累加所有候选者的segments数量,并将rowset放到临时rowset列表中 compaction_score += rowset->rowset_meta()->get_compaction_score(); transient_rowsets.push_back(rowset); } // 如果临时rowset不为空,那么将它放入input rowset中 if (!transient_rowsets.empty()) { _input_rowsets = transient_rowsets; } // 判断压缩积分是否达到启动base压缩的最小值,默认5 if (compaction_score >= config::min_base_compaction_num_singleton_deltas) { LOG(INFO) << "satisfy the base compaction policy. tablet=" << _tablet->full_name() << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1 << ", min_base_compaction_num_singleton_deltas=" << config::min_base_compaction_num_singleton_deltas; return Status::OK(); } // 判断cumulative文件与base文件之间的比例是否达到配置,默认30% // 2. the ratio between base rowset and all input cumulative rowsets reachs the threshold int64_t base_size = 0; int64_t cumulative_total_size = 0; for (auto& rowset : _input_rowsets) { if (rowset->start_version() != 0) { cumulative_total_size += rowset->data_disk_size(); } else { base_size = rowset->data_disk_size(); } } double base_cumulative_delta_ratio = config::base_cumulative_delta_ratio; if (base_size == 0) { // base_size == 0 means this may be a base version [0-1], which has no data. // set to 1 to void devide by zero base_size = 1; } double cumulative_base_ratio = static_cast(cumulative_total_size) / base_size; if (cumulative_base_ratio > base_cumulative_delta_ratio) { LOG(INFO) << "satisfy the base compaction policy. tablet=" << _tablet->full_name() << ", cumulative_total_size=" << cumulative_total_size << ", base_size=" << base_size << ", cumulative_base_ratio=" << cumulative_base_ratio << ", policy_ratio=" << base_cumulative_delta_ratio; return Status::OK(); } // 判断tablet的上次base压缩时间是否已经达到了间隔,默认86400s // 3. the interval since last base compaction reachs the threshold int64_t base_creation_time = _input_rowsets[0]->creation_time(); int64_t interval_threshold = config::base_compaction_interval_seconds_since_last_operation; int64_t interval_since_last_base_compaction = time(nullptr) - base_creation_time; if (interval_since_last_base_compaction > interval_threshold) { LOG(INFO) << "satisfy the base compaction policy. tablet=" << _tablet->full_name() << ", interval_since_last_base_compaction=" << interval_since_last_base_compaction << ", interval_threshold=" << interval_threshold; return Status::OK(); } LOG(INFO) << "don't satisfy the base compaction policy. tablet=" << _tablet->full_name() << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1 << ", cumulative_base_ratio=" << cumulative_base_ratio << ", interval_since_last_base_compaction=" << interval_since_last_base_compaction; // 如果都不满足,则不执行base compaction了 return Status::NotFound("base compaction no suitable version error."); }
重头戏,执行压缩函数
// 先简单判断一下并行数量是否已经达标,然后开始执行 Status Compaction::do_compaction() { _concurrency_sem.wait(); TRACE("got concurrency lock and start to do compaction"); Status st = do_compaction_impl(); _concurrency_sem.signal(); return st; } Status Compaction::do_compaction_impl() { OlapStopWatch watch; // 先统计segments总数,行数,size数 // 1. prepare input and output parameters int64_t segments_num = 0; int64_t total_row_size = 0; for (auto& rowset : _input_rowsets) { _input_rowsets_size += rowset->data_disk_size(); _input_row_num += rowset->num_rows(); segments_num += rowset->num_segments(); total_row_size += rowset->total_row_size(); } TRACE_COUNTER_INCREMENT("input_rowsets_data_size", _input_rowsets_size); TRACE_COUNTER_INCREMENT("input_row_num", _input_row_num); TRACE_COUNTER_INCREMENT("input_segments_num", segments_num); _output_version = Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()); // choose vertical or horizontal compaction algorithm // 判断是行式压缩还是列式压缩, auto iterator_num_res = Rowset::get_segment_num(_input_rowsets); if (!iterator_num_res.ok()) { LOG(WARNING) << "fail to get segment iterator num. tablet=" << _tablet->tablet_id() << ", err=" << iterator_num_res.status().to_string(); return iterator_num_res.status(); } size_t segment_iterator_num = iterator_num_res.value(); int64_t max_columns_per_group = config::vertical_compaction_max_columns_per_group; size_t num_columns = _tablet->num_columns(); // 行列判断函数,简单来说就是如果字段数量小于5则为行式压缩,否则为列式 CompactionAlgorithm algorithm = CompactionUtils::choose_compaction_algorithm(num_columns, max_columns_per_group, segment_iterator_num); if (algorithm == VERTICAL_COMPACTION) { CompactionUtils::split_column_into_groups(_tablet->num_columns(), _tablet->num_key_columns(), max_columns_per_group, &_column_groups); } // max rows per segment // 获得平均每个segments的行数,计算方式为1G/平均每行字节数 // 最大值为INT32 int64_t max_rows_per_segment = CompactionUtils::get_segment_max_rows(config::max_segment_file_size, _input_row_num, _input_rowsets_size); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output version is=" << _output_version.first << "-" << _output_version.second << ", max rows per segment=" << max_rows_per_segment << ", segment iterator num=" << segment_iterator_num << ", algorithm=" << CompactionUtils::compaction_algorithm_to_string(algorithm) << ", column group size=" << _column_groups.size() << ", columns per group=" << max_columns_per_group; // create rowset writer RETURN_IF_ERROR(CompactionUtils::construct_output_rowset_writer(_tablet.get(), max_rows_per_segment, algorithm, _output_version, &_output_rs_writer)); TRACE("prepare finished"); // 2. write combined rows to output rowset // 开始压缩,行列压缩方式不一样 Statistics stats; Status st; if (algorithm == VERTICAL_COMPACTION) { st = _merge_rowsets_vertically(segment_iterator_num, &stats); } else { st = _merge_rowsets_horizontally(segment_iterator_num, &stats); } if (!st.ok()) { LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << st << ", tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version.first << "-" << _output_version.second; return st; } TRACE("merge rowsets finished"); TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows); TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows); TRACE_COUNTER_INCREMENT("output_rows", stats.output_rows); auto res = _output_rs_writer->build(); if (!res.ok()) return res.status(); _output_rowset = std::move(res).value(); TRACE_COUNTER_INCREMENT("output_rowset_data_size", _output_rowset->data_disk_size()); TRACE_COUNTER_INCREMENT("output_row_num", _output_rowset->num_rows()); TRACE_COUNTER_INCREMENT("output_segments_num", _output_rowset->num_segments()); TRACE("output rowset built"); // 3. check correctness, commented for this moment. // 判断压缩是否成功 RETURN_IF_ERROR(check_correctness(stats)); TRACE("check correctness finished"); // 修改rowsets的元数据 // 4. modify rowsets in memory RETURN_IF_ERROR(modify_rowsets()); TRACE("modify rowsets finished"); // 5. update last success compaction time int64_t now = UnixMillis(); if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { _tablet->set_last_cumu_compaction_success_time(now); } else { _tablet->set_last_base_compaction_success_time(now); } LOG(INFO) << "succeed to do " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version.first << "-" << _output_version.second << ", input infos [segments=" << segments_num << ", rows=" << _input_row_num << ", disk size=" << _input_rowsets_size << "]" << ", output infos [segments=" << _output_rowset->num_segments() << ", rows=" << _output_rowset->num_rows() << ", disk size=" << _output_rowset->data_disk_size() << "]" << ". elapsed time=" << watch.get_elapse_second() << "s."; // warm-up this rowset // load合并后的rowset st = _output_rowset->load(); // only log load failure LOG_IF(WARNING, !st.ok()) << "ignore load rowset error tablet:" << _tablet->tablet_id() << " rowset:" << _output_rowset->rowset_id() << " " << st; return Status::OK(); }
选择压缩方式(行式or列式)
CompactionAlgorithm CompactionUtils::choose_compaction_algorithm(size_t num_columns, int64_t max_columns_per_group, size_t source_num) { // if the number of columns in the schema is less than or equal to max_columns_per_group, use HORIZONTAL_COMPACTION. // 字段数量和max_columns_per_group进行比较,如果小于等于5个字段,则用行式压缩 // max_columns_per_group 为 vertical_compaction_max_columns_per_group配置的值,默认为5 if (num_columns <= max_columns_per_group) { return HORIZONTAL_COMPACTION; } // if source_num is less than or equal to 1, heap merge iterator is not used in compaction, // and row source mask is not created. // if source_num is more than MAX_SOURCES, mask in RowSourceMask may overflow. // 这块基本不用关心,如果segments数量为1或者大于32737时也为行式 if (source_num <= 1 || source_num > vectorized::RowSourceMask::MAX_SOURCES) { return HORIZONTAL_COMPACTION; } // 否则为列式 return VERTICAL_COMPACTION; }
获取segment的平均行数
uint32_t CompactionUtils::get_segment_max_rows(int64_t max_segment_file_size, int64_t input_row_num,
int64_t input_size) {
// The range of config::max_segments_file_size is between [1, INT64_MAX]
// If the configuration is set wrong, the config::max_segments_file_size will be a negtive value.
// Using division instead multiplication can avoid the overflow
// 相当于 1G / (待压缩的总字节数 / (待压缩的总行数 + 1) + 1)
int64_t max_segment_rows = max_segment_file_size / (input_size / (input_row_num + 1) + 1);
if (max_segment_rows > INT32_MAX || max_segment_rows <= 0) {
max_segment_rows = INT32_MAX;
}
return max_segment_rows;
}
创建一个rowsetwriter
Status CompactionUtils::construct_output_rowset_writer(Tablet* tablet, uint32_t max_rows_per_segment, CompactionAlgorithm algorithm, Version version, std::unique_ptr* output_rowset_writer) { RowsetWriterContext context(kDataFormatV2, config::storage_format_version); context.rowset_id = StorageEngine::instance()->next_rowset_id(); context.tablet_uid = tablet->tablet_uid(); context.tablet_id = tablet->tablet_id(); context.partition_id = tablet->partition_id(); context.tablet_schema_hash = tablet->schema_hash(); context.rowset_type = BETA_ROWSET; context.rowset_path_prefix = tablet->schema_hash_path(); context.tablet_schema = &(tablet->tablet_schema()); context.rowset_state = VISIBLE; context.version = version; context.segments_overlap = NONOVERLAPPING; context.max_rows_per_segment = max_rows_per_segment; context.writer_type = (algorithm == VERTICAL_COMPACTION ? RowsetWriterType::kVertical : RowsetWriterType::kHorizontal); // 调用 create_rowset_writer函数进行创建 Status st = RowsetFactory::create_rowset_writer(context, output_rowset_writer); if (!st.ok()) { std::stringstream ss; ss << "Fail to create rowset writer. tablet_id=" << context.tablet_id << " err=" << st; LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } return Status::OK(); }
创建writer,默认值场景下没什么区别,就是根据行列压缩格式生成不同的writer
Status RowsetFactory::create_rowset_writer(const RowsetWriterContext& context, std::unique_ptr* output) { if (UNLIKELY(context.rowset_type != BETA_ROWSET)) { return Status::NotSupported("unsupported rowset type"); } auto tablet_schema = context.tablet_schema; auto memory_format_version = context.memory_format_version; auto storage_format_version = context.storage_format_version; if (memory_format_version == kDataFormatUnknown) { if (tablet_schema->contains_format_v1_column()) { memory_format_version = kDataFormatV1; } else if (tablet_schema->contains_format_v2_column()) { memory_format_version = kDataFormatV2; } else { memory_format_version = storage_format_version; } } if (storage_format_version != kDataFormatV1 && storage_format_version != kDataFormatV2) { LOG(WARNING) << "Invalid storage format version " << storage_format_version; return Status::InvalidArgument("invalid storage_format_version"); } if (memory_format_version != kDataFormatV1 && memory_format_version != kDataFormatV2) { LOG(WARNING) << "Invalid memory format version " << memory_format_version; return Status::InvalidArgument("invalid memory_format_version"); } if (memory_format_version != storage_format_version) { auto adapter_context = context; adapter_context.memory_format_version = memory_format_version; *output = std::make_unique(adapter_context); } else if (context.writer_type == kHorizontal) { *output = std::make_unique(context); } else { DCHECK(context.writer_type == kVertical); *output = std::make_unique(context); } return (*output)->init(); }
垂直压缩,在列数比较多的场景下,会更好的利用CPU资源
Status Compaction::_merge_rowsets_vertically(size_t segment_iterator_num, Statistics* stats_output) { TRACE_COUNTER_SCOPE_LATENCY_US("merge_rowsets_latency_us"); auto mask_buffer = std::make_unique(_tablet->tablet_id(), _tablet->data_dir()->path()); auto source_masks = std::make_unique>(); // 将字段分为多组,按组进行压缩 for (size_t i = 0; i < _column_groups.size(); ++i) { bool is_key = (i == 0); if (!is_key) { // read mask buffer from the beginning mask_buffer->flip_to_read(); } Schema schema = ChunkHelper::convert_schema_to_format_v2(_tablet->tablet_schema(), _column_groups[i]); TabletReader reader(_tablet, _output_rs_writer->version(), schema, is_key, mask_buffer.get()); RETURN_IF_ERROR(reader.prepare()); TabletReaderParams reader_params; reader_params.reader_type = compaction_type(); reader_params.profile = _runtime_profile.create_child("merge_rowsets"); int64_t total_num_rows = 0; int64_t total_mem_footprint = 0; // 循环所有rowsets for (auto& rowset : _input_rowsets) { if (rowset->rowset_meta()->rowset_type() != BETA_ROWSET) { continue; } total_num_rows += rowset->num_rows(); auto* beta_rowset = down_cast(rowset.get()); // 循环所有segments for (auto& segment : beta_rowset->segments()) { // 按column_group循环 for (uint32_t column_index : _column_groups[i]) { const auto* column_reader = segment->column(column_index); if (column_reader == nullptr) { continue; } // 累加这一组列的总size total_mem_footprint += column_reader->total_mem_footprint(); } } } // 根据函数获取chunk_size,基本上就是默认值4096 int32_t chunk_size = CompactionUtils::get_read_chunk_size(config::compaction_memory_limit_per_worker, config::vector_chunk_size, total_num_rows, total_mem_footprint, segment_iterator_num); VLOG(1) << "tablet=" << _tablet->tablet_id() << ", column group=" << i << ", reader chunk size=" << chunk_size; reader_params.chunk_size = chunk_size; RETURN_IF_ERROR(reader.open(reader_params)); int64_t output_rows = 0; auto chunk = ChunkHelper::new_chunk(schema, reader_params.chunk_size); auto char_field_indexes = ChunkHelper::get_char_field_indexes(schema); Status status; // 判断进程存活,并且有足够的内存处理压缩,循环加载,每次加载4kb的数据 while (!ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped()) { #ifndef BE_TEST status = tls_thread_status.mem_tracker()->check_mem_limit("Compaction"); if (!status.ok()) { LOG(WARNING) << "fail to execute compaction: " << status.message() << std::endl; return status; } #endif chunk->reset(); status = reader.get_next(chunk.get(), source_masks.get()); if (!status.ok()) { if (status.is_end_of_file()) { break; } else { LOG(WARNING) << "reader get next error. tablet=" << _tablet->tablet_id() << ", err=" << status.to_string(); return Status::InternalError(fmt::format("reader get_next error: {}", status.to_string())); } } // 填充char类型默认值0,用来给bitmap或者zone map用 ChunkHelper::padding_char_columns(char_field_indexes, schema, _tablet->tablet_schema(), chunk.get()); // 输出 if (auto st = _output_rs_writer->add_columns(*chunk, _column_groups[i], is_key); !st.ok()) { LOG(WARNING) << "writer add chunk by columns error. tablet=" << _tablet->tablet_id() << ", err=" << st; return st; } if (is_key) { output_rows += chunk->num_rows(); if (!source_masks->empty()) { RETURN_IF_ERROR(mask_buffer->write(*source_masks)); } } if (!source_masks->empty()) { source_masks->clear(); } } if (ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped()) { return Status::InternalError("Process is going to quit. The compaction will stop."); } if (is_key && stats_output != nullptr) { stats_output->output_rows = output_rows; stats_output->merged_rows = reader.merged_rows(); stats_output->filtered_rows = reader.stats().rows_del_filtered; } if (auto st = _output_rs_writer->flush_columns(); !st.ok()) { LOG(WARNING) << "failed to flush column group when merging rowsets of tablet " << _tablet->tablet_id() << ", err=" << st; return st; } if (is_key) { RETURN_IF_ERROR(mask_buffer->flush()); } } if (auto st = _output_rs_writer->final_flush(); !st.ok()) { LOG(WARNING) << "failed to final flush rowset when merging rowsets of tablet " << _tablet->tablet_id() << ", err=" << st; return st; } return Status::OK(); }
水平压缩,和垂直压缩思路差不多,就是不用每5列1组进行压缩了
Status Compaction::_merge_rowsets_vertically(size_t segment_iterator_num, Statistics* stats_output) { TRACE_COUNTER_SCOPE_LATENCY_US("merge_rowsets_latency_us"); auto mask_buffer = std::make_unique(_tablet->tablet_id(), _tablet->data_dir()->path()); auto source_masks = std::make_unique>(); for (size_t i = 0; i < _column_groups.size(); ++i) { bool is_key = (i == 0); if (!is_key) { // read mask buffer from the beginning mask_buffer->flip_to_read(); } Schema schema = ChunkHelper::convert_schema_to_format_v2(_tablet->tablet_schema(), _column_groups[i]); TabletReader reader(_tablet, _output_rs_writer->version(), schema, is_key, mask_buffer.get()); RETURN_IF_ERROR(reader.prepare()); TabletReaderParams reader_params; reader_params.reader_type = compaction_type(); reader_params.profile = _runtime_profile.create_child("merge_rowsets"); int64_t total_num_rows = 0; int64_t total_mem_footprint = 0; for (auto& rowset : _input_rowsets) { if (rowset->rowset_meta()->rowset_type() != BETA_ROWSET) { continue; } total_num_rows += rowset->num_rows(); auto* beta_rowset = down_cast(rowset.get()); for (auto& segment : beta_rowset->segments()) { for (uint32_t column_index : _column_groups[i]) { const auto* column_reader = segment->column(column_index); if (column_reader == nullptr) { continue; } total_mem_footprint += column_reader->total_mem_footprint(); } } } int32_t chunk_size = CompactionUtils::get_read_chunk_size(config::compaction_memory_limit_per_worker, config::vector_chunk_size, total_num_rows, total_mem_footprint, segment_iterator_num); VLOG(1) << "tablet=" << _tablet->tablet_id() << ", column group=" << i << ", reader chunk size=" << chunk_size; reader_params.chunk_size = chunk_size; RETURN_IF_ERROR(reader.open(reader_params)); int64_t output_rows = 0; auto chunk = ChunkHelper::new_chunk(schema, reader_params.chunk_size); auto char_field_indexes = ChunkHelper::get_char_field_indexes(schema); Status status; while (!ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped()) { #ifndef BE_TEST status = tls_thread_status.mem_tracker()->check_mem_limit("Compaction"); if (!status.ok()) { LOG(WARNING) << "fail to execute compaction: " << status.message() << std::endl; return status; } #endif chunk->reset(); status = reader.get_next(chunk.get(), source_masks.get()); if (!status.ok()) { if (status.is_end_of_file()) { break; } else { LOG(WARNING) << "reader get next error. tablet=" << _tablet->tablet_id() << ", err=" << status.to_string(); return Status::InternalError(fmt::format("reader get_next error: {}", status.to_string())); } } ChunkHelper::padding_char_columns(char_field_indexes, schema, _tablet->tablet_schema(), chunk.get()); if (auto st = _output_rs_writer->add_columns(*chunk, _column_groups[i], is_key); !st.ok()) { LOG(WARNING) << "writer add chunk by columns error. tablet=" << _tablet->tablet_id() << ", err=" << st; return st; } if (is_key) { output_rows += chunk->num_rows(); if (!source_masks->empty()) { RETURN_IF_ERROR(mask_buffer->write(*source_masks)); } } if (!source_masks->empty()) { source_masks->clear(); } } if (ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped()) { return Status::InternalError("Process is going to quit. The compaction will stop."); } if (is_key && stats_output != nullptr) { stats_output->output_rows = output_rows; stats_output->merged_rows = reader.merged_rows(); stats_output->filtered_rows = reader.stats().rows_del_filtered; } if (auto st = _output_rs_writer->flush_columns(); !st.ok()) { LOG(WARNING) << "failed to flush column group when merging rowsets of tablet " << _tablet->tablet_id() << ", err=" << st; return st; } if (is_key) { RETURN_IF_ERROR(mask_buffer->flush()); } } if (auto st = _output_rs_writer->final_flush(); !st.ok()) { LOG(WARNING) << "failed to final flush rowset when merging rowsets of tablet " << _tablet->tablet_id() << ", err=" << st; return st; } return Status::OK(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。