赞
踩
运行起来前端后端
前端的nginx运行一定注意文件路径不能有中文,否则不行
后端记得修改application.yaml文件就可以
server: port: 8081 spring: application: name: hmdp datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/hmdp?useSSL=false&serverTimezone=UTC username: root password: 123456 redis: host: localhost port: 6379 lettuce: pool: max-active: 10 max-idle: 10 min-idle: 1 time-between-eviction-runs: 10s jackson: default-property-inclusion: non_null # JSON处理时忽略非空字段 mybatis-plus: type-aliases-package: com.hmdp.entity # 别名扫描包 logging: level: com.hmdp: debug
@Override
public Result code(String phone, HttpSession session) {
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("请输入正确的手机号");
}
Integer code = ValidateCodeUtils.generateValidateCode(6);
SMSUtils.sendMessage("苏一拉", "模板", phone, code.toString());
session.setAttribute("code", code);
log.info("code:" + code);
return Result.ok("短信发送成功");
}
@Override public Result login(LoginFormDTO loginForm, HttpSession session) { String phone = loginForm.getPhone(); String code = loginForm.getCode(); String password = loginForm.getPassword(); //1.校验手机号 if (RegexUtils.isPhoneInvalid(phone)) { return Result.fail("手机格式不正确"); } //2.校验验证码 String beforeCode = session.getAttribute("code").toString(); if (code == null || !beforeCode.equals(code)) { return Result.fail("验证码错误"); } //3.根据手机号查询用户 User user = query().eq(loginForm.getPhone() != null, "phone", phone).one(); if (user == null) { user=saveUserWithPhone(phone); } session.setAttribute("user", user); return Result.ok(); }
阿里云发送短信的两个工具类
package com.hmdp.utils; import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest; import com.aliyuncs.dysmsapi.model.v20170525.SendSmsResponse; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.profile.DefaultProfile; /** * 短信发送工具类 */ public class SMSUtils { /** * 发送短信 * @param signName 签名 * @param templateCode 模板 * @param phoneNumbers 手机号 * @param param 参数 */ public static void sendMessage(String signName, String templateCode,String phoneNumbers,String param){ DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "", ""); IAcsClient client = new DefaultAcsClient(profile); SendSmsRequest request = new SendSmsRequest(); request.setSysRegionId("cn-hangzhou"); request.setPhoneNumbers(phoneNumbers); request.setSignName(signName); request.setTemplateCode(templateCode); request.setTemplateParam("{\"code\":\""+param+"\"}"); try { SendSmsResponse response = client.getAcsResponse(request); System.out.println("短信发送成功"); }catch (ClientException e) { e.printStackTrace(); } } }
package com.hmdp.utils; import java.util.Random; /** * 随机生成验证码工具类 */ public class ValidateCodeUtils { /** * 随机生成验证码 * @param length 长度为4位或者6位 * @return */ public static Integer generateValidateCode(int length){ Integer code =null; if(length == 4){ code = new Random().nextInt(9999);//生成随机数,最大为9999 if(code < 1000){ code = code + 1000;//保证随机数为4位数字 } }else if(length == 6){ code = new Random().nextInt(999999);//生成随机数,最大为999999 if(code < 100000){ code = code + 100000;//保证随机数为6位数字 } }else{ throw new RuntimeException("只能生成4位或6位数字验证码"); } return code; } /** * 随机生成指定长度字符串验证码 * @param length 长度 * @return */ public static String generateValidateCode4String(int length){ Random rdm = new Random(); String hash1 = Integer.toHexString(rdm.nextInt()); String capstr = hash1.substring(0, length); return capstr; } }
拦截器LoginIntercetpor
package com.hmdp.utils; import com.hmdp.dto.UserDTO; import com.hmdp.entity.User; import org.springframework.web.servlet.HandlerInterceptor; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; public class LoginIntercetpor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 1.获取session HttpSession session = request.getSession(); // 2.获取session中的用户 Object user = session.getAttribute("user"); // 3. 判断用户是否存在 if(user==null){ // 4.不存在,拦截,返回401状态码 response.setStatus(401); return false; } // 5.存在,保存用户信息到ThreadLocal UserHolder.saveUser((UserDTO) user); // 6.放行 return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
配置拦截器
package com.hmdp.config; import com.hmdp.utils.LoginIntercetpor; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; @Configuration public class MvcConfig implements WebMvcConfigurer { @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new LoginIntercetpor()) .excludePathPatterns( // 不用拦截的请求 "/shop/**", "/voucher/**", "/shop-type/**", "/upload/**", "/blog/hot", "/user/code", "/user/login" ); } }
修改后
package com.hmdp.service.impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.copier.CopyOptions; import cn.hutool.core.lang.UUID; import cn.hutool.core.util.RandomUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.hmdp.dto.LoginFormDTO; import com.hmdp.dto.Result; import com.hmdp.dto.UserDTO; import com.hmdp.entity.User; import com.hmdp.mapper.UserMapper; import com.hmdp.service.IUserService; import com.hmdp.utils.RegexUtils; import com.hmdp.utils.SMSUtils; import com.hmdp.utils.ValidateCodeUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.servlet.http.HttpSession; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; import static com.hmdp.utils.RedisConstants.*; import static com.hmdp.utils.SystemConstants.USER_NICK_NAME_PREFIX; /** * <p> * 服务实现类 * </p> * * @author 虎哥 * @since 2021-12-22 */ @Slf4j @Service public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService { @Resource private StringRedisTemplate stringRedisTemplate; @Override public Result code(String phone,HttpSession session) { if (RegexUtils.isPhoneInvalid(phone)) { return Result.fail("请输入正确的手机号"); } Integer code = ValidateCodeUtils.generateValidateCode(4); SMSUtils.sendMessage("苏一拉", "模板", phone, code.toString()); stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code.toString(),LOGIN_CODE_TTL,TimeUnit.MINUTES); log.info("code:" + code); return Result.ok("短信发送成功"); } @Override public Result login(LoginFormDTO loginForm, HttpSession session) { String phone = loginForm.getPhone(); //1.校验手机号 if (RegexUtils.isPhoneInvalid(phone)) { return Result.fail("手机格式不正确"); } //2.校验验证码 String beforeCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY+phone); String code = loginForm.getCode(); if (code == null || !beforeCode.equals(code)) { return Result.fail("验证码错误"); } //3.根据手机号查询用户 User user = query().eq(loginForm.getPhone() != null, "phone", phone).one(); if (user == null) { user=saveUserWithPhone(phone); } String token = UUID.randomUUID().toString(true); UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); Map<String, Object> stringObjectMap = BeanUtil.beanToMap(userDTO,new HashMap<>(), CopyOptions.create() .setIgnoreNullValue(true) .setFieldValueEditor((fieldName,fieldValue) -> fieldValue.toString())); stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY+token,stringObjectMap); stringRedisTemplate.expire(LOGIN_USER_KEY+token,LOGIN_USER_TTL,TimeUnit.MINUTES); return Result.ok(token); } private User saveUserWithPhone(String phone) { User user = new User(); user.setPhone(phone); user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10)); save(user); return user; } }
第一个拦截器
这个时候我们需要加一个拦截器,也就是双层拦截器,第一个拦截器拦截所有请求,第二个拦截器,拦截登录部分请求,设置两个拦截器的原因就是不管用户有没有登录,我们都会进行刷新token的有效期
package com.hmdp.utils; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.StrUtil; import com.hmdp.dto.UserDTO; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.servlet.HandlerInterceptor; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.Map; import java.util.concurrent.TimeUnit; import static com.hmdp.utils.RedisConstants.LOGIN_USER_KEY; import static com.hmdp.utils.RedisConstants.LOGIN_USER_TTL; public class RefreshTokenInterceptor implements HandlerInterceptor { private StringRedisTemplate stringRedisTemplate; public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 1.获取token String token = request.getHeader("authorization"); if(StrUtil.isBlank(token)){ return true; } String key = LOGIN_USER_KEY + token; // 2.获取token中的用户信息 Map<Object, Object> user = stringRedisTemplate.opsForHash().entries(key); // 3. 判断用户是否存在 if(user.isEmpty()){ return true; } UserDTO userDTO = BeanUtil.fillBeanWithMap(user,new UserDTO(),false); // 5.存在,保存用户信息到ThreadLocal UserHolder.saveUser(userDTO); stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES); // 6.放行 return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
第二个拦截器
package com.hmdp.utils; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.StrUtil; import com.hmdp.dto.UserDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.servlet.HandlerInterceptor; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.Map; import java.util.concurrent.TimeUnit; import static com.hmdp.utils.RedisConstants.LOGIN_USER_KEY; import static com.hmdp.utils.RedisConstants.LOGIN_USER_TTL; public class LoginIntercetpor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if(UserHolder.getUser() == null){ response.setStatus(401); return false; } return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
配置拦截器
package com.hmdp.config; import com.hmdp.utils.LoginIntercetpor; import com.hmdp.utils.RefreshTokenInterceptor; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import javax.annotation.Resource; @Configuration public class MvcConfig implements WebMvcConfigurer { @Resource StringRedisTemplate stringRedisTemplate; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0); registry.addInterceptor(new LoginIntercetpor()) .excludePathPatterns( "/shop/**", "/voucher/**", "/shop-type/**", "/upload/**", "/blog/hot", "/user/code", "/user/login" ).order(1); } }
什么是缓存
缓存就是数据交换的缓冲区 (称作Cache[kaej]) 是存贮数据的临时地方,一般读写性能较高。
@Service public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService { @Resource StringRedisTemplate stringRedisTemplate; @Override public Result queryShowId(Long id) { String key = CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if(StrUtil.isNotEmpty(shopJson)){ Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } Shop shop = getById(id); if(shop == null){ Result.fail("店铺不存在"); } stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop)); return Result.ok(shop); } }
@Override public Result queryList() { String typeKey= SHOP_TYPE_LIST; //从redis中查询 Long typeListSize = stringRedisTemplate.opsForList().size(typeKey); //redis存在数据 if (typeListSize!=null&&typeListSize!=0){ List<String> typeJsonList = stringRedisTemplate.opsForList().range(typeKey, 0, typeListSize-1); List<ShopType> typeList=new ArrayList<>(); for (String typeJson : typeJsonList) { typeList.add(JSONUtil.toBean(typeJson,ShopType.class)); } return Result.ok(typeList); } //redis不存在数据 查询数据库 List<ShopType> typeList = query().orderByAsc("sort").list(); if (typeList==null){ //数据库不存在数据 return Result.fail("发生错误"); } //转换 List<String> typeJsonList=new ArrayList<>(); for (ShopType shopType : typeList) { typeJsonList.add(JSONUtil.toJsonStr(shopType)); } //数据库存在数据 写入redis stringRedisTemplate.opsForList().rightPushAll(typeKey,typeJsonList); //返回数据 return Result.ok(typeList); }
业务场景:
●低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
●高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存
综上所述,使用策略(Cache Aside Pattern)最优
正常情况这个时候可以保持缓存和数据库一致
特殊情况这个时候不能保持缓存和数据库一致
正常情况这个时候可以保持缓存和数据库一致
特殊情况这个时候不能保持缓存和数据库一致
缓存更新策略的最佳实践方案:
1.低一致性需求:使用Redis自带的内存淘汰机制
2.高一致性需求:主动更新,并以超时剔除作为兜底方案
◆读操作:
缓存命中则直接返回
缓存未命中则查询数据库,并写入缓存,设定超时时间
◆写操作:
先写数据库,然后再删除缓存
要确保数据库与缓存操作的原子性
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
常见的解决方案有两种:
缓存空对象
布隆过滤
缓存穿透产生的原因是什么?
用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求 ,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
互斥锁解决缓存击穿
添加互斥锁
/** * 添加锁 * @param key * @return */ private Boolean tryLock(String key){ Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } /** * 去除锁 * @param key */ private void unLock(String key){ stringRedisTemplate.delete(key); }
互斥锁解决缓存穿透
/** * 互斥锁解决缓存穿透 * @param id * @return */ private Shop queryWithMutex(Long id){ String key = CACHE_SHOP_KEY + id; String keyLock = "LOCK_SHOP_KEY"+id;; Shop shop = null; try { //1.查找缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); //2.命中缓存 if(StrUtil.isNotBlank(shopJson)){ return JSONUtil.toBean(shopJson, Shop.class); } //只剩下“”和null //判断是否是空值 if(shopJson != null){ return null; } //3.未命中 //3.1获取互斥锁 //3.2没有获取成功 Boolean tryLock = tryLock(keyLock); if(!tryLock){ Thread.sleep(50); return queryWithMutex(id); } //3.3获取锁成功 shop = getById(id); if(shop == null){ stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES); return null; } stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { unLock(keyLock); } return shop; }
逻辑过期解决缓存击穿
当逻辑过期,将数据保存在redis
/**
* 将数据保存redis
* @param id
* @param expireSeconds
*/
public void shopSaveRedis(Long id, Long expireSeconds){
//获取店铺数据
Shop shop = getById(id);
//封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
//逻辑写入redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,JSONUtil.toJsonStr(redisData));
}
逻辑过期解决缓存击穿的代码
/** * 逻辑过期解决缓存击穿 * @param id * @return */ private Shop queryWithLogicalExpiration(Long id){ String key = CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); //1.未命中 if(StrUtil.isBlank(shopJson)){ return null; } //2.命中 RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class); Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); //2.1 判断缓存是否过期 if(expireTime.isAfter(LocalDateTime.now())){ //2.2 未过期,返回商铺信息 return shop; } //3.过期 //3.1尝试获取互斥锁 String keyLock = "LOCK_SHOP_KEY"+id; Boolean tryLock = tryLock(keyLock); if(tryLock){ //2.1 判断缓存是否过期 if(expireTime.isAfter(LocalDateTime.now())){ //2.2 未过期,返回商铺信息 return shop; } //3.2 获取锁成功 //3.4开启独立线程 CACHE_REBUILD_EXECUTOP.submit(() ->{ try { //3.5将信息写入缓存 this.shopSaveRedis(id,20L); } catch (Exception e) { throw new RuntimeException(e); }finally { unLock(keyLock); } }); return shop; } //3.3获取锁失败 return shop; }
互斥锁和逻辑过期的优缺点
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
给不同的Key的TTL添加随机值
利用Redis集群提高服务的可用性
给缓存业务添加降级限流策略
给业务添加多级缓存
基于StringRedisTemplate:封装一个缓存工具类,满足下列需求:
方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
@Component @Slf4j public class CacheClient { @Resource private StringRedisTemplate stringRedisTemplate; public void set(String key, Object value, Long time, TimeUnit unit) { stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit); } public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) { //设置逻辑过期 RedisData redisData = new RedisData(); redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time))); redisData.setData(value); stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData)); } public <R, ID> R queryWithPassThrough(String prefix, ID id, Class<R> type, Function<ID, R> dbFallBack, Long time, TimeUnit unit) { String key = prefix + id; String Json = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(Json)) { return JSONUtil.toBean(Json, type); } //只剩下“”和null if (Json != null) { return null; } R r = dbFallBack.apply(id); if (r == null) { stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES); return null; } this.set(key, r, time, unit); return r; } private static final ExecutorService CACHE_REBUILD_EXECUTOP = Executors.newFixedThreadPool(10); public <R, ID> R queryWithLogicalExpiration(String prefix, ID id, Class<R> type, Function<ID, R> dbFallBack, Long time, TimeUnit unit) { String key = prefix + id; String Json = stringRedisTemplate.opsForValue().get(key); //1.未命中 if (StrUtil.isBlank(Json)) { return null; } //2.命中 RedisData redisData = JSONUtil.toBean(Json, RedisData.class); R r = JSONUtil.toBean((JSONObject) redisData.getData(), type); LocalDateTime expireTime = redisData.getExpireTime(); //2.1 判断缓存是否过期 if (expireTime.isAfter(LocalDateTime.now())) { //2.2 未过期,返回商铺信息 return r; } //3.过期 //3.1尝试获取互斥锁 String keyLock = "LOCK_SHOP_KEY" + id; Boolean tryLock = tryLock(keyLock); if (tryLock) { //2.1 判断缓存是否过期 if (expireTime.isAfter(LocalDateTime.now())) { //2.2 未过期,返回商铺信息 return r; } //3.2 获取锁成功 //3.4开启独立线程 CACHE_REBUILD_EXECUTOP.submit(() -> { try { //3.5将信息写入缓存 R r1 = dbFallBack.apply(id); this.setWithLogicalExpire(key,r1,time,unit); } catch (Exception e) { throw new RuntimeException(e); } finally { unLock(keyLock); } }); return r; } //3.3获取锁失败 return r; } /** * 添加锁 * @param key * @return */ private Boolean tryLock(String key){ Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } /** * 去除锁 * @param key */ private void unLock(String key){ stringRedisTemplate.delete(key); } }
当用户抢购时,就会生成订单并保存到tb_voucher_order:这张表中,而订单表如果使用数据库自增ID就存在一些问题:
id的规律性太明显
受单表数据量的限制
全局引入ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:
◆符号位:1bit,永远为0
◆时间戳:31bit,以秒为单位,可以使用69年
◆序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
@Component @Slf4j public class RedisIdWorker { private StringRedisTemplate stringRedisTemplate; public RedisIdWorker(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } private static final Long BEGIN_TIMESTAMP= 1672531201L; private static final int COUNT_BITS= 32; /** * 全局唯一ID=符号位+时间戳+序列号 * @param keyProfix * @return */ public long nextId(String keyProfix){ //1.生成时间戳 LocalDateTime now = LocalDateTime.now(); log.info(now.toString()); long second = now.toEpochSecond(ZoneOffset.UTC); long timestamp = second-BEGIN_TIMESTAMP; //2.生成序列号 String format = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); long count = stringRedisTemplate.opsForValue().increment("icr"+keyProfix + format); //3.拼接并返回 return timestamp << COUNT_BITS | count; } }
验证是否成功
@Resource private RedisIdWorker redisIdWorker; private ExecutorService es = Executors.newFixedThreadPool(500); @Test public void testIdWorker() throws InterruptedException { CountDownLatch latch = new CountDownLatch(300); Runnable task = () -> { for (int i = 0; i < 100; i++) { long id = redisIdWorker.nextId("order"); System.out.println("id = " + id); } latch.countDown(); }; long begin = System.currentTimeMillis(); for (int i = 0; i < 300; i++) { es.submit(task); } latch.await(); long end = System.currentTimeMillis(); System.out.println("time = " + (end - begin)); }
使用postman进行添加秒杀卷
{
"shopId": 2,
"tit1e": "100元代金券",
"subTitle": "周一至周日均可使用",
"rules": "全场通用\\n无需预约列\n可无限叠加\\不兑现、不找零\n仅限堂食",
"payValue": 8000,
"actualValue": 10000,
"type": 1,
"stock": 100,
"beginTime": "2023-06-01 00:00:00",
"endTime": "2023-06-03 00:00:00"
}
使用postman发送
下单时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
@Override @Transactional public Result seckillVoucher(Long voucherId) { //1.查询优惠卷的信息 SeckillVoucher serviceById = seckillVoucherService.getById(voucherId); //2.判断秒杀是否开始 LocalDateTime beginTime = serviceById.getBeginTime(); if(beginTime.isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始"); } //3.判断秒杀是否结束 LocalDateTime endTime = serviceById.getEndTime(); if(endTime.isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束"); } //4.判断库存是否充足 Integer stock = serviceById.getStock(); //5.库存不足,返回 if(stock < 1){ return Result.fail("库存不足"); } //6.库存充足 //6.1 扣减库存 boolean update = seckillVoucherService.update() .setSql("stock = stock -1") // set stock = stock - 1 .eq("voucher_id", voucherId) // .gt("stock",0)//where id = ? and stock > 0 .update(); if(!update){ return Result.fail("更新失败"); } // Long userId = UserHolder.getUser().getId(); // synchronized (userId.toString().intern()){ // IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); // return proxy.createVoucherOrder(voucherId); // } //6.2 创建订单 VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); Long userId = UserHolder.getUser().getId(); voucherOrder.setVoucherId(voucherId); voucherOrder.setUserId(userId); save(voucherOrder); return Result.ok(orderId); }
提醒:这个过程需要进行多表操作,所以需要事务@Transactional,同时成功同时失败
超卖问题,正常情况,线程一个一个进行,但是计算机不知道,他会发生并行问题,其实就是多线程穿插
这种情况就是多线程并发穿插
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:
悲观锁
认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。例如Synchronized、Lock都属于悲观锁
乐观锁
认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。如果没有修改则认为是安全的,自己才更新数据。如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。
0
乐观锁:CAS(Compare And Swap,比较与交换)算法
下面我们使用乐观锁
boolean update = seckillVoucherService.update()
.setSql("stock = stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId)
.gt("stock",0)//where id = ? and stock > 0
.update();
只需要库存大于0就可以
超卖这样的线程安全问题,解决方案有哪些?
1.悲观锁:添加同步锁,让线程串行执行
优点:简单粗暴
缺点:性能一般
2.乐观锁:不加锁,在更新时判断是否有其它线程在修改
·优点:性能好
·缺点:存在成功率低的问题
@Override public Result seckillVoucher(Long voucherId) { //1.查询优惠卷的信息 SeckillVoucher serviceById = seckillVoucherService.getById(voucherId); //2.判断秒杀是否开始 LocalDateTime beginTime = serviceById.getBeginTime(); if(beginTime.isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始"); } //3.判断秒杀是否结束 LocalDateTime endTime = serviceById.getEndTime(); if(endTime.isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束"); } //4.判断库存是否充足 Integer stock = serviceById.getStock(); //5.库存不足,返回 if(stock < 1){ return Result.fail("库存不足"); } Long userId = UserHolder.getUser().getId(); //其实就是悲观锁 synchronized (userId.toString().intern()){ IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } } @Transactional public Result createVoucherOrder(Long voucherId){ //一人一单 Long userId = UserHolder.getUser().getId(); //查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if(count>0){ return Result.fail("每个用户只能抢一份"); } boolean update = seckillVoucherService.update() .setSql("stock = stock -1") // set stock = stock - 1 .eq("voucher_id", voucherId) .gt("stock",0)//where id = ? and stock > 0 采用了一个乐观锁中的CAS .update(); if(!update){ return Result.fail("更新失败"); } //判断是否存在 //6.2 创建订单 VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setVoucherId(voucherId); voucherOrder.setUserId(userId); voucherOrder.setId(orderId); save(voucherOrder); return Result.ok(orderId); }
锁释放和事务提交之间依然会有线程并发问题,锁如果在方法里面,当锁释放以后,又来了一个请求,但是事务还未提交,就会有脏读问题,所以将锁加在方法返回之后。
为什么要用代理对象?
注解式事务是通过动态代理实现的,如果直接写return this.createVoucherOrder(voucherId);他们不是同一个对象,所以你也要先获取他的动态代理对象,才能令事务生效,这也是事务失效的常见场景之一。
端口直接在配置文件中修改
接下来需要修改nginx配置文件
worker_processes 1; events { worker_connections 1024; } http { include mime.types; default_type application/json; sendfile on; keepalive_timeout 65; server { listen 8080; server_name localhost; # 指定前端项目所在的位置 location / { root html/hmdp; index index.html index.htm; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } location /api { default_type application/json; #internal; keepalive_timeout 30s; keepalive_requests 1000; #支持keep-alive proxy_http_version 1.1; rewrite /api(/.*) $1 break; proxy_pass_request_headers on; #more_clear_input_headers Accept-Encoding; proxy_next_upstream error timeout; # proxy_pass http://127.0.0.1:8081; proxy_pass http://backend; } } upstream backend { server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1; server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1; } }
这样就实现了两个服务器的集群,虽然少也是集群,采取的是默认轮询请求的方式。
这个时候单进程多线程的锁就会失效。
有多个JVM的情况下,我们原先的JVM里面的锁失效,线程一样会并行,不会被锁住。
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心是实现多进程之间互斥,互斥锁是多线程之间互斥,范围不同,而满足这一点的方式有很多,常见的有三种:
public class SimpleRedisLock implements ILock{ private StringRedisTemplate stringRedisTemplate; private String name; public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) { this.stringRedisTemplate = stringRedisTemplate; this.name = name; } public static final String KEY_PREFIX = "LOCK:"; // public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; @Override public boolean tryLock(long timeoutSec) { UUID.randomUUID().toString(true); //获取线程标识 long id = Thread.currentThread().getId(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, id + "", timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(lock); } @Override public void unLock() { // //获取线程标识 // String id = ID_PREFIX + Thread.currentThread().getId(); // String key = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); // if(key.equals(id)){ // stringRedisTemplate.delete(KEY_PREFIX + name); // } stringRedisTemplate.delete(KEY_PREFIX + name); }
@Override public Result seckillVoucher(Long voucherId) { //1.查询优惠卷的信息 SeckillVoucher serviceById = seckillVoucherService.getById(voucherId); //2.判断秒杀是否开始 LocalDateTime beginTime = serviceById.getBeginTime(); if(beginTime.isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始"); } //3.判断秒杀是否结束 LocalDateTime endTime = serviceById.getEndTime(); if(endTime.isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束"); } //4.判断库存是否充足 Integer stock = serviceById.getStock(); //5.库存不足,返回 if(stock < 1){ return Result.fail("库存不足"); } Long userId = UserHolder.getUser().getId(); SimpleRedisLock simpleRedisLock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId); //获取锁1200秒 boolean tryLock = simpleRedisLock.tryLock(1200); if(!tryLock){ return Result.fail("同一用户不能重复下单"); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); }finally { simpleRedisLock.unLock(); } }
如图所示,当线程1业务阻塞导致锁的时间到了,其他线程就可能乘虚而入,获取锁,线程二执行业务时,线程1执行业务完毕将锁释放删除了,就会导致线程三也去获取锁,从而导致分布式锁失败解决办法就是在删除锁时,添加标识去验证
需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程标示(可以用UUD表示)
2.在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
如果一致则释放锁
如果不一致则不释放锁
package com.hmdp.utils; import cn.hutool.core.lang.UUID; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.TimeUnit; public class SimpleRedisLock implements ILock{ private StringRedisTemplate stringRedisTemplate; private String name; public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) { this.stringRedisTemplate = stringRedisTemplate; this.name = name; } public static final String KEY_PREFIX = "LOCK:"; public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; @Override public boolean tryLock(long timeoutSec) { UUID.randomUUID().toString(true); //获取线程标识 String id = ID_PREFIX + Thread.currentThread().getId(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, id, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(lock); } @Override public void unLock() { //获取线程标识 String id = ID_PREFIX + Thread.currentThread().getId(); // 获取锁中的标识 String key = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); // 判断标识是否一致 if(key.equals(id)){ stringRedisTemplate.delete(KEY_PREFIX + name); } } }
即使这样还会有新的问题,因为判断锁和释放锁不是原子性,依旧会造成误删分布式锁,当线程一判断锁标识是否一样时,判断确定可以删除锁,这是正好碰上了jvm回收垃圾gc.full,由于这个过程进行的时间可能过长超过了redis设置的锁释放时间,锁就被释放了,这个时候线程二进来了,执行业务中,线程一阻塞完了,由于已经进行了逻辑判断是自己的 锁,它直接误删线程二的锁,其他线程又可以进来,这样一来,还是存在一些问题的。
解决方案:
Redis:提供了Lua脚本功能,在一个脚本中编写多条Redist命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https:/www.runoob.com/lua/lua-tutorial.html
这里重点介绍Redis提供的调用函数,语法如下:
#执行redis命令
redis.call('命令名称',key','其它参数',···)
例如,我们要执行set name jack,则脚本是这样:
#执行set name jack
redis.call('set','name','jack')
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
先执行set name jack
redis.call('set','name','Rose')
再执行get name
local name redis.call('get','name')
#返回
return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
127.0.0.1:6379>help @scripting
EVAL script numkeys key [key ...] arg [arg ...]
summary:Execute a Lua script server side
since:2.6.0
例如,我们要执行redis.call(‘set’,‘name’,‘jack’)这个脚本,语法如下:
#调用脚本
EVAL "return redis.call('set','name','jack')" 0
“return redis.call(‘set’,‘name’,‘jack’)”:脚本内容
0 :脚本需要的key类型的参数个数
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
调用脚本
EVAL"return redis.call('set',KEYS[1],ARGV[1])" 1 name Rose
1:脚本需要的key类型的参数个数
基于Redis的分布式锁释放锁的业务流程是这样的:
1.获取锁中的线程标示
2.判断是否与指定的标示(当前线程标示)一致
3.如果一致则释放锁(删除)
4.如果不一致则什么都不做
写出lua脚本
--比较线程标识与锁中的标示是否一直
if(redis.call('get',KEYS[1])== ARGV[1]) then
--释放锁 del key
return redis.call('del',KEYS[1])
end
return 0
需求:基于Lua脚本实现分布式锁的释放锁逻辑
提示:RedisTemplate调用Lua脚本的APl如下:
修改unlock代码
public class SimpleRedisLock implements ILock{ private StringRedisTemplate stringRedisTemplate; private String name; public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) { this.stringRedisTemplate = stringRedisTemplate; this.name = name; } public static final String KEY_PREFIX = "LOCK:"; public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; public static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } @Override public void unLock() { stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); } @Override public boolean tryLock(long timeoutSec) { UUID.randomUUID().toString(true); //获取线程标识 String id = ID_PREFIX + Thread.currentThread().getId(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, id, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(lock); } }
总结
基于Redis的分布式锁实现思路:
利用set nx ex获取锁,并设置过期时间,保存线程标示
释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
利用set nx满足互斥性
利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
利用Redis集群保证高可用和高并发特性
分布式锁的不足
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
8.分布式锁(Lock)和同步器(Synchronizer)
8.1.可重入锁(Reentrant Lock)
8.2.公平锁(Fair Lock)
8.3.联锁(MultiLock)
8.4.红锁(RedLock)
8.5.读写锁(ReadWriteLock)
8.6.信号量(Semaphore)
8.7.可过期性信号量(PermitExpirableSemaphore)
8.8.闭锁(CountDownLatch)
官网地址:https:/redisson…org
GitHub地址:https://github.com/redisson/redisson
1.引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
2.配置Redission客户端:
@Configuration
public class RedissionConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://47.106.182.147:6379").setPassword("525403");
return Redisson.create(config);
}
}
3.使用分布式锁
@Resource private RedissonClient redissonClient; @Override public Result seckillVoucher(Long voucherId) { //1.查询优惠卷的信息 SeckillVoucher serviceById = seckillVoucherService.getById(voucherId); //2.判断秒杀是否开始 LocalDateTime beginTime = serviceById.getBeginTime(); if(beginTime.isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始"); } //3.判断秒杀是否结束 LocalDateTime endTime = serviceById.getEndTime(); if(endTime.isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束"); } //4.判断库存是否充足 Integer stock = serviceById.getStock(); //5.库存不足,返回 if(stock < 1){ return Result.fail("库存不足"); } Long userId = UserHolder.getUser().getId(); //其实就是悲观锁 // SimpleRedisLock simpleRedisLock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId); RLock simpleRedisLock = redissonClient.getLock("lock:order:" + userId); //获取锁1200秒 boolean tryLock = simpleRedisLock.tryLock(); if(!tryLock){ return Result.fail("同一用户不能重复下单"); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); }finally { simpleRedisLock.unlock(); } }
之前分布式锁的使用的是string数据类型,setnx key_name value,这是一种不可重入锁,同一个线程多次获取锁是失败的。
改进:redission使用redis的hash数据类型,同一个线程进入就在value的值上进行加一。
流程图
使用lua脚本,java写的话不能保证原子性,因此,使用lua脚本保证程序执行的原子性,lua脚本如下:添加锁的过程
local key KEYS[1];--锁ey local threadId=ARGV[l];--线程唯一标识 local releaseTime=ARGV[2];--锁的自动释放时间 --判断是否存在 if(redis.call('exists',key)==0)then --不存在,获取锁 redis.call('hset',key,threadId,'1'); --设置有效期 redis.call('expire',key,releaseTime); --返回结果 return1; end; 锁已经存在,判断threadId是否是自己 if(redis.call('hexists',key,threadId)=1)then --不存在,获取锁,重入次数+1 redis.call('hincrby',key,threadId,'1'); --设置有效期 redis.call('expire',key,releaseTime); return1;--返回结果 end; return0;-代码走到这里,说明获取锁的不是自己,获取锁失败
释放锁的过程
local key KEYS[1];--锁的key local threadId=ARGV[1];--线程唯一标识 local releaseTime=ARGV[2];--锁的自动释放时间 --判断当前锁是否还是被自己持有 if (redis.call('HEXISTS',key,threadId)== 0 ) then return nil;--如果已经不是自己,则直接返回 end; --是自己的锁,则重入次数-1 local count = redis.call('HINCRBY',key,threadId,-1); --判断是否重入次数是否已经为0 if(count > 0)then --大于0说明不能释放锁,重置有效期然后返回 redis.call('EXPIRE',key,releaseTime); return nil; e1se --等于0说明可以释放锁,直接删除 redis.call('DEL',key); return nil; end;
@Test public void test() throws InterruptedException { // ctrl+左键点击trylock boolean isLock = lock.tryLock(1, TimeUnit.SECONDS); if(!isLock){ log.error("获取锁失败,1"); return; } try{ log.info("获取锁成功,1"); method2(); }finally { log.info("释放锁,1"); lock.unlock(); } } void method2(){ boolean isLock = lock.tryLock(); if(!isLock){ log.error("获取锁失败,2"); return; } try{ log.info("获取锁成功,2"); }finally { log.info("释放锁,2"); lock.unlock(); } }
查看boolean isLock = lock.tryLock(1, TimeUnit.SECONDS)源码,点进去这是没有传锁释放时间的trylock()
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
// 锁默认释放时间是(-1),继续进去
return tryLock(waitTime, -1, unit);
}
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { //将时间全部化成毫秒 long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); //当前线程ID long threadId = Thread.currentThread().getId(); //尝试去获取锁 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } ............ }
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { //如果释放时间不等于-1 if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //如果释放时间等于-1 //commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()其实他就是释放时间,看门狗的施放时间,追进去源码时间是30秒 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, //当锁不存在时,创建一把锁 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + //相当于返回null "return nil; " + "end; " + //当锁存在时,value添加1 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //返回key剩余有效期,单位毫秒 "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
结果返回到最初的(Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);)然后去判断:
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); // 返回到ttl Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired //如果是null,说明获取锁成功 if (ttl == null) { return true; } //下面逻辑是返回不是null,而是key剩余时间,锁获取失败情况,重新获取锁 //当前时间减去之前的时间,就是第一次去获取锁消耗的时间,再减去time,就是剩余重新获取锁的时间。 time -= System.currentTimeMillis() - current; //剩余重新获取锁的时间小于0,返回获取失败 if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); //订阅机制,当之前的获取锁的线程释放锁,就去通知,它你可以去获取锁了。 RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); //如果等到自己重新获取锁的时间过了,别人还没释放锁,也是失败 if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } //还在时间内,循环去获取锁 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { //信号量机制 subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
这个问题就是如果你的业务阻塞,锁超时释放了,其他线程就过来获取锁,实际上你的业务还没执行完,这不就造成线程安全问题
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); //判断ttlRemainingFuture是否完成 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { //出现异常,直接返回 if (e != null) { return; } // lock acquired if (ttlRemaining == null) { //获取锁成功,重新更新有效期 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
//重新续期
renewExpiration();
}
}
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { // reschedule itself //注意这里是递归,也就是一直会续期,internalLockLeaseTime / 3 = 10,10秒后刷新有效期 renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
不设置释放时间,默认才会开启看门狗机制
主从一致性问题就是,集群中一般有主节点和从节点,当主节点去和从节点去做一致性时,这个时候主节点宕机,导致没有一致性成功,从节点没有获取锁,就会出现锁失效,一致性问题
当主节点宕机
在从节点中自动选择一个节点为主节点,由于是做主从一致性时宕机,所以从节点这个时候锁是失效的
搭建redis连锁,所有节点都要获取锁,这个时候就不会出现主从一致性的问题,你也可以去搭建从节点
1)不可重入Redis分布式锁:
2)可重入的Redis分布式锁:
原理:利用hash结构,记录线程标示和重入次数;利用 watchDog.延续锁时间;利用信号量控制锁重试等待
缺陷:redis.宕机引起锁失效问题
3)Redisson的multiLock:
之前业务逻辑,串行进行并发能力依旧不行
进行以下改进
改进秒杀业务,提高并发性能
需求:
1.新增秒杀优惠券的同时,将优惠券信息保存到Redis中
2.基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
3.如果抢购成功,将优惠券d和用户id封装后存入阻塞队列
4.开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
1.新增秒杀优惠券的同时,将优惠券信息保存到Redis中
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(),voucher.getStock().toString());
}
2.基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
local voucherId = ARGV[1]; local userId =ARGV[2]; -- 库存key local stockKey = 'seckill:stock:' .. voucherId -- 订单key local orderKey = 'seckill:order:' .. voucherId -- 判断库存是否充足 get stockKey if(tonumber(redis.call('get', stockKey)) <= 0) then return 1 end -- 判断用户是否下单 sismember orderKey userId if(redis.call('sismember', orderKey, userId)) == 1 then return 2 end --扣减库存incrby stockKey -1 redis.call('incrby',stockKey,-1) --下单(保存用户)sadd orderKey userId redis.call('sadd',orderKey,userId) return 0
3.如果抢购成功,将优惠券d和用户id封装后存入阻塞队列
package com.hmdp.service.impl; import com.hmdp.dto.Result; import com.hmdp.entity.VoucherOrder; import com.hmdp.mapper.VoucherOrderMapper; import com.hmdp.service.ISeckillVoucherService; import com.hmdp.service.IVoucherOrderService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.hmdp.utils.RedisIdWorker; import com.hmdp.utils.UserHolder; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.aop.framework.AopContext; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Service @Slf4j public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redissonClient; public static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } private static final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); // 线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); @PostConstruct protected void init() { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { VoucherOrder voucherOrder = orderTasks.take(); // 创建订单 handleVoucherOrder(voucherOrder); } catch (Exception e) { log.info("处理订单异常", e); } } } } private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock simpleRedisLock = redissonClient.getLock("lock:order:" + userId); //获取锁1200秒 boolean tryLock = simpleRedisLock.tryLock(); if (!tryLock) { log.error("不允许重复下单"); return; } try { proxy.createVoucherOrder(voucherOrder); } finally { simpleRedisLock.unlock(); } } private IVoucherOrderService proxy; @Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); // 执行lua脚本 Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString()); // 判断结果是否0 int r = result.intValue(); if (r != 0) { // 不是0,没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } // 是0,有购买资格,把 下单信息保存到阻塞队列 VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setVoucherId(voucherId); voucherOrder.setUserId(userId); voucherOrder.setId(orderId); // 创建阻塞队列 orderTasks.add(voucherOrder); // 主线程获取代理对象 proxy = (IVoucherOrderService) AopContext.currentProxy(); // 返回订单ID return Result.ok(orderId); } // @Override // public Result seckillVoucher(Long voucherId) { // //1.查询优惠卷的信息 // SeckillVoucher serviceById = seckillVoucherService.getById(voucherId); // //2.判断秒杀是否开始 // LocalDateTime beginTime = serviceById.getBeginTime(); // if(beginTime.isAfter(LocalDateTime.now())){ // return Result.fail("秒杀尚未开始"); // } // //3.判断秒杀是否结束 // LocalDateTime endTime = serviceById.getEndTime(); // if(endTime.isBefore(LocalDateTime.now())){ // return Result.fail("秒杀已经结束"); // } // //4.判断库存是否充足 // Integer stock = serviceById.getStock(); // //5.库存不足,返回 // if(stock < 1){ // return Result.fail("库存不足"); // } // Long userId = UserHolder.getUser().getId(); // //其实就是悲观锁 SimpleRedisLock simpleRedisLock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId); // RLock simpleRedisLock = redissonClient.getLock("lock:order:" + userId); // //获取锁1200秒 // boolean tryLock = simpleRedisLock.tryLock(); // if(!tryLock){ // return Result.fail("同一用户不能重复下单"); // } // try { // IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); // return proxy.createVoucherOrder(voucherId); // }finally { // simpleRedisLock.unlock(); // } // } @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { //一人一单 Long userId = voucherOrder.getUserId(); //查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count(); if (count > 0) { log.error("每个用户只能抢一份"); return; } boolean update = seckillVoucherService.update() .setSql("stock = stock -1") // set stock = stock - 1 .eq("voucher_id", voucherOrder.getVoucherId()) .gt("stock", 0)//where id = ? and stock > 0 采用了一个乐观锁中的CAS .update(); if (!update) { log.error("库存不足"); return; } save(voucherOrder); } // @Transactional // public Result createVoucherOrder(Long voucherId){ // //一人一单 // Long userId = UserHolder.getUser().getId(); // //查询订单 // int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); // if(count>0){ // return Result.fail("每个用户只能抢一份"); // } // boolean update = seckillVoucherService.update() // .setSql("stock = stock -1") // set stock = stock - 1 // .eq("voucher_id", voucherId) // .gt("stock",0)//where id = ? and stock > 0 采用了一个乐观锁中的CAS // .update(); // if(!update){ // return Result.fail("更新失败"); // } // //判断是否存在 // //6.2 创建订单 // VoucherOrder voucherOrder = new VoucherOrder(); // long orderId = redisIdWorker.nextId("order"); // voucherOrder.setVoucherId(voucherId); // voucherOrder.setUserId(userId); // voucherOrder.setId(orderId); // save(voucherOrder); // return Result.ok(orderId); // } }
秒杀业务的优化思路是什么?
基于阻塞队列的异步秒杀存在哪些问题?
Redis消息队列实现异步秒杀
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
Redis提供了三种不同的方式来实现消息队列:
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟
出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。
因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。