赞
踩
顺序写:level db不需要在各个level中去修改位置,而是只要放到最新的memTable中插入,所以在读取数据时如果在上层找到了数据或者数据flag是已删除就不需要继续寻找了。所以其能够提供比读更好的性能。
LevelDB 中的 sequence number 是一个全局的编号,每次写入(注意批量写入时 sequence number 是相同的)时,该编号会递增。
hash索引,需要将所有数据都读入到内存,对于大规模数据读不到内存里就不能使用hash索引。
memTable是基于跳表实现的结构。node格式,数据在memtable上是有序的,担当memtable到达一定大小后,就会转化为immutable之后放入到level 0,每一层level里面又有多个文件,但是在level 0中单个文件内是有序的,文件间可能是无序的有重叠,但是在其他level 层中文件间和文件内都是有序的。同时每个文件中也包含了多个block。(问题1:如果在immutable minor compact到sstable中的时候memtable马上又满了leveldb会停止写入数据吗?答案是会停止写入),
使用protobuf varint编码,将internal_key_size和value_size由定长存储改为变长存储。
sequence:为该操作的编号数,编号越大表示该操作越新。
type:由于没有删除操作,所以通过type类型来判断是写入操作还是删除操作
MemTable
相关的有多种 key,本质上就是上图里三种:
sequence
type
,用于支持同一 key 的多次操作,以及 snapshot,即内部key.explicit MemTable(const InternalKeyComparator& comparator); //初始化接收一个比较函数 //提供两种接口 void Add(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value); bool Get(const LookupKey& key, std::string* value, Status* s); //通过迭代器形式 暴露底层skiplist Iterator* MemTable::NewIterator() { return new MemTableIterator(&table_); } struct KeyComparator { const InternalKeyComparator comparator; explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } int operator()(const char* a, const char* b) const; }; friend class MemTableIterator; friend class MemTableBackwardIterator; typedef SkipList<const char*, KeyComparator> Table; KeyComparator comparator_; int refs_; Arena arena_; Table table_; //底层skiplist
add:通过比较来插入,组装memtable_key的过程
get:userkey不存在的两种情况,1:没有userkey 2:存在userkey,但是最大的seq对应的type是删除
每个block的长度最大为0x8000 = 32k,根据图中日志结构可知日志是变长的,只能从前往后遍历block,type的作用就是当 当前block的空间写不下那么多data时来标识该data跨Block了。
日志写入类的关系,因为日志只有添加,所以用不到其他Close等函数。PosixW是WriteableFile的子类,PosixW和Log::Writer是聚合关系(代表整体与部分的关系,比如Writer的功能知识PosixW的一部分)
Object * p = new (address) ClassConstruct(...);
//先分配一对内存
int* buff = new int;
memset(buff,0,sizeof(int));
//此处new的placement new,在buff的内存上构造int对象,不需要分配额外的内存
int *p = new (buff)int(3);
std::cout << *p << std::endl; //3
第一层:n 第二层:n/2 第三层:n/4元素 但在实际实现环节要想一直保持如此高的性能,需要经常调整结构,所以在实现时使用抛硬币的方法,从height=1开始每抛一次正面增加一层,同时一般也会设置最大层数。因为memtable中不需要删除元素,所以在leveldb代码中只提供了插入和查找两个接口。
template<typename Key, class Comparator> class SkipList //类模板 void Insert(const Key& key); //插入函数接口 bool Contains(const Key& key) const; //查询函数接口 //成员变量 Comparator const compare_; //比较函数 Arena* const arena_; //leveldb的内存池 在该处用于分配node Node* const head_; //跳表中的节点 对应一列 port::AtomicPointer max_height_; // Random rnd_; //随机数产生器 //构造函数 template<typename Key, class Comparator> SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena) : compare_(cmp), arena_(arena), head_(NewNode(0 /* any key will do */, kMaxHeight)), max_height_(reinterpret_cast<void*>(1)), //当前最大高度为1 rnd_(0xdeadbeef) { for (int i = 0; i < kMaxHeight; i++) { head_->SetNext(i, nullptr); //初始化head_高度为最大高度 后面节点在高度大于最大高度时能够使用head_作为prev节点 } }
template<typename Key, class Comparator> struct SkipList<Key,Comparator>::Node { explicit Node(const Key& k) : key(k) { } Key const key;//数据本身 //获取该节点在第n层的后继节点 Node* Next(int n) { assert(n >= 0); return reinterpret_cast<Node*>(next_[n].Acquire_Load()); } //设置该节点在第n层的后继节点 void SetNext(int n, Node* x) { assert(n >= 0); next_[n].Release_Store(x); } //不能保证线程安全 Node* NoBarrier_Next(int n) { assert(n >= 0); return reinterpret_cast<Node*>(next_[n].NoBarrier_Load()); } void NoBarrier_SetNext(int n, Node* x) { assert(n >= 0); next_[n].NoBarrier_Store(x); } private: // 作为Node的最后一个成员变量 // 由于Node通过placement new的方式构造,因此next_实际上是一个不定长的数组 // 数组长度即该节点的高度 // next_记录了该节点在所有层的后继节点,0是最底层链表。 port::AtomicPointer next_[1]; }; //所有的 Node 对象都通过NewNode构造出来:先通过arena_分配内存,然后通过 placement new 的方式调用 Node 的构造函数。 template<typename Key, class Comparator> typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::NewNode(const Key& key, int height) { //额外存储(height - 1)个port::AtomicPointer char* mem = arena_->AllocateAligned( sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1)); return new (mem) Node(key); }
template<typename Key, class Comparator> void SkipList<Key,Comparator>::Insert(const Key& key) { Node* prev[kMaxHeight]; //prev记录每一层最后一个 < key的节点,也就是待插入节点的前驱节点 Node* x = FindGreaterOrEqual(key, prev); // Our data structure does not allow duplicate insertion assert(x == nullptr || !Equal(key, x->key)); //随机决定节点高度height int height = RandomHeight(); //如果新的高度比当前所有节点高度都大,那么填充prev更高层为head_,同时更新max_height_ if (height > GetMaxHeight()) { for (int i = GetMaxHeight(); i < height; i++) { prev[i] = head_; //prev 保存的是node节点 也就是跳表的一整列 } max_height_.NoBarrier_Store(reinterpret_cast<void*>(height)); } //构造Node,高度为height x = NewNode(key, height); //插入节点x到prev及prev->next中间 for (int i = 0; i < height; i++) { x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); // 先修改x节点,再修改prev节点 prev[i]->SetNext(i, x); } }
template<typename Key, class Comparator> //有点楞 前面不是说1/2概率往上增加嘛 下面总结有解答
int SkipList<Key,Comparator>::RandomHeight() {
// Increase height with probability 1 in kBranching
static const unsigned int kBranching = 4;
int height = 1;
// 1/4概率继续增加height
while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) {
height++;
}
assert(height > 0);
assert(height <= kMaxHeight);
return height;
}
template<typename Key, class Comparator> typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev) const { Node* x = head_; int level = GetMaxHeight() - 1; while (true) { Node* next = x->Next(level); if (KeyIsAfterNode(key, next)) {//如果next->key < key // Keep searching in this list x = next; } else {//如果next->key >= key //notes:如果单纯为了判断是否相等,这里可以加一个判断直接返回了,没必>要level--到0再返回,不过复杂度没有变化 if (prev != nullptr) prev[level] = x;//prev记录该level最后一个<key的节点 if (level == 0) {//到达最底层则返回next (next是第一个>=key的节点) return next; } else { // Switch to next list level--; } } } }
template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
//x记录第一个>= key的Node
//注意FindGreaterOrEqual是查找>=key的Node,因此会迭代直到level = 0才返回
//实际上可以实现一个接口直接查找==key的Node,这样会在level >=0 时就能返回,查找的时间复杂度不变,不过可以预期减少比较次数。
Node* x = FindGreaterOrEqual(key, nullptr);
//判断x->key == key
if (x != nullptr && Equal(key, x->key)) {
return true;
} else {
return false;
}
}
kBranching = 4
和kMaxHeight = 12
,不影响复杂度的情况下,可以最多支持4**11 = 4194304
个节点,因此在百万节点左右,这么设置参数效果最优。memory order
,以及Insert
里执行语句的前后顺序;多个写操作之间存在竞争关系,需要锁控制。Insert
里设置max_height_
前的那段注释,读线程可能读到旧的或者新的值,无论是哪种值,写线程都可能在更新SkipList
,因为后面更新是从低往高更新,而读是从高往低读,所以当读到新的节点的时候,继续往下层,一定是能读到正确值的。sstable则是由一系列block组成,有data block(键值对,同时为了增加读取速度增加了index 索引),filter block等等,index block就是在个data block后面保存一个大于该data block所有key的值(结构类似下面这种把entry改为data block,restart pointer改为索引行,这样就能直接使用二分快速查找了。
index block,meta index block 都采用相同的数据格式,由类 BlockBuilder 负责生成。
BlockBuilder
用于格式化传入的 key:value
数据,采用了 share key 的手段来优化存储的数据大小。
注意BlockBuilder
本身并不与存储打交道,所有数据都格式化到了内存,通过Finish
接口返回数据,由更上层对象写入到文件。
其采用合并相同前缀来节省空间(和其前一个key相同的部分),但为了使读取更高效,每N条entry则不再应用该规则。 restart pointer则记录了每个没有使用合并前缀的entry地址,同时又因为entry是有序的,所以可以在restart pointer之间进行二分查找。
为了加快SST中数据查询的效率,在直接查询DataBlock中的内容之前,会先根据FilterBlock中的过滤数据判定DataBlock中是否有需要查询的数据,若判断不存在,则无需对整个DataBlock进行数据查找。
FilterBlock存储的是DataBlock数据的一些过滤信息,这些过滤数据一般指代布隆过滤器的数据,用于加快查询的速度。每个DataBlock在FilterBlock中对应一个FilterData。
2.3 数据写入操作
2.4 数据读取操作
block和filter block都是 sstable 的一个组件,负责构造部分数据格式。
sstable被设计用于存储大量的 {key:value} 数据,当我们在 leveldb 查找某个 key 时,可能需要逐层查找多个 sstable 文件。
因此,sstable 在文件格式设计上,主要考虑:
data block用于存储原始数据,同时为了方便磁盘查找,每个data block被设定为固定大小默认值为4K。
同时每一个data block对应一行信息,记录3要素:
>= block 内所有的 key
被称为index block,其存储格式也是key value
key = data_block_key
value = (offset + size)
filter block,目前使用的是布隆过滤器,在查找key时,先通过filter block判断是否存在,如果不存在直接跳过对应的data block。同时在设计level db时预计会包含其他很多索引block,但目前就只有filter block,所以meta block等价于filter block,其里面只包含一组{key:value}数据就是找到filter block
footer中包含了以下信息:
index of data block'index //index block位置
index of mata block'index //meta index block位置
footer 需要首先读取、解析出来,然后才能“按图索骥”找到其他 block,因此 footer 是定长的,而且位置固定在文件尾部。之后得到index block和meta index block位置,之后就可以从index block读到data block位置,meta index block中读到meta block位置。
<beginning_of_file>
[data block 1]
[data block 2]
...
[data block N]
[meta block 1]
...
[meta block K]
[metaindex block]
[index block]
[Footer] (fixed size; starts at file_size - sizeof(Footer))
<end_of_file>
bloom filter是一种数据结构,作用类似于 hash table,相对于后者,空间利用率更高。
不过这种高利用率是有代价的,当我们在 bloom filter 查找 key 时,有返回两种情况:
也就是说 bloom filter 具有一定的误判率。
先介绍下 bloom filter 的几个组成:
h1, h2, ..., hk
,hash 结果为{1, 2, ..., m} or {0, 1, ..., m-1})
具体的,对于 key=a
,经过 k 个 hash 函数后结果为
h1(a), h2(a), ..., hk(a)
那么就将 v 对应的 bit 置为 1.
假定 k 为 4,对应的 bloom filter 为:
注:这里有一个js写的一篇博客,支持互动的查看 bloom filter,更形象一些.
当 key 越来越多,v 里置为 1 的 bits 越来越多。对于某个不存在的 key’,k 个 hash 函数对应的 bit 可能正好为1,此时就概率发生误判,更专业的术语称为 false positive,或者 false drop.
因此,我们称 bloom filter 是一种概率型的数据结构,当返回某个 key’ 存在时,只是说明可能存在。
m 越大,k 越大, n 越小,那么 false positive越小。
更进一步,bloom filter 是关于空间和 false positive 的 tradeoff,bloom filter 的算法其实并不复杂,其真正的艺术在于这种平衡。
我们先看下 tradeoff 的结论:
hash 函数 k 的最优个数为 ln2 * (m/n).
minor compaction就是将memtable中的数据转化为immetable并写入到sstable中,同时其不总是直接写入到level 0,如果imm中的数据和level 1层的数据没有交集也会插入到level 1层。同时由于将imm数据写入到level磁盘中需要时间,如果太慢则会导致mem又满了而阻塞写入。后台有线程会执行以下代码来检测是否需要compaction并且minor的优先级比major高:
//实际Compact
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
//如果immutable memtable存在,则本次先compact,即Minor Compaction
if (imm_ != nullptr) {
CompactMemTable();
return;
}
...
// major compaction
CompactMemTable
(即minor compaction)主要流程分为三部分:
WriteLevel0Table(imm_, &edit, base)
:imm_
落盘成为新的 sst 文件,文件信息记录到 edit
versions_->LogAndApply(&edit, &mutex_)
:因为compaction会生成新文件,同时旧文件可能还有人使用所以不能删,所以会保存多个版本信息,在compaction后将本次文件更新信息versions_
,当前的文件(包含新的 sst 文件)作为数据库的一个最新状态,后续读写都会基于该状态,(具体作用请看版本管理)DeleteObsoleeteFiles
:删除一些无用文件imm_
持久化为 sstable 文件后,文件的相关信息通过meta
返回
{ mutex_.Unlock(); //更新memtable中全部数据到xxx.ldb文件 //meta记录key range, file_size等sst信息 s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); mutex_.Lock(); } struct FileMetaData { int refs; int allowed_seeks; // Seeks allowed until compaction uint64_t number; uint64_t file_size; // File size in bytes InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { } };
meta
包括文件的 key range,大小,文件的引用数(当引用数为0时会从磁盘删除)等。
查找合适的 level 将新文件记录到edit
:
//为新生成sstable选择合适的level(不一定总是0)
if (base != nullptr) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
//level及file meta记录到edit
edit->AddFile(level, meta.number, meta.file_size,
meta.smallest, meta.largest);
minor compaction 比较简单,因为只新增了一个 sstable 文件,加入后调用LogAndApply
生效到新版本。
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
//应用edit
s = versions_->LogAndApply(&edit, &mutex_);
}
因为通过compaction会增加文件,存在新文件和老文件同时存在的情况,如果老文件还在执行操作此时不能删除。只有当其彻底没人使用时才能将老文件删除。
VersionEdit
即 delta,最重要的两个成员变量就是新增与删除文件:
DeletedFileSet deleted_files_;//待删除文件
//新增文件,例如immutable memtable dump后就会添加到new_files_
std::vector< std::pair<int, FileMetaData> > new_files_;
VersionEdit在每次compaction后都会调用该接口将新文件放入。同时在老文件没人使用时也会放入。
Version
用于表示某次 compaction 后的数据库状态,管理当前的文件集合,因此最重要一个成员变量files_
表示每一层的全部 sstable 文件。
// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];
就是为刚从 memtable 持久化的 sstable,选择一个合适的 level.
Builder是一个辅助类,实现Version + VersionEdit = Version‘,其中+ =分别对应Apply和SaveTo两个接口。
成员变量也是记录所有的 delta,levels_
存储了每一层的added_files
及deleted_files
:
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
};
VersionSet* vset_;
Version* base_;
LevelState levels_[config::kNumLevels];//每一层的新增及删除文件
**Apply:**将VersionEdit中记录的文件更新到levels_中
**SaveTo:**将levels_中的文件更新和base_合并生成一个新的Version v
,同时需要保证v中每一层文件之间的顺序。
随着Builder
不断执行,新的version
被构造出来。VersionSet
就负责管理多个版本,对应的变量全局唯一,在DBImpl
构造函数里初始化:
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {
管理一个双向链表
Version dummy_versions_; // 循环链表的头部.
Version* current_; // 最新版本
current_
指向最新的版本。
因此class Version
实际上还有三个重要的链表相关成员变量:
VersionSet* vset_; // Version中指向VersionSet
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu)
的主要做了几件事,这个函数会在compaction中使用:
edit
应用于current_
生成一个新的Version
Version
下,下次 major compact 的文件 Finalize(v);
由这个函数计算Version
添加到双向链表,current_ = 新Version
首先是生成新Version
:
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
对于major compaction有两种,一种是文件多次seek但是没有查找到数据(可能该key在这个文件里被删除了,或者在其他文件里,比如level 0是允许文件之间有重叠key范围的)。另外一种是当该level中文件过大时执行。
// 1. 1次seek花费10ms
// 2. 1M读写花费10ms
// 3. 1M文件的compact需要25M IO(读写10-12MB的下一层文件),为什么10-12M?经验值?
// 因此1M的compact时间 = 25次seek时间 = 250ms
// 也就是40K的compact时间 = 1次seek时间,保守点取16KB,即t = 16K的compact时间 = 1次seek时间
// compact这个文件的时间: file_size / 16K
// 如果文件seek很多次但是没有找到key,时间和已经比compact时间要大,就应该compact了
// 这个次数记录到f->allowed_seeks
f->allowed_seeks = (f->file_size / 16384);//16KB
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
compaction 另外一个直观的想法就是,当某一层文件变得很大,往往意味着冗余数据过多,应该 compact 以避免占用磁盘以及读取过慢的问题。
level 越大,我们可以认为数据越“冷”,读取的几率越小,因此大的 level,能“容忍”的程度就越高,给的文件大小阈值越大。
具体的,当产生新版本时,遍历所有的层,比较该层文件总大小与基准大小,得到一个最应当 compact 的层。
这个步骤,在VersionSet::Finalize完成。
//计算compact的level和score,更新到compaction_level_&&compaction_score_ void VersionSet::Finalize(Version* v) { int best_level = -1; double best_score = -1; //level 0看文件个数,降低seek的次数,提高读性能,个数/4 //level >0看文件大小,减少磁盘占用,大小/(10M**level) //例如: //level 0 有4个文件,score = 1.0 //level 1 文件大小为9M,score = 0.9 //那么compact的level就是0,score = 1.0 for (int level = 0; level < config::kNumLevels-1; level++) { double score; if (level == 0) { score = v->files_[level].size() / static_cast<double>(config::kL0_CompactionTrigger); } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalFileSize(v->files_[level]); score = static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level); } if (score > best_score) { best_level = level; best_score = score; } } v->compaction_level_ = best_level; v->compaction_score_ = best_score; }
可以看到 level 0 与其他层不同,看的是文件个数,因为 level 0 的文件是重叠的,每次读取都需要遍历所有文件,所以文件个数更加影响性能。
每层的基准大小为10M << ${level - 1}
,level = 1 则MaxBytes = 10M
, level = 2 则MaxBytes=100M
,依次类推.
逐层比较后,得到最大的得分以及对应层数:compaction_score_ compaction_level_
major compact 选择文件时就会用到上述两个条件。
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);//文件数过多
const bool seek_compaction = (current_->file_to_compact_ != nullptr);//seek了多次文件但是没有查到,记录到的file_to_compact_
因此,版本管理的作用之三,就是在新增或者遍历文件的过程中,为 major compact 筛选文件。
作用
随着数据合并到更大的 level,一个明显的好处就是清理冗余数据。
如果不同 level 的 sst 文件里,存在相同的 key,那么更底层的数据就可以删除不再保留(不考虑 snapshot的情况下)。为了充分利用磁盘高性能的顺序写,删除数据也是顺序写入删除标记,而真正删除数据,是在 major compact 的过程中。
所以,一个作用是能够节省磁盘空间。
level 0 的数据文件之间是无序的,每次查找都需要遍历所有可能重叠的文件,而归并到 level 1 之后,数据变得有序,待查找的文件变少。
所以,另外一个作用是能够提高读效率。
leveldb 最为复杂的在 compaction,compaction 最为复杂的在 major compaction.面对磁盘上的众多 sstable 文件,应该怎么开始?
千里之行始于足下,首先需要找到最应该 compact 的一个文件。
“最应该”的判断条件,前面笔记已有介绍,有seek_compaction && size_compaction
,分别从读取和文件大小两个维度来判断。
筛选出这个文件后,还需要考虑一系列问题:
这些问题,都需要在PickCompaction
这个函数里解决。
leveldb::Compaction
用来记录筛选文件的结果,其中inputs[2]
记录了参与 compact 的两层文件,是最重要的两个变量
// Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs
Compaction* VersionSet::PickCompaction()
简言之,就是选取一层需要compact的文件列表,及相关的下层文件列表,记录在Compaction*
返回。
其主要过程如下:
inputs_[0]
确定inputs_[1]
inputs_[1]
反过来看下能否扩大inputs_[0]
inptus_[0]
扩大的话,记录到expanded0
expanded[0]
看下是否会导致inputs_[1]
增大inputs[1]
没有增大,那就扩大 compact 的 level 层的文件范围也就是:
在不增加 level + 1 层文件,同时不会导致 compact 的文件过大的前提下,尽量增加 level 层的文件数
or
seek_compaction首先是根据size_compaction seek_compaction
计算应当 compact 的文件。
只有compaction_score_ >= 1
时,触发 size compaction.
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);//文件数过多
const bool seek_compaction = (current_->file_to_compact_ != nullptr);//seek了多次文件但是没有查到,记录到的file_to_compact_
如果size_compaction = true
,则找到该层一个满足条件的文件:
if (size_compaction) { //该层第一个>compact_pointer_的文件,或者第一个文件 level = current_->compaction_level_; assert(level >= 0); assert(level+1 < config::kNumLevels); c = new Compaction(options_, level); // Pick the first file that comes after compact_pointer_[level] for (size_t i = 0; i < current_->files_[level].size(); i++) { FileMetaData* f = current_->files_[level][i]; if (compact_pointer_[level].empty() || icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { c->inputs_[0].push_back(f); break; } } if (c->inputs_[0].empty()) { // Wrap-around to the beginning of the key space c->inputs_[0].push_back(current_->files_[level][0]); }
compact_pointer_
是 string 类型,记录了该层上次 compact 时文件的 largest key,初始值为空,也就是选择该层第一个文件。
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];
如果seek_compaction = true
,则直接使用满足条件的文件。
到了这一步,inputs_[0]
里有且仅有一个文件。
level 0 的文件之间是无序的,假设当前有 4 个文件,key range 分别是
[a, n]
[c, k]
[b, e]
[l, n]
本次选择了第3个文件,如果只是把[b, e]
更新到 level 1,那么就会导致读取时数据错误。因为多个文件之间数据是有重叠的,数据之间的先后无法判断,而更新到 level 1 就意味着认为数据更早。(将[b,e]更新到level 1,因为level 0是无序的,可能其他level 0中的文件有比这个文件更老的数据,直接更新到level 1后就导致level 1中有些文件比level 0更新了,后续读取会出错,所以需要将level 0层重叠文件都找出来)
对应的做法就是当选出文件后,判断还有哪些文件有重叠,把这些文件都加入进来,这个例子对应的就是把文件1 2都加进来。
代码上,先通过GetRange
获取输入文件的 key range,然后根据 key range 得到一个最全的文件列表。
// Files in level 0 may overlap each other, so pick up all overlapping ones
// 对level 0,获取当前已选择文件的key range: [smallest, largest]
// 然后选择level 0的其他与该key range有overlap的文件,组成新的key range
// 然后重新从头在level 0查找,直到key range固定下来
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
到了这一步,本质上是是的待 compact 的文件在各层都满足统一条件:inputs_[0]
的文件跟本层其他文件之间,没有 key 重叠
inputs_[0]
填充了第一层要参与 compact 的文件,接下来就是要计算下一层参与 compact 的文件,记录到inputs_[1]
。
基本的思想是:所有有重叠的 level + 1 层文件都要参与 compact,得到这些文件后,反过来看下,如果在不增加 level + 1 层文件的前提下,能否增加 level 层的文件?
也就是尽量增加 level 层的文件,贪心算法。
首先是计算下一层与inputs_[0]
key range 有重叠的所有 sstable files,记录到inputs_[1]
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
//inputs_[0]所有文件的key range -> [smallest, largest]
GetRange(c->inputs_[0], &smallest, &largest);
//inputs_[1]记录level + 1层所有与inputs_[0]有overlap的文件
current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
//inputs_[0, 1]两层所有文件的key range -> [all_start, all_limit]
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
根据inputs_[1]
反推下 level 层有多少 key range 有重叠的文件,记录到expanded0
:
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
// 如果再不增加level + 1层文件的情况下,尽可能的增加level层的文件
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
//level层与[all_start, all_limit]有overlap的所有文件,记录到expanded0
//expanded0 >= inputs_[0]
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
如果文件确实又增加,同时又不会增加太多文件(太多会导致 compact 压力过大)
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
//1. level 层参与compact文件数有增加
//2. 但合并的文件总量在ExpandedCompactionByteSizeLimit之内(防止compact过多)
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size <
ExpandedCompactionByteSizeLimit(options_)) {
那么就增加参与 compact 的文件,更新到inputs_
InternalKey new_start, new_limit; //[new_start, new_limit]记录expand0的key range GetRange(expanded0, &new_start, &new_limit); std::vector<FileMetaData*> expanded1; //如果level层文件从inputs_[0]扩展到expand0,key的范围变成[new_start, new_limit] //看下level + 1层overlap的文件范围,记录到expand1 current_->GetOverlappingInputs(level+1, &new_start, &new_limit, &expanded1); //确保level + 1层文件没有增加,那么使用心得expand0, expand1 if (expanded1.size() == c->inputs_[1].size()) { Log(options_->info_log, "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n", level, int(c->inputs_[0].size()), int(c->inputs_[1].size()), long(inputs0_size), long(inputs1_size), int(expanded0.size()), int(expanded1.size()), long(expanded0_size), long(inputs1_size)); smallest = new_start; largest = new_limit; c->inputs_[0] = expanded0; c->inputs_[1] = expanded1; GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit); }
到此,参与 compact 的文件集合就已经确定了,为了避免这些文件合并到 level + 1 层后,跟 level + 2 层有重叠的文件太多,届时合并 level + 1 和 level + 2 层压力太大,因此我们还需要记录下 level + 2 层的文件,后续 compact 时用于提前结束的判断:
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
// level + 2层有overlap的文件,记录到c->grandparents_
if (level + 2 < config::kNumLevels) {
//level + 2层overlap的文件记录到c->grandparents_
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}
接着记录compact_pointer_
到c->edit_
,在后续PickCompaction
入口时使用。
// Update the place where we will do the next compaction for this level.
// We update this immediately instead of waiting for the VersionEdit
// to be applied so that if the compaction fails, we will try a different
// key range next time.
// 记录该层本次compact的最大key
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
最后一步,就是返回筛选的结果c
:
//选取一层需要compact的文件列表,及相关的下层文件列表,记录在Compaction*
Compaction* VersionSet::PickCompaction() {
Compaction* c;
...
return c;
}
//实际Compact
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
//如果immutable memtable存在,则本次先compact,即Minor Compaction
if (imm_ != nullptr) {
CompactMemTable();
return;
}
接着就是调用PickCompaction筛选合适的 level 及 文件。注意也可以手动指定 range,原理是类似的,不再赘述。
//如果immutable memtable不存在,则合并各层level的文件,称为Major Compaction Compaction* c; bool is_manual = (manual_compaction_ != nullptr); InternalKey manual_end; //手动指定compact if (is_manual) { ManualCompaction* m = manual_compaction_; c = versions_->CompactRange(m->level, m->begin, m->end); m->done = (c == nullptr); if (c != nullptr) { manual_end = c->input(0, c->num_input_files(0) - 1)->largest; } Log(options_.info_log, "Manual compaction at level-%d from %s .. %s; will stop at %s\n", m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), (m->done ? "(end)" : manual_end.DebugString().c_str())); } else { //自动compact,c记录了待参与compact的所有文件 c = versions_->PickCompaction(); } //再接收到需要compaction的文件后 为了节省重新生成文件过程 有些情况可以直接使用源文件 // 1. level层只有一个文件 // 2. level + 1层没有文件 // 3. 跟level + 2层overlap的文件没有超过25M IsTrivialMove()在这个函数 Status status; if (c == nullptr) { // Nothing to do } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level // level + 1没有overlap的文件,不需要compact,直接从level层标记到level + 1层即可 assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); //直接把这个文件从level移动level + 1层 c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast<unsigned long long>(f->number), c->level() + 1, static_cast<unsigned long long>(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); if (!status.ok()) { RecordBackgroundError(status); } CleanupCompaction(compact); c->ReleaseInputs(); DeleteObsoleteFiles();//删除旧文件,回收内存和磁盘空间 }
正常情况下,通过DoCompactionWork
完成文件的归并操作。
实现上,主要就是通过遍历所有的文件,实现多路归并,生成新的文件。
第一步,获取遍历所有文件用到的 Iterator*
. 因为level 0 是无序的所以level 0需要level 0.size个迭代器,其他层只需要1个迭代器。
iterator 返回的 key 全部有序,遍历过程可以清理掉一些 key。
由于多次Put/Delete,有些key会出现多次,在compact时丢弃。策略如下:
同时跟上一节的思想类似,如果目前 compact 生成的文件,会导致接下来 level + 1 && level + 2 层 compact 压力过大,那么结束本次 compact.因此,每次都会调用ShouldStopBefore
来判断是否满足上述条件:
bool Compaction::ShouldStopBefore(const Slice& internal_key) { const VersionSet* vset = input_version_->vset_; // Scan to find earliest grandparent file that contains key. const InternalKeyComparator* icmp = &vset->icmp_; while (grandparent_index_ < grandparents_.size() && icmp->Compare(internal_key, grandparents_[grandparent_index_]->largest.Encode()) > 0) { if (seen_key_) { overlapped_bytes_ += grandparents_[grandparent_index_]->file_size; } grandparent_index_++; } seen_key_ = true; if (overlapped_bytes_ > MaxGrandParentOverlapBytes(vset->options_)) { // Too much overlap for current output; start new output overlapped_bytes_ = 0; return true; } else { return false; } }
先用一张图片介绍下:
写入的key value
首先被封装到WriteBatch
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
//key,value数据更新到batch里
batch.Put(key, value);
return Write(opt, &batch);
}
WriterBatch
封装了数据,DBImpl::Writer
则继续封装了 mutex cond 等同步原语
// Information kept for every waiting writer
struct DBImpl::Writer {
Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
explicit Writer(port::Mutex* mu) : cv(mu) { }
};
写入流程实际上调用的是DBImpl::Write
//调用流程: DBImpl::Put -> DB::Put -> DBImpl::Write
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
//一次Write写入内容会首先封装到Writer里,Writer同时记录是否完成写入、触发Writer写入的条件变量等
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;
数据被写入到writers_
,直到满足两个条件:
w
的写入writers_
首部 MutexLock l(&mutex_);//多个线程调用的写入操作通过mutex_串行化
writers_.push_back(&w);
//数据先放到queue里,如果不在queue顶部则等待
//这里是对数据流的一个优化,wirters_里Writer写入时,可能会把queue里其他Writer也完成写入
while (!w.done && &w != writers_.front()) {
w.cv.Wait(); //这边会释放锁 直到被唤醒
}
//如果醒来并且抢到了mutex_,检查是否已经完成了写入(by其他Writer),则直接返回写入status
if (w.done) {
return w.status;
}
接着查看是否有足够空间写入,例如mem_
是否写满,是否必须触发 minor compaction 等
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == nullptr);
取出writers_
的数据,统一记录到updates
uint64_t last_sequence = versions_->LastSequence();//本次写入的SequenceNumber
Writer* last_writer = &w;
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
//updates存储合并后的所有WriteBatch
WriteBatch* updates = BuildBatchGroup(&last_writer); //一次性把writers_里面的数据都保存在updates 一次性写入 此时last_writer等于writers_最后一个
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
//WriterBatch写入log文件,包括:sequence,操作count,每次操作的类型(Put/Delete),key/value及其长度
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
//log_底层使用logfile_与文件系统交互,调用Sync完成写入
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
//写入文件系统后不用担心数据丢失,继续插入MemTable
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
写入完成后,逐个唤醒等待的线程:
//last_writer记录了writers_里合并的最后一个Writer //逐个遍历弹出writers_里的元素,并环形等待write的线程,直到遇到last_writer while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; } // Notify new head of write queue // 唤醒队列未写入的第一个Writer if (!writers_.empty()) { writers_.front()->cv.Signal(); }
批量写入接口DB::Write(const WriteOptions& options, WriteBatch* updates)
调用也是DBImpl::Write
。
批量写入一个典型问题就是一致性,例如这么调用:
leveldb::WriteBatch batch;
batch.Put("company", "Google");
batch.Put(...);
batch.Delete("company");
db->Write(write_option, &batch);
我们肯定不希望读到company -> Google
这个中间结果,而效果的产生就在于sequence
:versions_
记录了单调递增的sequence
,对于相同 key,判断先后顺序依赖该数值。
写入时,sequence
递增的更新到 memtable,但是一次性的记录到versions_
:
uint64_t last_sequence = versions_->LastSequence();//本次写入的SequenceNumber
...
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
...
versions_->SetLastSequence(last_sequence);
对于Get
操作(参考本文 Get 一节),看到的 sequence 只有两种可能: 批量提交get要不再batch.Put(“company”, “Google”);之前或者batch.Delete(“company”);之后执行。同时一个writebatch里的操作应该是一次全部写入的。
<= last_sequence
>= last_sequence + Count(updates)
因此读取时不会观察到中间状态。
第一节介绍,写入的key/value
数据,都记录到了WriteBatch
,更具体的,记录到了:
//rep_存储了所有Put/Delete接口传入的数据
//按照一定格式记录了:sequence, count, 操作类型(Put or Delete),key/value的长度及key/value本身
std::string rep_; // See comment in write_batch.cc for the format of rep_
rep_
数据组织如下:一个writebatch可以包含多个操作
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。