当前位置:   article > 正文

redis scan 命令底层原理(为什么会重复扫描?)_redis scan原理

redis scan原理


前言

本文参考源码版本为 redis 6.2

迭代器——可在容器(容器可为字典、链表等数据结构)上遍访的接口,设计人员无须关心容器的内容,调用迭代器固定的接口就可遍历数据,在很多高级语言中都有实现。

字典迭代器主要用于迭代字典这个数据结构中的数据,既然是迭代字典中的数据,必然会出现一个问题,迭代过程中,如果发生了数据增删,则可能导致字典触发 rehash 操作,或迭代开始时字典正在进行 rehash 操作,从而导致一条数据可能多次遍历到。

那 redis 如何解决这个问题呢?带着这个疑问,我们接下来一起看下迭代器的实现。


一、迭代器

我们先来看看 redis 中字典迭代器的数据结构

typedef struct dictIterator {
    dict *d;  // 迭代的字典
    long index;  // 当前迭代的hash表索引值(hash槽)
    int table, safe; // table 表示当前迭代的hash表,ht[0] or ht[1];table 表示当前迭代器是否为安全迭代器
    dictEntry *entry, *nextEntry; // 当前节点、下个节点
    /* unsafe iterator fingerprint for misuse detection. */
    long long fingerprint; // 指纹,当字典没有改变时,指纹不变;当字典值发生改变,则指纹也随之改变
} dictIterator;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

整个数据结构占用了 48 字节,其中:

  • d 字段指向需要迭代的字典;
  • index 字段代表当前读取到 hash 表中哪个索引值;
  • table 字段表示当前正在迭代的 hash 表(即 ht[0] 与 ht[1] 中的 0 和 1);
  • safe 字段表示当前创建的迭代器是否为安全模式;
  • entry 字段表示正在读取的节点数据;
  • nextEntry 字段表示 entry 节点中的 next 字段所指向的数据。

fingerprint 字段是一个 64 位的整数,表示在给定时间内字典的状态。在这里称其为字典的指纹,因为该字段的值为字典(dict结构体)中所有字段值组合在一起生成的 hash 值,所以当字典中数据发生任何变化时,其值都会不同,生成算法不做过多解读,读者可参见源码 dict.c文件中的 dictFingerprint 函数。

为了让迭代过程变得简单,redis 也提供了迭代相关的 API 函数,主要为:

dictIterator *dictGetIterator(dict *d);
dictIterator *dictGetSafeIterator(dict *d);
dictEntry *dictNext(dictIterator *iter);
void dictReleaseIterator(dictIterator *iter);
  • 1
  • 2
  • 3
  • 4

我们把迭代器遍历数据分为两类:

  • 普通迭代器,只遍历数据;
  • 安全迭代器,遍历的同时删除数据。

1)普通迭代器

普通迭代器迭代字典中数据时,会对迭代器中 fingerprint 字段的值作严格的校验,来保证迭代过程中字典结构不发生任何变化,确保读取出的数据不出现重复

当 redis 执行部分命令时会使用普通迭代器迭代字典数据,例如 sort 命令。sort 命令主要作用是对给定列表、集合、有序集合的元素进行排序,如果给定的是有序集合,其成员名存储用的是字典,分值存储用的是跳跃表,则执行 sort 命令读取数据的时候会用到迭代器来遍历整个字典。

普通迭代器迭代数据的过程比较简单,主要分为如下几个步骤。

  • 调用 dictGetIterator 函数初始化一个普通迭代器,此时会把 iter->safe 值置为 0,表示初始化的迭代器为普通迭代器。
  • 循环调用 dictNext 函数依次遍历字典中 hash 表的节点,首次遍历时会通过 dictFingerprint 函数拿到当前字典的指纹值。
  • 当调用 dictNext 函数遍历完字典 hash 表中节点数据后,释放迭代器时会继续调用dictFingerprint 函数计算字典的指纹值,并与首次拿到的指纹值比较,不相等则输出异常"=== ASSERTION FAILED ===",且退出程序执行。

普通迭代器通过步骤1、步骤3的指纹值对比,来限制整个迭代过程中只能进行迭代操作,即迭代过程中字典数据的修改、添加、删除、查找等操作都不能进行,只能调用dictNext函数迭代整个字典,否则就报异常,由此来保证迭代器取出数据的准确性

值得注意的是

指纹的计算,通过字典的几个属性进行一定规则运算,当字典有增删改等操作时,这些属性就会有变动,从而导致前后指纹不一致,直接抛出异常。

2)安全迭代器

安全迭代器和普通迭代器迭代数据原理类似,也是通过循环调用 dictNext 函数依次遍历字典中 hash 表的节点。

安全迭代器确保读取数据的准确性,不是通过限制字典的部分操作来实现的,而是通过限制rehash 的进行来确保数据的准确性,因此迭代过程中可以对字典进行增删改查等操作

在字典进行增删查改的时候,都会调用 _dictRehashStep 来尝试进行 rehash,源码为:

static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
}
  • 1
  • 2
  • 3

原理是,如果当前字典有安全迭代器运行,则不进行渐进式 rehash 操作,rehash 操作暂停,字典中数据就不会被重复遍历,由此确保了读取数据的准确性。

当 redis 执行部分命令时会使用安全迭代器迭代字典数据,例如 keys 命令。keys 命令主要作用是通过模式匹配,返回给定模式的所有 key 列表,遇到过期的键则会进行删除操作。redis 数据键值对都存储在字典中,因此 keys 命令会通过安全迭代器来遍历整个字典。安全迭代器整个迭代过程也较为简单,主要分如下几个步骤:

  • 第1步:调用 dictGetSafeIterator 函数初始化一个安全迭代器,此时会把 iter->safe 值置为1,表示初始化的迭代器为安全迭代器
  • 第2步:循环调用 dictNext 函数依次遍历字典中 hash 表的节点,首次遍历时会把字典中iterators 字段进行加 1 操作,确保迭代过程中渐进式 rehash 操作会被中断执行。
  • 第3步:当调用 dictNext 函数遍历完字典 hash 表中节点数据后,释放迭代器时会把字典中iterators 字段进行减 1 操作,确保迭代后渐进式 rehash 操作能正常进行。

安全迭代器是通过步骤1、步骤3中对字典的 iterators 字段进行修改,使得迭代过程中渐进式 rehash 操作被中断,由此来保证迭代器读取数据的准确性。

1. 全遍历

一次命令执行就遍历完整个数据库。比如执行 keys 命令进行一次数据库全遍历。

以上介绍的普通迭代器和安全迭代器都是一次性获取所有数据,由于 redis 单线程的原因,会造成一定的阻塞。

我们接下来看看,间断遍历如何在安全迭代器的基础上,解决这个阻塞问题。

2. 间断遍历

前文讲解了“全遍历”字典的实现,但有一个问题凸显出来,当数据库中有海量数据时,执行keys命令进行一次数据库全遍历,耗时肯定不短,会造成短暂的 redis 不可用,所以在 redis 在 2.8.0 版本后新增了 scan 操作,也就是“间断遍历”。

而 dictScan 是“间断遍历”中的一种实现,主要在迭代字典中数据时使用,例如 hscan 命令迭代整个数据库中的 key,以及 zscan 命令迭代有序集合所有成员与值时,都是通过 dictScan 函数来实现的字典遍历。dictScan 遍历字典过程中是可以进行 rehash 操作的,通过算法来保证所有的数据能被遍历到。

dictScan 函数间断遍历字典过程中会遇到如下 3 种情况。

  • 从迭代开始到结束,散列表没有进行 rehash 操作。
  • 从迭代开始到结束,散列表进行了扩容或缩容操作,且恰好为两次迭代间隔期间完成了 rehash 操作。
  • 从迭代开始到结束,某次或某几次迭代时散列表正在进行 rehash 操作。

总之,间断遍历的思想是每次命令执行只取部分数据,分多次遍历

二、scan 扫描原理

扫描算法的核心思想是:利用同模效应,减少扩容或者缩容过程中的 hash 槽重复扫描。

比如现在有,容量为8的 hash table,其遍历顺序为:
在这里插入图片描述
即 hash 槽下标遍历顺序:
在这里插入图片描述
从二进制变化来看,其实就是从左往右依次累加1,进位向右累计

而落实到具体实现上来看就是,将二进制反转后再进行常规+1,示例如下:

1 1 0 0  =>  
反转
0 0 1 1  => 
+1 操作
0 1 0 0  => 
再反转
0 0 1 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

也就是说,12 (1 1 0 0) 之后的下一个遍历槽位是 2 (0 0 1 0)。

1. 扫描算法:

1)反转、+1、再反转


        /* Set unmasked bits so incrementing the reversed cursor
         * operates on the masked bits */
        v |= ~m0;

        /* Increment the reverse cursor */
        v = rev(v);
        v++;
        v = rev(v);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2)反转算法

static unsigned long rev(unsigned long v) {
    unsigned long s = CHAR_BIT * sizeof(v); // bit size; must be power of 2
    unsigned long mask = ~0UL;
    while ((s >>= 1) > 0) {
        mask ^= (mask << s);
        v = ((v >> s) & mask) | ((v << s) & ~mask);
    }
    return v;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

此算法是直接引用 位反转算法,具体可以点击查看,我们只需了解原理即可,不必深究。

2. 减少重复扫描?

上文提到,dictScan 函数间断遍历字典过程中会遇到如下 3 种情况。

  • 从迭代开始到结束,散列表没有进行 rehash 操作。
  • 从迭代开始到结束,散列表进行了扩容或缩容操作,且恰好为两次迭代间隔期间完成了rehash 操作。
  • 从迭代开始到结束,某次或某几次迭代时散列表正在进行 rehash 操作。

第一种情况最容易理解,常规扫描完即可。后面两种情况较为复杂。

下面主要以第二种情况主要说明:

比如我现在已经取槽位 12 (1 1 0 0) 数据,准备下一个槽位 2 (0 0 1 0) 数;碰巧,在这两次操作之间,完成了一次 rehash 扩容。
  • 1

2.1 扩容

在这里插入图片描述
如上,我们即将遍历槽位6,此时,已经完成了一次扩容,容量从 8 扩容到 16;我们剩下待遍历的数据槽位有 6、1、5、3、7,原槽位上的数据在扩容后可能分散到两个槽位,比如槽位 6 的数据可能分散到新表的槽位 6、14。

在这里插入图片描述
对比两张图,可以发现,扩容一倍后,新表待遍历槽位都是我们目前还没有遍历的数据;换句话说,扩容操作不会重复遍历数据,同时也不会遗漏数据

2.2 缩容

我们以 hash table 容量由 16 缩容为 8 为例,如下图:
在这里插入图片描述
目前,我们刚遍历完槽位 6,即将遍历槽位 14,此时,完成了一次缩容 rehash;

由于槽位 14 已经超过目前缩容后的哈希表容量,所以会进行一次取模操作,即 14 % 8 = 6,因此,将读取槽位 6 上的数据;

值得注意的是,缩容后槽位 6 上会映射了原表槽位6、14的数据,因此,这里可能存在重复取原表槽位 6 的数据。

不过,庆幸的是,之后待扫描的数据都是没有处理过的,也就是不再有重复数据处理。

你可能已经注意到了,缩容操作可能引起合并槽位数据;但是,在原表处理的时候,两个可能合并的槽位都是连续处理的,这就意味着,在扫描的时候即使存在缩容操作,也最多重复扫描原表一个槽位的数据。将重复扫描的数据量降到了最低

简单总结下,缩容操作会重复扫描数据(最多重复扫描一个槽位),但不会遗漏数据

3. 迭代过程中正在进行rehash

从迭代开始到结束,某次或某几次迭代时散列表正在进行 rehash 操作,rehash 操作中会同时并存两个 hash 表:一张为扩容或缩容后的表 ht[1],一张为老表 ht[0], ht[0] 的数据通过渐进式 rehash 会逐步迁移到ht[1]中,最终完成整个迁移过程。

因为大小两表并存,所以需要从 ht[0] 和 ht[1] 中都取出数据,整个遍历过程为:先找到两个散列表中更小的表,先对小的 hash 表遍历,然后对大的 hash 表遍历,迭代的代码如下:

        t0 = &d->ht[0];
        t1 = &d->ht[1];
        //  确保始终先处理小表
        if (t0->size > t1->size) {
            t0 = &d->ht[1];
            t1 = &d->ht[0];
        }
        m0 = t0->sizemask;
        m1 = t1->sizemask;
        
        if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
        de = t0->table[v & m0];
        // 迭代第一张表
        while (de) { 
            next = de->next;
            fn(privdata, de);
            de = next;
        }

        do {
            if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);
            de = t1->table[v & m1];
            // 迭代第二张表
            while (de) {
                next = de->next;
                fn(privdata, de);
                de = next;
            }

            v |= ~m1;
            v = rev(v);
            v++;
            v = rev(v);

        } while (v & (m0 ^ m1));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

结合 rehash 中游标变更算法,为了让大家更好地理解该算法及整个迭代过程,举个例子简单讲解这种情况下迭代的顺序:假设 hash 表大小为8,扩容到 16,迭代从始至终每次迭代都在进行 rehash 操作,接下来两张表数据遍历次序如下图所示:
在这里插入图片描述
如果是缩容操作,其实和扩容操作遍历顺序是一样的,因为在处理的时候总是先处理小表,然后再处理大表。

值得注意的是,因为后台线程也会定期执行 rehash 操作,此过程会导致部分数据重复访问。

为啥会重复?

你想,一个用户线程和一个后台线程同时处理,比如,刚扫描完一个小表的 hash 槽,准备继续扫描对应大表的 hash 槽,此时,后台线程将一部分刚才扫描过的 hash 槽部分数据 rehash 到对应大表上,这种情况下,便会出现部分重复数据。

当然,这种同模的小-大表连续处理,间隙基本很短,因此,重复概率较小,影响不大。



来看看后台线程什么时候执行:

1)server.c#databasesCron

定时任务定期处理

   /* Rehash */
   if (server.activerehashing) {
       for (j = 0; j < dbs_per_call; j++) {
           int work_done = incrementallyRehash(rehash_db);
           if (work_done) {
               /* If the function did some work, stop here, we'll do
                * more at the next cron loop. */
               break;
           } else {
               /* If this db didn't need rehash, we'll try the next one. */
               rehash_db++;
               rehash_db %= server.dbnum;
           }
       }
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2)处理指定的 dbid

int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /* already used our millisecond for this loop... */
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this loop... */
    }
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3)最终,通过调用 dictRehashMilliseconds 执行 rehash 操作

可以指定 rehash 的时间,后台线程指定的是 1ms

int dictRehashMilliseconds(dict *d, int ms) {
    if (d->iterators > 0) return 0;

    long long start = timeInMilliseconds();
    int rehashes = 0;

    while(dictRehash(d,100)) {
        rehashes += 100;
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

4. 完整的 scan 逻辑

1) 一次 dictScan 调用会扫描一个 hash 槽数据

一般情况下确实如此,但是,当正在 rehash 时,小表扫描一个 hash 槽数据,而大表则扫描小表对应的多个 hash 槽数据。

unsigned long dictScan(dict *d,
                       unsigned long v,
                       dictScanFunction *fn,
                       dictScanBucketFunction* bucketfn,
                       void *privdata)
{
    dictht *t0, *t1;
    const dictEntry *de, *next;
    unsigned long m0, m1;
    if (dictSize(d) == 0) return 0;

    d->iterators++; // 安全迭代器,当次遍历结束后会-1
    
    if (!dictIsRehashing(d)) { // 如果 rehash 已经完成,也就是只有 ht[0] 有数据
        t0 = &(d->ht[0]);
        m0 = t0->sizemask;
        if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
        de = t0->table[v & m0]; // v & m0 是为了避免缩容后超出hash表最大值
        while (de) { // 扫描此 hash 槽上的所有数据
            next = de->next;
            fn(privdata, de);
            de = next;
        }

        v |= ~m0;

        // 二进制反转并+1
        v = rev(v);
        v++;
        v = rev(v);

    } else { // 如果正在 rehash,则需要处理两张表 ht[0]、ht[1]的数据
        t0 = &d->ht[0];
        t1 = &d->ht[1];
        // 确保 t0 是小表
        if (t0->size > t1->size) {
            t0 = &d->ht[1];
            t1 = &d->ht[0];
        }

        m0 = t0->sizemask;
        m1 = t1->sizemask;

        if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
        de = t0->table[v & m0];
        while (de) { // 处理小表
            next = de->next;
            fn(privdata, de);
            de = next;
        }
        
        // 循环处理,处理大表,比如,size = 4 的 hash 表,当前槽位 1
        // 需要处理对应扩容一倍后,槽位 1、5
        do {
            if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);
            de = t1->table[v & m1];
            while (de) {
                next = de->next;
                fn(privdata, de);
                de = next;
            }
            
            v |= ~m1;
            v = rev(v);
            v++;
            v = rev(v);
        } while (v & (m0 ^ m1));
    }
    
    d->iterators--; // 对应前面 ++ 操作

    return v;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 变量 d 是当前迭代的字典;
  • 变量 v 标识迭代开始的游标(即 hash 表中数组索引,也就是我们常说的槽位),每次遍历后会返回新的游标值,整个遍历过程都是围绕这个游标值的改动进行,来保证所有的数据能被遍历到;
  • fn 是函数指针,每遍历一个节点则调用该函数处理;
  • bucketfn 函数在整理碎片时调用;
  • privdata 是回调函数 fn 所需参数

2)db.c#scanGenericCommand

客户端命令入口解析:

redis 127.0.0.1:6379> scan 0 COUNT 5
  • 1

即,扫描游标从 0 开始的 5 条数据;不过,这里并不是严格意义上只返回 5 条数据,因为每一次扫描都要处理整个槽位的数据,因此是有可能多的。

具体是通过下面代码片段调用 dictScan:

    if (ht) {
        void *privdata[2];
        // count 为 scan 命令传入的值,当hash表处于病态时(例如大部分节点为空),最大迭代次数为 10 * count
        long maxiterations = count*10;

        // list *keys = listCreate(); 创建新的 list 用来返回 keys
        privdata[0] = keys;
        // o != null 则为 Hash, Set or Zset object,反之为 NULL 则表示处理当前 db
        privdata[1] = o;
        do {
            cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
        } while (cursor && maxiterations-- && listLength(keys) < (unsigned long)count);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

可见,do {} while() 语句中,通过循环调用 dictScan 来满足需要的 count 数量;也正因为如此,如果客户端传入的 count 过大,将会长时间阻塞;

由于 redis 是单线程处理,这种阻塞会影响其他命令处理,因此,需尽量避免 count 过大。


总结

本文先后介绍了 redis 的普通和安全两种迭代器:

  • 普通迭代器在迭代过程中不能进行增删改操作
  • 安全迭代器在迭代过程中可以进行增删改操作,但最初只能一次性迭代完整个字典,可能造成阻塞
  • 在安全迭代器基础上,演变出了间断遍历(scan 命令),解决了这种长时间阻塞问题。

关于间断遍历原理,文中已经详细描述,此处不在赘述。




参考文献:

Finally Redis collections are iterable(Antirez)
Add SCAN command #579
Fix dictScan(): It can’t scan all buckets when dict is shrinking. #4907
《Redis 设计与源码分析》【陈雷】

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/658609
推荐阅读
相关标签
  

闽ICP备14008679号