赞
踩
RedisOperations中包含了RedisTemplate和StringRedisTemplate
先开启监听watch(key值,可以是多个key值)
然后开启事务mulit()
执行事务,exec(),返回的是一个集合,若集合是空则证明事务执行失败,执行失败的时候需要让他休眠一会,过于频繁重试会导致栈内存溢出,不推荐使用
- //redis乐观锁
- public void redisMulPay(){
- this.redisTemplate.execute(new SessionCallback<Object>() {
- @Override
- public Object execute(RedisOperations operations) throws DataAccessException {
- //开启监听
- operations.watch("stock");
- //查询库存数
- String stock=operations.opsForValue().get("stock").toString();
- //判断库存
- if(stock !=null && stock.length() != 0){
- Integer st = Integer.valueOf(stock);
- if (st > 0){
- //开启事务
- operations.multi();
- //减少库存
- operations.opsForValue().set("stock",String.valueOf(--st));
- //执行事务,返回的是集合
- List exec =operations.exec();
- //判断事物是否执行成功
- if (exec ==null || exec.size() == 0) {
- //返回的是空,则事务执行失败,再次执行
- //可以让它睡眠一会,一直重复会导致栈内存溢出
- try {
- System.out.println("事物执行失败");
- Thread.sleep(40);
- redisMulPay();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
- return exec;
- }
- }
- return null;
- }
- });
- }
由下图可见,使用JMeter测试乐观锁,程序性能明显低且重试次数多(注:并发量3000,同一时时刻请求)
jvm本地锁只能解决一个服务内部的情况,和面对集群部署可能jvm本地锁就有点乏力
分布式锁:可以跨进程、服务、服务器
常见的使用场景:超卖现象、缓存击穿
锁在缓存击穿的作用:
Mysql是存在硬盘中以文件格式存储数据的,redis是以内存方式存储数据(内存型数据库),但是存储的数据量少,通常我们会给缓存添加过期时间,不添加过期时间,随着服务的时间越来越长,redis的数据会越来越多导致内存会给占满,导致服务器内存资源不够而宕机,所以就要给缓存设置时间,当某天一个热点key过期时,大量数据访问,发现redis里面没有,这时大量数据就会直接访问mysql,这时可能会导致mysql宕机,而锁的作用就是,当大量的访问访问mysql时,锁会将他们拦在外面,谁拿到锁谁就访问mysql数据,范文mysql数据后,可以重新把这个数据放入缓存,下次访问的时候缓存中就有数据了,锁就可以防止mysql给大量数据冲垮
使用setIfAbsent("key","value")加锁,解锁使用delete("key")
- //redis锁1
- public void redisPay1() {
- //若使用递归调用时会导致栈内存溢出的现象
- //使用setnx加锁(setIfAbsent:不存在则设置,setIfPresent:存在则设置)
- while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "727")) {
- //重试:循环(CAS方式去获取锁,加入睡眠锁竞争就不会那么激烈,进程压力表兄啊,性能提高)
- try {
- //睡眠一会重新获取锁,同样担心栈内存溢出
- Thread.sleep(50);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- try {
- //查询库存数
- String stock = this.redisTemplate.opsForValue().get("stock");
- //判断库存
- if (stock != null && stock.length() != 0) {
- Integer st = Integer.valueOf(stock);
- if (st > 0) {
- //减少库存
- this.redisTemplate.opsForValue().set("stock", String.valueOf(--st));
- }
- }
- } finally {
- //解锁
- this.redisTemplate.delete("lock");
- }
- }
由此可看,这样锁性能和jvm本地锁性能差不多
但是上面代码并不是最优解,很可能会发生死锁的现象
了解这个问题的关键是为什么设置过期时间且解决什么问题
为什么设置过期时间:当线程获取到锁的时候,突然宕机了,这时锁并没有得到释放,其他线程一直处于获取锁的状态,大量请求给拦截到外面,所以要给锁添加过期时间,当线程获取到锁后宕机了,锁的时间到了就会自己释放锁让其他线程获取到锁
误删问题是当你设置锁的过期时间,当你给锁添加过期时间,当线程1获取到锁并设置了过期时间3秒,这时线程1开始业务,当到3秒的时候线程1 还没完成业务,这时锁的时间过期了,当其他线程获取到锁,并执行业务,这时线程1到5秒的时候,线程1释放了锁,这时线程2的业务还没完成,这时线程3又获取到锁,线程2业务完成后就释放了锁一直这样循环下去,这时业务就会暴露,导致锁失效的问题,想要解决这个问题也很简单,给锁的value添加一个唯一标识,可以是线程名、或则是随机数UUID、也可以是时间戳都可以,在释放锁的时候去判断是不是当前线程自己加的锁
- public void redisPay1() {
- //获取时间戳
- // long nowTime=System.currentTimeMillis();
- //生成随机数
- String uuid = UUID.randomUUID().toString();
- //若使用递归调用时会导致栈内存溢出的现象
- //使用setnx加锁(setIfAbsent:不存在则设置,setIfPresent:存在则设置)
- //setIfAbsent(key,value,过期时间,分/秒/毫秒)
- while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)) {
- //重试:循环(CAS方式去获取锁,加入睡眠锁竞争就不会那么激烈,进程压力表兄啊,性能提高)
- try {
- //睡眠一会重新获取锁,同样担心栈内存溢出
- Thread.sleep(50);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- try {
- //查询库存数
- String stock = this.redisTemplate.opsForValue().get("stock");
- //判断库存
- if (stock != null && stock.length() != 0) {
- Integer st = Integer.valueOf(stock);
- if (st > 0) {
- //减少库存
- this.redisTemplate.opsForValue().set("stock", String.valueOf(--st));
- }
- }
- } finally {
- //先判断是否是自己加的锁,是则解锁
- if(StringUtils.equals(this.redisTemplate.opsForValue().get("lock"),uuid)){
- this.redisTemplate.delete("lock");
- }
- }
- }
在上面代码中,释放锁的逻辑没有原子性,当线程1判断完是自己的锁之后,这时锁时间过期,其他线程立马获取到锁,这时线程1将线程2 的锁给释放掉,线程2的代码就暴露,这时也会面临锁失效的问题,但是也没有指令实现,当搜索解决方案的时候出现的大部分是使用Lua脚本解决这一问题
什么是Lua脚本?
需要了解的可以去:https://www.runoob.com/lua/lua-tutorial.html
它是由C语言编写的,轻量级是它的特点之一,可拓展性也很高
Lua应用场景也挺广,大部分的脚本都是由Lua来编写的,同时他也能防止脚入侵,redis同时也对它做了支持
Lua为什么可以解决原子性问题,这是因为它能一次性发送多条指令给redis,一次性打包多条指令给redis,redis是单线程的,执行规则是一个一个的解决,当Lua给他一次性发送多条指令,其他的线程就不会在这时横插一脚,下面是Lua实现,这样原子性问题解决了
- public void redisPay1() {
- //获取时间戳
- // long nowTime=System.currentTimeMillis();
- //生成随机数
- String uuid = UUID.randomUUID().toString();
- //若使用递归调用时会导致栈内存溢出的现象
- //使用setnx加锁(setIfAbsent:不存在则设置,setIfPresent:存在则设置)
- //setIfAbsent(key,value,过期时间,分/秒/毫秒)
- while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)) {
- //重试:循环(CAS方式去获取锁,加入睡眠锁竞争就不会那么激烈,进程压力表兄啊,性能提高)
- try {
- //睡眠一会重新获取锁,同样担心栈内存溢出
- Thread.sleep(50);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- try {
- //查询库存数
- String stock = this.redisTemplate.opsForValue().get("stock");
- //判断库存
- if (stock != null && stock.length() != 0) {
- Integer st = Integer.valueOf(stock);
- if (st > 0) {
- //减少库存
- this.redisTemplate.opsForValue().set("stock", String.valueOf(--st));
- }
- }
- } finally {
- //先判断是否是自己加的锁,是则解锁(Lua实现)
- String script="if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
- this.redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList("lock"), uuid);
-
- }
- }
ReentrantLock支持可重入性,参考原理使用Lua去写可重入锁,Lua在这里的作用是解决原子性问题
利用工厂实现
- @Component
- public class DistributedLockClient {
-
- @Autowired
- private StringRedisTemplate redisTemplate;
- private String uuid;
-
- //添加uuid,每个服务一个
- public DistributedLockClient() {
- this.uuid = UUID.randomUUID().toString();
- }
-
- public DistributedRedisLock getLock(String lockName){
- return new DistributedRedisLock(lockName,redisTemplate,uuid);
- }
- }
- public class DistributedRedisLock implements Lock {
-
- private String lockName;
- private String uuid;
- private long expire=30;
-
- public DistributedRedisLock(String lockName ,StringRedisTemplate redisTemplate, String uuid) {
- this.lockName = lockName;
- this.uuid = uuid;
- this.redisTemplate = redisTemplate;
- }
-
- //给线程添加唯一标识
- String getid(){
- return uuid+":"+Thread.currentThread().getId();
- }
-
- private StringRedisTemplate redisTemplate;
- @Override
- public void lock() {
- this.tryLock();
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
-
- }
-
- @Override
- public boolean tryLock() {
- try {
- return this.tryLock(-1L, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return false;
- }
-
- // 加锁方法
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- if (time != -1){
- this.expire=unit.toSeconds(time);
- }
-
- String script="if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) return 1 else return 0 end";
- while (!this.redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName), getid(),String.valueOf(expire)))
- {
- Thread.sleep(50);
- }
- return true;
- }
-
- //解锁方法
- @Override
- public void unlock() {
- String script="if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1], -1) == 0 then return redis.call('del',KEYS[1]) else return 0 end";
- Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, long.class), Arrays.asList(lockName), getid());
- if(flag==null){
- throw new IllegalMonitorStateException("this lock doesn`t belong to you");
- }
-
- }
-
- @Override
- public Condition newCondition() {
- return null;
- }
- }
- //redis可重入锁
- public void redisPay2() {
- //调用工厂对象
- DistributedRedisLock redisLock = this.distributedLockClient.getLock("lock");
- //调用加锁方法
- redisLock.lock();
- try {
- //查询库存数
- String stock = this.redisTemplate.opsForValue().get("stock");
- //判断库存
- if (stock != null && stock.length() != 0) {
- Integer st = Integer.valueOf(stock);
- if (st > 0) {
- //减少库存
- this.redisTemplate.opsForValue().set("stock", String.valueOf(--st));
- }
- }
- this.ceshi();
- }finally {
- //解锁
- redisLock.unlock();
- }
- }
-
- //测试可重入锁
- public void ceshi(){
- //调用工厂对象
- DistributedRedisLock redisLock = this.distributedLockClient.getLock("lock");
- redisLock.lock();
- System.out.println("测试可重入锁中.....");
- redisLock.unlock();
- }
- //自动续期
- public void renewExpire(){
- String script="if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end";
- //自动续期
- new Timer().schedule(new TimerTask() {
- @Override
- public void run() {
- while (redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class),Arrays.asList(lockName),uuid,String.valueOf(expire))){
- //有锁的情况下就会一直自动续期,直到业务完成
- renewExpire();
- }
- }
- },this.expire*1000 / 3);
- }
小结:
分布式锁的特征:
1、独占排他性:setnx 命令
2、防死锁:redis客户端获取到锁后立马宕机,给锁添加过期时间
3、防误删:给锁添加唯一标识
4、原子性: 发生在加锁和过期时间之间、判断和释放锁的时候服务器突然宕机和其他原因
5、可重入性:hash(key ,唯一标识,value) +lua脚本解决
6、自动续期:Timer定时器+lua脚本
下面是Redisson加锁解锁的底层代码,可见redisson实现加锁解锁也是使用了Lua脚本去实现代码加锁解锁
- //加锁底层
- <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
- return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
- }
-
-
-
- //解锁底层
- protected RFuture<Boolean> unlockInnerAsync(long threadId) {
- return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
- }
redisson也实现了自动续期,默认是30秒,在官网文档中描述的是监控对象的看门狗,主要实现也是实现了一个定时器,和上面Lua自动续期一样,实现一个定时器,使用Lua脚本去给锁续期,但是唯一不同的是,Redisson在实现解锁的时候也会自动续期
redisson实现了公平锁,拼的是手速和网速,只要你手速和网速快就可以抢到锁,没抢到锁的就要在外面排对,新的请求也会加入等待队列中,也支持自动续期机制
- //公平锁
- RLock lock1 = this.redissonClient.getFairLock("lock");
- lock1.lock();
联锁:new RedissonMultiLock();同红锁创建相同
红锁和联锁不同,大部分节点加锁成功则为加锁成功,而联锁只要一个不成功就加锁失败
- //获取锁
- RLock lock = this.redissonClient.getLock("lock");
- //创建红锁算法实例
- RedissonRedLock redissonRedLock = new RedissonRedLock(lock);//多个Redis实例
- //加锁
- redissonRedLock.lock();
- //解锁
- redissonRedLock.unlock();
读写锁不适用场景,写和写不支持并发,会出现超卖现象,读和写也不支持,在写的时候去读数据会导致读到脏数据(错误数据),读和读支持并发
读写的时候,只有等读完或则写完释放锁,才能继续下一步,写写也是一样,只有读读不需要等待
- //获取读写锁
- RReadWriteLock lock2 = this.redissonClient.getReadWriteLock("lock");
- //读锁
- lock2.readLock().lock(10,TimeUnit.SECONDS);
- //写锁
- lock2.writeLock().lock(10,TimeUnit.SECONDS);
- //释放读锁
- lock2.readLock().unlock();
- //释放写锁
- lock2.writeLock().unlock();
单机模式下模拟抢车位,车位为5,车辆为10,当把Semaphore注释掉后代码乱套
- //单机模式下信号量Semaphore
- public static void main(String[] args) {
- //new Semaphore
- Semaphore semaphore = new Semaphore(5);
- for (int i = 0; i < 10; i++) {
- //创建线程
- new Thread(()-> {
- try {
- semaphore.acquire();
- System.out.println(Thread.currentThread().getName() + "抢到车位");
- //业务时间
- TimeUnit.SECONDS.sleep(10);
- System.out.println(Thread.currentThread().getName() + "开走了");
- //资源释放
- semaphore.release();
- }catch (InterruptedException e){
- e.printStackTrace();
- }
- },i+"号车");
- }
- }
在分布式下资源使用信号量
- public void testSemaphore() {
- //创建分布式信号量
- RSemaphore semaphore = this.redissonClient.getSemaphore("semaphore");
- //设置限定资源
- semaphore.trySetPermits(3);
- try {
- semaphore.acquire();
- System.out.println("开始处理业务" + Thread.currentThread().getName());
- TimeUnit.SECONDS.sleep(10);
- System.out.println("处理完,释放资源" + Thread.currentThread().getName());
- //释放资源
- semaphore.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。