赞
踩
本文参考源码版本为 redis 6.2
迭代器
——可在容器(容器可为字典、链表等数据结构)上遍访的接口,设计人员无须关心容器的内容,调用迭代器固定的接口就可遍历数据,在很多高级语言中都有实现。
字典迭代器主要用于迭代字典这个数据结构中的数据,既然是迭代字典中的数据,必然会出现一个问题,迭代过程中,如果发生了数据增删
,则可能导致字典触发 rehash
操作,或迭代开始时字典正在进行 rehash 操作,从而导致一条数据可能多次遍历
到。
那 redis 如何解决这个问题呢?带着这个疑问,我们接下来一起看下迭代器的实现。
我们先来看看 redis 中字典迭代器的数据结构
typedef struct dictIterator {
dict *d; // 迭代的字典
long index; // 当前迭代的hash表索引值(hash槽)
int table, safe; // table 表示当前迭代的hash表,ht[0] or ht[1];table 表示当前迭代器是否为安全迭代器
dictEntry *entry, *nextEntry; // 当前节点、下个节点
/* unsafe iterator fingerprint for misuse detection. */
long long fingerprint; // 指纹,当字典没有改变时,指纹不变;当字典值发生改变,则指纹也随之改变
} dictIterator;
整个数据结构占用了 48 字节
,其中:
fingerprint 字段是一个 64 位的整数,表示在给定时间内字典的状态
。在这里称其为字典的指纹,因为该字段的值为字典(dict结构体)中所有字段值组合在一起生成的 hash 值,所以当字典中数据发生任何变化时,其值都会不同,生成算法不做过多解读,读者可参见源码 dict.c
文件中的 dictFingerprint
函数。
为了让迭代过程变得简单,redis 也提供了迭代相关的 API 函数,主要为:
dictIterator *dictGetIterator(dict *d);
dictIterator *dictGetSafeIterator(dict *d);
dictEntry *dictNext(dictIterator *iter);
void dictReleaseIterator(dictIterator *iter);
我们把迭代器遍历数据分为两类:
1)普通迭代器
普通迭代器迭代字典中数据时,会对迭代器中 fingerprint 字段的值作严格的校验,来保证迭代过程中字典结构不发生任何变化,确保读取出的数据不出现重复
。
当 redis 执行部分命令时会使用普通迭代器迭代字典数据,例如 sort 命令。sort 命令主要作用是对给定列表、集合、有序集合的元素进行排序,如果给定的是有序集合,其成员名存储用的是字典,分值存储用的是跳跃表,则执行 sort 命令读取数据的时候会用到迭代器来遍历整个字典。
普通迭代器迭代数据的过程比较简单,主要分为如下几个步骤。
普通迭代器通过步骤1、步骤3的指纹值对比,来限制整个迭代过程中只能进行迭代操作,即迭代过程中字典数据的修改、添加、删除、查找等操作都不能进行,只能调用dictNext函数迭代整个字典,否则就报异常,由此来保证迭代器取出数据的准确性。
值得注意的是
:
指纹的计算,通过字典的几个属性进行一定规则运算,当字典有增删改等操作时,这些属性就会有变动,从而导致前后指纹不一致,直接抛出异常。
2)安全迭代器
安全迭代器和普通迭代器迭代数据原理类似,也是通过循环调用 dictNext 函数依次遍历字典中 hash 表的节点。
安全迭代器确保读取数据的准确性,不是通过限制字典的部分操作来实现的,而是通过限制rehash 的进行来确保数据的准确性,因此迭代过程中可以对字典进行增删改查等操作
。
在字典进行增删查改的时候,都会调用 _dictRehashStep
来尝试进行 rehash,源码为:
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}
原理是,如果当前字典有安全迭代器运行,则不进行渐进式 rehash 操作,rehash 操作暂停,字典中数据就不会被重复遍历,由此确保了读取数据的准确性。
当 redis 执行部分命令时会使用安全迭代器迭代字典数据,例如 keys 命令。keys 命令主要作用是通过模式匹配,返回给定模式的所有 key 列表,遇到过期的键则会进行删除操作。redis 数据键值对都存储在字典中,因此 keys 命令会通过安全迭代器来遍历整个字典。安全迭代器整个迭代过程也较为简单,主要分如下几个步骤:
首次遍历时会把字典中iterators 字段进行加 1 操作
,确保迭代过程中渐进式 rehash 操作会被中断执行。释放迭代器时会把字典中iterators 字段进行减 1 操作
,确保迭代后渐进式 rehash 操作能正常进行。安全迭代器是通过步骤1、步骤3中对字典的 iterators 字段进行修改,使得迭代过程中渐进式 rehash 操作被中断,由此来保证迭代器读取数据的准确性。
一次命令执行就遍历完整个数据库。比如执行 keys 命令进行一次数据库全遍历。
以上介绍的普通迭代器和安全迭代器都是一次性获取所有数据,由于 redis 单线程的原因,会造成一定的阻塞。
我们接下来看看,间断遍历如何在安全迭代器的基础上,解决这个阻塞问题。
前文讲解了“全遍历”字典的实现,但有一个问题凸显出来,当数据库中有海量数据时,执行keys命令进行一次数据库全遍历,耗时肯定不短,会造成短暂的 redis 不可用
,所以在 redis 在 2.8.0 版本后新增了 scan
操作,也就是“间断遍历”。
而 dictScan 是“间断遍历”中的一种实现,主要在迭代字典中数据时使用,例如 hscan 命令迭代整个数据库中的 key,以及 zscan 命令迭代有序集合所有成员与值时,都是通过 dictScan 函数来实现的字典遍历。dictScan 遍历字典过程中是可以进行 rehash 操作的,通过算法来保证所有的数据能被遍历到。
dictScan 函数间断遍历字典过程中会遇到如下 3 种情况。
总之,间断遍历的思想是每次命令执行只取部分数据,分多次遍历
。
扫描算法的核心思想是:利用同模效应,减少扩容或者缩容过程中的 hash 槽重复扫描。
比如现在有,容量为8的 hash table,其遍历顺序为:
即 hash 槽下标遍历顺序:
从二进制变化来看,其实就是从左往右依次累加1,进位向右累计
。
而落实到具体实现上来看就是,将二进制反转后再进行常规+1,示例如下:
1 1 0 0 =>
反转
0 0 1 1 =>
+1 操作
0 1 0 0 =>
再反转
0 0 1 0
也就是说,12 (1 1 0 0) 之后的下一个遍历槽位是 2 (0 0 1 0)。
1)反转、+1、再反转
/* Set unmasked bits so incrementing the reversed cursor
* operates on the masked bits */
v |= ~m0;
/* Increment the reverse cursor */
v = rev(v);
v++;
v = rev(v);
2)反转算法
static unsigned long rev(unsigned long v) {
unsigned long s = CHAR_BIT * sizeof(v); // bit size; must be power of 2
unsigned long mask = ~0UL;
while ((s >>= 1) > 0) {
mask ^= (mask << s);
v = ((v >> s) & mask) | ((v << s) & ~mask);
}
return v;
}
此算法是直接引用 位反转算法
,具体可以点击查看,我们只需了解原理即可,不必深究。
上文提到,dictScan 函数间断遍历字典过程中会遇到如下 3 种情况。
第一种情况最容易理解,常规扫描完即可。后面两种情况较为复杂。
下面主要以第二种情况主要说明:
比如我现在已经取槽位 12 (1 1 0 0) 数据,准备下一个槽位 2 (0 0 1 0) 数;碰巧,在这两次操作之间,完成了一次 rehash 扩容。
如上,我们即将遍历槽位6,此时,已经完成了一次扩容,容量从 8 扩容到 16;我们剩下待遍历的数据槽位有 6、1、5、3、7,原槽位上的数据在扩容后可能分散到两个槽位,比如槽位 6 的数据可能分散到新表的槽位 6、14。
对比两张图,可以发现,扩容一倍后,新表待遍历槽位都是我们目前还没有遍历的数据;换句话说,扩容操作不会重复遍历数据,同时也不会遗漏数据
。
我们以 hash table 容量由 16 缩容为 8 为例,如下图:
目前,我们刚遍历完槽位 6,即将遍历槽位 14,此时,完成了一次缩容 rehash;
由于槽位 14 已经超过目前缩容后的哈希表容量,所以会进行一次取模操作,即 14 % 8 = 6,因此,将读取槽位 6 上的数据;
值得注意的是,缩容后槽位 6 上会映射了原表槽位6、14的数据,因此,这里可能存在重复取原表槽位 6 的数据。
不过,庆幸的是,之后待扫描的数据都是没有处理过的,也就是不再有重复数据处理。
你可能已经注意到了,缩容操作可能引起合并槽位数据;但是,在原表处理的时候,两个可能合并的槽位都是连续处理的,这就意味着,在扫描的时候即使存在缩容操作,也最多重复扫描原表一个槽位的数据。将重复扫描的数据量降到了最低。
简单总结下,缩容操作会重复扫描数据(最多重复扫描一个槽位),但不会遗漏数据
。
从迭代开始到结束,某次或某几次迭代时散列表正在进行 rehash 操作,rehash 操作中会同时并存两个 hash 表:一张为扩容或缩容后的表 ht[1],一张为老表 ht[0], ht[0] 的数据通过渐进式 rehash 会逐步迁移到ht[1]中,最终完成整个迁移过程。
因为大小两表并存,所以需要从 ht[0] 和 ht[1] 中都取出数据,整个遍历过程为:先找到两个散列表中更小的表,先对小的 hash 表遍历,然后对大的 hash 表遍历
,迭代的代码如下:
t0 = &d->ht[0]; t1 = &d->ht[1]; // 确保始终先处理小表 if (t0->size > t1->size) { t0 = &d->ht[1]; t1 = &d->ht[0]; } m0 = t0->sizemask; m1 = t1->sizemask; if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; // 迭代第一张表 while (de) { next = de->next; fn(privdata, de); de = next; } do { if (bucketfn) bucketfn(privdata, &t1->table[v & m1]); de = t1->table[v & m1]; // 迭代第二张表 while (de) { next = de->next; fn(privdata, de); de = next; } v |= ~m1; v = rev(v); v++; v = rev(v); } while (v & (m0 ^ m1));
结合 rehash 中游标变更算法,为了让大家更好地理解该算法及整个迭代过程,举个例子简单讲解这种情况下迭代的顺序:假设 hash 表大小为8,扩容到 16,迭代从始至终每次迭代都在进行 rehash 操作,接下来两张表数据遍历次序如下图所示:
如果是缩容操作,其实和扩容操作遍历顺序是一样的,因为在处理的时候总是先处理小表,然后再处理大表。
值得注意的是,因为后台线程也会定期执行 rehash 操作,此过程会导致部分数据重复访问。
为啥会重复?
你想,一个用户线程和一个后台线程同时处理,比如,刚扫描完一个小表的 hash 槽,准备继续扫描对应大表的 hash 槽,此时,后台线程将一部分刚才扫描过的 hash 槽部分数据 rehash 到对应大表上,这种情况下,便会出现部分重复数据。
当然,这种同模的小-大表连续处理,间隙基本很短,因此,重复概率较小,影响不大。
来看看后台线程什么时候执行:
1)server.c#databasesCron
定时任务定期处理
/* Rehash */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
2)处理指定的 dbid
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
3)最终,通过调用 dictRehashMilliseconds 执行 rehash 操作
可以指定 rehash 的时间,后台线程指定的是 1ms
int dictRehashMilliseconds(dict *d, int ms) {
if (d->iterators > 0) return 0;
long long start = timeInMilliseconds();
int rehashes = 0;
while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
1) 一次 dictScan 调用会扫描一个 hash 槽数据
一般情况下确实如此,但是,当正在 rehash 时,小表扫描一个 hash 槽数据,而大表则扫描小表对应的多个 hash 槽数据。
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction* bucketfn, void *privdata) { dictht *t0, *t1; const dictEntry *de, *next; unsigned long m0, m1; if (dictSize(d) == 0) return 0; d->iterators++; // 安全迭代器,当次遍历结束后会-1 if (!dictIsRehashing(d)) { // 如果 rehash 已经完成,也就是只有 ht[0] 有数据 t0 = &(d->ht[0]); m0 = t0->sizemask; if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; // v & m0 是为了避免缩容后超出hash表最大值 while (de) { // 扫描此 hash 槽上的所有数据 next = de->next; fn(privdata, de); de = next; } v |= ~m0; // 二进制反转并+1 v = rev(v); v++; v = rev(v); } else { // 如果正在 rehash,则需要处理两张表 ht[0]、ht[1]的数据 t0 = &d->ht[0]; t1 = &d->ht[1]; // 确保 t0 是小表 if (t0->size > t1->size) { t0 = &d->ht[1]; t1 = &d->ht[0]; } m0 = t0->sizemask; m1 = t1->sizemask; if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; while (de) { // 处理小表 next = de->next; fn(privdata, de); de = next; } // 循环处理,处理大表,比如,size = 4 的 hash 表,当前槽位 1 // 需要处理对应扩容一倍后,槽位 1、5 do { if (bucketfn) bucketfn(privdata, &t1->table[v & m1]); de = t1->table[v & m1]; while (de) { next = de->next; fn(privdata, de); de = next; } v |= ~m1; v = rev(v); v++; v = rev(v); } while (v & (m0 ^ m1)); } d->iterators--; // 对应前面 ++ 操作 return v; }
2)db.c#scanGenericCommand
客户端命令入口解析:
redis 127.0.0.1:6379> scan 0 COUNT 5
即,扫描游标从 0 开始的 5 条数据;不过,这里并不是严格意义上只返回 5 条数据,因为每一次扫描都要处理整个槽位的数据,因此是有可能多的。
具体是通过下面代码片段调用 dictScan:
if (ht) {
void *privdata[2];
// count 为 scan 命令传入的值,当hash表处于病态时(例如大部分节点为空),最大迭代次数为 10 * count
long maxiterations = count*10;
// list *keys = listCreate(); 创建新的 list 用来返回 keys
privdata[0] = keys;
// o != null 则为 Hash, Set or Zset object,反之为 NULL 则表示处理当前 db
privdata[1] = o;
do {
cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
} while (cursor && maxiterations-- && listLength(keys) < (unsigned long)count);
}
可见,do {} while() 语句中,通过循环调用 dictScan 来满足需要的 count 数量;也正因为如此,如果客户端传入的 count 过大,将会长时间阻塞;
由于 redis 是单线程处理,这种阻塞会影响其他命令处理,因此,需尽量避免 count 过大。
本文先后介绍了 redis 的普通和安全两种迭代器:
关于间断遍历原理,文中已经详细描述,此处不在赘述。
Finally Redis collections are iterable(Antirez)
Add SCAN command #579
Fix dictScan(): It can’t scan all buckets when dict is shrinking. #4907
《Redis 设计与源码分析》【陈雷】
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。