赞
踩
整体是使用字典来进行存储键值对。
想要知道一个缓存的 key 是否失效,就必须知道这个 key 是啥时候放进去的,所以需要有一个 put_time 记录;
同时需要设置失效时间,所以还需要一个多长时间失效的 expired 记录;
那么现在的字典格式就变成了:
{ key: [value, put_time, expired]}
后面的 list 可以进一步封装为一个 Value 对象
class Value:
def __init__(self, value, put_time, expired):
"""
缓存值对象
:param value: 具体的值
:param put_time: 放入缓存的时间
:param expired: 缓存失效时间
"""
self.value = value
self.put_time = put_time
self.expired = expired
现在字典的样子就变成了:
{ key: Value(value, put_time, expired) }
好了,现在关于数据结构设计好了
放值
放值的时候需要设置一下「放入的时间」和「过期时间」还有「具体的值」这三个字段
def set_value(self, k, v, expired):
"""
将值放入缓存中
:param k: 缓存的 key
:param v: 缓存值
:param expired: 缓存失效时间,单位秒(s)
"""
current_timestamp = int(time.time()) # 获取当前时间戳 10 位 秒级
value = Value(v, current_timestamp, expired)
self.__cache[k] = value
logger.info("已放入缓存, key: {} {}", k, value)
取值
取值的时候需要检查是否过期
def check_key(self, k): """ 检查缓存是否可用 :param k: 缓存 key :return: True or False """ current_timestamp = int(time.time()) value = self.__cache.get(k, None) # 考虑k不存在的情况 if value is None: return False differ = current_timestamp - value.put_time if differ > value.expired: del self.__cache[k] # 证明缓存失效了,删除键值对 logger.info("缓存已失效, key: {}", k) return False return True def get_value(self, k): """ 通过缓存key获取值 :param k: key :return: value """ if self.check_key(k): return self.__cache[k].value return None
python 的模块就是天然的单例模式,因为模块在第一次导入时,会生成 .pyc
文件,当第二次导入时,就会直接加载 .pyc
文件,而不会再次执行模块代码。因此,我们只需把相关的函数和数据定义在一个模块中,就可以获得一个单例对象了。如果我们真的想要一个单例类,可以考虑这样做:
class Singleton(object):
def foo(self):
pass
singleton = Singleton()
使用:
from a import singleton
好了,现在可以全局使用一个缓存对象,并且还拥有 set 和 get 逻辑,还带有过期时间,一个简单的缓存已经实现了!
cache.py
""" 基于内存缓存 使用 memory_cache 实例即可 """ import time from loguru import logger class Value: def __init__(self, value, put_time, expired): """ 缓存值对象 :param value: 具体的值 :param put_time: 放入缓存的时间 :param expired: 缓存失效时间 """ self.value = value self.put_time = put_time self.expired = expired def __str__(self): return f"value: {self.value} put_time: {self.put_time} expired: {self.expired}" class MemoryCache: def __init__(self): self.__cache = {} def set_value(self, k, v, expired): """ 将值放入缓存中 :param k: 缓存的 key :param v: 缓存值 :param expired: 缓存失效时间,单位秒(s) """ current_timestamp = int(time.time()) # 获取当前时间戳 10 位 秒级 value = Value(v, current_timestamp, expired) self.__cache[k] = value logger.info("已放入缓存, key: {} {}", k, value) def check_key(self, k): """ 检查缓存是否可用 :param k: 缓存 key :return: True or False """ current_timestamp = int(time.time()) value = self.__cache.get(k, None) # 考虑k不存在的情况 if value is None: return False differ = current_timestamp - value.put_time if differ > value.expired: del self.__cache[k] # 证明缓存失效了,删除键值对 logger.info("缓存已失效, key: {}", k) return False return True def get_value(self, k): """ 通过缓存key获取值 :param k: key :return: value """ if self.check_key(k): return self.__cache[k].value return None memory_cache = MemoryCache()
测试放入一个 3 秒过期的缓存,再放入一个 6 秒过期的,然后 sleep 5 秒
看看 6 秒过期的缓存是否可以正常取出,3 秒过期的缓存是否失效,进行验证
test.py
import time
from cache import memory_cache
memory_cache.set_value('my_blog', 'sunnyc.icu', 3) # 设置一个 3 秒过期的键值对
memory_cache.set_value('my_github', 'hczs', 6) # 设置一个 6 秒过期的键值对
time.sleep(5)
print('my_blog: ', memory_cache.get_value('my_blog'))
print('my_github: ', memory_cache.get_value('my_github'))
测试结果:
2022-03-19 10:53:34.021 | INFO | cache:set_value:43 - 已放入缓存, key: my_blog value: sunnyc.icu put_time: 1647658414 expired: 3
2022-03-19 10:53:34.021 | INFO | cache:set_value:43 - 已放入缓存, key: my_github value: hczs put_time: 1647658414 expired: 6
my_blog: None
my_github: hczs
2022-03-19 10:53:39.032 | INFO | cache:check_key:60 - 缓存已失效, key: my_blog
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。