当前位置:   article > 正文

LevelDB总结_hadoop leveldb

hadoop leveldb

leveldb架构

博客
在这里插入图片描述

顺序写:level db不需要在各个level中去修改位置,而是只要放到最新的memTable中插入,所以在读取数据时如果在上层找到了数据或者数据flag是已删除就不需要继续寻找了。所以其能够提供比读更好的性能。
LevelDB 中的 sequence number 是一个全局的编号,每次写入(注意批量写入时 sequence number 是相同的)时,该编号会递增。

一、memTable

1.1 LSM-Tree VS B Tree

hash索引,需要将所有数据都读入到内存,对于大规模数据读不到内存里就不能使用hash索引。

  • 从使用的角度上来说,B-tree等索引存储结构多用于OLTP型的数据库,因为这类数据库主要以事务,或是行级别的读取和存储为主的(比如Mysql)。换句话说,这种类型的数据库更多的操作是小批量或单行级别的更新或读取,并且可能还有事务方面的需求,这种类型正是B-tree结构所擅长的。
  • 而 LSM-tree则多用于大规模数据情况下的检索分析和快速写入的情况。在写入的性能上,因为上直接写入内存再定期刷入到磁盘中,所以写入操作对用户的感知而言上非常迅速的。而检索速度也因为key顺序存储,可以快速定位到key对应的位置,因而具有较好的检索性能。
  • 但是LSM-tree比较显著的应用方向还是在大规模分析这方面,在大规模分析(OLAP)场景下,数据通常都是列式存储,并且需要全表扫描。其中磁盘数据可以使用二进制进行压缩,读取的时候可以有效减少磁盘IO的处理时间(与之相比,B-tree等存储结构就无法充分压缩,因为每次都只处理小部分数据)。同时在存储文件中还能再进一步切分,比如将列式数据按照水平切分成不同的Page,同时存储一些简单的索引,用来指定不同Page大概范围,Hadoop的存储数据格式Parquet就是类似的设计。

memTable是基于跳表实现的结构。node格式,数据在memtable上是有序的,担当memtable到达一定大小后,就会转化为immutable之后放入到level 0,每一层level里面又有多个文件,但是在level 0中单个文件内是有序的,文件间可能是无序的有重叠,但是在其他level 层中文件间和文件内都是有序的。同时每个文件中也包含了多个block。(问题1:如果在immutable minor compact到sstable中的时候memtable马上又满了leveldb会停止写入数据吗?答案是会停止写入),

1.2 kv数据在memtable中的结构

在这里插入图片描述

使用protobuf varint编码,将internal_key_size和value_size由定长存储改为变长存储。

  1. 整数由定长改为变长存储
  2. 小整数仅占用1个字节,随着数值变大占用的字节数也变大,最多占用5个字节。

sequence:为该操作的编号数,编号越大表示该操作越新。

type:由于没有删除操作,所以通过type类型来判断是写入操作还是删除操作

MemTable相关的有多种 key,本质上就是上图里三种:

  1. userkey: 用户传入的 key,即用户key.
  2. internal key: 增加了sequence type,用于支持同一 key 的多次操作,以及 snapshot,即内部key.
  3. memtable key: 增加了 varint32 encode 后的 internal key长度

1.3 memtable

explicit MemTable(const InternalKeyComparator& comparator);   //初始化接收一个比较函数

//提供两种接口
void Add(SequenceNumber seq, ValueType type,
        const Slice& key,
        const Slice& value);

bool Get(const LookupKey& key, std::string* value, Status* s);

//通过迭代器形式  暴露底层skiplist
Iterator* MemTable::NewIterator() {
  return new MemTableIterator(&table_);
}

struct KeyComparator {
    const InternalKeyComparator comparator;
    explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
    int operator()(const char* a, const char* b) const;
};
friend class MemTableIterator;
friend class MemTableBackwardIterator;

typedef SkipList<const char*, KeyComparator> Table;

KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_;  //底层skiplist
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

1.4 数据比较

  1. userkey 按照指定的排序方式,默认字节大小排序,akey-userkey < bkey-userkey 则返回-1.
  2. 如果 userkey 相等,那么解析出 sequence,按照 sequence 大小逆序排序,即 akey-sequence > bkey-sequence 则返回-1.sequence越大则代表数据越新,这样的好处是越新的数据越排在前面。

add:通过比较来插入,组装memtable_key的过程

get:userkey不存在的两种情况,1:没有userkey 2:存在userkey,但是最大的seq对应的type是删除

二、日志

在这里插入图片描述

每个block的长度最大为0x8000 = 32k,根据图中日志结构可知日志是变长的,只能从前往后遍历block,type的作用就是当 当前block的空间写不下那么多data时来标识该data跨Block了。

在这里插入图片描述

日志写入类的关系,因为日志只有添加,所以用不到其他Close等函数。PosixW是WriteableFile的子类,PosixW和Log::Writer是聚合关系(代表整体与部分的关系,比如Writer的功能知识PosixW的一部分)

三、SkipList实现

3.1 placement new:在用户指定的内存位置上构建新的对象

Object * p = new (address) ClassConstruct(...);

//先分配一对内存
int* buff = new int;
memset(buff,0,sizeof(int));
 
//此处new的placement new,在buff的内存上构造int对象,不需要分配额外的内存
int *p = new (buff)int(3);
 
std::cout << *p << std::endl; //3

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 优点:
    • 在已分配好的内存上进行对象的构建,构建速度快
    • 已分配好的内存可以反复利用,有效的避免内存碎片问题

3.2 跳表简介

skiplist,即跳表是由William Pugh在1989年发明的,允许快速查询一个有序连续元素的数据链表,搜索、插入、删除的平均时间复杂度均为O(lgn)。根据推导,在理想情况下当跳表的高度为logn时其具有最佳性能。

在这里插入图片描述

第一层:n 第二层:n/2 第三层:n/4元素 但在实际实现环节要想一直保持如此高的性能,需要经常调整结构,所以在实现时使用抛硬币的方法,从height=1开始每抛一次正面增加一层,同时一般也会设置最大层数。因为memtable中不需要删除元素,所以在leveldb代码中只提供了插入和查找两个接口。

3.3 SkipList类

template<typename Key, class Comparator>
class SkipList       //类模板


void Insert(const Key& key);     //插入函数接口
 
bool Contains(const Key& key) const;   //查询函数接口

//成员变量
Comparator const compare_;   //比较函数
Arena* const arena_;         //leveldb的内存池  在该处用于分配node
Node* const head_;           //跳表中的节点  对应一列
port::AtomicPointer max_height_;   //
Random rnd_;  //随机数产生器

//构造函数
template<typename Key, class Comparator>
SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
    : compare_(cmp),
      arena_(arena),
      head_(NewNode(0 /* any key will do */, kMaxHeight)),
      max_height_(reinterpret_cast<void*>(1)),  //当前最大高度为1
      rnd_(0xdeadbeef) {
  for (int i = 0; i < kMaxHeight; i++) {
    head_->SetNext(i, nullptr);       //初始化head_高度为最大高度  后面节点在高度大于最大高度时能够使用head_作为prev节点
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

3.4 Node && NewNode

template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
  explicit Node(const Key& k) : key(k) { }
  Key const key;//数据本身
  //获取该节点在第n层的后继节点
  Node* Next(int n) {
    assert(n >= 0);
    return reinterpret_cast<Node*>(next_[n].Acquire_Load());
  }
  //设置该节点在第n层的后继节点
  void SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].Release_Store(x);
  }

  //不能保证线程安全
  Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
  }
  void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].NoBarrier_Store(x);
  }

 private:
  // 作为Node的最后一个成员变量
  // 由于Node通过placement new的方式构造,因此next_实际上是一个不定长的数组
  // 数组长度即该节点的高度
  // next_记录了该节点在所有层的后继节点,0是最底层链表。
  port::AtomicPointer next_[1];
};

//所有的 Node 对象都通过NewNode构造出来:先通过arena_分配内存,然后通过 placement new 的方式调用 Node 的构造函数。
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
    //额外存储(height - 1)个port::AtomicPointer
  char* mem = arena_->AllocateAligned(
      sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
  return new (mem) Node(key);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

3.5 Insert

template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
  Node* prev[kMaxHeight];  
  //prev记录每一层最后一个 < key的节点,也就是待插入节点的前驱节点
  Node* x = FindGreaterOrEqual(key, prev);

  // Our data structure does not allow duplicate insertion
  assert(x == nullptr || !Equal(key, x->key));

  //随机决定节点高度height
  int height = RandomHeight();
  //如果新的高度比当前所有节点高度都大,那么填充prev更高层为head_,同时更新max_height_
  if (height > GetMaxHeight()) {
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;   //prev 保存的是node节点  也就是跳表的一整列
    }
    max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
  }

  //构造Node,高度为height
  x = NewNode(key, height);
  //插入节点x到prev及prev->next中间
  for (int i = 0; i < height; i++) {
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    // 先修改x节点,再修改prev节点
    prev[i]->SetNext(i, x);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
template<typename Key, class Comparator>  //有点楞  前面不是说1/2概率往上增加嘛  下面总结有解答
int SkipList<Key,Comparator>::RandomHeight() {
  // Increase height with probability 1 in kBranching
  static const unsigned int kBranching = 4;
  int height = 1;
  // 1/4概率继续增加height
  while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) {
    height++;
  }
  assert(height > 0);
  assert(height <= kMaxHeight);
  return height;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
    const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    Node* next = x->Next(level);
    if (KeyIsAfterNode(key, next)) {//如果next->key < key
      // Keep searching in this list
      x = next;
    } else {//如果next->key >= key
      //notes:如果单纯为了判断是否相等,这里可以加一个判断直接返回了,没必>要level--到0再返回,不过复杂度没有变化
      if (prev != nullptr) prev[level] = x;//prev记录该level最后一个<key的节点
      if (level == 0) {//到达最底层则返回next (next是第一个>=key的节点)
        return next;
      } else {
        // Switch to next list
        level--;
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

3.6 Contains

template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
  //x记录第一个>= key的Node
  //注意FindGreaterOrEqual是查找>=key的Node,因此会迭代直到level = 0才返回
  //实际上可以实现一个接口直接查找==key的Node,这样会在level >=0 时就能返回,查找的时间复杂度不变,不过可以预期减少比较次数。
  Node* x = FindGreaterOrEqual(key, nullptr);
  //判断x->key == key
  if (x != nullptr && Equal(key, x->key)) {
    return true;
  } else {
    return false;
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3.7 总结

  1. 除了insert和contains还提供了FindLast和FindLessThan函数实现起来都差不多就不介绍了
  2. MemTable在读取时使用的时SkipList::Iterator
  3. 之前介绍抛硬币的方法概率为1/2,这里使用的是1/4,结合kBranching = 4kMaxHeight = 12,不影响复杂度的情况下,可以最多支持4**11 = 4194304个节点,因此在百万节点左右,这么设置参数效果最优。
  4. 读写并发:读操作不会修改内部数据,因此多个reader不存在竞争,并发没有问题;多个读单个写操作也没有问题,因为采用了原子变量以及memory order,以及Insert里执行语句的前后顺序;多个写操作之间存在竞争关系,需要锁控制。
  5. 重点说明下Insert里设置max_height_前的那段注释,读线程可能读到旧的或者新的值,无论是哪种值,写线程都可能在更新SkipList,因为后面更新是从低往高更新,而读是从高往低读,所以当读到新的节点的时候,继续往下层,一定是能读到正确值的。

四、block

sstable则是由一系列block组成,有data block(键值对,同时为了增加读取速度增加了index 索引),filter block等等,index block就是在个data block后面保存一个大于该data block所有key的值(结构类似下面这种把entry改为data block,restart pointer改为索引行,这样就能直接使用二分快速查找了。

在这里插入图片描述

index block,meta index block 都采用相同的数据格式,由类 BlockBuilder 负责生成。

4.1 leveldb::BlockBuilder

在这里插入图片描述

BlockBuilder 用于格式化传入的 key:value 数据,采用了 share key 的手段来优化存储的数据大小。

注意BlockBuilder本身并不与存储打交道,所有数据都格式化到了内存,通过Finish接口返回数据,由更上层对象写入到文件。

在这里插入图片描述

其采用合并相同前缀来节省空间(和其前一个key相同的部分),但为了使读取更高效,每N条entry则不再应用该规则。 restart pointer则记录了每个没有使用合并前缀的entry地址,同时又因为entry是有序的,所以可以在restart pointer之间进行二分查找。

4.2 Filter Block

为了加快SST中数据查询的效率,在直接查询DataBlock中的内容之前,会先根据FilterBlock中的过滤数据判定DataBlock中是否有需要查询的数据,若判断不存在,则无需对整个DataBlock进行数据查找。

FilterBlock存储的是DataBlock数据的一些过滤信息,这些过滤数据一般指代布隆过滤器的数据,用于加快查询的速度。每个DataBlock在FilterBlock中对应一个FilterData。

2.3 数据写入操作

  1. 向SST中追加文件,会先将数据写入内存中的DataBlock,如果DataBlock数据量大于指定大小,会将内存中的DataBlock写入磁盘。在将DataBlock写入前,会写入RestartPoint信息,数据压缩,CRC校验。最后将DataBlock的索引信息写入IndexBlock
  2. 当SST写入完成,写入内存中的FilterBlock、MetaIndexBlock、 IndexBlock、Footer信息

2.4 数据读取操作

  1. 读取Footer字段,获取IndexBlockIndex、MetaIndexBlockIndex,根据offset和length读取IndexBlock、MeatIndexBlock数据
  2. 根据IndexBlock索引数据获取查询数据的DataBlock,然后根据FilterBlock数据判定查询数据是否在定位的DataBlock中,加速数据查询
  3. 读取定位到的DataBlock,根据RestartPoint数据定位查询的数据块中的Entry

五、sstable

blockfilter block都是 sstable 的一个组件,负责构造部分数据格式。

5.1 sstable

sstable被设计用于存储大量的 {key:value} 数据,当我们在 leveldb 查找某个 key 时,可能需要逐层查找多个 sstable 文件。

因此,sstable 在文件格式设计上,主要考虑:

  1. 查找速度,通过建索引解决
  2. 文件大小,通过压缩解决

5.2 文件格式

data block用于存储原始数据,同时为了方便磁盘查找,每个data block被设定为固定大小默认值为4K。

同时每一个data block对应一行信息,记录3要素:

  1. offset:即 data block 的偏移量
  2. size:即 data block 的大小
  3. data_block_key:满足条件>= block 内所有的 key

被称为index block,其存储格式也是key value

key = data_block_key
value = (offset + size)
  • 1
  • 2

filter block,目前使用的是布隆过滤器,在查找key时,先通过filter block判断是否存在,如果不存在直接跳过对应的data block。同时在设计level db时预计会包含其他很多索引block,但目前就只有filter block,所以meta block等价于filter block,其里面只包含一组{key:value}数据就是找到filter block

5.3 footer

footer中包含了以下信息:

index of data block'index   //index block位置
index of mata block'index   //meta index block位置
  • 1
  • 2

footer 需要首先读取、解析出来,然后才能“按图索骥”找到其他 block,因此 footer 是定长的,而且位置固定在文件尾部。之后得到index block和meta index block位置,之后就可以从index block读到data block位置,meta index block中读到meta block位置。

 	<beginning_of_file>
    [data block 1]
    [data block 2]
    ...
    [data block N]
    [meta block 1]
    ...
    [meta block K]
    [metaindex block]
    [index block]
    [Footer]        (fixed size; starts at file_size - sizeof(Footer))
    <end_of_file>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

5.4 源码解析

5.5 sstable读取

六、bloom filter

1. 简介

bloom filter是一种数据结构,作用类似于 hash table,相对于后者,空间利用率更高。

不过这种高利用率是有代价的,当我们在 bloom filter 查找 key 时,有返回两种情况:

  1. key 不存在,那么 key 一定不存在。
  2. key 存在,那么 key 可能存在。

也就是说 bloom filter 具有一定的误判率。

2. 理论知识

先介绍下 bloom filter 的几个组成:

  1. n 个 key
  2. m bits 的空间 v,全部初始化为0
  3. k 个无关的 hash 函数:h1, h2, ..., hk,hash 结果为{1, 2, ..., m} or {0, 1, ..., m-1})

具体的,对于 key=a,经过 k 个 hash 函数后结果为

h1(a), h2(a), ..., hk(a)
  • 1

那么就将 v 对应的 bit 置为 1.

假定 k 为 4,对应的 bloom filter 为:

bloom_filter_with_4_hash_functions

注:这里有一个js写的一篇博客,支持互动的查看 bloom filter,更形象一些.

当 key 越来越多,v 里置为 1 的 bits 越来越多。对于某个不存在的 key’,k 个 hash 函数对应的 bit 可能正好为1,此时就概率发生误判,更专业的术语称为 false positive,或者 false drop.

因此,我们称 bloom filter 是一种概率型的数据结构,当返回某个 key’ 存在时,只是说明可能存在。

m 越大,k 越大, n 越小,那么 false positive越小。

更进一步,bloom filter 是关于空间和 false positive 的 tradeoff,bloom filter 的算法其实并不复杂,其真正的艺术在于这种平衡。

我们先看下 tradeoff 的结论:

hash 函数 k 的最优个数为 ln2 * (m/n).

七、minor compaction

minor compaction就是将memtable中的数据转化为immetable并写入到sstable中,同时其不总是直接写入到level 0,如果imm中的数据和level 1层的数据没有交集也会插入到level 1层。同时由于将imm数据写入到level磁盘中需要时间,如果太慢则会导致mem又满了而阻塞写入。后台有线程会执行以下代码来检测是否需要compaction并且minor的优先级比major高:

//实际Compact
void DBImpl::BackgroundCompaction() {
  mutex_.AssertHeld();

  //如果immutable memtable存在,则本次先compact,即Minor Compaction
  if (imm_ != nullptr) {
    CompactMemTable();
    return;
  }

  ...
  // major compaction
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

CompactMemTable(即minor compaction)主要流程分为三部分:

  1. WriteLevel0Table(imm_, &edit, base)imm_落盘成为新的 sst 文件,文件信息记录到 edit
  2. versions_->LogAndApply(&edit, &mutex_):因为compaction会生成新文件,同时旧文件可能还有人使用所以不能删,所以会保存多个版本信息,在compaction后将本次文件更新信息versions_,当前的文件(包含新的 sst 文件)作为数据库的一个最新状态,后续读写都会基于该状态,(具体作用请看版本管理)
  3. DeleteObsoleeteFiles:删除一些无用文件

imm_持久化为 sstable 文件后,文件的相关信息通过meta返回

  {
    mutex_.Unlock();
    //更新memtable中全部数据到xxx.ldb文件
    //meta记录key range, file_size等sst信息
    s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
    mutex_.Lock();
  }
struct FileMetaData {
  int refs;
  int allowed_seeks;          // Seeks allowed until compaction
  uint64_t number;
  uint64_t file_size;         // File size in bytes
  InternalKey smallest;       // Smallest internal key served by table
  InternalKey largest;        // Largest internal key served by table

  FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { }
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

meta包括文件的 key range,大小,文件的引用数(当引用数为0时会从磁盘删除)等。

查找合适的 level 将新文件记录到edit

    //为新生成sstable选择合适的level(不一定总是0)
    if (base != nullptr) {
      level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
    }
    //level及file meta记录到edit
    edit->AddFile(level, meta.number, meta.file_size,
                  meta.smallest, meta.largest);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

minor compaction 比较简单,因为只新增了一个 sstable 文件,加入后调用LogAndApply生效到新版本。

  // Replace immutable memtable with the generated Table
  if (s.ok()) {
    edit.SetPrevLogNumber(0);
    edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
    //应用edit
    s = versions_->LogAndApply(&edit, &mutex_);
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

八、版本管理

因为通过compaction会增加文件,存在新文件和老文件同时存在的情况,如果老文件还在执行操作此时不能删除。只有当其彻底没人使用时才能将老文件删除。

VersionEdit即 delta,最重要的两个成员变量就是新增与删除文件:

  DeletedFileSet deleted_files_;//待删除文件
  //新增文件,例如immutable memtable dump后就会添加到new_files_
  std::vector< std::pair<int, FileMetaData> > new_files_;
  • 1
  • 2
  • 3

VersionEdit在每次compaction后都会调用该接口将新文件放入。同时在老文件没人使用时也会放入。

Version用于表示某次 compaction 后的数据库状态,管理当前的文件集合,因此最重要一个成员变量files_表示每一层的全部 sstable 文件。

  // List of files per level
  std::vector<FileMetaData*> files_[config::kNumLevels];
  • 1
  • 2

8.1 PickLevelForMemTableOutput

就是为刚从 memtable 持久化的 sstable,选择一个合适的 level.

  1. level 0的 sstable 数量有严格的限制,因此尽可能尝试放到一个更大的 level.
  2. 大于 level 0的各层文件间是有序的,如果放到对应的层数会导致文件间不严格有序,会影响读取,则不再尝试。
  3. 如果放到 level + 1层,与 level + 2层的文件重叠很大,就会导致 compact 到该文件时,压力过大,则不再尝试。这算是一个预测,放到 level 层能够缓冲这一点。
  4. 最大返回 level 2,这大概是个经验值。

8.2 Builder

Builder是一个辅助类,实现Version + VersionEdit = Version‘,其中+ =分别对应Apply和SaveTo两个接口。

成员变量也是记录所有的 delta,levels_存储了每一层的added_filesdeleted_files:

  typedef std::set<FileMetaData*, BySmallestKey> FileSet;
  struct LevelState {
    std::set<uint64_t> deleted_files;
    FileSet* added_files;
  };

  VersionSet* vset_;
  Version* base_;
  LevelState levels_[config::kNumLevels];//每一层的新增及删除文件
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

**Apply:**将VersionEdit中记录的文件更新到levels_中

**SaveTo:**将levels_中的文件更新和base_合并生成一个新的Version v,同时需要保证v中每一层文件之间的顺序。

8.3 VersionSet

随着Builder不断执行,新的version被构造出来。VersionSet就负责管理多个版本,对应的变量全局唯一,在DBImpl构造函数里初始化:

      versions_(new VersionSet(dbname_, &options_, table_cache_,
                               &internal_comparator_)) {
  • 1
  • 2

管理一个双向链表

  Version dummy_versions_;  // 循环链表的头部.
  Version* current_;        // 最新版本
  • 1
  • 2

VersionSet

current_指向最新的版本。

因此class Version实际上还有三个重要的链表相关成员变量:

  VersionSet* vset_;            // Version中指向VersionSet
  Version* next_;               // Next version in linked list
  Version* prev_;               // Previous version in linked list
  • 1
  • 2
  • 3
8.3.1 LogAndApply

Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu)的主要做了几件事,这个函数会在compaction中使用:

  1. edit应用于current_生成一个新的Version
  2. 计算新Version下,下次 major compact 的文件 Finalize(v);由这个函数计算
  3. 更新一些元信息管理文件,将current_和edit信息写入manifest文件,接着在CURRENT文件里明文写入manifest文件名。(应该是用来故障恢复的,比如重启,断电等)
  4. 将新Version添加到双向链表,current_ = 新Version

首先是生成新Version:

  Version* v = new Version(this);
  {
    Builder builder(this, current_);
    builder.Apply(edit);
    builder.SaveTo(v);
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

九、seek_compaction && size_compaction

对于major compaction有两种,一种是文件多次seek但是没有查找到数据(可能该key在这个文件里被删除了,或者在其他文件里,比如level 0是允许文件之间有重叠key范围的)。另外一种是当该level中文件过大时执行。

9.1 seek_compaction

// 1. 1次seek花费10ms
// 2. 1M读写花费10ms
// 3. 1M文件的compact需要25M IO(读写10-12MB的下一层文件),为什么10-12M?经验值?
// 因此1M的compact时间 = 25次seek时间 = 250ms
// 也就是40K的compact时间 = 1次seek时间,保守点取16KB,即t = 16K的compact时间 = 1次seek时间
// compact这个文件的时间: file_size / 16K
// 如果文件seek很多次但是没有找到key,时间和已经比compact时间要大,就应该compact了
// 这个次数记录到f->allowed_seeks
f->allowed_seeks = (f->file_size / 16384);//16KB
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

9.2 size_compaction

compaction 另外一个直观的想法就是,当某一层文件变得很大,往往意味着冗余数据过多,应该 compact 以避免占用磁盘以及读取过慢的问题。

level 越大,我们可以认为数据越“冷”,读取的几率越小,因此大的 level,能“容忍”的程度就越高,给的文件大小阈值越大。

具体的,当产生新版本时,遍历所有的层,比较该层文件总大小与基准大小,得到一个最应当 compact 的层。

这个步骤,在VersionSet::Finalize完成。

//计算compact的level和score,更新到compaction_level_&&compaction_score_
void VersionSet::Finalize(Version* v) {
  int best_level = -1;
  double best_score = -1;

  //level 0看文件个数,降低seek的次数,提高读性能,个数/4
  //level >0看文件大小,减少磁盘占用,大小/(10M**level)
  //例如:
  //level 0 有4个文件,score = 1.0
  //level 1 文件大小为9M,score = 0.9
  //那么compact的level就是0,score = 1.0
  for (int level = 0; level < config::kNumLevels-1; level++) {
    double score;
    if (level == 0) {
      score = v->files_[level].size() /
          static_cast<double>(config::kL0_CompactionTrigger);
    } else {
      // Compute the ratio of current size to size limit.
      const uint64_t level_bytes = TotalFileSize(v->files_[level]);
      score =
          static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
    }

    if (score > best_score) {
      best_level = level;
      best_score = score;
    }
  }

  v->compaction_level_ = best_level;
  v->compaction_score_ = best_score;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

可以看到 level 0 与其他层不同,看的是文件个数,因为 level 0 的文件是重叠的,每次读取都需要遍历所有文件,所以文件个数更加影响性能。

每层的基准大小为10M << ${level - 1},level = 1 则MaxBytes = 10M, level = 2 则MaxBytes=100M,依次类推.

逐层比较后,得到最大的得分以及对应层数:compaction_score_ compaction_level_

major compact 选择文件时就会用到上述两个条件。

  // We prefer compactions triggered by too much data in a level over
  // the compactions triggered by seeks.
  const bool size_compaction = (current_->compaction_score_ >= 1);//文件数过多
  const bool seek_compaction = (current_->file_to_compact_ != nullptr);//seek了多次文件但是没有查到,记录到的file_to_compact_
  • 1
  • 2
  • 3
  • 4

因此,版本管理的作用之三,就是在新增或者遍历文件的过程中,为 major compact 筛选文件。

十、major compaction

作用

随着数据合并到更大的 level,一个明显的好处就是清理冗余数据。

如果不同 level 的 sst 文件里,存在相同的 key,那么更底层的数据就可以删除不再保留(不考虑 snapshot的情况下)。为了充分利用磁盘高性能的顺序写,删除数据也是顺序写入删除标记,而真正删除数据,是在 major compact 的过程中。

所以,一个作用是能够节省磁盘空间。

level 0 的数据文件之间是无序的,每次查找都需要遍历所有可能重叠的文件,而归并到 level 1 之后,数据变得有序,待查找的文件变少。

所以,另外一个作用是能够提高读效率。

10.1 当我们谈论筛选时,在谈论什么

leveldb 最为复杂的在 compaction,compaction 最为复杂的在 major compaction.面对磁盘上的众多 sstable 文件,应该怎么开始?

千里之行始于足下,首先需要找到最应该 compact 的一个文件

“最应该”的判断条件,前面笔记已有介绍,有seek_compaction && size_compaction,分别从读取和文件大小两个维度来判断。

筛选出这个文件后,还需要考虑一系列问题:

  1. 如果在 level 0,由于该层文件之间是无序的,如果只把这一个文件 compact 到 level 1 是否会导致读取错误?
  2. compact 到 level + 1后,会不会导致 level + 1 与 level + 2 的 compact 过于复杂?
  3. 这个文件应该与哪些文件 compact?

这些问题,都需要在PickCompaction这个函数里解决。

10.2 Compaction

leveldb::Compaction用来记录筛选文件的结果,其中inputs[2]记录了参与 compact 的两层文件,是最重要的两个变量

  // Each compaction reads inputs from "level_" and "level_+1"
  std::vector<FileMetaData*> inputs_[2];      // The two sets of inputs
  • 1
  • 2

10.3 VersionSet::PickCompaction

Compaction* VersionSet::PickCompaction()简言之,就是选取一层需要compact的文件列表,及相关的下层文件列表,记录在Compaction*返回。

其主要过程如下:

  1. 根据inputs_[0]确定inputs_[1]
  2. 根据inputs_[1]反过来看下能否扩大inputs_[0]
  3. inptus_[0]扩大的话,记录到expanded0
  4. 根据expanded[0]看下是否会导致inputs_[1]增大
  5. 如果inputs[1]没有增大,那就扩大 compact 的 level 层的文件范围

也就是:

在不增加 level + 1 层文件,同时不会导致 compact 的文件过大的前提下,尽量增加 level 层的文件数

10.3.1 size_compactionorseek_compaction

首先是根据size_compaction seek_compaction计算应当 compact 的文件。

只有compaction_score_ >= 1时,触发 size compaction.

// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);//文件数过多
const bool seek_compaction = (current_->file_to_compact_ != nullptr);//seek了多次文件但是没有查到,记录到的file_to_compact_
  • 1
  • 2
  • 3
  • 4

如果size_compaction = true,则找到该层一个满足条件的文件:

  if (size_compaction) {
    //该层第一个>compact_pointer_的文件,或者第一个文件
    level = current_->compaction_level_;
    assert(level >= 0);
    assert(level+1 < config::kNumLevels);
    c = new Compaction(options_, level);

    // Pick the first file that comes after compact_pointer_[level]
    for (size_t i = 0; i < current_->files_[level].size(); i++) {
      FileMetaData* f = current_->files_[level][i];
      if (compact_pointer_[level].empty() ||
          icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
        c->inputs_[0].push_back(f);
        break;
      }
    }
    if (c->inputs_[0].empty()) {
      // Wrap-around to the beginning of the key space
      c->inputs_[0].push_back(current_->files_[level][0]);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

compact_pointer_是 string 类型,记录了该层上次 compact 时文件的 largest key,初始值为空,也就是选择该层第一个文件。

  // Per-level key at which the next compaction at that level should start.
  // Either an empty string, or a valid InternalKey.
  std::string compact_pointer_[config::kNumLevels];
  • 1
  • 2
  • 3

如果seek_compaction = true,则直接使用满足条件的文件。

到了这一步,inputs_[0]里有且仅有一个文件。

10.3.2 level 0 输入文件的特殊处理

level 0 的文件之间是无序的,假设当前有 4 个文件,key range 分别是

  1. [a, n]
  2. [c, k]
  3. [b, e]
  4. [l, n]

本次选择了第3个文件,如果只是把[b, e]更新到 level 1,那么就会导致读取时数据错误。因为多个文件之间数据是有重叠的,数据之间的先后无法判断,而更新到 level 1 就意味着认为数据更早。(将[b,e]更新到level 1,因为level 0是无序的,可能其他level 0中的文件有比这个文件更老的数据,直接更新到level 1后就导致level 1中有些文件比level 0更新了,后续读取会出错,所以需要将level 0层重叠文件都找出来)

对应的做法就是当选出文件后,判断还有哪些文件有重叠,把这些文件都加入进来,这个例子对应的就是把文件1 2都加进来。

代码上,先通过GetRange获取输入文件的 key range,然后根据 key range 得到一个最全的文件列表。

  // Files in level 0 may overlap each other, so pick up all overlapping ones
  // 对level 0,获取当前已选择文件的key range: [smallest, largest]
  // 然后选择level 0的其他与该key range有overlap的文件,组成新的key range
  // 然后重新从头在level 0查找,直到key range固定下来
  if (level == 0) {
    InternalKey smallest, largest;
    GetRange(c->inputs_[0], &smallest, &largest);
    // Note that the next call will discard the file we placed in
    // c->inputs_[0] earlier and replace it with an overlapping set
    // which will include the picked file.
    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
    assert(!c->inputs_[0].empty());
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

到了这一步,本质上是是的待 compact 的文件在各层都满足统一条件:inputs_[0]的文件跟本层其他文件之间,没有 key 重叠

10.3.3 SetupOtherInputs

inputs_[0]填充了第一层要参与 compact 的文件,接下来就是要计算下一层参与 compact 的文件,记录到inputs_[1]

基本的思想是:所有有重叠的 level + 1 层文件都要参与 compact,得到这些文件后,反过来看下,如果在不增加 level + 1 层文件的前提下,能否增加 level 层的文件?

也就是尽量增加 level 层的文件,贪心算法。

首先是计算下一层与inputs_[0] key range 有重叠的所有 sstable files,记录到inputs_[1]

void VersionSet::SetupOtherInputs(Compaction* c) {
  const int level = c->level();
  InternalKey smallest, largest;
  //inputs_[0]所有文件的key range -> [smallest, largest]
  GetRange(c->inputs_[0], &smallest, &largest);

  //inputs_[1]记录level + 1层所有与inputs_[0]有overlap的文件
  current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);

  // Get entire range covered by compaction
  InternalKey all_start, all_limit;
  //inputs_[0, 1]两层所有文件的key range -> [all_start, all_limit]
  GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

根据inputs_[1]反推下 level 层有多少 key range 有重叠的文件,记录到expanded0

  // See if we can grow the number of inputs in "level" without
  // changing the number of "level+1" files we pick up.
  // 如果再不增加level + 1层文件的情况下,尽可能的增加level层的文件
  if (!c->inputs_[1].empty()) {
    std::vector<FileMetaData*> expanded0;
    //level层与[all_start, all_limit]有overlap的所有文件,记录到expanded0
    //expanded0 >= inputs_[0]
    current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

如果文件确实又增加,同时又不会增加太多文件(太多会导致 compact 压力过大)

    const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
    const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
    const int64_t expanded0_size = TotalFileSize(expanded0);
    //1. level 层参与compact文件数有增加
    //2. 但合并的文件总量在ExpandedCompactionByteSizeLimit之内(防止compact过多)
    if (expanded0.size() > c->inputs_[0].size() &&
        inputs1_size + expanded0_size <
            ExpandedCompactionByteSizeLimit(options_)) {
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

那么就增加参与 compact 的文件,更新到inputs_

      InternalKey new_start, new_limit;
      //[new_start, new_limit]记录expand0的key range
      GetRange(expanded0, &new_start, &new_limit);
      std::vector<FileMetaData*> expanded1;
      //如果level层文件从inputs_[0]扩展到expand0,key的范围变成[new_start, new_limit]
      //看下level + 1层overlap的文件范围,记录到expand1
      current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
                                     &expanded1);
      //确保level + 1层文件没有增加,那么使用心得expand0, expand1
      if (expanded1.size() == c->inputs_[1].size()) {
        Log(options_->info_log,
            "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
            level,
            int(c->inputs_[0].size()),
            int(c->inputs_[1].size()),
            long(inputs0_size), long(inputs1_size),
            int(expanded0.size()),
            int(expanded1.size()),
            long(expanded0_size), long(inputs1_size));
        smallest = new_start;
        largest = new_limit;
        c->inputs_[0] = expanded0;
        c->inputs_[1] = expanded1;
        GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
      }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

到此,参与 compact 的文件集合就已经确定了,为了避免这些文件合并到 level + 1 层后,跟 level + 2 层有重叠的文件太多,届时合并 level + 1 和 level + 2 层压力太大,因此我们还需要记录下 level + 2 层的文件,后续 compact 时用于提前结束的判断:

// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
// level + 2层有overlap的文件,记录到c->grandparents_
if (level + 2 < config::kNumLevels) {
    //level + 2层overlap的文件记录到c->grandparents_
    current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
    &c->grandparents_);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

接着记录compact_pointer_c->edit_,在后续PickCompaction入口时使用。

// Update the place where we will do the next compaction for this level.
// We update this immediately instead of waiting for the VersionEdit
// to be applied so that if the compaction fails, we will try a different
// key range next time.
// 记录该层本次compact的最大key
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

最后一步,就是返回筛选的结果c:

//选取一层需要compact的文件列表,及相关的下层文件列表,记录在Compaction*
Compaction* VersionSet::PickCompaction() {
  Compaction* c;
  ...
  return c;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
10.4 BackgroundCompaction
//实际Compact
void DBImpl::BackgroundCompaction() {
  mutex_.AssertHeld();

  //如果immutable memtable存在,则本次先compact,即Minor Compaction
  if (imm_ != nullptr) {
    CompactMemTable();
    return;
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

接着就是调用PickCompaction筛选合适的 level 及 文件。注意也可以手动指定 range,原理是类似的,不再赘述。

  //如果immutable memtable不存在,则合并各层level的文件,称为Major Compaction
  Compaction* c;
  bool is_manual = (manual_compaction_ != nullptr);
  InternalKey manual_end;
  //手动指定compact
  if (is_manual) {
    ManualCompaction* m = manual_compaction_;
    c = versions_->CompactRange(m->level, m->begin, m->end);
    m->done = (c == nullptr);
    if (c != nullptr) {
      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
    }
    Log(options_.info_log,
        "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
        m->level,
        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
        (m->end ? m->end->DebugString().c_str() : "(end)"),
        (m->done ? "(end)" : manual_end.DebugString().c_str()));
  } else {
  //自动compact,c记录了待参与compact的所有文件
    c = versions_->PickCompaction();
  }
  
  
  //再接收到需要compaction的文件后 为了节省重新生成文件过程  有些情况可以直接使用源文件
  // 1. level层只有一个文件
  // 2. level + 1层没有文件
  // 3. 跟level + 2层overlap的文件没有超过25M  IsTrivialMove()在这个函数
  Status status;
  if (c == nullptr) {
    // Nothing to do
  } else if (!is_manual && c->IsTrivialMove()) {
    // Move file to next level
    // level + 1没有overlap的文件,不需要compact,直接从level层标记到level + 1层即可
    assert(c->num_input_files(0) == 1);
    FileMetaData* f = c->input(0, 0);
    //直接把这个文件从level移动level + 1层
    c->edit()->DeleteFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
                       f->smallest, f->largest);
    status = versions_->LogAndApply(c->edit(), &mutex_);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    VersionSet::LevelSummaryStorage tmp;
    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
        static_cast<unsigned long long>(f->number),
        c->level() + 1,
        static_cast<unsigned long long>(f->file_size),
        status.ToString().c_str(),
        versions_->LevelSummary(&tmp));
   } else {
    CompactionState* compact = new CompactionState(c);
    status = DoCompactionWork(compact);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    CleanupCompaction(compact);
    c->ReleaseInputs();
    DeleteObsoleteFiles();//删除旧文件,回收内存和磁盘空间
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

正常情况下,通过DoCompactionWork完成文件的归并操作。

实现上,主要就是通过遍历所有的文件,实现多路归并,生成新的文件。

第一步,获取遍历所有文件用到的 Iterator*. 因为level 0 是无序的所以level 0需要level 0.size个迭代器,其他层只需要1个迭代器。

iterator 返回的 key 全部有序,遍历过程可以清理掉一些 key。

由于多次Put/Delete,有些key会出现多次,在compact时丢弃。策略如下:

  1. 对于多次出现的user key,我们只关心最后写入的值 or >snapshot的值通过设置last_sequence_for_key = kMaxSequenceNumber以及跟compact->smallest_snapshot比较,可以分别保证这两点
  2. 如果是删除key && <= snapshot && 更高层没有该key,那么也可以忽略

同时跟上一节的思想类似,如果目前 compact 生成的文件,会导致接下来 level + 1 && level + 2 层 compact 压力过大,那么结束本次 compact.因此,每次都会调用ShouldStopBefore来判断是否满足上述条件:

bool Compaction::ShouldStopBefore(const Slice& internal_key) {
  const VersionSet* vset = input_version_->vset_;
  // Scan to find earliest grandparent file that contains key.
  const InternalKeyComparator* icmp = &vset->icmp_;
  while (grandparent_index_ < grandparents_.size() &&
         icmp->Compare(internal_key,
                       grandparents_[grandparent_index_]->largest.Encode()) >
             0) {
    if (seen_key_) {
      overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
    }
    grandparent_index_++;
  }
  seen_key_ = true;

  if (overlapped_bytes_ > MaxGrandParentOverlapBytes(vset->options_)) {
    // Too much overlap for current output; start new output
    overlapped_bytes_ = 0;
    return true;
  } else {
    return false;
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

十一、写入和读取

1. Put

先用一张图片介绍下:

Write

写入的key value首先被封装到WriteBatch

// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  //key,value数据更新到batch里
  batch.Put(key, value);
  return Write(opt, &batch);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

WriterBatch封装了数据,DBImpl::Writer则继续封装了 mutex cond 等同步原语

// Information kept for every waiting writer
struct DBImpl::Writer {
  Status status;
  WriteBatch* batch;
  bool sync;
  bool done;
  port::CondVar cv;

  explicit Writer(port::Mutex* mu) : cv(mu) { }
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

写入流程实际上调用的是DBImpl::Write

//调用流程: DBImpl::Put -> DB::Put -> DBImpl::Write
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  //一次Write写入内容会首先封装到Writer里,Writer同时记录是否完成写入、触发Writer写入的条件变量等
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
  w.done = false;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

数据被写入到writers_,直到满足两个条件:

  1. 其他线程已经帮忙完成了w的写入
  2. 抢到锁并且位于writers_首部
  MutexLock l(&mutex_);//多个线程调用的写入操作通过mutex_串行化
  writers_.push_back(&w);
  //数据先放到queue里,如果不在queue顶部则等待
  //这里是对数据流的一个优化,wirters_里Writer写入时,可能会把queue里其他Writer也完成写入
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();   //这边会释放锁  直到被唤醒
  }
  //如果醒来并且抢到了mutex_,检查是否已经完成了写入(by其他Writer),则直接返回写入status
  if (w.done) {
    return w.status;
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

接着查看是否有足够空间写入,例如mem_是否写满,是否必须触发 minor compaction 等

  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(my_batch == nullptr);
  • 1
  • 2

取出writers_的数据,统一记录到updates

  uint64_t last_sequence = versions_->LastSequence();//本次写入的SequenceNumber
  Writer* last_writer = &w;
  if (status.ok() && my_batch != nullptr) {  // nullptr batch is for compactions
    //updates存储合并后的所有WriteBatch
    WriteBatch* updates = BuildBatchGroup(&last_writer);   //一次性把writers_里面的数据都保存在updates  一次性写入  此时last_writer等于writers_最后一个
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(updates);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

然后写入日志写入内存:

      //WriterBatch写入log文件,包括:sequence,操作count,每次操作的类型(Put/Delete),key/value及其长度
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        //log_底层使用logfile_与文件系统交互,调用Sync完成写入
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      //写入文件系统后不用担心数据丢失,继续插入MemTable
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(updates, mem_);
      }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

写入完成后,逐个唤醒等待的线程:

  //last_writer记录了writers_里合并的最后一个Writer
  //逐个遍历弹出writers_里的元素,并环形等待write的线程,直到遇到last_writer
  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  // Notify new head of write queue
  // 唤醒队列未写入的第一个Writer
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

2. Sequence

批量写入接口DB::Write(const WriteOptions& options, WriteBatch* updates)调用也是DBImpl::Write

批量写入一个典型问题就是一致性,例如这么调用:

leveldb::WriteBatch batch;
batch.Put("company", "Google");
batch.Put(...);
batch.Delete("company");

db->Write(write_option, &batch);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

我们肯定不希望读到company -> Google这个中间结果,而效果的产生就在于sequenceversions_记录了单调递增的sequence,对于相同 key,判断先后顺序依赖该数值。

写入时,sequence递增的更新到 memtable,但是一次性的记录到versions_:

  uint64_t last_sequence = versions_->LastSequence();//本次写入的SequenceNumber
  ...
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(updates);
    ...
    versions_->SetLastSequence(last_sequence);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

对于Get操作(参考本文 Get 一节),看到的 sequence 只有两种可能: 批量提交get要不再batch.Put(“company”, “Google”);之前或者batch.Delete(“company”);之后执行。同时一个writebatch里的操作应该是一次全部写入的。

  1. <= last_sequence
  2. >= last_sequence + Count(updates)

因此读取时不会观察到中间状态。

3. WriteBatch

第一节介绍,写入的key/value数据,都记录到了WriteBatch,更具体的,记录到了:

  //rep_存储了所有Put/Delete接口传入的数据
  //按照一定格式记录了:sequence, count, 操作类型(Put or Delete),key/value的长度及key/value本身
  std::string rep_;  // See comment in write_batch.cc for the format of rep_
  • 1
  • 2
  • 3

rep_数据组织如下:一个writebatch可以包含多个操作

Write

4 Get

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/919264
推荐阅读
相关标签
  

闽ICP备14008679号