赞
踩
众所周知,项目开发中难免会遇到高并发场景,而保证数据安全则成为其中的重中之重.。提到保证线程安全,很多人都会认为上锁,比如使用synchronized关键字来保证线程安全,如果实在单服务环境下,这样无疑是正确的,因为使用锁后只会有一个进程来访问我们的资源,不会出现数据不安全的情况,可是如果部署到多个服务器上,面对大量的请求,集群负载均衡到各个不同的服务器上,各个服务不在共享一个上下文,不属于一个进程,此时synchronized将毫无意义,无法保证数据安全,以下图为例:
编辑三个请求同时访问,最终会出现超卖的现象,此时我们使用synchronized是无法解决分布式服务数据的统一性的。
基于以上特性,Redis的SETNX命令通常被用来实现分布式锁。SETNX命令会在指定的key不存在时,将其值设为value。如果设置成功,则返回1,表示客户端获得了锁;如果设置失败(即key已经存在),则返回0,表示客户端未能获得锁。同时,结合EXPIRE命令设置锁的过期时间,可以确保在发生异常情况时锁能够被自动释放,防止死锁的发生。
而其中的字符串是以key,value形式存储到redis中的,此外redis还可以设置过期时间,而且大部分操作还是原子性的的,Redis 的 SETNX 命令用于将值 value 关联到键 key,如果 key 已经存在,则 SETNX 不做任何动作。这是一个原子操作。SETNX 命令的返回值是一个整数:
而这就和synchronized意义几乎意义,如果有两个服务来分别使用SETNX 设置同一个key,只会
有一个设置成功,这就相当于手动去获取锁,而且还能设置过期时间,到期自动删除该key,下一个服务继续去获取该key,从而达到同一时间只会有一个服务去操作数据,大大保证了安全性。
首先引入redis的maven依赖
- <!-- redis依赖-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
在yaml文件中配置Redis
- server:
- port: 8088
- spring:
- redis:
- host: 192.168.60.139
- port: 6379
-
配置redis获取创建键值对和删除键值对的公共方法
- package com.wwy.redisLock.redis;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.stereotype.Component;
-
- import java.util.concurrent.TimeUnit;
-
- /**
- * @author 王伟羽
- * @date 2024/3/26 15:26
- */
-
- /**
- * redis获取锁
- */
- @Component
- public class RedisLockUtil {
-
- @Autowired
- private StringRedisTemplate redisTemplate;
-
-
- /**
- * 利用键值对实现上锁
- *
- * @param key 健
- * @param value 值
- * @param second 过期时间
- * @return 是否设置成功
- */
- public boolean lock(String key, String value, int second) {
- return redisTemplate.opsForValue().setIfAbsent(key, value, second, TimeUnit.SECONDS);
- }
-
-
- /**
- * 删除键值对从而达到释放锁的结果
- *
- * @param key
- */
- public void unlock(String key) {
- redisTemplate.delete(key);
- }
-
-
- }
下面是一个简单的例子,出于方便,这里我直接把数据设置在redis中,方便测试
在这里我们可以使用模拟ab工具来模拟高并发
将ab压缩包解压到文件夹中,并进入\Apache24\bin目录中打开命令行窗口
ab -n 请求数 -c 并发数 访问的路径
ab -n 100 -c 5 http://localhost:8080/rediskill?item=%E7%89%99%E5%88%B7
- package com.wwy.redisLock.controller;
-
- import io.netty.util.internal.StringUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.util.StringUtils;
- import org.springframework.web.bind.annotation.*;
-
- /**
- * @author 王伟羽
- * @date 2024/3/26 15:29
- */
-
- @RestController
- @RequestMapping(value = "/goods")
- public class GoodsController {
-
- @Autowired
- private StringRedisTemplate stringRedisTemplate;
-
- /**
- * 初始化库存信息
- *
- * @param num 库存数量
- * @param name 商品名称
- * @return
- */
- @GetMapping(value = "/initGoods")
- public String initGoods(@RequestParam("num") Integer num, @RequestParam("name") String name) {
- //先在redis中初始化库存
- stringRedisTemplate.opsForValue().set("goods" + name, num.toString());
- stringRedisTemplate.opsForValue().set("order" + name, "0");
- return "初始化库存成功!";
- }
-
- @GetMapping(value = "/killGoods")
- public String killGoods(@RequestParam("name") String name) {
- String num = stringRedisTemplate.opsForValue().get("goods" + name);
- String orderNum = stringRedisTemplate.opsForValue().get("order" + name);
- if (StringUtils.isEmpty(num)) {
- return "库存为空";
- }
- if (StringUtils.isEmpty(orderNum)) {
- return "订单为空";
- }
- Integer nums = Integer.valueOf(num);
- Integer orderNums = Integer.valueOf(orderNum);
- if (nums == 0) {
- return "商品已被秒杀完";
- }
- nums = nums - 1;
- orderNums = orderNums + 1;
- stringRedisTemplate.opsForValue().set("goods" + name, nums.toString());
- stringRedisTemplate.opsForValue().set("order" + name, orderNums.toString());
- return "秒杀成功!此时库存剩余" + nums + " 生成订单数:" + orderNums;
- }
-
- @GetMapping(value = "/getGoods")
- public String getGoods(@RequestParam("name") String name) {
- String num = stringRedisTemplate.opsForValue().get("goods" + name);
- String orderNum = stringRedisTemplate.opsForValue().get("order" + name);
- return "秒杀成功!此时库存剩余" + num + " 生成订单数:" + orderNum;
- }
- }
- /**
- * 使用redis 的 setnxex 完成分布式锁
- * @param goods
- * @return
- * @throws Exception
- */
- @RequestMapping("/redisKillGoods")
- public synchronized String redisKillGoods(String goods) throws Exception {
-
-
- /**
- * 通过 setnxex 设置key 成功之后 最多key 持有3s 钟
- */
- if (redisLockUtil.lock("lock_"+goods,"1",3)){// 获取锁成功
- // 1.判断库存是否充足
-
- int stack = Integer.valueOf(stringRedisTemplate.opsForValue().get("goods"+goods));
- if (stack<=0){// 库存不足
-
- // 释放锁
- redisLockUtil.unlock("lock_"+goods);
-
- return goods + "该商品已售罄";
- }
-
- Thread.sleep(10);
-
- // 2.削减库存
- // (stack-1)+"" 转化为字符串
- stringRedisTemplate.opsForValue().set("goods"+goods,(stack-1)+"");
-
- // 3.生成订单 商品对应订单 +1
- stringRedisTemplate.opsForValue().increment("order"+goods);
-
- // 释放锁
- redisLockUtil.unlock("lock_"+goods);
-
- return "商品秒杀成功";
-
- }else { // 没有得到锁
-
- return "抱歉,手速太慢了,下次重试";
- }
-
-
- }
存在的问题:
setnxex:本质分为两步 1.setnx 2.ex
1.setnxex 操作不是原子性的
有可能操作第一步,第二部设置时间失败(同时应用宕机),改key 一直存在无法释放-----》死锁
2.其他的线程 有可能误删改key
3.setnxex 在获取锁时,不能阻塞等待,不支持可重入
4.setnxex 设置了时间,但是由于任务执行时间过长,超过key 生存的时间,造成还在执行业务,但是锁已经释放了----》造成其他线程抢占到锁---》线程不安全
Redisson是一个基于NIO的Netty框架的企业级的开源Redis Client,也提供了分布式锁的支持,Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格框架, 充分利用 Redis 键值数据库提供的一系列优势, 基于 Java 实用工具包中常用接口, 为使用者提供了 一系列具有分布式特性的常用工具类.
Redisson的本质就是原子的执行 setnxex-----》使用lua 脚本执行 原子执行setnxex
方案 | 实现原理 | 优点 | 缺点 |
---|---|---|---|
基于Redis命令 | 1. 加锁:执行setnx,若成功再执行expire添加过期时间2. 解锁:执行delete命令 | 实现简单,相比数据库和分布式系统的实现,该方案最轻,性能最好 | 1.setnx和expire分2步执行,非原子操作;若setnx执行成功,但expire执行失败,就可能出现死锁 2.delete命令存在误删除非当前线程持有的锁的可能 3.不支持阻塞等待、不可重入 |
基于Redis Lua脚本 | 1. 加锁:执行SET lock_name random_value EX seconds NX 命令2. 解锁:执行Lua脚本,释放锁时验证random_value -- ARGV[1]为random_value, KEYS[1]为lock_name if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end | 实现逻辑上也更严谨,除了单点问题,生产环境采用用这种方案,问题也不大 | 不支持锁重入,不支持阻塞等待 |
基于Redission | 结合redis和lua脚本实现 | 支持锁重入、支持阻塞等待、Lua脚本原子操作 | Redisson 的宗旨是促进使用者对 Redis 的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上。 |
1.引入redisson依赖
- <!-- redisson -->
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>3.16.7</version>
- </dependency>
2.在容器中添加RedissonClient Bean
- import org.redisson.Redisson;
- import org.redisson.api.RedissonClient;
- import org.redisson.config.Config;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * redisson 配置类
- */
- @Configuration
- public class RedissonConfig {
-
-
-
- @Bean// 在容器中加入RedissonClient
- public RedissonClient redissonClient(){
-
- Config config = new Config();
- config.useSingleServer().setAddress("redis://192.168.23.133:6379");
-
- RedissonClient redissonClient = Redisson.create(config);
-
- return redissonClient;
- }
-
- }
-
3.测试
- @Autowired
- private RedissonClient redissonClient;
-
- /**
- * 使用 redisson 完成分布式锁
- * @param goods
- * @return
- * @throws Exception
- */
- @RequestMapping("/redissonKillGoods")
- public synchronized String redissonKillGoods(String goods) throws Exception {
-
-
- /**
- * 通过 setnxex 设置key 成功之后 最多key 持有3s 钟
- */
- RLock lock = redissonClient.getLock("lock_" + goods);
-
- // 尝试者获取锁 ,如果没有得到锁,等待3s钟
- if ( lock.tryLock(3,TimeUnit.SECONDS)){// 获取锁成功
- // 1.判断库存是否充足
-
- int stack = Integer.valueOf(stringRedisTemplate.opsForValue().get("goods"+goods));
-
-
- if (stack<=0){// 库存不足
-
- // 释放锁
- lock.unlock();
-
- return goods + "该商品已售罄";
- }
-
- Thread.sleep(10);
-
- // 2.削减库存
- // (stack-1)+"" 转化为字符串
- stringRedisTemplate.opsForValue().set("goods"+goods,(stack-1)+"");
-
- // 3.生成订单 商品对应订单 +1
- stringRedisTemplate.opsForValue().increment("order"+goods);
-
- // 释放锁
- lock.unlock();
-
- return "商品秒杀成功";
-
- }else { // 没有得到锁
-
- return "抱歉,手速太慢了,下次重试";
- }
-
-
- }
同时使用ab 像两个应用发起秒杀
http://localhost:8080/redissonKillGoods?goods=mate50 http://localhost:8088/redissonKillGoods?goods=mate50
Redisson源码分析
redisson所有指令都通过lua脚本执行,redis支持lua脚本原子性执行
redisson设置一个key的默认过期时间为30s,如果某个客户端持有一个锁超过了30s怎么办?
redisson中有一个
watchdog
的概念,翻译过来就是看门狗,它会在你获取锁之后,每隔10秒帮你把key的超时时间设为30s,这样的话,就算一直持有锁也不会出现key过期了,其他线程获取到锁的问题了。redisson的“看门狗”逻辑保证了没有死锁发生,如果机器宕机了,看门狗也就没了。此时就不会延长key的过期时间,到了30s之后就会自动过期了,其他线程可以获取到锁
参考
核心源码,基于Lua脚本语言(具有原子性)
Redission加锁Lua脚本解读
参数 | 示例 | 含义 |
---|---|---|
KEY个数 | 1 | KEY个数 |
KEYS[1] | lock_name | 锁名 |
ARGV[1] | 60000 | 持有锁的有效时间:毫秒 |
ARGV[2] | 58c62432-bb74-4d14-8a00-9908cc8b828f:1 | 唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID |
Redission请求流程图
如果Redisson存储key的redis节点发生故障,就会造成锁丢失问题。
RedissonLock琐最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:
在Redis的master节点上拿到了锁;但是这个加锁的key还没有同步到slave节点;master故障,发生故障转移,slave节点升级为master节点;导致锁丢失。
原理:获取锁不是使用一个key决定,而是使用多个key (key存储到不同的redis节点) 决定,只有多个key都是设置成功才代表得到锁,多个key的删除,才代表释放锁
lock1 存储到redis01 lock2 存储到redis02 lock3 存储到redis03 只有三个lock1 lock2 lock3 中的大多数都lock 成功,才代表当前线程获得到锁 超过一般 unlock 成功,才代表释放锁
/** * 使用 RedLock 解决 RLock 单点故障问题 * @param goods * @return * @throws Exception */ @RequestMapping("/redLockKillGoods") public synchronized String redLockKillGoods(String goods) throws Exception { /** * 通过 setnxex 设置key 成功之后 最多key 持有3s 钟 */ // 假装三个lock 存储出到不同的节点中 RLock lock1 = redissonClient.getLock("lock_" + goods + "-1"); // 实际上应该时不同的redissonClient RLock lock2 = redissonClient.getLock("lock_" + goods + "-2"); RLock lock3= redissonClient.getLock("lock_" + goods + "-3"); //redLock 获取锁 有存储到不同redis节点的 lock 决定 RedissonRedLock redLock = new RedissonRedLock(lock1,lock2,lock3); // 尝试者获取锁 ,如果没有得到锁,等待3s钟 if ( redLock.tryLock(3,TimeUnit.SECONDS)){// 获取锁成功 // 1.判断库存是否充足 int stack = Integer.valueOf(stringRedisTemplate.opsForValue().get("goods"+goods)); if (stack<=0){// 库存不足 // 释放锁 redLock.unlock(); return goods + "该商品已售罄"; } Thread.sleep(10); // 2.削减库存 // (stack-1)+"" 转化为字符串 stringRedisTemplate.opsForValue().set("goods"+goods,(stack-1)+""); // 3.生成订单 商品对应订单 +1 stringRedisTemplate.opsForValue().increment("order"+goods); // 释放锁 redLock.unlock(); return "商品秒杀成功"; }else { // 没有得到锁 return "抱歉,手速太慢了,下次重试"; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。