当前位置:   article > 正文

Redis实现分布式锁的几种方式_setnxex

setnxex

一、分布式锁有何意义

1、synchronized为什么不能作为分布式锁

众所周知,项目开发中难免会遇到高并发场景,而保证数据安全则成为其中的重中之重.。提到保证线程安全,很多人都会认为上锁,比如使用synchronized关键字来保证线程安全,如果实在单服务环境下,这样无疑是正确的,因为使用锁后只会有一个进程来访问我们的资源,不会出现数据不安全的情况,可是如果部署到多个服务器上,面对大量的请求,集群负载均衡到各个不同的服务器上,各个服务不在共享一个上下文,不属于一个进程,此时synchronized将毫无意义,无法保证数据安全,以下图为例:

编辑三个请求同时访问,最终会出现超卖的现象,此时我们使用synchronized是无法解决分布式服务数据的统一性的。

2、redis为什么可以作为分布式锁

  1. 原子性操作:Redis的许多命令,如SETNX、INCR等,都是原子性的,这意味着这些操作在执行过程中不会被其他客户端的操作打断。这种原子性保证了在并发环境下,对共享资源的访问和修改能够保持一致性。
  2. 内存存储:Redis将数据存储在内存中,因此读写速度非常快,这使得它成为实现分布式锁的高效工具。与将锁信息存储在磁盘或其他存储系统中相比,Redis的内存存储方式能够显著减少锁操作的延迟。
  3. 过期机制:Redis提供了键的过期时间设置功能,可以通过EXPIRE命令为某个键设置一个过期时间。当这个键到达过期时间后,它会自动被删除。这种过期机制可以用于实现锁的自动释放,从而防止了死锁的发生。
  4. 分布式特性:Redis本身就是一个分布式的键值存储系统,它可以在多个节点之间进行数据同步和复制。这使得Redis能够在分布式环境中提供一致的锁服务,无论客户端连接到哪个Redis节点,都能获得相同的锁状态。

基于以上特性,Redis的SETNX命令通常被用来实现分布式锁。SETNX命令会在指定的key不存在时,将其值设为value。如果设置成功,则返回1,表示客户端获得了锁;如果设置失败(即key已经存在),则返回0,表示客户端未能获得锁。同时,结合EXPIRE命令设置锁的过期时间,可以确保在发生异常情况时锁能够被自动释放,防止死锁的发生。

二、如何使用Redis实现分布式锁

1、Redis常用的基本数据类型

  1. 字符串(String):
    1. 这是Redis最基本的数据类型,能存储任何形式的字符串,包括二进制数据。
    2. 它经常用于缓存、计数、共享会话、分布式锁等场景。
    3. Redis的字符串实际上是字节序列,因此它可以包含任何数据,包括图片、序列化后的对象等。
  2. 哈希(Hash):
    1. 哈希是键值对的集合,类似于Java中的Map类型。
    2. Redis的哈希是字符串字段和字符串值之间的映射,因此它们特别适合表示对象。
    3. 例如,你可以使用哈希来存储用户的信息,其中每个字段(如用户名、密码、邮箱等)都是一个键值对。
  3. 列表(List):
    1. 列表是简单的字符串列表,按照插入顺序排序。你可以从列表的两端推入或者弹出元素。
    2. 它经常用于实现消息队列或堆栈等功能。
    3. 列表类型的操作是原子性的,这意味着在一个命令执行期间,其他客户端不能进行干预。
  4. 集合(Set):
    1. 集合是字符串的无序集合,元素是唯一的,不重复。
    2. 集合类型提供了交集、并集和差集等操作,非常适合用于处理一些集合间的操作,如共同好友、共同关注等。
  5. 有序集合(Sorted Set):
    1. 有序集合和集合类似,但每个元素都会关联一个double类型的分数,Redis正是通过分数来为集合中的元素进行从小到大的排序。
    2. 有序集合的成员是唯一的,但分数(score)可以重复。
    3. 它经常用于排行榜、带权重的消息队列等场景。

而其中的字符串是以key,value形式存储到redis中的,此外redis还可以设置过期时间,而且大部分操作还是原子性的的,Redis 的 SETNX 命令用于将值 value 关联到键 key,如果 key 已经存在,则 SETNX 不做任何动作。这是一个原子操作。SETNX 命令的返回值是一个整数:

  • 如果键 key 不存在,那么命令执行成功,返回 1。
  • 如果键 key 已经存在,那么命令不执行任何操作,返回 0。

而这就和synchronized意义几乎意义,如果有两个服务来分别使用SETNX 设置同一个key,只会

​有一个设置成功,这就相当于手动去获取锁,而且还能设置过期时间,到期自动删除该key,下一个服务继续去获取该key,从而达到同一时间只会有一个服务去操作数据,大大保证了安全性。

2、Java实现Redis分布式锁

首先引入redis的maven依赖

  1. <!-- redis依赖-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-data-redis</artifactId>
  5. </dependency>

在yaml文件中配置Redis

  1. server:
  2. port: 8088
  3. spring:
  4. redis:
  5. host: 192.168.60.139
  6. port: 6379

配置redis获取创建键值对和删除键值对的公共方法

  1. package com.wwy.redisLock.redis;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.data.redis.core.StringRedisTemplate;
  4. import org.springframework.stereotype.Component;
  5. import java.util.concurrent.TimeUnit;
  6. /**
  7. * @author 王伟羽
  8. * @date 2024/3/26 15:26
  9. */
  10. /**
  11. * redis获取锁
  12. */
  13. @Component
  14. public class RedisLockUtil {
  15. @Autowired
  16. private StringRedisTemplate redisTemplate;
  17. /**
  18. * 利用键值对实现上锁
  19. *
  20. * @param key 健
  21. * @param value 值
  22. * @param second 过期时间
  23. * @return 是否设置成功
  24. */
  25. public boolean lock(String key, String value, int second) {
  26. return redisTemplate.opsForValue().setIfAbsent(key, value, second, TimeUnit.SECONDS);
  27. }
  28. /**
  29. * 删除键值对从而达到释放锁的结果
  30. *
  31. * @param key
  32. */
  33. public void unlock(String key) {
  34. redisTemplate.delete(key);
  35. }
  36. }

下面是一个简单的例子,出于方便,这里我直接把数据设置在redis中,方便测试

在这里我们可以使用模拟ab工具来模拟高并发

将ab压缩包解压到文件夹中,并进入\Apache24\bin目录中打开命令行窗口

ab -n 请求数 -c 并发数 访问的路径

ab -n 100 -c 5 http://localhost:8080/rediskill?item=%E7%89%99%E5%88%B7

一、普通环境下高并发订单
  1. package com.wwy.redisLock.controller;
  2. import io.netty.util.internal.StringUtil;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.data.redis.core.StringRedisTemplate;
  6. import org.springframework.util.StringUtils;
  7. import org.springframework.web.bind.annotation.*;
  8. /**
  9. * @author 王伟羽
  10. * @date 2024/3/26 15:29
  11. */
  12. @RestController
  13. @RequestMapping(value = "/goods")
  14. public class GoodsController {
  15. @Autowired
  16. private StringRedisTemplate stringRedisTemplate;
  17. /**
  18. * 初始化库存信息
  19. *
  20. * @param num 库存数量
  21. * @param name 商品名称
  22. * @return
  23. */
  24. @GetMapping(value = "/initGoods")
  25. public String initGoods(@RequestParam("num") Integer num, @RequestParam("name") String name) {
  26. //先在redis中初始化库存
  27. stringRedisTemplate.opsForValue().set("goods" + name, num.toString());
  28. stringRedisTemplate.opsForValue().set("order" + name, "0");
  29. return "初始化库存成功!";
  30. }
  31. @GetMapping(value = "/killGoods")
  32. public String killGoods(@RequestParam("name") String name) {
  33. String num = stringRedisTemplate.opsForValue().get("goods" + name);
  34. String orderNum = stringRedisTemplate.opsForValue().get("order" + name);
  35. if (StringUtils.isEmpty(num)) {
  36. return "库存为空";
  37. }
  38. if (StringUtils.isEmpty(orderNum)) {
  39. return "订单为空";
  40. }
  41. Integer nums = Integer.valueOf(num);
  42. Integer orderNums = Integer.valueOf(orderNum);
  43. if (nums == 0) {
  44. return "商品已被秒杀完";
  45. }
  46. nums = nums - 1;
  47. orderNums = orderNums + 1;
  48. stringRedisTemplate.opsForValue().set("goods" + name, nums.toString());
  49. stringRedisTemplate.opsForValue().set("order" + name, orderNums.toString());
  50. return "秒杀成功!此时库存剩余" + nums + " 生成订单数:" + orderNums;
  51. }
  52. @GetMapping(value = "/getGoods")
  53. public String getGoods(@RequestParam("name") String name) {
  54. String num = stringRedisTemplate.opsForValue().get("goods" + name);
  55. String orderNum = stringRedisTemplate.opsForValue().get("order" + name);
  56. return "秒杀成功!此时库存剩余" + num + " 生成订单数:" + orderNum;
  57. }
  58. }
二、使用redis来实现
  1. /**
  2. * 使用redis 的 setnxex 完成分布式锁
  3. * @param goods
  4. * @return
  5. * @throws Exception
  6. */
  7. @RequestMapping("/redisKillGoods")
  8. public synchronized String redisKillGoods(String goods) throws Exception {
  9. /**
  10. * 通过 setnxex 设置key 成功之后 最多key 持有3s 钟
  11. */
  12. if (redisLockUtil.lock("lock_"+goods,"1",3)){// 获取锁成功
  13. // 1.判断库存是否充足
  14. int stack = Integer.valueOf(stringRedisTemplate.opsForValue().get("goods"+goods));
  15. if (stack<=0){// 库存不足
  16. // 释放锁
  17. redisLockUtil.unlock("lock_"+goods);
  18. return goods + "该商品已售罄";
  19. }
  20. Thread.sleep(10);
  21. // 2.削减库存
  22. // (stack-1)+"" 转化为字符串
  23. stringRedisTemplate.opsForValue().set("goods"+goods,(stack-1)+"");
  24. // 3.生成订单 商品对应订单 +1
  25. stringRedisTemplate.opsForValue().increment("order"+goods);
  26. // 释放锁
  27. redisLockUtil.unlock("lock_"+goods);
  28. return "商品秒杀成功";
  29. }else { // 没有得到锁
  30. return "抱歉,手速太慢了,下次重试";
  31. }
  32. }

存在的问题:

setnxex:本质分为两步 1.setnx 2.ex

1.setnxex 操作不是原子性的

有可能操作第一步,第二部设置时间失败(同时应用宕机),改key 一直存在无法释放-----》死锁

2.其他的线程 有可能误删改key

3.setnxex 在获取锁时,不能阻塞等待,不支持可重入

4.setnxex 设置了时间,但是由于任务执行时间过长,超过key 生存的时间,造成还在执行业务,但是锁已经释放了----》造成其他线程抢占到锁---》线程不安全

三、使用Redisson

Redisson是一个基于NIO的Netty框架的企业级的开源Redis Client,也提供了分布式锁的支持,Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格框架, 充分利用 Redis 键值数据库提供的一系列优势, 基于 Java 实用工具包中常用接口, 为使用者提供了 一系列具有分布式特性的常用工具类.

Redisson的本质就是原子的执行 setnxex-----》使用lua 脚本执行 原子执行setnxex

Redis、Redis lua脚本和Redission加锁对比
方案实现原理优点缺点
基于Redis命令1. 加锁:执行setnx,若成功再执行expire添加过期时间2. 解锁:执行delete命令实现简单,相比数据库和分布式系统的实现,该方案最轻,性能最好1.setnx和expire分2步执行,非原子操作;若setnx执行成功,但expire执行失败,就可能出现死锁 2.delete命令存在误删除非当前线程持有的锁的可能 3.不支持阻塞等待、不可重入
基于Redis Lua脚本1. 加锁:执行SET lock_name random_value EX seconds NX 命令2. 解锁:执行Lua脚本,释放锁时验证random_value -- ARGV[1]为random_value, KEYS[1]为lock_name if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end实现逻辑上也更严谨,除了单点问题,生产环境采用用这种方案,问题也不大不支持锁重入,不支持阻塞等待
基于Redission结合redis和lua脚本实现支持锁重入、支持阻塞等待、Lua脚本原子操作Redisson 的宗旨是促进使用者对 Redis 的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上。

实战

1.引入redisson依赖

  1. <!-- redisson -->
  2. <dependency>
  3.   <groupId>org.redisson</groupId>
  4.   <artifactId>redisson-spring-boot-starter</artifactId>
  5.   <version>3.16.7</version>
  6. </dependency>

2.在容器中添加RedissonClient Bean

  1. import org.redisson.Redisson;
  2. import org.redisson.api.RedissonClient;
  3. import org.redisson.config.Config;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * redisson 配置类
  8. */
  9. @Configuration
  10. public class RedissonConfig {
  11.   @Bean// 在容器中加入RedissonClient
  12.   public RedissonClient redissonClient(){
  13.       Config config = new Config();
  14.       config.useSingleServer().setAddress("redis://192.168.23.133:6379");
  15.       RedissonClient redissonClient = Redisson.create(config);
  16.       return redissonClient;
  17.   }
  18. }

3.测试

  1.   @Autowired
  2.   private RedissonClient redissonClient;
  3.   /**
  4.     * 使用 redisson 完成分布式锁
  5.     * @param goods
  6.     * @return
  7.     * @throws Exception
  8.     */
  9.   @RequestMapping("/redissonKillGoods")
  10.   public synchronized String redissonKillGoods(String goods) throws Exception {
  11.       /**
  12.         * 通过 setnxex 设置key 成功之后 最多key 持有3s 钟
  13.         */
  14.       RLock lock = redissonClient.getLock("lock_" + goods);
  15.       // 尝试者获取锁 ,如果没有得到锁,等待3s钟
  16.       if ( lock.tryLock(3,TimeUnit.SECONDS)){// 获取锁成功
  17.           // 1.判断库存是否充足
  18.           int stack = Integer.valueOf(stringRedisTemplate.opsForValue().get("goods"+goods));
  19.           if (stack<=0){// 库存不足
  20.               // 释放锁
  21.               lock.unlock();
  22.               return goods + "该商品已售罄";
  23.           }
  24.           Thread.sleep(10);
  25.           // 2.削减库存
  26.           // (stack-1)+"" 转化为字符串
  27.           stringRedisTemplate.opsForValue().set("goods"+goods,(stack-1)+"");
  28.           // 3.生成订单 商品对应订单 +1
  29.           stringRedisTemplate.opsForValue().increment("order"+goods);
  30.           // 释放锁
  31.           lock.unlock();
  32.           return "商品秒杀成功";
  33.       }else { // 没有得到锁
  34.           return "抱歉,手速太慢了,下次重试";
  35.       }
  36.   }

同时使用ab 像两个应用发起秒杀

http://localhost:8080/redissonKillGoods?goods=mate50
​
http://localhost:8088/redissonKillGoods?goods=mate50
原理

Redisson源码分析

redisson所有指令都通过lua脚本执行,redis支持lua脚本原子性执行

redisson设置一个key的默认过期时间为30s,如果某个客户端持有一个锁超过了30s怎么办?

redisson中有一个watchdog的概念,翻译过来就是看门狗,它会在你获取锁之后,每隔10秒帮你把key的超时时间设为30s,这样的话,就算一直持有锁也不会出现key过期了,其他线程获取到锁的问题了。

redisson的“看门狗”逻辑保证了没有死锁发生,如果机器宕机了,看门狗也就没了。此时就不会延长key的过期时间,到了30s之后就会自动过期了,其他线程可以获取到锁

参考

核心源码,基于Lua脚本语言(具有原子性)

Redission加锁Lua脚本解读

参数示例含义
KEY个数1KEY个数
KEYS[1]lock_name锁名
ARGV[1]60000持有锁的有效时间:毫秒
ARGV[2]58c62432-bb74-4d14-8a00-9908cc8b828f:1唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID

Redission请求流程图

Redisson的缺陷:就是单点故障

如果Redisson存储key的redis节点发生故障,就会造成锁丢失问题。

RedissonLock琐最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

在Redis的master节点上拿到了锁;但是这个加锁的key还没有同步到slave节点;master故障,发生故障转移,slave节点升级为master节点;导致锁丢失。

解决方案:使用RedLock解决

原理:获取锁不是使用一个key决定,而是使用多个key (key存储到不同的redis节点) 决定,只有多个key都是设置成功才代表得到锁,多个key的删除,才代表释放锁

lock1    存储到redis01
lock2     存储到redis02
lock3     存储到redis03
​
只有三个lock1 lock2 lock3 中的大多数都lock 成功,才代表当前线程获得到锁
                            超过一般 unlock 成功,才代表释放锁 
​
 /**
     * 使用 RedLock 解决 RLock 单点故障问题
     * @param goods
     * @return
     * @throws Exception
     */
    @RequestMapping("/redLockKillGoods")
    public synchronized String  redLockKillGoods(String goods) throws Exception {
​
​
        /**
         * 通过 setnxex 设置key 成功之后 最多key 持有3s 钟
         */
​
        // 假装三个lock 存储出到不同的节点中
        RLock lock1 = redissonClient.getLock("lock_" + goods + "-1");
        // 实际上应该时不同的redissonClient
        RLock lock2 = redissonClient.getLock("lock_" + goods + "-2");
​
        RLock lock3= redissonClient.getLock("lock_" + goods + "-3");
​
        //redLock 获取锁 有存储到不同redis节点的 lock 决定
        RedissonRedLock redLock = new RedissonRedLock(lock1,lock2,lock3);
​
        // 尝试者获取锁 ,如果没有得到锁,等待3s钟
        if ( redLock.tryLock(3,TimeUnit.SECONDS)){// 获取锁成功
            // 1.判断库存是否充足
​
            int stack = Integer.valueOf(stringRedisTemplate.opsForValue().get("goods"+goods));
​
​
            if (stack<=0){// 库存不足
​
                // 释放锁
                redLock.unlock();
​
                return goods + "该商品已售罄";
            }
​
            Thread.sleep(10);
​
            // 2.削减库存
            //  (stack-1)+""  转化为字符串
            stringRedisTemplate.opsForValue().set("goods"+goods,(stack-1)+"");
​
            // 3.生成订单  商品对应订单 +1
            stringRedisTemplate.opsForValue().increment("order"+goods);
​
            // 释放锁
            redLock.unlock();
​
            return  "商品秒杀成功";
​
        }else { // 没有得到锁
​
            return  "抱歉,手速太慢了,下次重试";
        }
​
​
    }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/696763
推荐阅读
相关标签
  

闽ICP备14008679号