当前位置:   article > 正文

大白话解析LevelDB:SkipList(跳表)

大白话解析LevelDB:SkipList(跳表)

SkipList

什么是SkipList(跳表)

跳表是有序集合的一种实现, 本质是一个有序链表。

在c++的标准库中, 我们经常会用到std::setstd::map。这两个容器都是有序集合/有序字典, 但是它们的底层实现是RBTree(红黑树).
RBTree, SkipList, B+Tree都是有序集合的实现.

SkipList的增删改查的时间复杂度都是O(logn), 与RBTree相同, 但是SkipList的实现比RBTree简单得多.

为什么使用skiplist

SkipList(跳表)、Red-Black Tree(红黑树)和 B+ Tree 都是常用的数据结构,各自具有不同的特点和优势。以下是 SkipList 与 Red-Black Tree 和 B+ Tree 相比的一些优势:

SkipList 与 Red-Black Tree(红黑树)相比的优势:

  1. 实现简单

    • SkipList 的算法和数据结构比 Red-Black Tree 简单得多,容易理解和实现。对于一些应用场景来说,这使得 SkipList 更易于维护和调试。
  2. 并发程度更高

    • 在并发环境中,Skip List 通常更容易实现有效的锁定策略。由于其层级结构,可以更灵活地控制锁的粒度,从而在并发操作中表现更好。
  3. 顺序访问友好

    • SkipList 对顺序访问(例如范围查询)非常友好,因为它基于链表实现。

SkipList 与 B+ Tree 相比的优势:

  1. 实现简单

    • SkipList 在实现上通常比 B+ Tree 简单。B+ Tree 的节点分裂和合并操作相对复杂,而 SkipList 的插入和删除操作更直接。
  2. 内存使用少

    • 对于内存中的数据结构,Skip List 可能比 B+ Tree 更高效,因为它不需要预先分配固定大小的节点。Skip List 的节点大小可以根据实际需求动态变化。
  3. 写操作友好

    • SkipList 更适合动态数据集,其中数据项频繁插入和删除,因为 SkipList 不需要复杂的重平衡操作。

综合考虑:

当考虑使用 Skip List、Red-Black Tree 或 B+ Tree 时,应根据具体应用场景的需求来选择。

  • 适合使用SkipList: 写多读少的情况。
  • 适合使用B+Tree: 读多写少的情况。
  • 适合使用使用RBTree: 需要严格平衡的场景,以保证操最坏情况性能

SkipList的原理

以论文中的skiplist示意图为例, 我们举例说明skiplist的原理.

SkipList

从图中我们可以看到, skiplist的本质是个多层的有序链表, 每一层都是一条单链表, 并且上层是下层的子集.

SkipList查找操作

查找过程

首先我们来看一下查找操作, 假设我们现在要查找一个值为12的节点.

  1. 一共有4层链表, 从DummyHead的最高层DummyHead[3]开始, DummyHead[3]->nextNode6, 比12小, 所以将当前指针指向Node6[3].
  2. 继续再看Node6[3]->next, 为Null, Null视为无限大节点, 比12大, 所以我们不能将当前指针移向Node6[3]->next. 此时需要降一层, 来看下一层的Node6[2]->next.
  3. Node6[2]->nextNode25, 比12大, 继续降一层.
  4. Node6[1]->nextNode9, 比12小了, 可以往后移了, 将当前指针移向Node9[1].
  5. Node9[1]->nextNode25, 比12大, 不能往后移, 降一层.
  6. Node9[0]->nextNode12, 找到了, 返回Node12.

查找开销

我们可以看到, 在SkipList中查找一个元素的过层, 是一个近似二分查找的过程. 那具体的时间复杂度是多少呢?

决定SkipList查找性能的因子有两个:

  • 最大层高h. h代表了SkipList的最大层高, h越大, 性能越好, 空间开销越大.
  • 概率因子p. p表示上层节点数与下层节点数的比例, p越接近0.5, 性能越好. 当p=0.5时, 就是二分查找.

SkipList的平均查找时间复杂度是 O ( log ⁡ 1 / p n ) O(\log_{1/p} n) O(log1/pn)。其中 p p p表示上层节点数与下层节点数的比例, 取值区间[0,1], n n n为节点总数。

h h h(最大层高)的影响
  • 性能: h h h越大,意味着SkipList的层数越多,能够加快查找速度。
  • 空间: 增加 h h h会导致SkipList的内存开销增加,因为需要更多的指针来维持更多的层。
p p p(概率因子)的影响
  • 性能: p p p决定了节点在上层出现的概率。 p p p值越大,每一层的节点数越多, p p p越接近0.5,SkipList的性能越好。
  • 空间: p p p值越小,SkipList内存开销越小。

选择合适的 h h h p p p取决于特定应用的性能要求和可用空间。通常, h h h会选择为 log ⁡ 1 / p n \log_{1/p} n log1/pn或稍微高一点,而 p p p的常见值为0.5,但可以根据具体的使用场景进行调整。

时间复杂度推导过程

要推导跳表(Skip List)查找操作的时间复杂度,我们首先需要理解跳表的基本属性和结构。给定跳表的高度为 h h h,随机因子为 p p p(表示节点在更高层出现的概率,即高层节点数量是低层节点数量的 1 p \frac{1}{p} p1),和节点总数为 n n n,我们可以按照以下步骤进行推导:

1. 层级与节点分布
由于每个节点在每一层出现的概率是 p p p,因此第 i i i 层大约有 n ⋅ p i n \cdot p^i npi 个节点(假设底层为第 0 层,有 n n n 个节点)。

2. 跨越节点的步数
在每一层,我们期望在转到下一层之前进行大约 1 p \frac{1}{p} p1 步跳跃,因为每个节点都有 p p p 的概率出现在下一层。

3. 层级高度的确定
高度 h h h 可以根据 n n n 来确定,以保证跳表的效率。一般来说,高度 h h h 大约是 log ⁡ 1 p n \log_{\frac{1}{p}} n logp1n

4. 查找操作的步骤分析
查找操作从最顶层开始,然后在每一层大约进行 1 p \frac{1}{p} p1 步跳跃,然后下降到下一层继续搜索。

5. 时间复杂度推导
由于在每一层我们都会进行大约 1 p \frac{1}{p} p1 步操作,且总共有 h h h 层,所以总的步骤数是 h ⋅ 1 p h \cdot \frac{1}{p} hp1。将 h h h 替换为 log ⁡ 1 p n \log_{\frac{1}{p}} n logp1n,我们得到总步骤数为 1 p ⋅ log ⁡ 1 p n \frac{1}{p} \cdot \log_{\frac{1}{p}} n p1logp1n

6. 最终时间复杂度
因此,跳表的查找操作的时间复杂度是 O ( 1 p ⋅ log ⁡ 1 p n ) O\left(\frac{1}{p} \cdot \log_{\frac{1}{p}} n\right) O(p1logp1n)。考虑到 log ⁡ 1 p n \log_{\frac{1}{p}} n logp1n log ⁡ n \log n logn 的常数倍,我们通常将时间复杂度简化为 O ( log ⁡ n ) O(\log n) O(logn)

总结
跳表的查找操作时间复杂度的推导基于层级结构和随机层级分配的概率性。每层的跳跃步数乘以层的总数给出了整个查找操作的总步骤数,从而得到了查找操作的时间复杂度。

SkipList插入操作

插入过程

理解SkipList的查找节点操作后, 添加一个节点就很简单了.

找到节点的插入位置, 具体的说, 找到新增节点的前驱节点, 即最大的小于等于新增节点的节点.

比如对于图中的SkipList说, 新增节点17的前驱节点就是12.
SkipList
找到前驱节点后, 将构造好的新增节点插入到前驱节点后面即可, 这和单层有序链表的插入操作是一样的, 只不过有很多个next指针而已.

重点在于构造新节点的时候, 如何确定新节点的层数? 这里就是SkipList的精髓所在了.

SkipList中确定新节点层数的过程通常是随机化的,基于一定的概率因子 p(通常取值为0.5或其他小于1的数).

确定新节点层数的过程:

  1. 开始于最低层:新节点至少会出现在第一层, 也就是从第一层开始抛硬币.
  2. 抛硬币:对于每一层,你可以想象自己在做一个“抛硬币”的实验,如果硬币正面朝上(这个事件发生的概率为 p),新节点的层数就会+1.
  3. 重复抛硬币直到失败:重复这个随机过程直到硬币背面朝上为止(即事件发生的概率为 1 - p), 此时抛硬币过程结束, 新节点层数计算完成.
  4. 限制最大层数SkipList通常有一个预设的最大层数 MaxLevel,节点的层数不会超过这个值.若抛硬币一直抛到第 MaxLevel 层,还是正面朝上,也不会再继续增加层数了.

插入开销

插入节点的开销 = 查找节点的开销 ( O ( log ⁡ 1 / p n ) ) + 抛硬币的开销 ( O ( log ⁡ 1 / p n ) ) + 将节点插入链表的开销 ( O ( log ⁡ 1 / p n ) ) = O ( log ⁡ 1 / p n ) 插入节点的开销 = 查找节点的开销(O(\log_{1/p} n)) + 抛硬币的开销(O(\log_{1/p} n)) + 将节点插入链表的开销(O(\log_{1/p} n)) = O(\log_{1/p} n) 插入节点的开销=查找节点的开销(O(log1/pn))+抛硬币的开销(O(log1/pn))+将节点插入链表的开销(O(log1/pn))=O(log1/pn)

SkipList删除操作

删除过程

找到前驱节点, 将前驱节点的next指针指向待删除节点的next指针即可, 思路和单链表的删除操作是一样的.

删除开销

于插入开销一样, O ( log ⁡ 1 / p n ) ) O(\log_{1/p} n)) O(log1/pn))

LevelDB中SkipList的实现

老规矩, 先看一下LevelDBSkipList的定义:

template <typename Key, class Comparator>
class SkipList {
   private:
    // 先声明一下Node, 把Node的定义放在后面
    // 以保证代码的简洁
    struct Node; 

   public:
    // cmp用于比较两个key的大小
    // skiplist所需的内存都是从arena中分配的
    explicit SkipList(Comparator cmp, Arena* arena);

    // 由于SkipList的内存都是从arena中分配的, 
    // 所以与MemTable一样, 禁止拷贝
    SkipList(const SkipList&) = delete;
    SkipList& operator=(const SkipList&) = delete;

    // 插入节点
    void Insert(const Key& key);

    // 判断节点是否存在
    bool Contains(const Key& key) const;

   private:
    // 经验值
    enum { kMaxHeight = 12 };
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

LevelDB中的SkipList非常简单, 只InsertContains两个接口.

为什么没有Delete接口呢? 因为LevelDB中的SkipList只用于MemTable的实现, 而Delete操作就是Insert一个软删除记录.

至于SkipList为什么将kMaxHeight设置为12, 与性能和空间的权衡有关.

SkipList的最大高度为什么定义为12

SkipList的最大高度为h, 概率因子为p,

则在不影响性能的情况下, 最大可容的节点数n的计算公式为:

( 1 / p ) h − 1 (1/p)^{h-1} (1/p)h1

LevelDB中, p的值为1/4, h的值为12, 则最大可容的节点数n ( 1 / ( 1 / 4 ) ) 12 − 1 = 4 11 = 4 , 194 , 304 (1/(1/4))^{12-1} = 4^{11} = 4,194,304 (1/(1/4))121=411=4,194,304.

也就是说, 高度为12SkipList最大可容纳400万条记录, 只有超过400万规模, 才需要增加高度以保证性能不下降.
SkipList的数据规模在400万以内时, 增加高度只会增加空间开销, 而不会提升性能.

而对于大多数的业务场景, 400万的规模已经足够大了, 徒增高度h只会带来没必要的空间开销.

当然, 这是基于节点分布按概率分布均匀的情况, 实际情况中, 节点分布并不是均匀的, 这就需要增加高度h以保证性能.
在实际的测试中, 当h的值小于等于12时, 增加h会提升性能, 但当大于12时, 性能的提升就不明显了.

SkipList::Insert的实现

SkipList的实现挺有意思的, LevelDB是一个key-value DB,但是SkipList
类中只定义了key, 而没有定义value。这是为什么?

因为LevelDB直接将User KeyUser Value打包成了一个更大的Key, 再塞到
SkipList中。

template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
    // prev是待插入节点的前驱节点
    // 将prev声明为kMaxHeight层, 多出来的不用
    Node* prev[kMaxHeight];

    // 找到前驱节点
    Node* x = FindGreaterOrEqual(key, prev);

    // 如果发现key已经存在于SkipList中了, 那是有问题的.
    // 因为key = sequence + key + value.
    // 就算key相同, sequence是全局递增的, 不会重复
    // 使用assert是为了在debug模式下与ut中测试, 
    // 但是在release模式中, 会被编译器优化掉, 不生效,
    // 同时也增加了可读性.
    assert(x == nullptr || !Equal(key, x->key));

    // 给新节点按概率随机生成一个层高
    int height = RandomHeight();

    // 如果新节点的层高比SkipList的当前层高还要大, 那么就需要做些更新
    if (height > GetMaxHeight()) {
        // 假设SkipList的当前层高是4, 新节点的层高是6, 
        // 那么第5层和第6层的前驱节点都是head(DummyHead)
        for (int i = GetMaxHeight(); i < height; i++) {
            prev[i] = head_;
        }

        // 原子更新SkipList的当前层高
        max_height_.store(height, std::memory_order_relaxed);
    }

    // 创建新节点
    x = NewNode(key, height);

    // 借助前驱节点prev将新节点插入到SkipList中
    for (int i = 0; i < height; i++) {
        // NoBarrier_SetNext()使用的是std::memory_order_relaxed.
        // SetNext使用的是std::memory_order_release.
        // 之所以使用NoBarrier_SetNext是因为后面还有个std::memory_order_release,
        // 保证x->NoBarrier_SetNext不会重排到prev[i]->SetNext之后.
        // 后面会详细讲解内存屏障与指令重排的关系.
        x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
        prev[i]->SetNext(i, x);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

FindGreaterOrEqual(key, prev)的实现我们也来看一下, 就是前面所说的查找节点的过程

/* 
 * 很好的一个设计,在查找的过程中记录一些其他接口所需的信息,最大可能地进行代码复用。
 * 接口设计的很好, 当传入的prev不为null时, 会将每一层的前驱节点都记录下来,
 * 便于代码复用.
 */
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
                                              Node** prev) const {
    // x为查找目标节点
    Node* x = head_;

    // index是从0开始的,所以需要减去1
    int level = GetMaxHeight() - 1;
    while (true) {

        // 获取当前level层的下一个节点
        Node* next = x->Next(level);

        // KeyIsAfterNode实际上就是使用 Compactor 比较 Key 和 next->key
        //      key > next->key:  return true
        //      key <= next->key: return false
        if (KeyIsAfterNode(key, next)) {
            // 待查找节点比next->key
            // 还要大的,那么就继续在同一层向后查找
            x = next;
        } else {
            // 当前待查找节点比next-key小,
            // 需要往下一层查找.

            // prev 数组主要记录的就是每一层的 prev
            // 节点,主要用于插入和删除时使用
            if (prev != nullptr) prev[level] = x;

            // 如果当前层已经是最底层了,没法再往下查找了,
            // 则返回当前节点
            if (level == 0) {
                return next;
            } else {
                // 还没到最底层, 继续往下一层查找
                level--;
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

SkipList::Insert里最重要的是RandomHeight()的实现, 用于随机生成新节点的层高.
如果RandomHeight()的实现不符合预期的概率分布, SkipList的性能就会大受影响.

template <typename Key, class Comparator>
int SkipList<Key, Comparator>::RandomHeight() {
    // 概率因子p = 1/kBranching = 1/4.
    static const unsigned int kBranching = 4;

    // 从1开始抛硬币
    int height = 1;
    // (rnd_.Next() % kBranching) == 0
    // 这个条件限制了上层的节点数量为下层节点数量的 1/4
    while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) {
        // rnd_.Next()生成一个随机数,
        // rnd_.Next() % 4的意思是, 生成一个0~3的随机数,
        // 0,1,2,3的概率都是1/4.
        // 所以(rnd_.Next() % 4) == 0成立的概率是1/4.
        // 也就是说每次抛硬币都有1/4的概率层高+1.
        // 所以LevelDB的SkipList里, 概率因子是1/4.
        height++;
    }
    
    // 生成的height必须在[1, kMaxHeight]之间
    assert(height > 0);
    assert(height <= kMaxHeight);

    return height;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

RandomHeight()完美的模拟了抛硬币的过程, 我为了验证RandomHeight()的概率分布是否符合预期,
写了个UT测试了一下, 误差在1%以内.

typedef uint64_t Key;

struct Comparator {
    int operator()(const Key& a, const Key& b) const {
        if (a < b) {
            return -1;
        } else if (a > b) {
            return +1;
        } else {
            return 0;
        }
    }
};

TEST(SkipTest, RandomHeightProbabilityDistribution) {
    Arena arena;
    Comparator cmp;
    SkipList<Key, Comparator> list(cmp, &arena);

    std::unordered_map<int, int> height_counts;
    const int num_samples = 1000000; // Number of samples for the test

    for (int i = 0; i < num_samples; ++i) {
        int height = list.RandomHeight();
        height_counts[height]++;
    }

    // 误差限制在1%. 
    const double tolerance = 0.01; // 1% tolerance

    // 层高为1的概率为0.75.
    // 计算过程: 
    //      概率因子为0.25, 也就是抛硬币正面朝上的概率为0.25, 反面朝上的概率为0.75;
    //      当抛硬币结果是正面朝上时, 可以将层高加1, 再继续抛硬币.
    //      从第1层开始抛硬币, 若需要层高为1,
    //      则第一次抛硬币的结果就应该是方面朝上, 概率为0.75;
    double expected_probability = 0.75; 
    for (int i = 1; i <= 12; ++i) {
        // 计算层高为i的概率
        double actual_probability = static_cast<double>(height_counts[i]) / num_samples;
        // 比较实际概率和理论概率, 误差不能超过1%
        EXPECT_NEAR(expected_probability, actual_probability, tolerance);
        // 更新+1层的预期概率, 在当前层概率的基础上乘以概率因子(0.25)
        expected_probability *= 0.25;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

有兴趣测试的同学可以将这段代码拷贝到LevelDBskiplist_test.cc中, 但是编译会错误, 因为SkipList::RandomHeight()private属性, 在CMakeLists.txt里找到if(LEVELDB_BUILD_TESTS)所在处, 添加set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-access-control"), 就可以访问privateprotected属性的成员了, 如下所示:

if(LEVELDB_BUILD_TESTS)
  enable_testing()
  # ...

  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-access-control")

  # ...
endif(LEVELDB_BUILD_TESTS)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

SkipList::Contains的实现

看过SkipList::Insert之后, SkipList::Contains的实现就很简单了.
通过FindGreaterOrEqual第一个大于等于key的节点, 再判断一下key是否相等.

template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::Contains(const Key& key) const {
    Node* x = FindGreaterOrEqual(key, nullptr);
    if (x != nullptr && Equal(key, x->key)) {
        return true;
    } else {
        return false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

SkipList::Node的实现

至此, SkipList的插入和查找实现就分析完了, 现在我们来看下SkipListNode的实现。

我们先通过示意图看一下SkipList::Node的样子:

+-------------------+
| Key: key          |
+-------------------+
| Node*: next_[0]   |
+-------------------+
| Node*: next_[1]   |
+-------------------+
| Node*: next_[2]   |
+-------------------+
| ...               |
+-------------------+
| Node*: next_[h-1] |
+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

其中h为该节点的层高.

对比一下我们平时熟悉的单链表节点:

+-------------------+
| Key: key          |
+-------------------+
| Node*: next_[0]   |
+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5

其实就是在单链表节点的基础上, 增加了next_[1]next_[h-1]这些指针而已.

那我们现在就来看下SkipList::Node的代码定义吧

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/876980
推荐阅读
相关标签