赞
踩
❗缓存,消息队列,分库分表是高并发解决方案三剑客。
设计系统缓存的主要目的是提高系统的性能和可伸缩性,同时减轻底层资源(如数据库、网络)的负载。以下是设计系统缓存的几个主要原因:
总的来说,设计系统缓存是为了提高系统的性能、可扩展性和稳定性,提升用户体验,并降低系统成本。通过合理的缓存设计和策略,可以充分利用缓存技术的优势,优化系统架构和资源利用,使系统更具竞争力和可持续性。
解决复杂高并发系统的缓存设计是一个关键的问题,下面给出一些常见的缓存设计策略和注意事项:
总而言之,核心需解决两大问题:
缓存雪崩、缓存击穿和缓存穿透是常见的缓存相关问题,下面分别介绍如何解决这些问题:
通过为缓存设置随机的过期时间,避免多个缓存同时过期导致的数据库压力集中。
对于热点数据,可以设置其缓存永不过期,确保其始终可用。
在系统启动时或低峰期,提前加载缓存数据,避免在高峰期同时访问数据库。
采用多级缓存,将缓存分布在不同层级,避免单一缓存层级出现故障导致缓存雪崩。
在缓存失效时,通过加锁的方式保证只有一个线程可以访问数据库,其他线程等待该线程完成后从缓存中获取数据。
当查询数据库结果为空时,将空值也缓存起来,设置较短的过期时间,避免多次查询数据库。
布隆过滤器可以判断一个元素是否存在于集合中,用于预先过滤不存在的数据,减少对数据库的查询压力。
在业务层面对请求参数进行校验和合法性检查,过滤掉非法请求,避免对不存在的数据进行缓存查询。
以上是一些常见的解决缓存雪崩、缓存击穿和缓存穿透问题的方法,将针对以上方法详细讲解。
使用多级缓存架构可以有效地提高缓存的稳定性和性能。下面是一个基本的多级缓存架构的实现示例:
❗多级缓存联想到[CPU多级缓存](https://zhuanlan.zhihu.com/p/370057417)。现代计算机系统中的CPU通常包含多级缓存,例如一级缓存(L1 Cache)、二级缓存(L2 Cache)和三级缓存(L3 Cache)。这些缓存层级用于存储CPU频繁访问的数据,以提高计算机系统的性能。
以下是一个简单的示例,展示如何在Java中使用多级缓存来处理电子商务网站的商品详情页:
public class ProductDetailCache { private static final int LOCAL_CACHE_EXPIRATION = 10; // 本地缓存过期时间,单位:秒 private static final int DISTRIBUTED_CACHE_EXPIRATION = 60; // 分布式缓存过期时间,单位:秒 private Cache<String, ProductDetail> localCache; // 本地缓存 private Cache<String, ProductDetail> distributedCache; // 分布式缓存 public ProductDetailCache() { // 初始化本地缓存,使用适合的本地缓存实现,如 Caffeine、Guava Cache 等 localCache = Caffeine.newBuilder() .expireAfterWrite(LOCAL_CACHE_EXPIRATION, TimeUnit.SECONDS) .build(); // 初始化分布式缓存,使用适合的分布式缓存实现,如 Redis、Memcached 等 distributedCache = RedisCacheBuilder.newBuilder() .expireAfterWrite(DISTRIBUTED_CACHE_EXPIRATION, TimeUnit.SECONDS) .build(); } public ProductDetail getProductDetail(String productId) { // 尝试从本地缓存获取商品详情 ProductDetail productDetail = localCache.getIfPresent(productId); if (productDetail != null) { return productDetail; } // 尝试从分布式缓存获取商品详情 productDetail = distributedCache.get(productId); if (productDetail != null) { // 将数据存储到本地缓存 localCache.put(productId, productDetail); return productDetail; } // 从数据库中查询商品详情 productDetail = databaseLookup(productId); if (productDetail != null) { // 将数据存储到本地缓存和分布式缓存 localCache.put(productId, productDetail); distributedCache.put(productId, productDetail); } return productDetail; } private ProductDetail databaseLookup(String productId) { // 查询数据库,获取商品详情数据 // ... // 返回商品详情对象 } }
在上述示例中,我们使用了两个缓存对象,一个是本地缓存(localCache
),另一个是分布式缓存(distributedCache
)。在获取商品详情时,首先尝试从本地缓存中获取数据,如果缓存命中则直接返回;如果本地缓存未命中,则尝试从分布式缓存中获取数据;如果分布式缓存也未命中,则从数据库中查询商品详情,并将结果存储到本地缓存和分布式缓存中。
本地缓存是指将数据存储在应用程序所在的进程或服务器的内存中,以便快速访问和提高系统性能。本地缓存通常是一个轻量级的数据存储,用于临时存储频繁访问或计算昂贵的数据,避免重复的计算或查询操作。
在Java中,常见的本地缓存实现包括:
使用本地缓存时,需要考虑缓存的大小、过期策略、并发访问控制等因素,以确保缓存能够正确地满足系统需求,并避免潜在的问题(如缓存过期、内存溢出等)。
常见的本地缓存实现有 ConcurrentHashMap、Caffeine 和 Guava Cache。以下是它们的优缺点:
ConcurrentHashMap 使用举例:
javaCopy code import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class Example { public static void main(String[] args) { // 创建 ConcurrentHashMap 实例 Map<String, String> cache = new ConcurrentHashMap<>(); // 向缓存中放入数据 cache.put("key1", "value1"); cache.put("key2", "value2"); // 从缓存中获取数据String value1 = cache.get("key1"); String value2 = cache.get("key2"); System.out.println(value1); // 输出: value1 System.out.println(value2); // 输出: value2 } }
javaCopy code import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; public class Example { public static void main(String[] args) { // 创建 Caffeine 缓存实例 Cache<String, String> cache = Caffeine.newBuilder() .maximumSize(100) .build(); // 向缓存中放入数据 cache.put("key1", "value1"); cache.put("key2", "value2"); // 从缓存中获取数据String value1 = cache.getIfPresent("key1"); String value2 = cache.getIfPresent("key2"); System.out.println(value1); // 输出: value1 System.out.println(value2); // 输出: value2 } }
需要根据具体的需求和场景选择合适的本地缓存实现。如果需要简单的线程安全缓存,可以使用 ConcurrentHashMap;如果对高性能和丰富特性有更高的要求,可以选择 Caffeine;如果希望简单易用且具备一定可配置性,可以考虑使用 Guava Cache。
Guava Cache 使用举例:
javaCopy code import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.util.concurrent.TimeUnit; public class Example { public static void main(String[] args) { // 创建 Guava Cache 实例 Cache<String, String> cache = CacheBuilder.newBuilder() .maximumSize(100) .expireAfterWrite(10, TimeUnit.MINUTES) .build(); // 向缓存中放入数据 cache.put("key1", "value1"); cache.put("key2", "value2"); // 从缓存中获取数据String value1 = cache.getIfPresent("key1"); String value2 = cache.getIfPresent("key2"); System.out.println(value1); // 输出: value1 System.out.println(value2); // 输出: value2 } }
springboot整合redis,springboot2.0以后是使用lettuce(低版本)存在问题,堆外内存溢出
解决方法:
切换到jedis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
Redis 的 SETNX
在 Redis 命令行中是这样执行的:
set <key> <value> NX
springboot整合redis
导入pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
yml中加入
spring:
redis:
database: 0
host: 127.0.0.1
port: 6379
jedis:
pool:
max-active: 100
max-idle: 10
max-wait: 100000
timeout: 5000
不管是RedisTemplate还是StringRedisTemplate,序列化都有问题,会出现乱码的情况:
在config包下面来设置RedisTemplate的序列化
/** * redis配置缓存 */ @Configuration @EnableAutoConfiguration @EnableCaching public class RedisConfig { @Bean public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){ RedisTemplate<String,Object> template = new RedisTemplate<>(); //关联 template.setConnectionFactory(factory); //设置key的序列化器 template.setKeySerializer(new StringRedisSerializer()); //设置value的序列化器 template.setValueSerializer(new StringRedisSerializer()); return template; } /** * 基于SpringBoot2 对 RedisCacheManager 的自定义配置 * @param redisConnectionFactory * @return */ @Bean public RedisCacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) { //初始化一个RedisCacheWriter RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory); //设置CacheManager的值序列化方式为json序列化 RedisSerializer<Object> jsonSerializer = new GenericJackson2JsonRedisSerializer(); RedisSerializationContext.SerializationPair<Object> pair = RedisSerializationContext.SerializationPair.fromSerializer(jsonSerializer); RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith(pair); //设置默认超过时期是1天 defaultCacheConfig.entryTtl(Duration.ofDays(1)); Map<String, RedisCacheConfiguration> redisCacheConfigurationMap = new HashMap<>(); // 需要作缓存在这里加上就加一个put即可 redisCacheConfigurationMap.put("graph", this.getRedisCacheConfigurationWithTtl(1)); // 120秒后失效 //初始化RedisCacheManager return new RedisCacheManager(redisCacheWriter, defaultCacheConfig,redisCacheConfigurationMap); } private RedisCacheConfiguration getRedisCacheConfigurationWithTtl(Integer day) { RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig(); RedisSerializer<Object> jsonSerializer = new GenericJackson2JsonRedisSerializer(); RedisSerializationContext.SerializationPair<Object> pair = RedisSerializationContext.SerializationPair.fromSerializer(jsonSerializer); redisCacheConfiguration = redisCacheConfiguration.serializeValuesWith( RedisSerializationContext.SerializationPair.fromSerializer(jsonSerializer) ).entryTtl(Duration.ofDays(day)); return redisCacheConfiguration; } }
Spring 提供了四个注解来声明缓存规则。@Cacheable,@CachePut,@CacheEvict,@Caching。
注解 | 描述 |
---|---|
@Cacheable | 在方法调用前,首先先去缓存中找方法的返回值,如果能找到,则返回缓存值,否则就执行这个方法,并将返回值放到缓存 |
@CachePut | 方法调用前不会去找缓存,无论如何都会执行方法,在执行后将缓存放入缓存中 |
@CacheEvict | 清除缓存中的一个或者多个记录,默认方法执行后删除缓存 |
@Caching | 能够同时应用多个缓存注解 |
使用缓存
1、pom
//springcahe
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
//redis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、配置文件
spring.chache.type=redis
3、启动类上添加 @EnableCaching
注解。
自定义配置类
# 使用 Redis 作为缓存组件
spring.cache.type=redis
# 缓存过期时间为 3600s
spring.cache.redis.time-to-live=3600000
# 缓存的键的名字前缀
spring.cache.redis.key-prefix=passjava_
# 是否使用缓存前缀
spring.cache.redis.use-key-prefix=true
# 是否缓存控制,防止缓存穿透
spring.cache.redis.cache-null-values=true
❗分布式锁推荐!!!这是讲解最好分布锁的文章!!!以下内容也是引用该篇文章https://my.oschina.net/u/4499317/blog/5039486
我们先用 Redis 的 SETNX 命令来实现最简单的分布式锁
// 1.先抢占锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");
if(lock) {
// 2.抢占成功,执行业务
List<TypeEntity> typeEntityListFromDb = getDataFromDB();
// 3.解锁
redisTemplate.delete("lock");
return typeEntityListFromDb;
} else {
// 4.休眠一段时间
sleep(100);
// 5.抢占失败,等待锁释放
return getTypeEntityListByRedisDistributedLock();
}
setnx
占锁成功,业务代码出现异常或者服务器宕机,没有执行删除锁的逻辑,就造成了死锁。
设置锁的设置过期时间,过一段时间后,自动删除锁,这样其他线程就能获取到锁了。
// 1.先抢占锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");
if(lock) {
// 2.在 10s 以后,自动清理 lock
redisTemplate.expire("lock", 10, TimeUnit.SECONDS);
// 3.抢占成功,执行业务
List<TypeEntity> typeEntityListFromDb = getDataFromDB();
// 4.解锁
redisTemplate.delete("lock");
return typeEntityListFromDb;
}
白银方案看似解决了线程异常或服务器宕机造成的锁未释放的问题,但还是存在其他问题:
因为占锁和设置过期时间是分两步执行的,所以如果在这两步之间发生了异常,则锁的过期时间根本就没有设置成功。
所以和青铜方案有一样的问题:锁永远不能过期。
事务的原子性(Atom)
# 设置某个 key 的值并设置多少毫秒或秒 过期。
set <key> <value> PX <多少毫秒> NX
或
set <key> <value> EX <多少秒> NX
//setIfAbsent("lock", "123", 10, TimeUnit.SECONDS);
// 1.先抢占锁与过期时间
Boolean lock = setIfAbsent("lock", "123", 10, TimeUnit.SECONDS);
if(lock) {
// 3.抢占成功,执行业务
List<TypeEntity> typeEntityListFromDb = getDataFromDB();
// 4.解锁
redisTemplate.delete("lock");
return typeEntityListFromDb;
}
因为锁的编号都叫做 “123”
,用户 A 只认锁编号,看见编号为 “123”
的锁就开,结果把用户 B 的锁打开了,此时用户 B 还未执行完任务,当然生气了。
// 1.生成唯一 id String uuid = UUID.randomUUID().toString(); // 2. 抢占锁 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS); if(lock) { System.out.println("抢占成功:" + uuid); // 3.抢占成功,执行业务 List<TypeEntity> typeEntityListFromDb = getDataFromDB(); // 4.获取当前锁的值 String lockValue = redisTemplate.opsForValue().get("lock"); // 5.如果锁的值和设置的值相等,则清理自己的锁 if(uuid.equals(lockValue)) { System.out.println("清理锁:" + lockValue); redisTemplate.delete("lock"); } return typeEntityListFromDb; } else { System.out.println("抢占失败,等待锁释放"); // 4.休眠一段时间 sleep(100); // 5.抢占失败,等待锁释放 return getTypeEntityListByRedisDistributedLock(); }
上面的线程 A 查询锁和删除锁的逻辑不是原子性
的,所以将查询锁和删除锁这两步作为原子指令操作就可以了。
//这段 Redis 专属脚本
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
//这段脚本和铂金方案的获取key,删除key的方式很像。先获取 KEYS[1] 的 value,判断 KEYS[1] 的 value 是否和 ARGV[1] 的值相等,如果相等,则删除 KEYS[1]。
分两步:先定义脚本;用 redisTemplate.execute 方法执行脚本。
// 脚本解锁
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
//而这段 Redis 脚本是由 Redis 内嵌的 Lua 环境执行的,所以又称作 Lua 脚本。
布隆过滤器(Bloom Filter)是一种空间效率高、快速判断元素是否存在的概率型数据结构。它基于哈希函数和位数组实现,并可以用于检索一个元素是否存在于一个集合中。
布隆过滤器的主要特点包括:
使用布隆过滤器时需要注意以下几点:
布隆过滤器适用于需要判断元素是否可能存在于集合中,但对于具体存在性要求不严格的场景,例如缓存穿透的过滤、网页黑名单过滤等。
在Java中,可以使用第三方库实现布隆过滤器,例如 Google Guava 库中提供了 com.google.common.hash.BloomFilter
类。另外,Apache Commons Collections 也提供了布隆过滤器的实现。这些库提供了简单易用的 API,可以方便地创建和使用布隆过滤器。
public class UserServiceImpl { @Autowired UserDAO userDAO; @Autowired RedisCache redisCache; public User findUser(Integer id) { Object object = redisCache.get(Integer.toString(id)); // 缓存中存在,直接返回 if(object != null) { // 检验该对象是否为缓存空对象,是则直接返回null if(object instanceof NullValueResultDO) { return null; } return (User)object; } else { // 缓存中不存在,查询数据库 User user = userDAO.getUser(id); // 存入缓存 if(user != null) { redisCache.put(Integer.toString(id),user); } else { // 将空对象存进缓存 redisCache.put(Integer.toString(id), new NullValueResultDO()); } return user; } } }
//哈希函数 public static class MyHash { private int cap; private int seed; // 初始化数据 public MyHash(int cap, int seed) { this.cap = cap; this.seed = seed; } // 哈希函数 public int hash(String value) { int result = 0; int len = value.length(); for (int i = 0; i < len; i++) { result = seed * result + value.charAt(i); } return (cap - 1) & result; } } //布隆过滤器 public class MyBloomFilter { // 布隆过滤器长度 private static final int SIZE = 2 << 10; // 模拟实现不同的哈希函数 private static final int[] num= new int[] {5, 19, 23, 31,47, 71}; // 初始化位数组 private BitSet bits = new BitSet(SIZE); // 用于存储哈希函数 private MyHash[] function = new MyHash[num.length]; // 初始化哈希函数 public MyBloomFilter() { for (int i = 0; i < num.length; i++) { function [i] = new MyHash(SIZE, num[i]); } } // 存值Api public void add(String value) { // 对存入得值进行哈希计算 for (MyHash f: function) { // 将为数组对应的哈希下标得位置得值改为1 bits.set(f.hash(value), true); } } // 判断是否存在该值得Api public boolean contains(String value) { if (value == null) { return false; } boolean result= true; for (MyHash f : func) { result= result&& bits.get(f.hash(value)); } return result; } }
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>27.0.1-jre</version> </dependency> // BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")),size, 误判率); public static void MyBloomFilterSysConfig { @Autowired OrderMapper orderMapper // 1.创建布隆过滤器 第二个参数为预期数据量10000000,第三个参数为错误率0.00001 BloomFilter<CharSequence> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")),10000000, 0.00001); // 2.获取所有的订单,并将订单的id放进布隆过滤器里面 List<Order> orderList = orderMapper.findAll() for (Order order;orderList ) { Long id = order.getId(); bloomFilter.put("" + id); } }
BloomFilter<String> bloomFilter = redisson.getBloomFilter("phoneList");
//初始化布隆过滤器:预计元素为100000000L,误差率为3%
bloomFilter.tryInit(100000000L,0.03);
//将号码10086插入到布隆过滤器中
bloomFilter.add("10086");
本章主要是针对与解决复杂高并发系统缓存的缓存三大问题进行讲解,后续将针对如何解决缓存的一致性进行论述。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。