赞
踩
要讲分布式锁,我们先来了解下普通锁是什么。在计算机科学中,锁(lock)是一种同步机制,用于在许多线程执行时对资源的限制。
通过使用锁,确保多个线程在访问同一个共享变量时,都能保持正确的访问顺序,不至于因为线程的争抢执行导致脏数据等问题的产生。
具体来说,锁依据实现又可以分成:悲观锁、乐观锁、自选锁等多种,这里我就不赘述了。
现在咱们已经有了基本的锁概念了,那么为什么还需要设计分布式锁呢?
以上图为例子,设计分布式锁的主要原因在于,普通锁只是针对单个系统下的多个线程的。而多个系统下的线程,普通锁是无法约束的。因此,需要针对不同系统下的线程都做一个相应的同步机制,从而保证对同一变量的约束。
talk is cheap, show me the code.
为了说明分布式的使用场景,我尽可能搭建了一个类似的场景,主要模拟的是秒杀场景下,按照sku纬度扣减库存。整体的逻辑思路图如下:
首先,根据对应的场景设计一个简单的数据库。这里我主要设计了一个主键id、skuId以及对应的数量number用于扣减。
create table product_stock
(
id int auto_increment,
sku_id int not null,
number int not null,
constraint product_stock_id_uindex
unique (id),
constraint product_stock_sku_id_uindex
unique (sku_id)
);
alter table product_stock
add primary key (id);
然后往数据库中添加几个简单的数据,初始数据我这里都设置成了100。依照关系数据库设计对应PO对象。(限于篇幅原因这里略去)。
并编写扣减库存的方法。这里我采用了异步请求的方式。
@PostMapping("/testDeductProduct") public ResultDTO<Boolean> testInsertData(@RequestBody ProductPO productPO){ return new ResultDTO<>().success(distributeLockService.deductProduct(productPO)); } @SneakyThrows public Boolean deductProduct(ProductPO productPO){ CompletableFuture<Exception> subThread = CompletableFuture.supplyAsync(()->{ try{ LOGGER.info("查找商品的数据为 :"+ JSON.toJSONString(productPO)); Example example = new Example(ProductPO.class); Example.Criteria criteria = example.createCriteria(); criteria.andEqualTo("skuId", productPO.getSkuId()); List<ProductPO> productPOS = productMapper.selectByExample(example); if (CollectionUtils.isEmpty(productPOS)){ throw new RuntimeException("当前商品不存在"); } for (ProductPO selectProduct: productPOS){ //对对应的sku进行数量扣减 Integer number = selectProduct.getNumber(); LOGGER.info("当前商品的数量为:"+number); if (number<=0){ //小于等于0时,不进行扣减 continue; } selectProduct.setNumber(number-1); productMapper.updateByPrimaryKey(selectProduct); } //最后删除表中数据进行解锁 return null; }catch (Exception e){ return e; } }); Exception exception = subThread.get(); if (exception !=null){ throw exception; } return true; }
因为要采用异步请求的方法,因此写了一个线程类继承自runnable接口,通过httpClient的方式组装相应的参数信息,实现请求。
@Data @AllArgsConstructor @NoArgsConstructor @Slf4j public class ProductRunnable implements Runnable{ /** * 对应的请求的url */ String url; /** * 对应的参数map,如何保证对应的资源不受影响? 考虑ThreadLocal */ Map<String, String> paramMap; public ProductRunnable(String urlTemplate, String port, Map<String, String> paramMap) { this.url = String.format(urlTemplate,port); this.paramMap = paramMap; } @SneakyThrows @Override public void run() { CloseableHttpClient httpClient = null; try { JSONObject json = new JSONObject(); json.putAll(paramMap); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(5000)//一、连接超时:connectionTimeout-->指的是连接一个url的连接等待时间 .setSocketTimeout(5000)// 二、读取数据超时:SocketTimeout-->指的是连接上一个url,获取response的返回等待时间 .setConnectionRequestTimeout(5000) .build(); httpClient = HttpClients.createDefault(); HttpPost post = new HttpPost(this.url); post.setHeader("Content-type", "application/json; charset=utf-8"); post.setConfig(requestConfig); post.setEntity(new StringEntity(json.toString(),"UTF-8")); LOGGER.info("请求post为:" + JSON.toJSONString(post)); //发送请求 HttpResponse response = httpClient.execute(post); //获取响应结果 if (200 == response.getStatusLine().getStatusCode()) { HttpEntity resEntity = response.getEntity(); String message = EntityUtils.toString(resEntity, "utf-8"); LOGGER.info("返回json为:"+ JSON.toJSONString(message)); } else { LOGGER.error(String.format("请求失败,错误码为 %d ,返回json为:%s",response.getStatusLine().getStatusCode(),JSON.toJSONString(response))); } } finally { if (httpClient != null) { httpClient.close(); } } } }
同时,写一个相应的请求main方法,用于请求对应接口。
@SneakyThrows
public static void main(String[] args) {
//请求参数
Map<String, String> map = new HashMap<>();
map.put("skuId", "10001");
//创建10个线程通过HttpClient进行发送请求,测试
for (int i = 0; i < 10; i++) {
//8080、8081交替请求,每个服务器处理5个请求
String port = "8080";
ProductRunnable productRunnable = new ProductRunnable("http://localhost:%s/testDeductProduct", port, map);
Thread studentThread = new Thread(productRunnable);
studentThread.start();
Thread.sleep(100);
}
}
首先,我们运行一下,看看结果如何。
很清晰的看到,数据库的数据并没有如我们所料变成90,而是变成了96。细心的同学可能已经注意到了,deductProduct并没有做相应的同步机制,导致了数据出现了错误。因此需要对异步线程进行同步,这里我使用比较简单的方式,采用synchronized进行同步,方法修改如下:
@SneakyThrows
public synchronized Boolean deductProduct(ProductPO productPO){
....
}
紧接着我们再测试一遍,看看效果如何
这次的结果,就符合我们心里的预期了~
然而,这样就没有问题了么?并不是的,咱们来看看下面这个场景。我以交替请求的方式重新设置对应的端口,从而模拟出分布式请求的情况。
@SneakyThrows
public static void main(String[] args) {
//请求参数
Map<String, String> map = new HashMap<>();
map.put("skuId", "10001");
//创建10个线程通过HttpClient进行发送请求,测试
for (int i = 0; i < 10; i++) {
//8080、8081交替请求,每个服务器处理5个请求
String port = "808" + (i % 2);
ProductRunnable productRunnable = new ProductRunnable("http://localhost:%s/testDeductProduct", port, map);
Thread studentThread = new Thread(productRunnable);
studentThread.start();
Thread.sleep(100);
}
}
结果如下:
可以看到,数据依然遭到了争抢,出现了【超卖】的情况。由此可见,在分布式情况下,光采用java的内置普通锁,是没有保证数据的最终一致性的。因此需要添加相应的分布式锁。
@Service @Slf4j public class DistributeLockUtil { @Resource LockMapper lockMapper; // 自旋间隔:单位为毫秒 private final int interval = 100; /** * SQL实现分布式锁,往表中插入对应的数据 * @param lockId * @return */ @SneakyThrows public void tryLockWithMySql(int lockId){ int insertRows = 0; while (true) { // 首先查询是否已经上锁,不存在则进行上锁的操作 LockInfo lockInfo = new LockInfo(); lockInfo.setId(lockId); lockInfo.setThreadId(Thread.currentThread().getId()); try{ insertRows = lockMapper.insert(lockInfo); }catch (DuplicateKeyException ignore){ LOGGER.warn("加锁失败!错误信息为:",ignore); } if (insertRows>0) { break; } Thread.sleep(interval); } LOGGER.info("加锁成功,线程id为"+ Thread.currentThread().getId()); } /** * 解锁 * @return */ @SneakyThrows public void tryUnlockWithMySql(int lockId){ LockInfo lockInfo = new LockInfo(); lockInfo.setId(lockId); lockInfo.setThreadId(Thread.currentThread().getId()); // 首先查询是否存在这个锁,不存在直接返回 List<LockInfo> lockInfos = lockMapper.select(lockInfo); if (CollectionUtils.isEmpty(lockInfos)){ return ; } int deleteRows = lockMapper.delete(lockInfo); if (deleteRows <= 0){ throw new RuntimeException("解锁失败!"); } } }
这里,我首先以数据库为例子,做相应的分布式锁进行优化。这里先简要说一下加锁逻辑,建立一个锁表,通过往锁表中插入数据的方式,来进行相应的同步操作。
需要特别注意的是,需要同时catch住DuplicateKeyException的异常,当出现这类异常的时候,让线程重试,直到最终线程加锁成功。这样才能保证最终数据的一致性。
具体代码如下:
@SneakyThrows public synchronized Boolean deductProduct(ProductPO productPO){ CompletableFuture<Exception> subThread = CompletableFuture.supplyAsync(()->{ try{ //首先分布式加锁,这里用了个枚举类,实际上就是1 distributeLockUtil.tryLockWithMySql(PRODUCT_LOCK.getLockId()); LOGGER.info("查找商品的数据为 :"+ JSON.toJSONString(productPO)); Example example = new Example(ProductPO.class); Example.Criteria criteria = example.createCriteria(); criteria.andEqualTo("skuId", productPO.getSkuId()); List<ProductPO> productPOS = productMapper.selectByExample(example); if (CollectionUtils.isEmpty(productPOS)){ throw new RuntimeException("当前商品不存在"); } for (ProductPO selectProductPO: productPOS){ //对对应的sku进行数量扣减 Integer number = selectProductPO.getNumber(); LOGGER.info("当前商品的数量为:"+number); if (number<=0){ //小于等于0时,不进行扣减 continue; } selectProductPO.setNumber(number-1); productMapper.updateByPrimaryKey(selectProductPO); } //最后删除表中数据进行解锁 return null; }catch (Exception e){ return e; }finally { distributeLockUtil.tryUnlockWithMySql(PRODUCT_LOCK.getLockId()); } }); Exception exception = subThread.get(); if (exception !=null){ throw exception; } return true; }
然后对外层,只需要在进入业务逻辑前进行上锁,在finally退出时再进行解锁即可。代码运行的具体结果如下:
看看结果,应该是没有问题了~
缺陷:
1、因为基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以数据库需要高可用性,主从备份确保锁可用;
2、不可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,可以在表中新增一列,用于记录当前获取到锁的机器和线程信息。再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;
3、没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;
4、不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。
5、在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。