赞
踩
多级缓存,即在整个系统架构的不同系统层级进行数据缓存,以提升访问效率。在SpringBoot项目中多级缓存基本分为本地缓存和远程缓存,本地缓存可以用Caffeine、Guava、HashMap,远程缓存可以用Redis。
J2Cache包含的功能大于JetCache,但是JetCache属于阿里开源框架,后续维护的可能性高,其它框架就更不敢随意在项目上使用了。不过JetCache需要扩展一些功能,不然本地缓存和远程缓存都要允许一定的延迟性才能使用。本地缓存可以允许,刷新频率比较高,但是远程缓存如果刷新频率也很高的话,就跟本地缓存无异了,有点得不偿失。Spring AOP原生的缓存框架,功能有所欠缺。
(1)简介
JetCache是一个基于Java的缓存系统封装,提供统一的API和注解来简化缓存的使用。 JetCache提供了比SpringCache更加强大的注解,可以原生的支持TTL、两级缓存、分布式自动刷新,还提供了Cache
接口用于手工缓存操作。
(2)特性
(1)特性
(1)简介
layering-cache是一个支持分布式环境的多级缓存框架,使用方式和spring-cache类似。使用Caffeine作为一级本地缓存,使用redis作为二级集中式缓存。一级缓存和二级缓存的数据一致性是通过推和拉两种模式相结合的方式来实现的。推主要是基于redis的pub/sub机制,拉主要是基于消息队列和记录消费消息的偏移量来实现的。
(2)特性
(1)简介
J2Cache 是 OSChina 目前正在使用的两级缓存框架(要求至少 Java 8)。第一级缓存使用内存(同时支持 Ehcache 2.x、Ehcache 3.x 和 Caffeine),第二级缓存使用 Redis(推荐)/Memcached 。 由于大量的缓存读取会导致 L2 的网络成为整个系统的瓶颈,因此 L1 的目标是降低对 L2 的读取次数。 该缓存框架主要用于集群环境中。单机也可使用,用于避免应用重启导致的缓存冷启动后对后端业务的冲击。
(2)特性
(1)特性
只有@Cached上注册过的name名称,才能做其它更新、删除注解的操作,不然都会报异常。
- // com.alicp.jetcache.anno.aop.JetCacheInterceptor#invoke
- public Object invoke(final MethodInvocation invocation) throws Throwable {
- // ...
- CacheInvokeContext context = configProvider.getCacheContext().createCacheInvokeContext(cacheConfigMap);
- context.setTargetObject(invocation.getThis());
- context.setInvoker(invocation::proceed);
- context.setMethod(method);
- context.setArgs(invocation.getArguments());
- context.setCacheInvokeConfig(cac);
- context.setHiddenPackages(globalCacheConfig.getHiddenPackages());
- return CacheHandler.invoke(context);
- }
- // com.alicp.jetcache.anno.method.CacheHandler#doInvoke
- private static Object doInvoke(CacheInvokeContext context) throws Throwable {
- CacheInvokeConfig cic = context.getCacheInvokeConfig();
- CachedAnnoConfig cachedConfig = cic.getCachedAnnoConfig();
- if (cachedConfig != null && (cachedConfig.isEnabled() || CacheContextSupport._isEnabled())) {
- // 执行查询操作
- return invokeWithCached(context);
- } else if (cic.getInvalidateAnnoConfigs() != null || cic.getUpdateAnnoConfig() != null) {
- // 执行删除或者更新操作
- return invokeWithInvalidateOrUpdate(context);
- } else {
- // 执行原生方法
- return invokeOrigin(context);
- }
- }
- // com.alicp.jetcache.anno.method.CacheHandler#invokeWithCached
- private static Object invokeWithCached(CacheInvokeContext context)
- throws Throwable {
- CacheInvokeConfig cic = context.getCacheInvokeConfig();
- CachedAnnoConfig cac = cic.getCachedAnnoConfig();
- Cache cache = context.getCacheFunction().apply(context, cac);
- if (cache == null) {
- logger.error("no cache with name: " + context.getMethod());
- return invokeOrigin(context);
- }
-
- // 如果key解析出来是null值,会直接执行原生方法,应避免,key = "'' + #userId",这种方式解析出来会产生“null”字符串,不再直接进数据库查询。若果是key = "#userId",解析出来会是null,不会命中缓存
- Object key = ExpressionUtil.evalKey(context, cic.getCachedAnnoConfig());
- if (key == null) {
- return loadAndCount(context, cache, key);
- }
-
- // 解析Condition表达式,如果出现一些异常则直接进数据库查询
- if (!ExpressionUtil.evalCondition(context, cic.getCachedAnnoConfig())) {
- return loadAndCount(context, cache, key);
- }
-
- try {
- CacheLoader loader = new CacheLoader() {
- @Override
- public Object load(Object k) throws Throwable {
- Object result = invokeOrigin(context);
- context.setResult(result);
- return result;
- }
-
- @Override
- public boolean vetoCacheUpdate() {
- return !ExpressionUtil.evalPostCondition(context, cic.getCachedAnnoConfig());
- }
- };
- Object result = cache.computeIfAbsent(key, loader);
- return result;
- } catch (CacheInvokeException e) {
- throw e.getCause();
- }
- }
- // com.alicp.jetcache.anno.method.CacheHandler#doInvalidate(com.alicp.jetcache.anno.method.CacheInvokeContext, com.alicp.jetcache.anno.support.CacheInvalidateAnnoConfig)
- private static void doInvalidate(CacheInvokeContext context, CacheInvalidateAnnoConfig annoConfig) {
- Cache cache = context.getCacheFunction().apply(context, annoConfig);
- if (cache == null) {
- return;
- }
- boolean condition = ExpressionUtil.evalCondition(context, annoConfig);
- if (!condition) {
- return;
- }
- Object key = ExpressionUtil.evalKey(context, annoConfig);
- if (key == null) {
- return;
- }
- if (annoConfig.isMulti()) {
- // 如果注解配置这个属性,则转换key为数组形式,进行缓存删除,但是不是模糊删除
- Iterable it = toIterable(key);
- if (it == null) {
- logger.error("jetcache @CacheInvalidate key is not instance of Iterable or array: " + annoConfig.getDefineMethod());
- return;
- }
- Set keys = new HashSet();
- it.forEach(k -> keys.add(k));
- cache.removeAll(keys);
- } else {
- cache.remove(key);
- }
- }
- // com.alicp.jetcache.AbstractCache#computeIfAbsentImpl
- static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
- long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {
- AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
- CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify);
- CacheGetResult<V> r;
- if (cache instanceof RefreshCache) {
- RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache);
- // 查询缓存后会检查缓存内的对象,对象内有过期时间信息,会判断是否过期
- r = refreshCache.GET(key);
- // 线程池执行更新部分信息,如过期时间,用户的访问并不会增加缓存的过期时间
- refreshCache.addOrUpdateRefreshTask(key, newLoader);
- } else {
- r = cache.GET(key);
- }
- if (r.isSuccess()) {
- return r.getValue();
- } else {
- // 判定缓存过期,进数据库查询数据进行缓存
- Consumer<V> cacheUpdater = (loadedValue) -> {
- if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {
- // 缓存过期后更新缓存信息
- if (timeUnit != null) {
- cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult();
- } else {
- cache.PUT(key, loadedValue).waitForResult();
- }
- }
- };
-
- V loadedValue;
- if (cache.config().isCachePenetrationProtect()) {
- // 如果开启缓存穿透保护,则进行同步加载,避免同一时间大量请求进入数据库查询,另外允许配置穿透保护时间(com.alicp.jetcache.CacheConfig), 用的CountDownLatch,进行单机限制,没必要用Redis分布式锁
- loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater);
- } else {
- loadedValue = newLoader.apply(key);
- cacheUpdater.accept(loadedValue);
- }
-
- return loadedValue;
- }
- }
2.6 为了支持HashMap存储,和模糊删除, 自定义实现RedisSpringDataCache,可以通过配置type按需设置
本来想用装饰模式,但是好像不太好用。
- import com.alicp.jetcache.*;
- import com.alicp.jetcache.external.ExternalKeyUtil;
- import com.alicp.jetcache.redis.springdata.RedisSpringDataCache;
- import com.alicp.jetcache.redis.springdata.RedisSpringDataCacheConfig;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.data.redis.connection.RedisConnection;
- import org.springframework.data.redis.connection.RedisConnectionFactory;
- import org.springframework.data.redis.connection.RedisStringCommands;
- import org.springframework.data.redis.core.Cursor;
- import org.springframework.data.redis.core.ScanOptions;
- import org.springframework.data.redis.core.types.Expiration;
- import priv.whh.std.common.constant.Constants;
-
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- import java.util.*;
- import java.util.concurrent.TimeUnit;
- import java.util.function.Function;
- import java.util.function.Predicate;
-
-
- @SuppressWarnings("unchecked")
- public class RedisSpringDataExtCache<K, V> extends RedisSpringDataCache<K, V> {
- private Logger logger = LoggerFactory.getLogger(RedisSpringDataExtCache.class);
-
- private RedisConnectionFactory connectionFactory;
- private RedisSpringDataCacheConfig<K, V> config;
-
- private Function<Object, byte[]> valueEncoder;
- private Function<byte[], Object> valueDecoder;
-
- public RedisSpringDataExtCache(RedisSpringDataCacheConfig<K, V> config) {
- super(config);
- this.connectionFactory = config.getConnectionFactory();
- if (connectionFactory == null) {
- throw new CacheConfigException("connectionFactory is required");
- }
- this.config = config;
- this.valueEncoder = config.getValueEncoder();
- this.valueDecoder = config.getValueDecoder();
- }
-
- private void closeConnection(RedisConnection connection) {
- try {
- if (connection != null) {
- connection.close();
- }
- } catch (Exception ex) {
- logger.error("RedisConnection close fail: {}, {}", ex.getMessage(), ex.getClass().getName());
- }
- }
-
-
- @Override
- protected CacheGetResult<V> do_GET(K key) {
- RedisConnection con = null;
- try {
- con = connectionFactory.getConnection();
- byte[] newKey = buildKey(key);
- byte[] resultBytes = con.hGet(config.getKeyPrefix().getBytes(StandardCharsets.UTF_8), newKey);
- if (resultBytes != null) {
- CacheValueHolder<V> holder = (CacheValueHolder<V>) valueDecoder.apply(resultBytes);
- if (System.currentTimeMillis() >= holder.getExpireTime()) {
- return CacheGetResult.EXPIRED_WITHOUT_MSG;
- }
- return new CacheGetResult(CacheResultCode.SUCCESS, null, holder);
- } else {
- return CacheGetResult.NOT_EXISTS_WITHOUT_MSG;
- }
- } catch (Exception ex) {
- logError("GET", key, ex);
- return new CacheGetResult(ex);
- } finally {
- closeConnection(con);
- }
- }
-
- @Override
- protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
- RedisConnection con = null;
- try {
- con = connectionFactory.getConnection();
- ArrayList<K> keyList = new ArrayList<>(keys);
- byte[][] newKeys = keyList.stream().map((k) -> buildKey(k)).toArray(byte[][]::new);
-
- Map<K, CacheGetResult<V>> resultMap = new HashMap<>(16);
- if (newKeys.length > 0) {
- List mgetResults = con.hMGet(config.getKeyPrefix().getBytes(StandardCharsets.UTF_8), newKeys);
- for (int i = 0; i < mgetResults.size(); i++) {
- Object value = mgetResults.get(i);
- K key = keyList.get(i);
- if (value != null) {
- CacheValueHolder<V> holder = (CacheValueHolder<V>) valueDecoder.apply((byte[]) value);
- if (System.currentTimeMillis() >= holder.getExpireTime()) {
- resultMap.put(key, CacheGetResult.EXPIRED_WITHOUT_MSG);
- } else {
- CacheGetResult<V> r = new CacheGetResult<>(CacheResultCode.SUCCESS, null, holder);
- resultMap.put(key, r);
- }
- } else {
- resultMap.put(key, CacheGetResult.NOT_EXISTS_WITHOUT_MSG);
- }
- }
- }
- return new MultiGetResult<>(CacheResultCode.SUCCESS, null, resultMap);
- } catch (Exception ex) {
- logError("GET_ALL", "keys(" + keys.size() + ")", ex);
- return new MultiGetResult<>(ex);
- } finally {
- closeConnection(con);
- }
- }
-
- @Override
- protected CacheResult do_REMOVE(K key) {
-
-
- RedisConnection con = null;
- try {
- con = connectionFactory.getConnection();
- if (key instanceof String && ((String) key).endsWith(Constants.STAR)) {
- // 如果匹配到"*",则认定模糊删除
- // TODO 集群暂时不考虑
- Set<String> keys = new HashSet<>(16);
- Cursor<byte[]> cursor = con.scan(ScanOptions.scanOptions().count(10000).match(config.getKeyPrefix()
- + key).build());
- while (cursor.hasNext()) {
- keys.add(new String(cursor.next(), StandardCharsets.UTF_8));
- }
- byte[][] newKeys = keys.stream().map(s -> s.getBytes(StandardCharsets.UTF_8)).toArray((len) ->
- new byte[keys.size()][]);
- Long result = con.del(newKeys);
- if (result != null) {
- return CacheResult.SUCCESS_WITHOUT_MSG;
- } else {
- return new CacheResult(CacheResultCode.FAIL, "result:" + result);
- }
- } else {
- byte[] keyBytes = buildKey(key);
- Long result = con.hDel(config.getKeyPrefix().getBytes(StandardCharsets.UTF_8), keyBytes);
- if (result == null) {
- return new CacheResult(CacheResultCode.FAIL, "result:" + result);
- } else if (result == 1) {
- return CacheResult.SUCCESS_WITHOUT_MSG;
- } else if (result == 0) {
- return new CacheResult(CacheResultCode.NOT_EXISTS, null);
- } else {
- return CacheResult.FAIL_WITHOUT_MSG;
- }
- }
- } catch (Exception ex) {
- logError("REMOVE", key, ex);
- return new CacheResult(ex);
- } finally {
- closeConnection(con);
- }
- }
-
- @Override
- protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) {
- RedisConnection con = null;
- try {
- con = connectionFactory.getConnection();
- boolean judge = keys.stream().allMatch((Predicate<K>) k -> k instanceof String && ((String) k).endsWith(Constants.STAR));
- if (judge) {
- // 如果匹配到"*",则认定模糊删除
- // TODO 集群暂时不考虑
- Set<String> keySet = new HashSet<>(16);
- for (K k : keys) {
- Cursor<byte[]> cursor = con.scan(ScanOptions.scanOptions().count(10000).match(config.getKeyPrefix() + k).build());
- while (cursor.hasNext()) {
- keySet.add(new String(cursor.next(), StandardCharsets.UTF_8));
- }
- }
- byte[][] newKeys = keySet.stream().map(s -> s.getBytes(StandardCharsets.UTF_8)).toArray((len) ->
- new byte[keys.size()][]);
- Long result = con.del(newKeys);
- if (result != null) {
- return CacheResult.SUCCESS_WITHOUT_MSG;
- } else {
- return new CacheResult(CacheResultCode.FAIL, "result:" + result);
- }
- } else {
- byte[][] newKeys = keys.stream().map((k) -> buildKey(k)).toArray((len) -> new byte[keys.size()][]);
- Long result = con.hDel(config.getKeyPrefix().getBytes(StandardCharsets.UTF_8), newKeys);
- if (result != null) {
- return CacheResult.SUCCESS_WITHOUT_MSG;
- } else {
- return new CacheResult(CacheResultCode.FAIL, "result:" + result);
- }
- }
- } catch (Exception ex) {
- logError("REMOVE_ALL", "keys(" + keys.size() + ")", ex);
- return new CacheResult(ex);
- } finally {
- closeConnection(con);
- }
- }
-
- @Override
- protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
- RedisConnection con = null;
- try {
- con = connectionFactory.getConnection();
- CacheValueHolder<V> holder = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite));
- byte[] keyBytes = buildKey(key);
- byte[] valueBytes = valueEncoder.apply(holder);
- boolean result = hSet(con, config.getKeyPrefix().getBytes(StandardCharsets.UTF_8), keyBytes,
- valueBytes, timeUnit.toMillis(expireAfterWrite));
- if (Boolean.TRUE.equals(result)) {
- return CacheResult.SUCCESS_WITHOUT_MSG;
- } else {
- return new CacheResult(CacheResultCode.FAIL, "result:" + result);
- }
- } catch (Exception ex) {
- logError("PUT", key, ex);
- return new CacheResult(ex);
- } finally {
- closeConnection(con);
- }
- }
-
- @Override
- protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
- RedisConnection con = null;
- try {
- con = connectionFactory.getConnection();
- int failCount = 0;
- for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
- CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
- boolean result = hSet(con, config.getKeyPrefix().getBytes(StandardCharsets.UTF_8),
- buildKey(en.getKey()), valueEncoder.apply(holder), timeUnit.toMillis(expireAfterWrite));
- if (!Boolean.TRUE.equals(result)) {
- failCount++;
- }
- }
- return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
- failCount == map.size() ? CacheResult.FAIL_WITHOUT_MSG : CacheResult.PART_SUCCESS_WITHOUT_MSG;
- } catch (Exception ex) {
- logError("PUT_ALL", "map(" + map.size() + ")", ex);
- return new CacheResult(ex);
- } finally {
- closeConnection(con);
- }
- }
-
- @Override
- protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
- RedisConnection con = null;
- try {
- con = connectionFactory.getConnection();
- CacheValueHolder<V> holder = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite));
- byte[] newKey = buildKey(key);
- Boolean result = con.set(newKey, valueEncoder.apply(holder),
- Expiration.from(expireAfterWrite, timeUnit), RedisStringCommands.SetOption.ifAbsent());
- if (Boolean.TRUE.equals(result)) {
- return CacheResult.SUCCESS_WITHOUT_MSG;
- }/* else if (result == null) {
- return CacheResult.EXISTS_WITHOUT_MSG;
- } */ else {
- return CacheResult.EXISTS_WITHOUT_MSG;
- }
- } catch (Exception ex) {
- logError("PUT_IF_ABSENT", key, ex);
- return new CacheResult(ex);
- } finally {
- closeConnection(con);
- }
- }
-
- @Override
- public byte[] buildKey(K key) {
- try {
- Object newKey = key;
- if (key instanceof byte[]) {
- newKey = key;
- } else if (key instanceof String) {
- newKey = key;
- } else {
- if (config.getKeyConvertor() != null) {
- newKey = config.getKeyConvertor().apply(key);
- }
- }
- return ExternalKeyUtil.buildKeyAfterConvert(newKey, "");
- } catch (IOException e) {
- throw new CacheException(e);
- }
- }
-
- private boolean hSet(RedisConnection con, byte[] key, byte[] field, byte[] value, long millis) {
- try {
- con.openPipeline();
- con.hashCommands().hSet(key, field, value);
- con.keyCommands().pExpire(key, millis);
- return true;
- } catch (Exception e) {
- logger.error("Failed to hset, cause: {}", e);
- return false;
- } finally {
- con.closePipeline();
- }
- }
- }
2.7 value序列化方式采用redis配置的序列化方式
考虑到fastjson兼容性可能会有坑,官方也不推荐fastjson给使用方,怕有坑,而其它方式的序列化后redis客户端看不了具体的值,没什么具体的性能要求的话就使用jackson了,redis客户端看的方便。但是使用jackson进行配置后,需要配置om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL)。 Jetcache缓存结果会封装一个对象进行缓存(com.alicp.jetcache.CacheValueHolder),但是这个对象final且不是public,因此使用jackson后没有类型信息,反序列化的时候会直接报错,因此只能改源码,仅此一次为了兼容jackson方式一定要修改,fastjson没有次问题。(按道理jackson配置不该有这种限制,但是基本上试了enableDefaultTyping的所有方式,但是没有效果)
- import com.alicp.jetcache.anno.support.SpringConfigProvider;
- import org.springframework.data.redis.serializer.RedisSerializer;
- import priv.whh.std.boot.cache.jetcache.convertor.Md5CacheKeyConvertor;
- import priv.whh.std.boot.cache.jetcache.decoder.DefaultValueDecoder;
- import priv.whh.std.boot.cache.jetcache.encoder.DefaultValueEncoder;
-
- import java.util.Objects;
- import java.util.function.Function;
-
-
- public class SpringConfigExtProvider extends SpringConfigProvider {
- private static final String DEFAULT = "default";
- private static final String MD5 = "MD5";
- private final RedisSerializer<Object> redisSerializer;
-
- public SpringConfigExtProvider(RedisSerializer<Object> redisSerializer) {
- this.redisSerializer = redisSerializer;
- }
-
- @Override
- public Function<Object, byte[]> parseValueEncoder(String valueEncoder) {
- if (Objects.equals(valueEncoder, DEFAULT)) {
- return new DefaultValueEncoder(redisSerializer);
- }
- return super.parseValueEncoder(valueEncoder);
- }
-
- @Override
- public Function<byte[], Object> parseValueDecoder(String valueDecoder) {
- if (Objects.equals(valueDecoder, DEFAULT)) {
- return new DefaultValueDecoder(redisSerializer);
- }
- return super.parseValueDecoder(valueDecoder);
- }
-
- @Override
- public Function<Object, Object> parseKeyConvertor(String convertor) {
- if (Objects.equals(convertor, MD5)) {
- return new Md5CacheKeyConvertor();
- }
- return super.parseKeyConvertor(convertor);
- }
- }
4.1 什么时候适合使用远程缓存,什么时候适合使用本地缓存
本身Redis的访问速度是不慢的,但是如果访问的数量上去后,Redis的连接数会打满,所以增加本地缓存我认为主要还是为了防止突发的流量。本地缓存过期时间设置大概率是要短的,过期时间一到自动刷新去Redis加载最新的数据。Redis缓存过期时如果没加分布式锁的话会直接访问数据库,本地缓存(Caffeine、Guava)一旦时间过期了,并且此时有访问的话,Caffeine、Guava会按照过期的数据返回(还没验证)。
4.2 是否需要保证强一致性
多级缓存基本上用的应该是查询和删除操作,基本应该不会使用更新操作,因为改数据库再改缓存存在的风险大于改数据库删除缓存。本地缓存因为是在服务器内部,删除缓存的时候也只是删除当前服务器的本地缓存。本地缓存过期时间设置的比较短,一定延迟性基本上是可以接受的,我觉得不一定非要强一致性。多级缓存如果没有特别的手段都会有一定风险导致不一致性(短时间的),有过期时间的限制可以保证最终一致性。Redis可以作为消息中心,可以通知更新到各服务器的本地缓存,但是有点大材小用。除非本地缓存设置的过期时间比较长,如果设置的过期时间比较短的话,感觉没有必要。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。