赞
踩
分布式锁在是一种用来安全访问分式式机器上变量的安全方案,一般用在全局id生成,秒杀系统,全局变量共享、分布式事务等。一般会有两种实现方案,一种是悲观锁的实现,一种是乐观锁的实现。悲观锁的并发性能差,但是能保证不会发生脏数据的可能性小一点。
悲观锁:每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
乐观锁:每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。
简单来说:悲观锁只要是访问数据,都会上锁。而乐观锁,读操作不会上锁,只有进行提交更新操作的时候,才会对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做。
使用Redis实现分布式锁,有两个重要函数需要介绍
SETNX命令(SET if Not eXists)
语法:
SETNX key value
功能:
当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。
GETSET命令(这是一个原子命令!)
语法:
GETSET key value
功能:
将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。
GET命令和DELETE命令很容易,不解释了。
SETNX lock.foo <current Unix time + lock timeout>
• 如返回1,则该客户端获得锁,把lock.foo的键值设置为时间值表示该键已被锁定,该客户端最后可以通过DEL lock.foo来释放该锁。
• 如返回0,表明该锁已被其他客户端取得,这时我们可以先返回或进行重试等对方完成或等待锁超时。
上面的锁定逻辑有一个问题:如果一个持有锁的客户端失败或崩溃了不能释放锁,该怎么解决?
我们可以通过锁的键对应的时间戳来判断这种情况是否发生了,如果当前的时间已经大于lock.foo的值,说明该锁已失效,可以被重新获得使用。
当多个客户端检测到锁超时后都会尝试去释放它,这里就可能出现一个竞态条件,让我们模拟一下这个场景:
C0操作超时了,但它还持有着锁,C1和C2读取lock.foo检查时间戳,先后发现超时了。
C1 发送DEL lock.foo
C1 发送SETNX lock.foo 并且成功了。
C2 发送DEL lock.foo
C2 发送SETNX lock.foo 并且成功了。
这样一来,C1,C2都拿到了锁!问题大了!
可以用下面的方式避免发生这种问题:
C3发送SETNX lock.foo 想要获得锁,由于C0还持有锁,所以Redis返回给C3一个0
C3发送GET lock.foo 以检查锁是否超时了,如果没超时,则等待或重试。
反之,如果已超时,C3通过下面的操作来尝试获得锁:
GETSET lock.foo <current Unix time + lock timeout>
通过GETSET,C3拿到的时间戳(lock.foo的旧值)如果仍然是超时的,那就说明,C3如愿以偿拿到锁了。
如果在C3之前,有个叫C4的客户端比C3快一步执行了上面的操作,那么C3拿到的时间戳是个未超时的值,这时,C3没有如期获得锁,需要再次等待或重试。留意一下,尽管C3没拿到锁,但它改写了C4设置的锁的超时值,不过这一点非常微小的误差带来的影响可以忽略不计。
因此,获得锁的逻辑如下所示:
* 1. 通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回1,成功获得锁
* 2. 锁已经存在,则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
* 3. 如果旧值依旧超时,则成功获得锁
* 4. 否则,未获得锁,可以重试或者等一段时间再做操作,由用户决定
释放锁的操作就是DEL lock.foo。
以下是伪代码:
1. 用SETNX命令设置锁的超时时间戳 expire = time(NULL) + lock_timeout
SETNX lock expire
如果返回1,则获得锁。
如果返回0,表明锁正在被其他客户端占用。
2. 用Get命令获取超时时间戳
与time(NULL)比较,如果小于,表示锁还有效,被其他客户端占着,获取锁失败。
如果大于表示锁过期,进行步骤3.
3. 用GetSet命令重新设置超时时间
GetSet返回旧的超时时间戳old_expire,并把新的超时时间戳new_expire设置进去。
如果old_expire依然大于time(NULL),则表示锁过期,获得锁。
4. 释放锁:delete key即可。
//锁的名字 string lock_key = "lock.foo"; //锁的超时时间(秒) uint64_t lock_timeout = 3; //redis实例 RedisString* redis = new RedisString(); //获得锁的函数 bool lock() { uint64_t expire_time = time(NULL) + lock_timeout; stringstream ss; ss << expire_time; string expire_time_str = ss.str(); int ret = redis->SetNx(lock_key, expire_time_str); if (ret == 1) { //获得锁 return true; } //lock_key已存在,返回0, //表明锁还被其他客户端占用着,但是那个客户端是否崩溃未知,如果崩溃,其他客户端将永远无法获取锁 //所以,要查询锁的超时时间,如果超时,则获取锁 string current_value; ret = redis->Get(lock_key, current_value); if (ret != 0) { printf("get lock_key fail,err_code:%d,err_msg:%s\n", redis->err(),redis->errstr()); return false; } if (is_expire(current_value))//锁超时,用getset命令获取旧值,并设置新的超时时间 { string old_value; ret = redis->GetSet(lock_key, expire_time_str, old_value); if (ret < 0) { printf("getset lock_key fail,err_code:%d,err_msg:%s\n", redis->err(),redis->errstr()); return false; } if (ret == 0)//lock_key存在,返回旧值 { if(is_expire(old_value))//旧值仍然超时,获得锁 { return true; } } } //锁未超时,锁还被其他客户端占用着 return false; } //释放锁函数 void unlock() { int ret = redis->DeleteKey(lock_key); if(ret != 0) { printf("delete lock_key fail,err_code:%d,err_msg:%s\n", redis->err(),redis->errstr()); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。