赞
踩
本篇博客的主要内容:
最不常使用 (LFU Least Frequently Used) 算法是一种常用的缓存置换算法,旨在移除 访问频次最低 的缓存项。与常用的 LRU 算法不同,LFU 算法更加重视元素的访问频率,而非最近一次访问时间。
实现 LFU 淘汰算法的缓存需要满足如下规则:
capacity
。解释下高亮部分:
1
。本节介绍如何设计 get、put 函数以 O(1) 时间复杂度运行 的 LFU 缓存。
缓存中的键值对通过一个全局双向循环链表存储,链表节点 Node 的定义如下:
struct Node{
int key;
int val;
Node* prev;
Node* next;
int count;
Node(int k, int v): key(k), val(v), prev(nullptr), next(nullptr), count(1) {}
Node(int k, int v, Node *p, Node *n): key(k), val(v), prev(p), next(n), count(1) {}
};
缓存中使用字段 head
存储双向循环链表的头节点,头节点是 链表中频数最高的节点:
Node *head = nullptr;
LFU 缓存是一个由多个 链表分段 拼装而成的双向循环链表,相同链表段中的节点频数相等。示意图如下:
当一个频数 count 的节点被访问,频数将递增为 count + 1,我们需要在 O(1) 的时间将该节点从频数 count 的链表分段移动到频数 count + 1 的链表分段头部。
因此,需要一个哈希表 count2SegHead
维护频数count 到链 表分段头节点 的映射:
unordered_map<int, Node*> count2SegHead;
示意图如下:
为了在 O(1) 时间复杂度通过 key 访问节点,需要使用哈希表 key2Node
维护 键key 到 节点 的映射:
unordered_map<int, Node*> key2Node;
示意图如下:
查询操作步骤如下:
node.count = count + 1
,将 node 节点插入 seg_head 前,同时设置 count2SegHead[count + 1] = node
,即设置 node 节点为频数 count + 1 的链表段头节点。下面这副示意图展示了查询 key=8 节点的流程:
count2SegHead[4] = node
;查询函数 getNode
代码实现如下:
#include "bits/stdc++.h" using namespace std; class LFUCache { struct Node{ int key; int val; Node* prev; Node* next; int count; Node(int k, int v): key(k), val(v), prev(nullptr), next(nullptr), count(1) {} Node(int k, int v, Node *p, Node *n): key(k), val(v), prev(p), next(n), count(1) {} }; unordered_map<int, Node*> key2Node; unordered_map<int, Node*> count2SegHead; // 频数到链表分段的映射 int capacity; int size = 0; Node *head = nullptr; // 双向循环链表头节点 // 将链表中的 node 节点移动到 newCount 链表段 void moveNode(Node* node, int newCount) { // ------(1) auto nextNode = remove(node); node->count = newCount; if(count2SegHead.find(newCount) == count2SegHead.end()) insert(node, nextNode); else insert(node, count2SegHead[newCount]); } // 将节点插入到链表段头节点 segHead 前, 同时维护 head 指针和 count2LinkedList void insert(Node* node, Node* segHead) { count2SegHead[node->count] = node; if(segHead == nullptr) { node->next = node->prev = node; head = node; return; } node->next = segHead, node->prev = segHead->prev; node->prev->next = node, node->next->prev = node; if(head->count <= node->count) head = node; } // remove 函数 Node* remove(Node *node){ // 循环链表中仅有node节点 if(node->next == node){ count2SegHead.erase(node->count); head = nullptr; return nullptr; } Node* next; // node 节点为 node->count 链表段的头节点 if(count2SegHead[node->count] == node) { if(node->next->count == node->count) count2SegHead[node->count] = node->next; else count2SegHead.erase(node->count); next = node->next; } // node 不是链表段头节点 else next = count2SegHead[node->count]; node->prev->next = node->next, node->next->prev = node->prev; node->prev = node->next = nullptr; if(head == node) head = next; return next; } public: LFUCache(int cap) { this->capacity = cap; } Node* getNode(int key){ if(key2Node.find(key) == key2Node.end()) return nullptr; auto node = key2Node[key]; moveNode(node, node->count + 1); // ------(1) return node; } };
moveHead(Node* node, int newCount)
:将循环链表中的节点 node 移动到频数为 newCount 的链表段头部。
moveHead
执行 remove(Node *node)
函数从双向循环链表中移除 node 节点:
count2SegHead
哈希表移除频数为 node->count 的键值对;node->count
链表段的头节点,考察 node->next
的频数:
node->next
设置为链表段新的头节点;node->count
键值对。node->count
链表段移除 node 后的头节点;如果链表段已移除,则返回该链表段的后继链表段头节点。Node* remove(Node *node){ // 循环链表中仅有node节点 if(node->next == node){ count2SegHead.erase(node->count); head = nullptr; return nullptr; } Node* next; // node 节点为 node->count 链表段的头节点 if(count2SegHead[node->count] == node) { if(node->next->count == node->count) count2SegHead[node->count] = node->next; else count2SegHead.erase(node->count); next = node->next; } // node 不是链表段头节点 else next = count2SegHead[node->count]; node->prev->next = node->next, node->next->prev = node->prev; node->prev = node->next = nullptr; if(head == node) head = next; return next; }
移除 node 节点后,使用 insert(Node* node, Node* segHead)
将 node 插入到链表段 segHead 的头部:
node->count
链表段新头节点:count2SegHead[node->count] = node
; // 将节点插入到链表段头节点 segHead 前, 同时维护 head 指针和 count2LinkedList
void insert(Node* node, Node* segHead) {
count2SegHead[node->count] = node; // 设置为新的链表段头节点
if(segHead == nullptr) {
// segHead 为 null, 说明全局链表为空, 执行初始化操作
node->next = node->prev = node;
head = node;
return;
}
// 双向链表插入操作
node->next = segHead, node->prev = segHead->prev;
node->prev->next = node, node->next->prev = node;
// 如果 node 操作计数大于等于当前头节点的计数, 更新 head
if(head->count <= node->count) head = node;
}
插入/更新操作的步骤如下:
getNode
查询键为 key 的节点;getNode
函数完成。void put(int key, int value) { if(capacity <= 0) return; Node *node = getNode(key); if(node == nullptr) { // 键等于 key 的节点不存在, 插入新的节点 if(size == capacity){ // 容量已经达到上限, 淘汰频数最低的节点, 即 head.prev auto evict = head->prev; remove(evict); key2Node.erase(evict->key); delete evict; } else size++; // 新的节点, node = new Node(key, value, nullptr, nullptr); key2Node[key] = node; if(count2SegHead.find(1) == count2SegHead.end()) // 如果不存在频数为 1 的链表段, 将新节点插入到head的前驱; insert(node, head); else // 如果存在频数等于 1 的链表段, 新节点插入到该链表段的头节点前 insert(node, count2SegHead[1]); } else node->val = value; // 更新键等于 key 的节点值 }
下面,我用两个场景,带大家理解 LFU 缓存的插入操作:
场景1:插入节点后缓存大小(size=7) 未超出 容量capacity(7),新节点(key=4) 插入到频数等于 1 的链表段前,成为该链表段的头节点。示意图如下:
场景2:插入节点后缓存大小(size=7) 超出 容量capacity(6),先 淘汰操作频数最小的节点 (key=7),此时频数为 1 的链表段将被移除,新节点(key=4) 插入到头节点(key=3) 之前。示意图如下:
Redis 中实现了基于 LFU 的缓存淘汰策略:volatile-lfu 和 allkeys-lfu。在介绍该策略之前,我们先了解下 缓存污染 的概念。
缓存污染 是指 访问很少的数据 在服务完访问请求后还继续 留存在缓存中,造成缓存空间的浪费。
缓存污染一旦变得严重后,有大量不再访问的数据滞留在缓存中,往缓存中写入新数据时需要先把数据逐步淘汰出缓存,引入额外的操作时间开销。
如何解决缓存污染问题?
LRU 方案的缺陷:只考虑了数据的访问时间,没有考虑数据的访问频数,在处理 扫描式单次查询操作 时,无法解决缓存问题!
幸运的是,Redis 从 4.0 版本开始增加了 LFU 淘汰策略:从 数据访问的时效性 、数据访问次数 两个角度筛选出需要淘汰的数据。
在实现 LRU 算法时,Redis 使用 RedisObject
结构来保存数据的;该结构的 lru
字段记录数据最近一次访问的时间戳。实现 LFU 算法时,将原来 24bit 的 lru 字段,进一步拆分成两个部分:
Redis使用LFU策略淘汰数据时,选择 lru 字段最小的数据进行淘汰。等价于 优先淘汰访问次数少的数据,访问次数相等则淘汰时间戳最小 的数据。
注意:count 为计数器的值,初始默认为 5 而不是 1。如果初始值为 1,刚被写入缓存的数据可能会因为使用次数太少而立即被淘汰。
counter 值 8bits,最大值 255,如果采用线性增加的方式 counter 很快就会达到上限,Redis 就不能很好筛选访问 1000次 和 10000 次的数据。因此,Redis 采用了非线性的频数更新策略:
p = 1 c o u n t × l f u _ l o g _ f a c t o r + 1 p=\frac{1}{count \times {lfu\_log\_factor} + 1} p=count×lfu_log_factor+11
每次缓存的数据被访问, counter 加 1 的概率等于 p p p,源码如下:
double r = (double)rand()/RAND_MAX;
...
double p = 1.0/(baseval*server.lfu_log_factor + 1);
if (r < p) counter++;
通过设置 lfu_log_factor
配置项,可以控制计数器的增加速度,避免 counter
值过快到达255。下图所示,当 lfu_log_factor
等于100时,小于10M 的数据能被区分出来。
一般可以将 lfu_log_factor
取值为 10,已经足够区分1k、1w、10w的数据访问量。
counter的衰减机制:
LFU 策略使用衰减因子配置项 lfu_decay_time
来控制访问次数的衰减。
lfu_decay_time
值,所得的结果就是数据 counter 要衰减的值。lfu_decay_time
取值为 1,如果数据在 N 分钟内没有被访问,那么它的访问次数就要减 N。lfu_decay_time
取值更大,那么相应的衰减值会变小,衰减效果也会减弱。如果业务应用中有短时高频访问的数据的话,建议把 lfu_decay_time 值设置为 1,在数据不被访问后,可以 迅速衰减访问次数,从缓存中淘汰出去,避免缓存污染。
总结:LFU 淘汰策略对于扫描式单次数据读取操作时,虽然仍然会频繁写缓存,但可以 避免访问次数高的热点数据因为最近一次访问时间较早而被淘汰,提升了缓存的命中率。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。