当前位置:   article > 正文

分布式-多级缓存_jetcache cacheinvalidate

jetcache cacheinvalidate

1. 介绍

1.1 什么是多级缓存

多级缓存,即在整个系统架构的不同系统层级进行数据缓存,以提升访问效率。在SpringBoot项目中多级缓存基本分为本地缓存和远程缓存,本地缓存可以用Caffeine、Guava、HashMap,远程缓存可以用Redis。

1.2 框架

J2Cache包含的功能大于JetCache,但是JetCache属于阿里开源框架,后续维护的可能性高,其它框架就更不敢随意在项目上使用了。不过JetCache需要扩展一些功能,不然本地缓存和远程缓存都要允许一定的延迟性才能使用。本地缓存可以允许,刷新频率比较高,但是远程缓存如果刷新频率也很高的话,就跟本地缓存无异了,有点得不偿失。Spring AOP原生的缓存框架,功能有所欠缺。

1.2.1 JetCache(阿里)

(1)简介

JetCache是一个基于Java的缓存系统封装,提供统一的API和注解来简化缓存的使用。 JetCache提供了比SpringCache更加强大的注解,可以原生的支持TTL、两级缓存、分布式自动刷新,还提供了Cache接口用于手工缓存操作。

(2)特性

  • 支持注解
  • 支持自动刷新(除非设置一直刷新,不然的话一段时间没访问后就不刷新了,然后突然又有大流量那怎么应对呢:Jetcache设置了分布式锁,当缓存过期后,避免全部进到数据库层)
  • 支持本地缓存Caffeine(本地缓存支持Caffeine、Guava,如果仅支持HashMap或者Memcache的话,就需要斟酌了)
  • 不支持模糊删除,只支持KV键存储,不支持HashMap方式存储(我所知的基本上所有人用缓存都是KV键存储,很少用HashMap存储,但是KV键存储的话基本上避免不了模糊删除,而HashMap存书的话主要删除一个键即可,模糊删除如果使用keys的话,会全量扫阻塞其它线程,而且数据量巨大的话,返回的数组能直接裂开。如果使用游标方式,虽然不会太阻塞其它现成,但是数组还是能直接裂开,数组裂开的可能性应该比较小)
  • 不支持Redis集群访问(代码上获取Redis连接方式为单机方式,不是集群方式)

1.2.2 AutoLoadCache

(1)特性

  • 支持注解
  • 支持自动刷新
  • 支持本地缓存HashMap,不支持Caffeine、Guava方式
  • 支持HashMap方式存储,不支持模糊删除(不推荐模糊删除,以HashMap的方式存储是为了方便删除)
  • 支持Redis集群访问

1.2.3 LayeringCache

(1)简介

layering-cache是一个支持分布式环境的多级缓存框架,使用方式和spring-cache类似。使用Caffeine作为一级本地缓存,使用redis作为二级集中式缓存。一级缓存和二级缓存的数据一致性是通过推和拉两种模式相结合的方式来实现的。推主要是基于redis的pub/sub机制,拉主要是基于消息队列和记录消费消息的偏移量来实现的。

(2)特性

  • 支持注解
  • 支持自动刷新,不支持自定义刷新频率
  • 支持本地缓存Caffeine
  • 支持模糊删除(游标方式删除)
  • 支持Redis集群访问

1.2.4 J2Cache

(1)简介

J2Cache 是 OSChina 目前正在使用的两级缓存框架(要求至少 Java 8)。第一级缓存使用内存(同时支持 Ehcache 2.x、Ehcache 3.x 和 Caffeine),第二级缓存使用 Redis(推荐)/Memcached 。 由于大量的缓存读取会导致 L2 的网络成为整个系统的瓶颈,因此 L1 的目标是降低对 L2 的读取次数。 该缓存框架主要用于集群环境中。单机也可使用,用于避免应用重启导致的缓存冷启动后对后端业务的冲击。

(2)特性

  • 支持注解(Spring Aop原生注解)
  • 支持自动刷新,不能自定义刷新频率(Redis键过期时触发回调,需要Redis版本和手动配置,默认是关闭的;Redis、RabbitMq、RocketMq等消息方式通知本地缓存的更新)
  • 支持本地缓存Caffeine、Ehcache
  • 支持HashMap方式存储
  • 支持模糊删除(游标方式)
  • 支持Redis集群访问

1.2.5 Spring Aop注解

(1)特性

  • 支持注解

2. 源码

2.1 以AOP方式织入方法内进行拦截

只有@Cached上注册过的name名称,才能做其它更新、删除注解的操作,不然都会报异常。

  1. // com.alicp.jetcache.anno.aop.JetCacheInterceptor#invoke
  2. public Object invoke(final MethodInvocation invocation) throws Throwable {
  3. // ...
  4. CacheInvokeContext context = configProvider.getCacheContext().createCacheInvokeContext(cacheConfigMap);
  5. context.setTargetObject(invocation.getThis());
  6. context.setInvoker(invocation::proceed);
  7. context.setMethod(method);
  8. context.setArgs(invocation.getArguments());
  9. context.setCacheInvokeConfig(cac);
  10. context.setHiddenPackages(globalCacheConfig.getHiddenPackages());
  11. return CacheHandler.invoke(context);
  12. }

2.2 根据注解方式执行不同的策略

  1. // com.alicp.jetcache.anno.method.CacheHandler#doInvoke
  2. private static Object doInvoke(CacheInvokeContext context) throws Throwable {
  3. CacheInvokeConfig cic = context.getCacheInvokeConfig();
  4. CachedAnnoConfig cachedConfig = cic.getCachedAnnoConfig();
  5. if (cachedConfig != null && (cachedConfig.isEnabled() || CacheContextSupport._isEnabled())) {
  6. // 执行查询操作
  7. return invokeWithCached(context);
  8. } else if (cic.getInvalidateAnnoConfigs() != null || cic.getUpdateAnnoConfig() != null) {
  9. // 执行删除或者更新操作
  10. return invokeWithInvalidateOrUpdate(context);
  11. } else {
  12. // 执行原生方法
  13. return invokeOrigin(context);
  14. }
  15. }

 2.3 执行查询操作

  1. // com.alicp.jetcache.anno.method.CacheHandler#invokeWithCached
  2. private static Object invokeWithCached(CacheInvokeContext context)
  3. throws Throwable {
  4. CacheInvokeConfig cic = context.getCacheInvokeConfig();
  5. CachedAnnoConfig cac = cic.getCachedAnnoConfig();
  6. Cache cache = context.getCacheFunction().apply(context, cac);
  7. if (cache == null) {
  8. logger.error("no cache with name: " + context.getMethod());
  9. return invokeOrigin(context);
  10. }
  11. // 如果key解析出来是null值,会直接执行原生方法,应避免,key = "'' + #userId",这种方式解析出来会产生“null”字符串,不再直接进数据库查询。若果是key = "#userId",解析出来会是null,不会命中缓存
  12. Object key = ExpressionUtil.evalKey(context, cic.getCachedAnnoConfig());
  13. if (key == null) {
  14. return loadAndCount(context, cache, key);
  15. }
  16. // 解析Condition表达式,如果出现一些异常则直接进数据库查询
  17. if (!ExpressionUtil.evalCondition(context, cic.getCachedAnnoConfig())) {
  18. return loadAndCount(context, cache, key);
  19. }
  20. try {
  21. CacheLoader loader = new CacheLoader() {
  22. @Override
  23. public Object load(Object k) throws Throwable {
  24. Object result = invokeOrigin(context);
  25. context.setResult(result);
  26. return result;
  27. }
  28. @Override
  29. public boolean vetoCacheUpdate() {
  30. return !ExpressionUtil.evalPostCondition(context, cic.getCachedAnnoConfig());
  31. }
  32. };
  33. Object result = cache.computeIfAbsent(key, loader);
  34. return result;
  35. } catch (CacheInvokeException e) {
  36. throw e.getCause();
  37. }
  38. }

 2.4 执行删除操作

  1. // com.alicp.jetcache.anno.method.CacheHandler#doInvalidate(com.alicp.jetcache.anno.method.CacheInvokeContext, com.alicp.jetcache.anno.support.CacheInvalidateAnnoConfig)
  2. private static void doInvalidate(CacheInvokeContext context, CacheInvalidateAnnoConfig annoConfig) {
  3. Cache cache = context.getCacheFunction().apply(context, annoConfig);
  4. if (cache == null) {
  5. return;
  6. }
  7. boolean condition = ExpressionUtil.evalCondition(context, annoConfig);
  8. if (!condition) {
  9. return;
  10. }
  11. Object key = ExpressionUtil.evalKey(context, annoConfig);
  12. if (key == null) {
  13. return;
  14. }
  15. if (annoConfig.isMulti()) {
  16. // 如果注解配置这个属性,则转换key为数组形式,进行缓存删除,但是不是模糊删除
  17. Iterable it = toIterable(key);
  18. if (it == null) {
  19. logger.error("jetcache @CacheInvalidate key is not instance of Iterable or array: " + annoConfig.getDefineMethod());
  20. return;
  21. }
  22. Set keys = new HashSet();
  23. it.forEach(k -> keys.add(k));
  24. cache.removeAll(keys);
  25. } else {
  26. cache.remove(key);
  27. }
  28. }

2.5 查询缓存

  1. // com.alicp.jetcache.AbstractCache#computeIfAbsentImpl
  2. static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
  3. long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {
  4. AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
  5. CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify);
  6. CacheGetResult<V> r;
  7. if (cache instanceof RefreshCache) {
  8. RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache);
  9. // 查询缓存后会检查缓存内的对象,对象内有过期时间信息,会判断是否过期
  10. r = refreshCache.GET(key);
  11. // 线程池执行更新部分信息,如过期时间,用户的访问并不会增加缓存的过期时间
  12. refreshCache.addOrUpdateRefreshTask(key, newLoader);
  13. } else {
  14. r = cache.GET(key);
  15. }
  16. if (r.isSuccess()) {
  17. return r.getValue();
  18. } else {
  19. // 判定缓存过期,进数据库查询数据进行缓存
  20. Consumer<V> cacheUpdater = (loadedValue) -> {
  21. if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {
  22. // 缓存过期后更新缓存信息
  23. if (timeUnit != null) {
  24. cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult();
  25. } else {
  26. cache.PUT(key, loadedValue).waitForResult();
  27. }
  28. }
  29. };
  30. V loadedValue;
  31. if (cache.config().isCachePenetrationProtect()) {
  32. // 如果开启缓存穿透保护,则进行同步加载,避免同一时间大量请求进入数据库查询,另外允许配置穿透保护时间(com.alicp.jetcache.CacheConfig), 用的CountDownLatch,进行单机限制,没必要用Redis分布式锁
  33. loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater);
  34. } else {
  35. loadedValue = newLoader.apply(key);
  36. cacheUpdater.accept(loadedValue);
  37. }
  38. return loadedValue;
  39. }
  40. }

2.6 为了支持HashMap存储,和模糊删除, 自定义实现RedisSpringDataCache,可以通过配置type按需设置

本来想用装饰模式,但是好像不太好用。

  1. import com.alicp.jetcache.*;
  2. import com.alicp.jetcache.external.ExternalKeyUtil;
  3. import com.alicp.jetcache.redis.springdata.RedisSpringDataCache;
  4. import com.alicp.jetcache.redis.springdata.RedisSpringDataCacheConfig;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.data.redis.connection.RedisConnection;
  8. import org.springframework.data.redis.connection.RedisConnectionFactory;
  9. import org.springframework.data.redis.connection.RedisStringCommands;
  10. import org.springframework.data.redis.core.Cursor;
  11. import org.springframework.data.redis.core.ScanOptions;
  12. import org.springframework.data.redis.core.types.Expiration;
  13. import priv.whh.std.common.constant.Constants;
  14. import java.io.IOException;
  15. import java.nio.charset.StandardCharsets;
  16. import java.util.*;
  17. import java.util.concurrent.TimeUnit;
  18. import java.util.function.Function;
  19. import java.util.function.Predicate;
  20. @SuppressWarnings("unchecked")
  21. public class RedisSpringDataExtCache<K, V> extends RedisSpringDataCache<K, V> {
  22. private Logger logger = LoggerFactory.getLogger(RedisSpringDataExtCache.class);
  23. private RedisConnectionFactory connectionFactory;
  24. private RedisSpringDataCacheConfig<K, V> config;
  25. private Function<Object, byte[]> valueEncoder;
  26. private Function<byte[], Object> valueDecoder;
  27. public RedisSpringDataExtCache(RedisSpringDataCacheConfig<K, V> config) {
  28. super(config);
  29. this.connectionFactory = config.getConnectionFactory();
  30. if (connectionFactory == null) {
  31. throw new CacheConfigException("connectionFactory is required");
  32. }
  33. this.config = config;
  34. this.valueEncoder = config.getValueEncoder();
  35. this.valueDecoder = config.getValueDecoder();
  36. }
  37. private void closeConnection(RedisConnection connection) {
  38. try {
  39. if (connection != null) {
  40. connection.close();
  41. }
  42. } catch (Exception ex) {
  43. logger.error("RedisConnection close fail: {}, {}", ex.getMessage(), ex.getClass().getName());
  44. }
  45. }
  46. @Override
  47. protected CacheGetResult<V> do_GET(K key) {
  48. RedisConnection con = null;
  49. try {
  50. con = connectionFactory.getConnection();
  51. byte[] newKey = buildKey(key);
  52. byte[] resultBytes = con.hGet(config.getKeyPrefix().getBytes(StandardCharsets.UTF_8), newKey);
  53. if (resultBytes != null) {
  54. CacheValueHolder<V> holder = (CacheValueHolder<V>) valueDecoder.apply(resultBytes);
  55. if (System.currentTimeMillis() >= holder.getExpireTime()) {
  56. return CacheGetResult.EXPIRED_WITHOUT_MSG;
  57. }
  58. return new CacheGetResult(CacheResultCode.SUCCESS, null, holder);
  59. } else {
  60. return CacheGetResult.NOT_EXISTS_WITHOUT_MSG;
  61. }
  62. } catch (Exception ex) {
  63. logError("GET", key, ex);
  64. return new CacheGetResult(ex);
  65. } finally {
  66. closeConnection(con);
  67. }
  68. }
  69. @Override
  70. protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
  71. RedisConnection con = null;
  72. try {
  73. con = connectionFactory.getConnection();
  74. ArrayList<K> keyList = new ArrayList<>(keys);
  75. byte[][] newKeys = keyList.stream().map((k) -> buildKey(k)).toArray(byte[][]::new);
  76. Map<K, CacheGetResult<V>> resultMap = new HashMap<>(16);
  77. if (newKeys.length > 0) {
  78. List mgetResults = con.hMGet(config.getKeyPrefix().getBytes(StandardCharsets.UTF_8), newKeys);
  79. for (int i = 0; i < mgetResults.size(); i++) {
  80. Object value = mgetResults.get(i);
  81. K key = keyList.get(i);
  82. if (value != null) {
  83. CacheValueHolder<V> holder = (CacheValueHolder<V>) valueDecoder.apply((byte[]) value);
  84. if (System.currentTimeMillis() >= holder.getExpireTime()) {
  85. resultMap.put(key, CacheGetResult.EXPIRED_WITHOUT_MSG);
  86. } else {
  87. CacheGetResult<V> r = new CacheGetResult<>(CacheResultCode.SUCCESS, null, holder);
  88. resultMap.put(key, r);
  89. }
  90. } else {
  91. resultMap.put(key, CacheGetResult.NOT_EXISTS_WITHOUT_MSG);
  92. }
  93. }
  94. }
  95. return new MultiGetResult<>(CacheResultCode.SUCCESS, null, resultMap);
  96. } catch (Exception ex) {
  97. logError("GET_ALL", "keys(" + keys.size() + ")", ex);
  98. return new MultiGetResult<>(ex);
  99. } finally {
  100. closeConnection(con);
  101. }
  102. }
  103. @Override
  104. protected CacheResult do_REMOVE(K key) {
  105. RedisConnection con = null;
  106. try {
  107. con = connectionFactory.getConnection();
  108. if (key instanceof String && ((String) key).endsWith(Constants.STAR)) {
  109. // 如果匹配到"*",则认定模糊删除
  110. // TODO 集群暂时不考虑
  111. Set<String> keys = new HashSet<>(16);
  112. Cursor<byte[]> cursor = con.scan(ScanOptions.scanOptions().count(10000).match(config.getKeyPrefix()
  113. + key).build());
  114. while (cursor.hasNext()) {
  115. keys.add(new String(cursor.next(), StandardCharsets.UTF_8));
  116. }
  117. byte[][] newKeys = keys.stream().map(s -> s.getBytes(StandardCharsets.UTF_8)).toArray((len) ->
  118. new byte[keys.size()][]);
  119. Long result = con.del(newKeys);
  120. if (result != null) {
  121. return CacheResult.SUCCESS_WITHOUT_MSG;
  122. } else {
  123. return new CacheResult(CacheResultCode.FAIL, "result:" + result);
  124. }
  125. } else {
  126. byte[] keyBytes = buildKey(key);
  127. Long result = con.hDel(config.getKeyPrefix().getBytes(StandardCharsets.UTF_8), keyBytes);
  128. if (result == null) {
  129. return new CacheResult(CacheResultCode.FAIL, "result:" + result);
  130. } else if (result == 1) {
  131. return CacheResult.SUCCESS_WITHOUT_MSG;
  132. } else if (result == 0) {
  133. return new CacheResult(CacheResultCode.NOT_EXISTS, null);
  134. } else {
  135. return CacheResult.FAIL_WITHOUT_MSG;
  136. }
  137. }
  138. } catch (Exception ex) {
  139. logError("REMOVE", key, ex);
  140. return new CacheResult(ex);
  141. } finally {
  142. closeConnection(con);
  143. }
  144. }
  145. @Override
  146. protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) {
  147. RedisConnection con = null;
  148. try {
  149. con = connectionFactory.getConnection();
  150. boolean judge = keys.stream().allMatch((Predicate<K>) k -> k instanceof String && ((String) k).endsWith(Constants.STAR));
  151. if (judge) {
  152. // 如果匹配到"*",则认定模糊删除
  153. // TODO 集群暂时不考虑
  154. Set<String> keySet = new HashSet<>(16);
  155. for (K k : keys) {
  156. Cursor<byte[]> cursor = con.scan(ScanOptions.scanOptions().count(10000).match(config.getKeyPrefix() + k).build());
  157. while (cursor.hasNext()) {
  158. keySet.add(new String(cursor.next(), StandardCharsets.UTF_8));
  159. }
  160. }
  161. byte[][] newKeys = keySet.stream().map(s -> s.getBytes(StandardCharsets.UTF_8)).toArray((len) ->
  162. new byte[keys.size()][]);
  163. Long result = con.del(newKeys);
  164. if (result != null) {
  165. return CacheResult.SUCCESS_WITHOUT_MSG;
  166. } else {
  167. return new CacheResult(CacheResultCode.FAIL, "result:" + result);
  168. }
  169. } else {
  170. byte[][] newKeys = keys.stream().map((k) -> buildKey(k)).toArray((len) -> new byte[keys.size()][]);
  171. Long result = con.hDel(config.getKeyPrefix().getBytes(StandardCharsets.UTF_8), newKeys);
  172. if (result != null) {
  173. return CacheResult.SUCCESS_WITHOUT_MSG;
  174. } else {
  175. return new CacheResult(CacheResultCode.FAIL, "result:" + result);
  176. }
  177. }
  178. } catch (Exception ex) {
  179. logError("REMOVE_ALL", "keys(" + keys.size() + ")", ex);
  180. return new CacheResult(ex);
  181. } finally {
  182. closeConnection(con);
  183. }
  184. }
  185. @Override
  186. protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
  187. RedisConnection con = null;
  188. try {
  189. con = connectionFactory.getConnection();
  190. CacheValueHolder<V> holder = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite));
  191. byte[] keyBytes = buildKey(key);
  192. byte[] valueBytes = valueEncoder.apply(holder);
  193. boolean result = hSet(con, config.getKeyPrefix().getBytes(StandardCharsets.UTF_8), keyBytes,
  194. valueBytes, timeUnit.toMillis(expireAfterWrite));
  195. if (Boolean.TRUE.equals(result)) {
  196. return CacheResult.SUCCESS_WITHOUT_MSG;
  197. } else {
  198. return new CacheResult(CacheResultCode.FAIL, "result:" + result);
  199. }
  200. } catch (Exception ex) {
  201. logError("PUT", key, ex);
  202. return new CacheResult(ex);
  203. } finally {
  204. closeConnection(con);
  205. }
  206. }
  207. @Override
  208. protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
  209. RedisConnection con = null;
  210. try {
  211. con = connectionFactory.getConnection();
  212. int failCount = 0;
  213. for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
  214. CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
  215. boolean result = hSet(con, config.getKeyPrefix().getBytes(StandardCharsets.UTF_8),
  216. buildKey(en.getKey()), valueEncoder.apply(holder), timeUnit.toMillis(expireAfterWrite));
  217. if (!Boolean.TRUE.equals(result)) {
  218. failCount++;
  219. }
  220. }
  221. return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
  222. failCount == map.size() ? CacheResult.FAIL_WITHOUT_MSG : CacheResult.PART_SUCCESS_WITHOUT_MSG;
  223. } catch (Exception ex) {
  224. logError("PUT_ALL", "map(" + map.size() + ")", ex);
  225. return new CacheResult(ex);
  226. } finally {
  227. closeConnection(con);
  228. }
  229. }
  230. @Override
  231. protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
  232. RedisConnection con = null;
  233. try {
  234. con = connectionFactory.getConnection();
  235. CacheValueHolder<V> holder = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite));
  236. byte[] newKey = buildKey(key);
  237. Boolean result = con.set(newKey, valueEncoder.apply(holder),
  238. Expiration.from(expireAfterWrite, timeUnit), RedisStringCommands.SetOption.ifAbsent());
  239. if (Boolean.TRUE.equals(result)) {
  240. return CacheResult.SUCCESS_WITHOUT_MSG;
  241. }/* else if (result == null) {
  242. return CacheResult.EXISTS_WITHOUT_MSG;
  243. } */ else {
  244. return CacheResult.EXISTS_WITHOUT_MSG;
  245. }
  246. } catch (Exception ex) {
  247. logError("PUT_IF_ABSENT", key, ex);
  248. return new CacheResult(ex);
  249. } finally {
  250. closeConnection(con);
  251. }
  252. }
  253. @Override
  254. public byte[] buildKey(K key) {
  255. try {
  256. Object newKey = key;
  257. if (key instanceof byte[]) {
  258. newKey = key;
  259. } else if (key instanceof String) {
  260. newKey = key;
  261. } else {
  262. if (config.getKeyConvertor() != null) {
  263. newKey = config.getKeyConvertor().apply(key);
  264. }
  265. }
  266. return ExternalKeyUtil.buildKeyAfterConvert(newKey, "");
  267. } catch (IOException e) {
  268. throw new CacheException(e);
  269. }
  270. }
  271. private boolean hSet(RedisConnection con, byte[] key, byte[] field, byte[] value, long millis) {
  272. try {
  273. con.openPipeline();
  274. con.hashCommands().hSet(key, field, value);
  275. con.keyCommands().pExpire(key, millis);
  276. return true;
  277. } catch (Exception e) {
  278. logger.error("Failed to hset, cause: {}", e);
  279. return false;
  280. } finally {
  281. con.closePipeline();
  282. }
  283. }
  284. }

2.7 value序列化方式采用redis配置的序列化方式 

考虑到fastjson兼容性可能会有坑,官方也不推荐fastjson给使用方,怕有坑,而其它方式的序列化后redis客户端看不了具体的值,没什么具体的性能要求的话就使用jackson了,redis客户端看的方便。但是使用jackson进行配置后,需要配置om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL)。 Jetcache缓存结果会封装一个对象进行缓存(com.alicp.jetcache.CacheValueHolder),但是这个对象final且不是public,因此使用jackson后没有类型信息,反序列化的时候会直接报错,因此只能改源码,仅此一次为了兼容jackson方式一定要修改,fastjson没有次问题。(按道理jackson配置不该有这种限制,但是基本上试了enableDefaultTyping的所有方式,但是没有效果)

  1. import com.alicp.jetcache.anno.support.SpringConfigProvider;
  2. import org.springframework.data.redis.serializer.RedisSerializer;
  3. import priv.whh.std.boot.cache.jetcache.convertor.Md5CacheKeyConvertor;
  4. import priv.whh.std.boot.cache.jetcache.decoder.DefaultValueDecoder;
  5. import priv.whh.std.boot.cache.jetcache.encoder.DefaultValueEncoder;
  6. import java.util.Objects;
  7. import java.util.function.Function;
  8. public class SpringConfigExtProvider extends SpringConfigProvider {
  9. private static final String DEFAULT = "default";
  10. private static final String MD5 = "MD5";
  11. private final RedisSerializer<Object> redisSerializer;
  12. public SpringConfigExtProvider(RedisSerializer<Object> redisSerializer) {
  13. this.redisSerializer = redisSerializer;
  14. }
  15. @Override
  16. public Function<Object, byte[]> parseValueEncoder(String valueEncoder) {
  17. if (Objects.equals(valueEncoder, DEFAULT)) {
  18. return new DefaultValueEncoder(redisSerializer);
  19. }
  20. return super.parseValueEncoder(valueEncoder);
  21. }
  22. @Override
  23. public Function<byte[], Object> parseValueDecoder(String valueDecoder) {
  24. if (Objects.equals(valueDecoder, DEFAULT)) {
  25. return new DefaultValueDecoder(redisSerializer);
  26. }
  27. return super.parseValueDecoder(valueDecoder);
  28. }
  29. @Override
  30. public Function<Object, Object> parseKeyConvertor(String convertor) {
  31. if (Objects.equals(convertor, MD5)) {
  32. return new Md5CacheKeyConvertor();
  33. }
  34. return super.parseKeyConvertor(convertor);
  35. }
  36. }

3. 实战

4. FAQ

4.1 什么时候适合使用远程缓存,什么时候适合使用本地缓存

本身Redis的访问速度是不慢的,但是如果访问的数量上去后,Redis的连接数会打满,所以增加本地缓存我认为主要还是为了防止突发的流量。本地缓存过期时间设置大概率是要短的,过期时间一到自动刷新去Redis加载最新的数据。Redis缓存过期时如果没加分布式锁的话会直接访问数据库,本地缓存(Caffeine、Guava)一旦时间过期了,并且此时有访问的话,Caffeine、Guava会按照过期的数据返回(还没验证)。

4.2 是否需要保证强一致性

多级缓存基本上用的应该是查询和删除操作,基本应该不会使用更新操作,因为改数据库再改缓存存在的风险大于改数据库删除缓存。本地缓存因为是在服务器内部,删除缓存的时候也只是删除当前服务器的本地缓存。本地缓存过期时间设置的比较短,一定延迟性基本上是可以接受的,我觉得不一定非要强一致性。多级缓存如果没有特别的手段都会有一定风险导致不一致性(短时间的),有过期时间的限制可以保证最终一致性。Redis可以作为消息中心,可以通知更新到各服务器的本地缓存,但是有点大材小用。除非本地缓存设置的过期时间比较长,如果设置的过期时间比较短的话,感觉没有必要。

5. 参考资料

JetCache官网

AutoLoadCache官网

LayeringCache官网

J2Cache官网

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/991778
推荐阅读
相关标签
  

闽ICP备14008679号