当前位置:   article > 正文

分布式锁通用解决方案_threadlock.trylock(acquiretimeout, timeunit);

threadlock.trylock(acquiretimeout, timeunit);

转载自:https://cloud.tencent.com/developer/article/1116306

分布式锁的解决方式:

基于数据库:

基于数据库表做乐观锁,用于分布式锁。(version) 基于数据库表做悲观锁(InnoDB,for update) 基于数据库表数据记录做唯一约束(表中记录方法名称)

基于缓存:

常用方案:使用redis的setnx()用于分布式锁。(setNx,直接设置值为当前时间+超时时间,保持操作原子性) 使用memcached的add()方法,用于分布式锁。 使用Tair的put()方法,用于分布式锁。

基于Zookeeper:

每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。 判断是否获取锁只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。

首先明确一点,有人可能会问是否可以考虑采用ReentrantLock来实现,但是实际上去实现的时候是有问题的,ReentrantLock的lock和unlock要求必须是在同一线程进行,而分布式应用中,lock和unlock是两次不相关的请求,因此肯定不是同一线程,因此导致无法使用ReentrantLock。

基于数据库实现分布式锁

基于数据库表数据记录做唯一约束(表中记录方法名称)

要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。

当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。创建这样一张数据库表:

当我们想要锁住某个方法时,执行以下SQL:

因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:

上面这种简单的实现有以下几个问题:

1、这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。

2、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。

3、这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。

4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

当然,我们也可以有其他方式解决上面的问题。

  1. • 数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
  2. • 没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
  3. • 非阻塞的?搞一个while循环,直到insert成功再返回成功。
  4. • 非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。

基于数据库表做悲观锁(InnoDB引擎,for update语句)

除了可以通过增删操作数据表中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。我们还用刚刚创建的那张数据库表。可以通过数据库的排他锁来实现分布式锁。 基于MySql的InnoDB引擎,可以使用以下方法来实现加锁操作:

在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁(这里再多提一句,InnoDB引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给method_name添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上。)。

当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。我们可以认为获得排它锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,再通过以下方法解锁:

  1. public void lock(){
  2. connection.setAutoCommit(false)
  3. int count = 0;
  4. while(count < 4){
  5. try{
  6. select * from lock where lock_name=xxx for update;
  7. if(结果不为空){
  8. //代表获取到锁
  9. return;
  10. }
  11. }catch(Exception e){
  12. }
  13. //为空或者抛异常的话都表示没有获取到锁
  14. sleep(1000);
  15. count++;
  16. }
  17. throw new LockException();
  18. }

通过connection.commit()操作来释放锁。

这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。 • 阻塞锁? for update语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。 • 锁定之后服务宕机,无法释放?使用这种方式,服务宕机之后数据库会自己把锁释放掉。但是还是无法直接解决数据库单点和可重入问题。

这里还可能存在另外一个问题,虽然我们对method_name 使用了唯一索引,并且显示使用for update来使用行级锁。但是,MySql会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。如果发生这种情况就悲剧了。。。

还有一个问题,就是我们要使用排他锁来进行分布式锁的lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆

基于数据库资源表做乐观锁,用于分布式锁:

1. 首先说明乐观锁的含义:

大多数是基于数据版本(VERSION)的记录机制实现的。何谓数据版本号?即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表添加一个 “VERSION”字段来实现读取出数据时,将此版本号一同读出,之后更新时,对此版本号加1。

在更新过程中,会对版本号进行比较,如果是一致的,没有发生改变,则会成功执行本次操作;如果版本号不一致,则会更新失败。

2. 对乐观锁的含义有了一定的了解后,结合具体的例子,我们来推演下我们应该怎么处理:

(1). 假设我们有一张资源表,如下图所示: T_RESOURCE , 其中有6个字段ID, RESOOURCE, STATE, ADD_TIME, UPDATE_TIME, VERSION,分别表示表主键、资源、分配状态(1未分配 2已分配)、资源创建时间、资源更新时间、资源数据版本号。

(4). 假设我们现在我们对ID=5780这条数据进行分配,那么非分布式场景的情况下,我们一般先查询出来STATE=1(未分配)的数据,然后从其中选取一条数据可以通过以下语句进行,如果可以更新成功,那么就说明已经占用了这个资源 UPDATE T_RESOURCE SET STATE=2 WHERE STATE=1 AND ID=5780。

(5). 如果在分布式场景中,由于数据库的UPDATE操作是原子是原子的,其实上边这条语句理论上也没有问题,但是这条语句如果在典型的“ABA”情况下,我们是无法感知的。有人可能会问什么是“ABA”问题呢?大家可以网上搜索一下,这里我说简单一点就是,如果在你第一次SELECT和第二次UPDATE过程中,由于两次操作是非原子的,所以这过程中,如果有一个线程,先是占用了资源(STATE=2),然后又释放了资源(STATE=1),实际上最后你执行UPDATE操作的时候,是无法知道这个资源发生过变化的。也许你会说这个在你说的场景中应该也还好吧,但是在实际的使用过程中,比如银行账户存款或者扣款的过程中,这种情况是比较恐怖的。

(6). 那么如果使用乐观锁我们如何解决上边的问题呢?

  • A. 先执行SELECT操作查询当前数据的数据版本号,比如当前数据版本号是26:
  • SELECT ID, RESOURCE, STATE,VERSION FROM T_RESOURCE WHERE STATE=1 AND ID=5780;
  • B. 执行更新操作:
  • UPDATE T_RESOURE SET STATE=2, VERSION=27, UPDATE_TIME=NOW() WHERE RESOURCE=XXXXXX AND STATE=1 AND VERSION=26
  • C. 如果上述UPDATE语句真正更新影响到了一行数据,那就说明占位成功。如果没有更新影响到一行数据,则说明这个资源已经被别人占位了。
3. 基于数据库表做乐观锁的一些缺点:

(1). 这种操作方式,使原本一次的UPDATE操作,必须变为2次操作: SELECT版本号一次;UPDATE一次。增加了数据库操作的次数。

(2). 如果业务场景中的一次业务流程中,多个资源都需要用保证数据一致性,那么如果全部使用基于数据库资源表的乐观锁,就要让每个资源都有一张资源表,这个在实际使用场景中肯定是无法满足的。而且这些都基于数据库操作,在高并发的要求下,对数据库连接的开销一定是无法忍受的。

(3). 乐观锁机制往往基于系统中的数据存储逻辑,因此可能会造成脏数据被更新到数据库中。在系统设计阶段,我们应该充分考虑到这些情况出现的可能性,并进行相应调整,如将乐观锁策略在数据库存储过程中实现,对外只开放基于此存储过程的数据更新途径,而不是将数据库表直接对外公开。

  1. 讲了乐观锁的实现方式和缺点,是不是会觉得不敢使用乐观锁了呢???当然不是,在文章开头我自己的业务场景中,场景1和场景2的一部分都使用了基于数据库资源表的乐观锁,已经很好的解决了线上问题。所以大家要根据的具体业务场景选择技术方案,并不是随便找一个足够复杂、足够新潮的技术方案来解决业务问题就是好方案?!比如,如果在我的场景一中,我使用zookeeper做锁,可以这么做,但是真的有必要吗???答案觉得是没有必要的!!!

总结一下使用数据库来实现分布式锁的方式,这两种方式都是依赖数据库的一张表,一种是通过表中的记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。数据库实现分布式锁的优点直接借助数据库,容易理解。数据库实现分布式锁的缺点会有各种各样的问题,在解决问题的过程中会使整个方案变得越来越复杂。操作数据库需要一定的开销,性能问题需要考虑。使用数据库的行级锁并不一定靠谱,尤其是当我们的锁表并不大的时候。

基于缓存实现分布式锁

Redis

使用redis的setnx()用于分布式锁。(setNx,直接设置值为当前时间+超时时间,保持操作原子性)

使用redis的SETNX实现分布式锁,多个进程执行以下Redis命令: SETNX lock.id

SETNX是将 key 的值设为 value,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不做任何动作。

  1. • 返回1,说明该进程获得锁,SETNX将键 lock.id 的值设置为锁的超时时间,当前时间 +加上锁的有效时间。
  2. • 返回0,说明其他进程已经获得了锁,进程不能进入临界区。进程可以在一个循环中不断地尝试 SETNX 操作,以获得锁。
存在死锁的问题

SETNX实现分布式锁,可能会存在死锁的情况。与单机模式下的锁相比,分布式环境下不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。某个线程获取了锁之后,断开了与Redis 的连接,锁没有及时释放,竞争该锁的其他线程都会hung,产生死锁的情况。在使用 SETNX 获得锁时,我们将键 lock.id 的值设置为锁的有效时间,线程获得锁后,其他线程还会不断的检测锁是否已超时,如果超时,等待的线程也将有机会获得锁。然而,锁超时,我们不能简单地使用 DEL 命令删除键 lock.id 以释放锁。考虑以下情况:

  1. 1. A已经首先获得了锁 lock.id,然后线A断线。B,C都在等待竞争该锁;
  2. 2. B,C读取lock.id的值,比较当前时间和键 lock.id 的值来判断是否超时,发现超时;
  3. 3. B执行 DEL lock.id命令,并执行 SETNX lock.id 命令,并返回1,B获得锁;
  4. 4. C由于各刚刚检测到锁已超时,执行 DEL lock.id命令,将B刚刚设置的键 lock.id 删除,执行 SETNX lock.id命令,并返回1,即C获得锁。
  5. 5.

上面的步骤很明显出现了问题,导致B,C同时获取了锁。在检测到锁超时后,线程不能直接简单地执行 DEL 删除键的操作以获得锁。

对于上面的步骤进行改进,问题是出在删除键的操作上面,那么获取锁之后应该怎么改进呢?首先看一下redis的GETSET这个操作,GETSET key value,将给定 key 的值设为 value ,并返回 key 的旧值(old value)。利用这个操作指令,我们改进一下上述的步骤。

  1. 1. A已经首先获得了锁 lock.id,然后线A断线。B,C都在等待竞争该锁;
  2. 2. B,C读取lock.id的值,比较当前时间和键 lock.id 的值来判断是否超时,发现超时;
  3. 3. B检测到锁已超时,即当前的时间大于键 lock.id 的值,B会执行

GETSET lock.id 设置时间戳,通过比较键 lock.id 的旧值是否小于当前时间,判断进程是否已获得锁;

  1. 4. B发现GETSET返回的值小于当前时间,则执行 DEL lock.id命令,并执行 SETNX lock.id 命令,并返回1,B获得锁;
  2. 5. C执行GETSET得到的时间大于当前时间,则继续等待。
  3. 6.

在线程释放锁,即执行 DEL lock.id 操作前,需要先判断锁是否已超时。如果锁已超时,那么锁可能已由其他线程获得,这时直接执行 DEL lock.id 操作会导致把其他线程已获得的锁释放掉。

一种实现方式

######获取锁

  1. public boolean lock(long acquireTimeout, TimeUnit timeUnit) throws InterruptedException {
  2. acquireTimeout = timeUnit.toMillis(acquireTimeout);
  3. long acquireTime = acquireTimeout + System.currentTimeMillis();
  4. //使用J.U.C的ReentrantLock
  5. threadLock.tryLock(acquireTimeout, timeUnit);
  6. try {
  7. //循环尝试
  8. while (true) {
  9. //调用tryLock
  10. boolean hasLock = tryLock();
  11. if (hasLock) {
  12. //获取锁成功
  13. return true;
  14. } else if (acquireTime < System.currentTimeMillis()) {
  15. break;
  16. }
  17. Thread.sleep(sleepTime);
  18. }
  19. } finally {
  20. if (threadLock.isHeldByCurrentThread()) {
  21. threadLock.unlock();
  22. }
  23. }
  24. return false;
  25. }
  26. public boolean tryLock() {
  27. long currentTime = System.currentTimeMillis();
  28. String expires = String.valueOf(timeout + currentTime);
  29. //设置互斥量
  30. if (redisHelper.setNx(mutex, expires) > 0) {
  31. //获取锁,设置超时时间
  32. setLockStatus(expires);
  33. return true;
  34. } else {
  35. String currentLockTime = redisUtil.get(mutex);
  36. //检查锁是否超时
  37. if (Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) < currentTime) {
  38. //获取旧的锁时间并设置互斥量
  39. String oldLockTime = redisHelper.getSet(mutex, expires);
  40. //旧值与当前时间比较
  41. if (Objects.nonNull(oldLockTime) && Objects.equals(oldLockTime, currentLockTime)) {
  42. //获取锁,设置超时时间
  43. setLockStatus(expires);
  44. return true;
  45. }
  46. }
  47. return false;
  48. }
  49. }

lock调用tryLock方法,参数为获取的超时时间与单位,线程在超时时间内,获取锁操作将自旋在那里,直到该自旋锁的保持者释放了锁。tryLock方法中,主要逻辑如下:

  1. setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁
  2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取
  3. • 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime
  4. • 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试
释放锁
  1. public boolean unlock() {
  2. //只有锁的持有线程才能解锁
  3. if (lockHolder == Thread.currentThread()) {
  4. //判断锁是否超时,没有超时才将互斥量删除
  5. if (lockExpiresTime System.currentTimeMillis()) {
  6. redisHelper.del(mutex);
  7. logger.info("删除互斥量[{}]", mutex);
  8. }
  9. lockHolder = null;
  10. logger.info("释放[{}]锁成功", mutex);
  11. return true;
  12. } else {
  13. throw new IllegalMonitorStateException("没有获取到锁的线程无法执行解锁操作");
  14. }
  15. }

相比较于基于数据库实现分布式锁的方案来说,基于缓存来实现在性能方面会表现的更好一点。而且很多缓存是可以集群部署的,可以解决单点问题。目前有很多成熟的缓存产品,包括Redis,memcached以及我们公司内部的Tair。

基于Tair实现分布式锁

这里以Tair为例来分析下使用缓存实现分布式锁的方案。关于Redis和memcached在网络上有很多相关的文章,并且也有一些成熟的框架及算法可以直接使用。

基于Tair的实现分布式锁其实和Redis类似,其中主要的实现方式是使用TairManager.put方法来实现。

以上实现方式同样存在几个问题:

  • 1、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在tair中,其他线程无法再获得到锁。
  • 2、这把锁只能是非阻塞的,无论成功还是失败都直接返回。
  • 3、这把锁是非重入的,一个线程获得锁之后,在释放锁之前,无法再次获得该锁,因为使用到的key在tair中已经存在。无法再执行put操作。

当然,同样有方式可以解决。

  1. • 没有失效时间?tair的put方法支持传入失效时间,到达时间之后数据会自动删除。
  2. • 非阻塞?while重复执行。
  3. • 非可重入?在一个线程获取到锁之后,把当前主机信息和线程信息保存起来,下次再获取之前先检查自己是不是当前锁的拥有者。

但是,失效时间我设置多长时间为好?如何设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间。这个问题使用数据库实现分布式锁同样存在

使用memcached的add()方法,用于分布式锁:

对于使用memcached的add()方法做分布式锁,这个在互联网公司是一种比较常见的方式,而且基本上可以解决自己手头上的大部分应用场景。在使用这个方法之前,只要能搞明白memcached的add()和set()的区别,并且知道为什么能用add()方法做分布式锁就好。如果还不知道add()和set()方法,请直接百度吧,这个需要自己了解一下。

我在这里想说明的是另外一个问题,人们在关注分布式锁设计的好坏时,还会重点关注这样一个问题,那就是是否可以避免死锁问题???!!!

如果使用memcached的add()命令对资源占位成功了,那么是不是就完事儿了呢?当然不是!我们需要在add()的使用指定当前添加的这个key的有效时间,如果不指定有效时间,正常情况下,你可以在执行完自己的业务后,使用delete方法将这个key删除掉,也就是释放了占用的资源。

但是,如果在占位成功后,memecached或者自己的业务服务器发生宕机了,那么这个资源将无法得到释放。所以通过对key设置超时时间,即便发生了宕机的情况,也不会将资源一直占用,可以避免死锁的问题。

基于缓存的方案总结

可以使用缓存来代替数据库来实现分布式锁,这个可以提供更好的性能,同时,很多缓存服务都是集群部署的,可以避免单点问题。并且很多缓存服务都提供了可以用来实现分布式锁的方法,比如Tair的put方法,redis的setnx方法等。并且,这些缓存服务也都提供了对数据的过期自动删除的支持,可以直接设置超时时间来控制锁的释放。使用缓存实现分布式锁的优点性能好,实现起来较为方便。使用缓存实现分布式锁的缺点通过超时时间来控制锁的失效时间并不是十分的靠谱。

基于Zookeeper实现分布式锁

基于zookeeper临时有序节点可以实现的分布式锁。

大致思想即为:每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。

来看下Zookeeper能不能解决前面提到的问题。
  • • 锁无法释放?使用Zookeeper可以有效的解决锁无法释放的问题,因为在创建锁的时候,客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
  • • 非阻塞锁?使用Zookeeper可以实现阻塞的锁,客户端可以通过在ZK中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。
  • • 不可重入?使用Zookeeper也可以有效的解决不可重入的问题,客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。
  • • 单点问题?使用Zookeeper可以有效的解决单点问题,ZK是集群部署的,只要集群中有半数以上的机器存活,就可以对外提供服务。

可以直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。

Curator提供的InterProcessMutex是分布式锁的实现。acquire方法用户获取锁,release方法用于释放锁。

使用ZK实现的分布式锁好像完全符合了本文开头我们对一个分布式锁的所有期望。但是,其实并不是,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。

其实,使用Zookeeper也有可能带来并发问题,只是并不常见而已。考虑这样的情况,由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。就可能产生并发问题。这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点。(所以,选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡。)

基于Zk的方案的总结
  • 使用Zookeeper实现分布式锁的优点
  • 有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。
  • 使用Zookeeper实现分布式锁的缺点
  • 性能上不如使用缓存实现分布式锁。 需要对ZK的原理有所了解。

三种方案的比较上面几种方式,哪种方式都无法做到完美。就像CAP一样,在复杂性、可靠性、性能等方面无法同时满足,所以,根据不同的应用场景选择最适合自己的才是王道。

  • 从理解的难易程度角度(从低到高)
  • 数据库 > 缓存 > Zookeeper
  • 从实现的复杂性角度(从低到高)
  • Zookeeper >= 缓存 > 数据库
  • 从性能角度(从高到低)
  • 缓存 > Zookeeper >= 数据库
  • 从可靠性角度(从高到低)
  • Zookeeper > 缓存 > 数据库

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/696815
推荐阅读
相关标签
  

闽ICP备14008679号