当前位置:   article > 正文

实战 -- Zookeeper实现分布式锁

实战 -- Zookeeper实现分布式锁

场景分析

比如购买商品的操作,首先获取商品的库存,判断库存是否充足,充足的话创建订单减库存,不充足的话不允许创建订单。

有一款新型笔记本电脑,库存只剩下1件的时候,有客户A、B从不同的客户端(比如网站和APP)上看中了这款电脑,两人同时进行下单操作。

A和B同时获取库存,A购买1件,库存为1,判断充足,创建订单,库存减1,B购买1件,库存为1,判断充足,创建订单,库存减1。

结果是仅剩1件的商品,被两次下单,库存变成了-1。显然这种结果是错误的,A和B之间有一人的订单无法发货。

如何解决呢

可以用一个锁,A在下单的时候,给库存加上一个锁,此时除了A以外,任何人都不能对库存进行操作,B在获取库存的时候 ,由于A对库存加上了锁,所以B只好等待A释放锁之后在继续。

A创建完订单,对库存减1,释放锁,B获取锁再继续获取库存,此时库存为0,判断库存不充足,无法创建订单。保证了库存仅剩1的商品只能被下单1次。

这是分布式锁实现的一种方案。

什么是分布式锁

分布式锁,是控制分布式系统或不同系统之间访问共享资源的一种锁的实现,其主要解决的问题就是保证数据一致性。

Zookeeper实现的分布式锁,是利用节点的操作来进行的,加锁,就是创建节点(临时节点),解锁就是删除节点,同一个业务都是在同一父节点下进行加锁和解锁操作,如果该业务父节点下有子节点,则说明该业务已经被锁住了,如果没有子节点,则没被加锁。

临时节点的特点是,当会话失效时,Zookeeper自动清除,避免获取锁的客户端掉线后,没有删除锁节点,而其他客户端都在等这个锁节点删除,产生了死锁

实现分布式锁的两种方式

一、单节点锁

在某一业务节点,只允许创建1个子节点,代表锁,所有客户端争抢创建子节点权限,抢到并创建,则加锁成功,没抢到,则等待机会。如图所示:

图1

  1. 客户端准备加锁的时候,查看该业务节点下有没有子节点,如果没有,则创建节点,此客户端获得锁,执行业务操作。
  2. 如果存在子节点,则代表当前业务被加锁,此时客户端挂起,监听业务节点的子节点变化
  3. 客户端获取锁并执行完业务之后,删除该节点,Zookeeper通知其他客户端,唤醒挂起,继续尝试创建节点。

二、多节点锁

在单节点锁中,所有客户端都操作同一个节点,当只有锁的客户端释放锁时,其他的客户端都从挂起状态中唤醒,来竞争锁。谁先获取锁与客户端的网络状态和Zookeeper的服务器CPU调度等不可控因素有关,和谁先来后到的无关。

如果希望客户端能按照先来后到的顺序来获取锁,就需要用多节点锁来实现,即每个客户端在同一业务节点下创建专属自己的顺序节点,按照顺序节点的序号,来决定谁获取锁。如图:

多节点锁

 

  1. 某个客户端尝试加锁时,先在该业务节点下,创建一个顺序节点
  2. 创建完成后,获取出该业务节点下的所有子节点,并按照按照节点序号排序
  3. 判断第一位的节点是否为自己的节点,是的话,代表获取锁,执行业务操作
  4. 不是的话,对排在自己前一位的节点进行监听,客户端挂起
  5. 当客户端执行业务完毕后,删除自己的节点,并通知监听自己节点的客户端进行业务操作。

多节点代码实现

类设计如下

图三

  1. BusinessTypeEnum枚举,定义了业务的类型,用来区分不同业务,如果要对某个业务加锁,就在BusinessTypeEnum定义的业务类型下创建节点
  2. CuatorExt接口,操作Zookeeper的客户端,定义了一些操作方法
  3. AbstractCuatorExt类,客户端CuatorExt接口方法的实现,规范了客户端基本结构
  4. BaseDistributedLock类,继承了AbstractCuatorExt,分布式锁实现的核心,规范了分布式锁结构,对它的子类公开获取锁的方法。
  5. DistributedLock接口,分布式锁对外公开的接口,提供获取锁和释放锁两种功能。
  6. DistributedLockImpl类是对DistributedLock接口的实现
  7. BuyService类,业务类。

BusinessTypeEnum

  1. public enum BusinessTypeEnum {
  2. items("/items"),
  3. orders("/orders");
  4. private String value;
  5. BusinessTypeEnum(String value){
  6. this.value = value;
  7. }
  8. public String getValue() {
  9. return value;
  10. }
  11. }

CuatorExt

  1. public interface CuatorExt {
  2. /**
  3. * 创建临时序列节点
  4. * @param basePath
  5. * @return
  6. */
  7. public String createEphemeralSequential(String basePath) throws Exception;
  8. /**
  9. * 删除节点
  10. * @param ourPath
  11. */
  12. public void delete(String ourPath) throws Exception;
  13. /**
  14. * 获取子节点
  15. * @param basePath
  16. * @return
  17. */
  18. public List<String> getChildren(String basePath) throws Exception;
  19. }

BaseDistributedLock

  1. public class BaseDistributedLock extends AbstractCuatorExt {
  2. private static final String NAME_SPACE="lock_namespace";
  3. private static final String DISTRIBUTED_LOCK = "/lock-";
  4. BaseDistributedLock(CuratorFramework client) {
  5. super(client);
  6. }
  7. private static final Integer MAX_RETRY_COUNT = 10;//重试次数
  8. public void init(){
  9. this.client = this.client.usingNamespace(NAME_SPACE);
  10. for(BusinessTypeEnum b : BusinessTypeEnum.values()){
  11. try {
  12. if(this.client.checkExists().forPath(b.getValue())==null){
  13. this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(b.getValue());
  14. }
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }
  20. /**
  21. * 获取锁,并设置超时时间
  22. * @param time
  23. * @param timeUnit
  24. * @param businessType
  25. * @return
  26. * @throws Exception
  27. */
  28. protected String attemptLock(long time, TimeUnit timeUnit, BusinessTypeEnum businessType) throws Exception {
  29. boolean goDone = false;
  30. String ourPath = null;
  31. String lockName = null;
  32. long startMillis = System.currentTimeMillis();
  33. int count = 0;
  34. while (!goDone) {
  35. goDone = true;
  36. try {
  37. ourPath = createEphemeralSequential(businessType.getValue()+DISTRIBUTED_LOCK);
  38. lockName = waitToLock(startMillis, time, timeUnit, businessType, ourPath);
  39. } catch (Exception e) {
  40. if (count++ < MAX_RETRY_COUNT) {
  41. goDone = false;
  42. } else {
  43. throw e;
  44. }
  45. }
  46. }
  47. return lockName;
  48. }
  49. private String waitToLock(long startMillis, long time, TimeUnit timeUnit, BusinessTypeEnum businessType, String ourPath) throws Exception {
  50. boolean haveLock = false;
  51. String lockName = null;
  52. Long waitMillis = timeUnit == null ? null : timeUnit.toMillis(time);
  53. boolean doDelete = false;
  54. try {
  55. while (!haveLock) {
  56. List<String> children = getChildrenAndSortThem(businessType.getValue());
  57. int index = children.indexOf(ourPath.substring(( businessType.getValue() + "/").length()));
  58. if (index < 0) {
  59. throw new Exception("无此节点:" + ourPath);
  60. }
  61. if (index == 0) {
  62. haveLock = true;
  63. lockName = ourPath;
  64. } else {
  65. String frontPath = children.get(index-1);
  66. CountDownLatch countDownLatch = new CountDownLatch(1);
  67. getClient().getData().usingWatcher(new CuratorWatcher() {
  68. @Override
  69. public void process(WatchedEvent watchedEvent) throws Exception {
  70. countDownLatch.countDown();
  71. lg.info(frontPath + "完成");
  72. }
  73. }).forPath(businessType.getValue()+"/"+frontPath);
  74. if(waitMillis!=null){
  75. waitMillis = System.currentTimeMillis() - startMillis;
  76. if(waitMillis>0){
  77. lg.info(ourPath + "等待" + frontPath + "完成");
  78. countDownLatch.await(waitMillis,timeUnit);
  79. }else{
  80. throw new Exception(ourPath+"等待超时");
  81. }
  82. }else{
  83. lg.info(ourPath + "等待" + frontPath + "完成");
  84. countDownLatch.await();
  85. }
  86. startMillis = System.currentTimeMillis();
  87. }
  88. }
  89. } catch (Exception e) {
  90. doDelete = true;
  91. throw e;
  92. }finally {
  93. if(doDelete){
  94. delete(ourPath);
  95. }
  96. }
  97. return lockName;
  98. }
  99. private List<String> getChildrenAndSortThem(String basePath) {
  100. List<String> children = null;
  101. try {
  102. children = getChildren(basePath);
  103. Collections.sort(children, new Comparator<String>() {
  104. @Override
  105. public int compare(String o1, String o2) {
  106. return getLockNumber(o1, basePath.length()) - getLockNumber(o2, basePath.length());
  107. }
  108. });
  109. } catch (Exception e) {
  110. e.printStackTrace();
  111. }
  112. return children;
  113. }
  114. private int getLockNumber(String node, int suff) {
  115. node = node.substring(suff);
  116. return Integer.parseInt(node);
  117. }
  118. }

AbstractCuatorExt

  1. public class AbstractCuatorExt implements CuatorExt {
  2. final static Logger lg = LoggerFactory.getLogger(AbstractCuatorExt.class);
  3. public CuratorFramework client;
  4. AbstractCuatorExt(CuratorFramework client){
  5. this.client = client;
  6. }
  7. public CuratorFramework getClient() {
  8. return client;
  9. }
  10. @Override
  11. public String createEphemeralSequential(String basePath) throws Exception {
  12. String o = this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(basePath);
  13. return o;
  14. }
  15. @Override
  16. public void delete(String ourPath) throws Exception {
  17. this.client.delete().forPath(ourPath);
  18. }
  19. @Override
  20. public List<String> getChildren(String basePath) throws Exception {
  21. List<String> children = this.client.getChildren().forPath(basePath);
  22. return children;
  23. }
  24. }

DistributedLock

  1. /**
  2. * 分配锁
  3. */
  4. public interface DistributedLock {
  5. /**获取锁,如果没有得到就等待*/
  6. public String acquire(BusinessTypeEnum businessType) throws Exception;
  7. /**
  8. * 获取锁,直到超时
  9. * @param time 超时时间
  10. * @param unit time参数的单位
  11. * @return是否获取到锁
  12. * @throws Exception
  13. */
  14. public String acquire(BusinessTypeEnum businessType,long time, TimeUnit unit) throws Exception;
  15. /**
  16. * 释放锁
  17. * @throws Exception
  18. */
  19. public void release(String lockName) throws Exception;
  20. }

DistributedLockImpl

  1. public class DistributedLockImpl extends BaseDistributedLock implements DistributedLock {
  2. DistributedLockImpl(CuratorFramework client) {
  3. super(client);
  4. }
  5. @Override
  6. public String acquire(BusinessTypeEnum businessType) throws Exception {
  7. return attemptLock(0,null,businessType);
  8. }
  9. @Override
  10. public String acquire(BusinessTypeEnum businessType, long time, TimeUnit unit) throws Exception {
  11. return attemptLock(time,unit,businessType);
  12. }
  13. @Override
  14. public void release(String lockName) throws Exception {
  15. delete(lockName);
  16. }
  17. }

BuyService

  1. @Service("byService")
  2. public class ByServiceImpl implements BuyService {
  3. static int i = 0;
  4. Logger lg = LoggerFactory.getLogger(ByServiceImpl.class);
  5. @Autowired
  6. OrderService orderService;
  7. @Autowired
  8. ItemService itemService;
  9. @Autowired
  10. DistributedLock distributedLock;
  11. @Override
  12. public String getLock(String name) {
  13. lg.info("开始获取锁");
  14. String lockName = null;
  15. try {
  16. lockName = distributedLock.acquire(BusinessTypeEnum.items);
  17. lg.info(lockName + "进行业务中:");
  18. Thread.sleep(30000);
  19. distributedLock.release(lockName);
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. return null;
  23. }
  24. lg.info(lockName+"释放完毕");
  25. return lockName;
  26. }
  27. }

Spring配置

  1. <bean id="distributedLock" class="xin.youhuila.shopdobbo.web.lock.impl.DistributedLock" init-method="init">
  2. <constructor-arg ref="client"/>
  3. </bean>
  4. <bean id="retryPolicy" class="org.apache.curator.retry.RetryNTimes">
  5. <constructor-arg index="0" value="10"/>
  6. <constructor-arg index="1" value="5000"/>
  7. </bean>
  8. <bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start">
  9. <constructor-arg index="0" value="localhost:2181"/>
  10. <constructor-arg index="1" value="10000"/>
  11. <constructor-arg index="2" value="5000"/>
  12. <constructor-arg index="3" ref="retryPolicy"/>
  13. </bean>

效果图

图4

单节点锁代码实现

比较简单,直接看代码吧

  1. public class DistributedLock {
  2. private CuratorFramework client = null;
  3. final static Logger lg =LoggerFactory.getLogger(DistributedLockImooc.class);
  4. private static CountDownLatch zkLocklatch = new CountDownLatch(1);
  5. private static final String ZK_LOCK_PROJECT = "imooc-locks";
  6. private static final String DISTRIBUTED_LOCK = "distributed_lock";
  7. public DistributedLockImooc(CuratorFramework client) {
  8. this.client = client;
  9. }
  10. private void init(){
  11. client = client.usingNamespace("ZKLocks-Namespace");
  12. try {
  13. if(client.checkExists().forPath("/".concat(ZK_LOCK_PROJECT))==null){
  14. client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
  15. .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
  16. .forPath("/"+ZK_LOCK_PROJECT);
  17. }
  18. addWatcherToLock("/"+ZK_LOCK_PROJECT);
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. private void addWatcherToLock(String path) throws Exception {
  24. final PathChildrenCache cache = new PathChildrenCache(client,path,true);
  25. cache.start( PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
  26. cache.getListenable().addListener(new PathChildrenCacheListener(){
  27. @Override
  28. public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
  29. if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
  30. String path = event.getData().getPath();
  31. lg.info("上一个会话已释放锁或断开"+path);
  32. lg.info("释放计数器,让当前请求来获得分布式锁");
  33. zkLocklatch.countDown();
  34. }
  35. }
  36. });
  37. }
  38. public boolean releaseLock(){
  39. try {
  40. if(client.checkExists().forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK)!=null){
  41. client.delete().forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK);
  42. }
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. return false;
  46. }
  47. lg.info("释放成功");
  48. return true;
  49. }
  50. public void getLock(){
  51. int i = 0;
  52. while(true){
  53. try {
  54. client.create().creatingParentsIfNeeded().withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
  55. .forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK);
  56. lg.info("获取分布式锁成功");
  57. return;
  58. } catch (Exception e) {
  59. lg.info("获取分布式锁失败");
  60. try {
  61. if(zkLocklatch.getCount()<=0){
  62. zkLocklatch = new CountDownLatch(1);
  63. }
  64. zkLocklatch.await();
  65. } catch (InterruptedException e1) {
  66. e1.printStackTrace();
  67. }
  68. lg.info("第"+i+"尝试此获取锁");
  69. }
  70. }
  71. }
  72. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/615389
推荐阅读
相关标签
  

闽ICP备14008679号