当前位置:   article > 正文

初识分布式锁(一):分布式锁原理浅析及Mysql实现案例_mysql实现分布式锁原理

mysql实现分布式锁原理

什么是分布式锁

普通锁

要讲分布式锁,我们先来了解下普通锁是什么。在计算机科学中,锁(lock)是一种同步机制,用于在许多线程执行时对资源的限制。

通过使用,确保多个线程在访问同一个共享变量时,都能保持正确的访问顺序,不至于因为线程的争抢执行导致脏数据等问题的产生。

在这里插入图片描述

具体来说,锁依据实现又可以分成:悲观锁、乐观锁、自选锁等多种,这里我就不赘述了。

分布式锁

现在咱们已经有了基本的锁概念了,那么为什么还需要设计分布式锁呢?

在这里插入图片描述

以上图为例子,设计分布式锁的主要原因在于,普通锁只是针对单个系统下的多个线程的。而多个系统下的线程,普通锁是无法约束的。因此,需要针对不同系统下的线程都做一个相应的同步机制,从而保证对同一变量的约束。

案例实践

普通式锁

talk is cheap, show me the code.

为了说明分布式的使用场景,我尽可能搭建了一个类似的场景,主要模拟的是秒杀场景下,按照sku纬度扣减库存。整体的逻辑思路图如下:

在这里插入图片描述

首先,根据对应的场景设计一个简单的数据库。这里我主要设计了一个主键id、skuId以及对应的数量number用于扣减。

create table product_stock
(
    id     int auto_increment,
    sku_id int not null,
    number int not null,
    constraint product_stock_id_uindex
        unique (id),
    constraint product_stock_sku_id_uindex
        unique (sku_id)
);
alter table product_stock
    add primary key (id);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

然后往数据库中添加几个简单的数据,初始数据我这里都设置成了100。依照关系数据库设计对应PO对象。(限于篇幅原因这里略去)。

并编写扣减库存的方法。这里我采用了异步请求的方式。

@PostMapping("/testDeductProduct")
public ResultDTO<Boolean> testInsertData(@RequestBody ProductPO productPO){
  return new ResultDTO<>().success(distributeLockService.deductProduct(productPO));
}

@SneakyThrows
public Boolean deductProduct(ProductPO productPO){
  CompletableFuture<Exception> subThread = CompletableFuture.supplyAsync(()->{
    try{
      LOGGER.info("查找商品的数据为 :"+ JSON.toJSONString(productPO));
      Example example = new Example(ProductPO.class);
      Example.Criteria criteria = example.createCriteria();
      criteria.andEqualTo("skuId", productPO.getSkuId());
      List<ProductPO> productPOS = productMapper.selectByExample(example);
      if (CollectionUtils.isEmpty(productPOS)){
        throw new RuntimeException("当前商品不存在");
      }
      for (ProductPO selectProduct: productPOS){
        //对对应的sku进行数量扣减
        Integer number = selectProduct.getNumber();
        LOGGER.info("当前商品的数量为:"+number);
        if (number<=0){
          //小于等于0时,不进行扣减
          continue;
        }
        selectProduct.setNumber(number-1);
        productMapper.updateByPrimaryKey(selectProduct);
      }
      //最后删除表中数据进行解锁
      return null;
    }catch (Exception e){
      return e;
    }
  });
  Exception exception = subThread.get();
  if (exception !=null){
    throw exception;
  }
  return true;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

因为要采用异步请求的方法,因此写了一个线程类继承自runnable接口,通过httpClient的方式组装相应的参数信息,实现请求。

@Data
@AllArgsConstructor
@NoArgsConstructor
@Slf4j
public class ProductRunnable implements Runnable{

    /**
     * 对应的请求的url
     */
    String url;

    /**
     * 对应的参数map,如何保证对应的资源不受影响? 考虑ThreadLocal
     */
    Map<String, String> paramMap;

    public ProductRunnable(String urlTemplate, String port, Map<String, String> paramMap) {
        this.url = String.format(urlTemplate,port);
        this.paramMap = paramMap;
    }

    @SneakyThrows
    @Override
    public void run() {
        CloseableHttpClient httpClient = null;
        try {
            JSONObject json = new JSONObject();
            json.putAll(paramMap);
            RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectTimeout(5000)//一、连接超时:connectionTimeout-->指的是连接一个url的连接等待时间
                    .setSocketTimeout(5000)// 二、读取数据超时:SocketTimeout-->指的是连接上一个url,获取response的返回等待时间
                    .setConnectionRequestTimeout(5000)
                    .build();
            httpClient = HttpClients.createDefault();

            HttpPost post = new HttpPost(this.url);
            post.setHeader("Content-type", "application/json; charset=utf-8");
            post.setConfig(requestConfig);
            post.setEntity(new StringEntity(json.toString(),"UTF-8"));
            LOGGER.info("请求post为:" + JSON.toJSONString(post));

            //发送请求
            HttpResponse response = httpClient.execute(post);
            //获取响应结果
            if (200 == response.getStatusLine().getStatusCode()) {
                HttpEntity resEntity = response.getEntity();
                String message = EntityUtils.toString(resEntity, "utf-8");
                LOGGER.info("返回json为:"+ JSON.toJSONString(message));
            } else {
                LOGGER.error(String.format("请求失败,错误码为 %d ,返回json为:%s",response.getStatusLine().getStatusCode(),JSON.toJSONString(response)));
            }
        } finally {
            if (httpClient != null) {
                httpClient.close();
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

同时,写一个相应的请求main方法,用于请求对应接口。

@SneakyThrows
public static void main(String[] args) {
  //请求参数
  Map<String, String> map = new HashMap<>();
  map.put("skuId", "10001");
  //创建10个线程通过HttpClient进行发送请求,测试
  for (int i = 0; i < 10; i++) {
    //8080、8081交替请求,每个服务器处理5个请求
    String port = "8080";
    ProductRunnable productRunnable = new ProductRunnable("http://localhost:%s/testDeductProduct", port, map);
    Thread studentThread = new Thread(productRunnable);
    studentThread.start();
    Thread.sleep(100);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

首先,我们运行一下,看看结果如何。在这里插入图片描述

在这里插入图片描述
很清晰的看到,数据库的数据并没有如我们所料变成90,而是变成了96。细心的同学可能已经注意到了,deductProduct并没有做相应的同步机制,导致了数据出现了错误。因此需要对异步线程进行同步,这里我使用比较简单的方式,采用synchronized进行同步,方法修改如下:

@SneakyThrows
public synchronized Boolean deductProduct(ProductPO productPO){
	....
}
  • 1
  • 2
  • 3
  • 4

紧接着我们再测试一遍,看看效果如何

在这里插入图片描述

在这里插入图片描述

这次的结果,就符合我们心里的预期了~

数据库实现分布式锁

然而,这样就没有问题了么?并不是的,咱们来看看下面这个场景。我以交替请求的方式重新设置对应的端口,从而模拟出分布式请求的情况。

@SneakyThrows
public static void main(String[] args) {
  //请求参数
  Map<String, String> map = new HashMap<>();
  map.put("skuId", "10001");
  //创建10个线程通过HttpClient进行发送请求,测试
  for (int i = 0; i < 10; i++) {
    //8080、8081交替请求,每个服务器处理5个请求
    String port = "808" + (i % 2);
    ProductRunnable productRunnable = new ProductRunnable("http://localhost:%s/testDeductProduct", port, map);
    Thread studentThread = new Thread(productRunnable);
    studentThread.start();
    Thread.sleep(100);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

结果如下:
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

可以看到,数据依然遭到了争抢,出现了【超卖】的情况。由此可见,在分布式情况下,光采用java的内置普通锁,是没有保证数据的最终一致性的。因此需要添加相应的分布式锁。

@Service
@Slf4j
public class DistributeLockUtil {
    @Resource
    LockMapper lockMapper;

    // 自旋间隔:单位为毫秒
    private final int interval = 100;

    /**
     * SQL实现分布式锁,往表中插入对应的数据
     * @param lockId
     * @return
     */
    @SneakyThrows
    public void tryLockWithMySql(int lockId){
        int insertRows = 0;
        while (true) {
            // 首先查询是否已经上锁,不存在则进行上锁的操作
            LockInfo lockInfo = new LockInfo();
            lockInfo.setId(lockId);
            lockInfo.setThreadId(Thread.currentThread().getId());
            try{
                insertRows = lockMapper.insert(lockInfo);
            }catch (DuplicateKeyException ignore){
                LOGGER.warn("加锁失败!错误信息为:",ignore);
            }
            if (insertRows>0) {
                break;
            }
            Thread.sleep(interval);
        }
        LOGGER.info("加锁成功,线程id为"+ Thread.currentThread().getId());
    }

    /**
     * 解锁
     * @return
     */
    @SneakyThrows
    public void tryUnlockWithMySql(int lockId){
        LockInfo lockInfo = new LockInfo();
        lockInfo.setId(lockId);
        lockInfo.setThreadId(Thread.currentThread().getId());
        // 首先查询是否存在这个锁,不存在直接返回
        List<LockInfo> lockInfos = lockMapper.select(lockInfo);
        if (CollectionUtils.isEmpty(lockInfos)){
            return ;
        }
        int deleteRows = lockMapper.delete(lockInfo);
        if (deleteRows <= 0){
            throw new RuntimeException("解锁失败!");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

这里,我首先以数据库为例子,做相应的分布式锁进行优化。这里先简要说一下加锁逻辑,建立一个锁表,通过往锁表中插入数据的方式,来进行相应的同步操作。

需要特别注意的是,需要同时catch住DuplicateKeyException的异常,当出现这类异常的时候,让线程重试,直到最终线程加锁成功。这样才能保证最终数据的一致性。

具体代码如下:

    @SneakyThrows
    public synchronized Boolean deductProduct(ProductPO productPO){
        CompletableFuture<Exception> subThread = CompletableFuture.supplyAsync(()->{
            try{
                //首先分布式加锁,这里用了个枚举类,实际上就是1
                distributeLockUtil.tryLockWithMySql(PRODUCT_LOCK.getLockId());
                LOGGER.info("查找商品的数据为 :"+ JSON.toJSONString(productPO));
                Example example = new Example(ProductPO.class);
                Example.Criteria criteria = example.createCriteria();
                criteria.andEqualTo("skuId", productPO.getSkuId());
                List<ProductPO> productPOS = productMapper.selectByExample(example);
                if (CollectionUtils.isEmpty(productPOS)){
                    throw new RuntimeException("当前商品不存在");
                }
                for (ProductPO selectProductPO: productPOS){
                    //对对应的sku进行数量扣减
                    Integer number = selectProductPO.getNumber();
                    LOGGER.info("当前商品的数量为:"+number);
                    if (number<=0){
                        //小于等于0时,不进行扣减
                        continue;
                    }
                    selectProductPO.setNumber(number-1);
                    productMapper.updateByPrimaryKey(selectProductPO);
                }
                //最后删除表中数据进行解锁
                return null;
            }catch (Exception e){
                return e;
            }finally {
                distributeLockUtil.tryUnlockWithMySql(PRODUCT_LOCK.getLockId());
            }
        });
        Exception exception = subThread.get();
        if (exception !=null){
            throw exception;
        }
        return true;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

然后对外层,只需要在进入业务逻辑前进行上锁,在finally退出时再进行解锁即可。代码运行的具体结果如下:
在这里插入图片描述
在这里插入图片描述

看看结果,应该是没有问题了~

缺陷:

1、因为基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以数据库需要高可用性,主从备份确保锁可用;

2、不可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,可以在表中新增一列,用于记录当前获取到锁的机器和线程信息。再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;

3、没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;

4、不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。

5、在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。

参考文献

详细讲解!从秒杀聊到ZooKeeper分布式锁

什么是分布式锁?实现分布式锁的三种方式

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/745125
推荐阅读
相关标签
  

闽ICP备14008679号