当前位置:   article > 正文

Redis锁_redis乐观锁

redis乐观锁

使用Redis乐观锁

        RedisOperations中包含了RedisTemplate和StringRedisTemplate

        先开启监听watch(key值,可以是多个key值)

        然后开启事务mulit()

        执行事务,exec(),返回的是一个集合,若集合是空则证明事务执行失败,执行失败的时候需要让他休眠一会,过于频繁重试会导致栈内存溢出,不推荐使用

  1. //redis乐观锁
  2. public void redisMulPay(){
  3. this.redisTemplate.execute(new SessionCallback<Object>() {
  4. @Override
  5. public Object execute(RedisOperations operations) throws DataAccessException {
  6. //开启监听
  7. operations.watch("stock");
  8. //查询库存数
  9. String stock=operations.opsForValue().get("stock").toString();
  10. //判断库存
  11. if(stock !=null && stock.length() != 0){
  12. Integer st = Integer.valueOf(stock);
  13. if (st > 0){
  14. //开启事务
  15. operations.multi();
  16. //减少库存
  17. operations.opsForValue().set("stock",String.valueOf(--st));
  18. //执行事务,返回的是集合
  19. List exec =operations.exec();
  20. //判断事物是否执行成功
  21. if (exec ==null || exec.size() == 0) {
  22. //返回的是空,则事务执行失败,再次执行
  23. //可以让它睡眠一会,一直重复会导致栈内存溢出
  24. try {
  25. System.out.println("事物执行失败");
  26. Thread.sleep(40);
  27. redisMulPay();
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. return exec;
  33. }
  34. }
  35. return null;
  36. }
  37. });
  38. }

由下图可见,使用JMeter测试乐观锁,程序性能明显低且重试次数多(注:并发量3000,同一时时刻请求) 

 redis分布式锁

        jvm本地锁只能解决一个服务内部的情况,和面对集群部署可能jvm本地锁就有点乏力

        分布式锁:可以跨进程、服务、服务器

        常见的使用场景:超卖现象、缓存击穿

锁在缓存击穿的作用:

        Mysql是存在硬盘中以文件格式存储数据的,redis是以内存方式存储数据(内存型数据库),但是存储的数据量少,通常我们会给缓存添加过期时间,不添加过期时间,随着服务的时间越来越长,redis的数据会越来越多导致内存会给占满,导致服务器内存资源不够而宕机,所以就要给缓存设置时间,当某天一个热点key过期时,大量数据访问,发现redis里面没有,这时大量数据就会直接访问mysql,这时可能会导致mysql宕机,而锁的作用就是,当大量的访问访问mysql时,锁会将他们拦在外面,谁拿到锁谁就访问mysql数据,范文mysql数据后,可以重新把这个数据放入缓存,下次访问的时候缓存中就有数据了,锁就可以防止mysql给大量数据冲垮

  使用redis锁

        使用setIfAbsent("key","value")加锁,解锁使用delete("key")

  1. //redis锁1
  2. public void redisPay1() {
  3. //若使用递归调用时会导致栈内存溢出的现象
  4. //使用setnx加锁(setIfAbsent:不存在则设置,setIfPresent:存在则设置)
  5. while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "727")) {
  6. //重试:循环(CAS方式去获取锁,加入睡眠锁竞争就不会那么激烈,进程压力表兄啊,性能提高)
  7. try {
  8. //睡眠一会重新获取锁,同样担心栈内存溢出
  9. Thread.sleep(50);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. try {
  15. //查询库存数
  16. String stock = this.redisTemplate.opsForValue().get("stock");
  17. //判断库存
  18. if (stock != null && stock.length() != 0) {
  19. Integer st = Integer.valueOf(stock);
  20. if (st > 0) {
  21. //减少库存
  22. this.redisTemplate.opsForValue().set("stock", String.valueOf(--st));
  23. }
  24. }
  25. } finally {
  26. //解锁
  27. this.redisTemplate.delete("lock");
  28. }
  29. }

由此可看,这样锁性能和jvm本地锁性能差不多

 但是上面代码并不是最优解,很可能会发生死锁的现象

防误删问题

        了解这个问题的关键是为什么设置过期时间且解决什么问题

        为什么设置过期时间:当线程获取到锁的时候,突然宕机了,这时锁并没有得到释放,其他线程一直处于获取锁的状态,大量请求给拦截到外面,所以要给锁添加过期时间,当线程获取到锁后宕机了,锁的时间到了就会自己释放锁让其他线程获取到锁

        误删问题是当你设置锁的过期时间,当你给锁添加过期时间,当线程1获取到锁并设置了过期时间3秒,这时线程1开始业务,当到3秒的时候线程1 还没完成业务,这时锁的时间过期了,当其他线程获取到锁,并执行业务,这时线程1到5秒的时候,线程1释放了锁,这时线程2的业务还没完成,这时线程3又获取到锁,线程2业务完成后就释放了锁一直这样循环下去,这时业务就会暴露,导致锁失效的问题,想要解决这个问题也很简单,给锁的value添加一个唯一标识,可以是线程名、或则是随机数UUID、也可以是时间戳都可以,在释放锁的时候去判断是不是当前线程自己加的锁 

  1. public void redisPay1() {
  2. //获取时间戳
  3. // long nowTime=System.currentTimeMillis();
  4. //生成随机数
  5. String uuid = UUID.randomUUID().toString();
  6. //若使用递归调用时会导致栈内存溢出的现象
  7. //使用setnx加锁(setIfAbsent:不存在则设置,setIfPresent:存在则设置)
  8. //setIfAbsent(key,value,过期时间,分/秒/毫秒)
  9. while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)) {
  10. //重试:循环(CAS方式去获取锁,加入睡眠锁竞争就不会那么激烈,进程压力表兄啊,性能提高)
  11. try {
  12. //睡眠一会重新获取锁,同样担心栈内存溢出
  13. Thread.sleep(50);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. try {
  19. //查询库存数
  20. String stock = this.redisTemplate.opsForValue().get("stock");
  21. //判断库存
  22. if (stock != null && stock.length() != 0) {
  23. Integer st = Integer.valueOf(stock);
  24. if (st > 0) {
  25. //减少库存
  26. this.redisTemplate.opsForValue().set("stock", String.valueOf(--st));
  27. }
  28. }
  29. } finally {
  30. //先判断是否是自己加的锁,是则解锁
  31. if(StringUtils.equals(this.redisTemplate.opsForValue().get("lock"),uuid)){
  32. this.redisTemplate.delete("lock");
  33. }
  34. }
  35. }

在上面代码中,释放锁的逻辑没有原子性,当线程1判断完是自己的锁之后,这时锁时间过期,其他线程立马获取到锁,这时线程1将线程2 的锁给释放掉,线程2的代码就暴露,这时也会面临锁失效的问题,但是也没有指令实现,当搜索解决方案的时候出现的大部分是使用Lua脚本解决这一问题

什么是Lua脚本?

        需要了解的可以去:https://www.runoob.com/lua/lua-tutorial.html
        

 它是由C语言编写的,轻量级是它的特点之一,可拓展性也很高

 Lua应用场景也挺广,大部分的脚本都是由Lua来编写的,同时他也能防止脚入侵,redis同时也对它做了支持

Lua为什么可以解决原子性问题,这是因为它能一次性发送多条指令给redis,一次性打包多条指令给redis,redis是单线程的,执行规则是一个一个的解决,当Lua给他一次性发送多条指令,其他的线程就不会在这时横插一脚,下面是Lua实现,这样原子性问题解决了

  1. public void redisPay1() {
  2. //获取时间戳
  3. // long nowTime=System.currentTimeMillis();
  4. //生成随机数
  5. String uuid = UUID.randomUUID().toString();
  6. //若使用递归调用时会导致栈内存溢出的现象
  7. //使用setnx加锁(setIfAbsent:不存在则设置,setIfPresent:存在则设置)
  8. //setIfAbsent(key,value,过期时间,分/秒/毫秒)
  9. while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)) {
  10. //重试:循环(CAS方式去获取锁,加入睡眠锁竞争就不会那么激烈,进程压力表兄啊,性能提高)
  11. try {
  12. //睡眠一会重新获取锁,同样担心栈内存溢出
  13. Thread.sleep(50);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. try {
  19. //查询库存数
  20. String stock = this.redisTemplate.opsForValue().get("stock");
  21. //判断库存
  22. if (stock != null && stock.length() != 0) {
  23. Integer st = Integer.valueOf(stock);
  24. if (st > 0) {
  25. //减少库存
  26. this.redisTemplate.opsForValue().set("stock", String.valueOf(--st));
  27. }
  28. }
  29. } finally {
  30. //先判断是否是自己加的锁,是则解锁(Lua实现)
  31. String script="if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
  32. this.redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList("lock"), uuid);
  33. }
  34. }

可重入性

        ReentrantLock支持可重入性,参考原理使用Lua去写可重入锁,Lua在这里的作用是解决原子性问题

利用工厂实现

  1. @Component
  2. public class DistributedLockClient {
  3. @Autowired
  4. private StringRedisTemplate redisTemplate;
  5. private String uuid;
  6. //添加uuid,每个服务一个
  7. public DistributedLockClient() {
  8. this.uuid = UUID.randomUUID().toString();
  9. }
  10. public DistributedRedisLock getLock(String lockName){
  11. return new DistributedRedisLock(lockName,redisTemplate,uuid);
  12. }
  13. }
  1. public class DistributedRedisLock implements Lock {
  2. private String lockName;
  3. private String uuid;
  4. private long expire=30;
  5. public DistributedRedisLock(String lockName ,StringRedisTemplate redisTemplate, String uuid) {
  6. this.lockName = lockName;
  7. this.uuid = uuid;
  8. this.redisTemplate = redisTemplate;
  9. }
  10. //给线程添加唯一标识
  11. String getid(){
  12. return uuid+":"+Thread.currentThread().getId();
  13. }
  14. private StringRedisTemplate redisTemplate;
  15. @Override
  16. public void lock() {
  17. this.tryLock();
  18. }
  19. @Override
  20. public void lockInterruptibly() throws InterruptedException {
  21. }
  22. @Override
  23. public boolean tryLock() {
  24. try {
  25. return this.tryLock(-1L, TimeUnit.SECONDS);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. return false;
  30. }
  31. // 加锁方法
  32. @Override
  33. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  34. if (time != -1){
  35. this.expire=unit.toSeconds(time);
  36. }
  37. String script="if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) return 1 else return 0 end";
  38. while (!this.redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName), getid(),String.valueOf(expire)))
  39. {
  40. Thread.sleep(50);
  41. }
  42. return true;
  43. }
  44. //解锁方法
  45. @Override
  46. public void unlock() {
  47. String script="if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1], -1) == 0 then return redis.call('del',KEYS[1]) else return 0 end";
  48. Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, long.class), Arrays.asList(lockName), getid());
  49. if(flag==null){
  50. throw new IllegalMonitorStateException("this lock doesn`t belong to you");
  51. }
  52. }
  53. @Override
  54. public Condition newCondition() {
  55. return null;
  56. }
  57. }
  1. //redis可重入锁
  2. public void redisPay2() {
  3. //调用工厂对象
  4. DistributedRedisLock redisLock = this.distributedLockClient.getLock("lock");
  5. //调用加锁方法
  6. redisLock.lock();
  7. try {
  8. //查询库存数
  9. String stock = this.redisTemplate.opsForValue().get("stock");
  10. //判断库存
  11. if (stock != null && stock.length() != 0) {
  12. Integer st = Integer.valueOf(stock);
  13. if (st > 0) {
  14. //减少库存
  15. this.redisTemplate.opsForValue().set("stock", String.valueOf(--st));
  16. }
  17. }
  18. this.ceshi();
  19. }finally {
  20. //解锁
  21. redisLock.unlock();
  22. }
  23. }
  24. //测试可重入锁
  25. public void ceshi(){
  26. //调用工厂对象
  27. DistributedRedisLock redisLock = this.distributedLockClient.getLock("lock");
  28. redisLock.lock();
  29. System.out.println("测试可重入锁中.....");
  30. redisLock.unlock();
  31. }

使用Timer实现自动续期

  1. //自动续期
  2. public void renewExpire(){
  3. String script="if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end";
  4. //自动续期
  5. new Timer().schedule(new TimerTask() {
  6. @Override
  7. public void run() {
  8. while (redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class),Arrays.asList(lockName),uuid,String.valueOf(expire))){
  9. //有锁的情况下就会一直自动续期,直到业务完成
  10. renewExpire();
  11. }
  12. }
  13. },this.expire*1000 / 3);
  14. }

小结:

        分布式锁的特征:

                1、独占排他性:setnx 命令

                2、防死锁:redis客户端获取到锁后立马宕机,给锁添加过期时间

                3、防误删:给锁添加唯一标识

                4、原子性: 发生在加锁和过期时间之间、判断和释放锁的时候服务器突然宕机和其他原因

                5、可重入性:hash(key ,唯一标识,value) +lua脚本解决

                6、自动续期:Timer定时器+lua脚本

Redisson实现加锁解锁

下面是Redisson加锁解锁的底层代码,可见redisson实现加锁解锁也是使用了Lua脚本去实现代码加锁解锁

  1. //加锁底层
  2. <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  3. return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
  4. }
  5. //解锁底层
  6. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  7. return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
  8. }

redisson也实现了自动续期,默认是30秒,在官网文档中描述的是监控对象的看门狗,主要实现也是实现了一个定时器,和上面Lua自动续期一样,实现一个定时器,使用Lua脚本去给锁续期,但是唯一不同的是,Redisson在实现解锁的时候也会自动续期

Redisson的公平锁

        redisson实现了公平锁,拼的是手速和网速,只要你手速和网速快就可以抢到锁,没抢到锁的就要在外面排对,新的请求也会加入等待队列中,也支持自动续期机制

  1. //公平锁
  2. RLock lock1 = this.redissonClient.getFairLock("lock");
  3. lock1.lock();

红锁/联锁

        联锁:new RedissonMultiLock();同红锁创建相同

        红锁和联锁不同,大部分节点加锁成功则为加锁成功,而联锁只要一个不成功就加锁失败

  1. //获取锁
  2. RLock lock = this.redissonClient.getLock("lock");
  3. //创建红锁算法实例
  4. RedissonRedLock redissonRedLock = new RedissonRedLock(lock);//多个Redis实例
  5. //加锁
  6. redissonRedLock.lock();
  7. //解锁
  8. redissonRedLock.unlock();

读写锁

        读写锁不适用场景,写和写不支持并发,会出现超卖现象,读和写也不支持,在写的时候去读数据会导致读到脏数据(错误数据),读和读支持并发

        读写的时候,只有等读完或则写完释放锁,才能继续下一步,写写也是一样,只有读读不需要等待

  1. //获取读写锁
  2. RReadWriteLock lock2 = this.redissonClient.getReadWriteLock("lock");
  3. //读锁
  4. lock2.readLock().lock(10,TimeUnit.SECONDS);
  5. //写锁
  6. lock2.writeLock().lock(10,TimeUnit.SECONDS);
  7. //释放读锁
  8. lock2.readLock().unlock();
  9. //释放写锁
  10. lock2.writeLock().unlock();

信号量Semaphore

单机模式下模拟抢车位,车位为5,车辆为10,当把Semaphore注释掉后代码乱套

  1. //单机模式下信号量Semaphore
  2. public static void main(String[] args) {
  3. //new Semaphore
  4. Semaphore semaphore = new Semaphore(5);
  5. for (int i = 0; i < 10; i++) {
  6. //创建线程
  7. new Thread(()-> {
  8. try {
  9. semaphore.acquire();
  10. System.out.println(Thread.currentThread().getName() + "抢到车位");
  11. //业务时间
  12. TimeUnit.SECONDS.sleep(10);
  13. System.out.println(Thread.currentThread().getName() + "开走了");
  14. //资源释放
  15. semaphore.release();
  16. }catch (InterruptedException e){
  17. e.printStackTrace();
  18. }
  19. },i+"号车");
  20. }
  21. }

在分布式下资源使用信号量

  1. public void testSemaphore() {
  2. //创建分布式信号量
  3. RSemaphore semaphore = this.redissonClient.getSemaphore("semaphore");
  4. //设置限定资源
  5. semaphore.trySetPermits(3);
  6. try {
  7. semaphore.acquire();
  8. System.out.println("开始处理业务" + Thread.currentThread().getName());
  9. TimeUnit.SECONDS.sleep(10);
  10. System.out.println("处理完,释放资源" + Thread.currentThread().getName());
  11. //释放资源
  12. semaphore.release();
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/571015
推荐阅读
相关标签
  

闽ICP备14008679号