赞
踩
排行榜是业务开发中常见的一个场景,如何设计一个好的数据结构能够满足高效实时的查询,下面我们结合一个实际例子来讨论一下。
大概需求就是: 排行榜上显示前n个积分最高的用户. 并且相同积分先完成的排在前面. 并且还要能看到自己当前的积分.
看到这个需求的时候就想到可以用redis zset来实现
对于整个排行榜, 我们用 zset 保存排行榜数据, key 为排行榜信息, member 为用户id, score 存储用户积分.
用户信息(排行榜需要的头像昵称) 再用string或hash结构存储. 这个没啥说的, 就是查用户信息的时候别一个一个查就好了.
zset对于score相同的排序是按照key的字典序排的.
所以我们需要在score里面加入时间信息.
比较简单的一种方式是积分乘以10的n次方, 后面n位用于存储时间信息.
由于时间小的排在前面, 所以可以取一个最大时间减去当前时间.
score = 积分 * 1E10 + 最大时间 - 当前时间
(其实也考虑过把时间信息放在小数位, 但是发现会丢失精度就放弃了)
需要注意两点
然后我这里根据实际业务, 使用的固定的最大时间2050年, 时间精确到秒, 所以n取的10, 此时支持的最大积分为90w, 满足实际业务场景.
这里贴一下相关代码
private static final double STEP = 1E10; private static final long SECOND_20500101 = 2524579200L; public static Double createScoreWithTimeAsc(Integer value, long timeSecond) { if (value == null) { return 0D; } return value * STEP + SECOND_20500101 - timeSecond; } /** * 返回的是incrScore, 用于 redisTemplate.opsForZSet().incrementScore */ public static Double incrScoreWithTimeAsc(Integer increment, Double originScore) { if (increment == null) { return 0D; } if (originScore == null || originScore < 1.0) { return createScoreWithTimeAsc(delta); } double last = originScore % STEP; //上次的时间 long now = System.currentTimeMillis() / 1000; return increment * STEP - last + SECOND_20500101 - now; } public static long getValueFromTimeScore(Double score) { return (long) (score / STEP); }
然后在加减积分的代码就不贴了, 但是需要注意并发的情况, 对于这种排行榜类的数据也是比较容易出现并发的.
虽然我们将排行榜数据存入zset中了, 但这个只是提高了我们的访问效率, 并不能完全保证数据的准确性. 可能会因为各种原因(并发, 网络异常)导致缓存数据不准确, 因此需要定时刷新缓存数据.
然后缓存刷新策略大概有一下几种:
大概就是使用一个定时任务, 查询数据库最近一段时间内积分变化的用户, 对这些用户的积分进行重算, 刷新缓存.
还有就是到了缓存过期的时候, 就别再刷了. 除非打算这个排行榜一直存在缓存中.
缓存肯定是要设置过期时间的, 过期时间肯定是在缓存数据不经常访问的时候. 那如果缓存过期后用户访问排行榜, 这个时候就需要从数据库中查询相关数据, 重新计算排行榜前n位(没必要全部重算), 显然重算排行榜是一个比较费事费力的操作.
但是假如这个时候是大量用户并发访问, 然后查询排行榜缓存, 发现没有数据, 于是都去查询数据库重算. 这个时候数据库压力就会很大, 很容易挂掉. 即 缓存击穿.
然后解决方式大概有几种
然后结合具体场景, 这里加载缓存时不一定时缓存过期了, 可能还没构建缓存, 就有大量用户访问该排行榜. 并且该排行榜缓存没有必要一直存在, 浪费空间.
然后记得用双重锁检测
// 伪代码 // redis 里查排行榜 Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeWithScores(zSetKey, 0, endIndex); // 判断是否为空 if (CollectionUtils.isEmpty(typedTuples)) { // 如果缓存为空, 则加锁加载缓存, 防止缓存击穿 lock(); try { // 再查一次 typedTuples = stringRedisTemplate.opsForZSet().reverseRangeWithScores(zSetKey, 0, endIndex); if (CollectionUtils.isEmpty(typedTuples)) { // 还是空, 重新加载缓存 result = computeDataFromDB(); // empty return if (CollectionUtils.isEmpty(result)) { return Collections.emptyList(); } typedTuples = loadDataToCache(result); } } finally { unlock(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。