赞
踩
比如购买商品的操作,首先获取商品的库存,判断库存是否充足,充足的话创建订单减库存,不充足的话不允许创建订单。
有一款新型笔记本电脑,库存只剩下1件的时候,有客户A、B从不同的客户端(比如网站和APP)上看中了这款电脑,两人同时进行下单操作。
A和B同时获取库存,A购买1件,库存为1,判断充足,创建订单,库存减1,B购买1件,库存为1,判断充足,创建订单,库存减1。
结果是仅剩1件的商品,被两次下单,库存变成了-1。显然这种结果是错误的,A和B之间有一人的订单无法发货。
如何解决呢
可以用一个锁,A在下单的时候,给库存加上一个锁,此时除了A以外,任何人都不能对库存进行操作,B在获取库存的时候 ,由于A对库存加上了锁,所以B只好等待A释放锁之后在继续。
A创建完订单,对库存减1,释放锁,B获取锁再继续获取库存,此时库存为0,判断库存不充足,无法创建订单。保证了库存仅剩1的商品只能被下单1次。
这是分布式锁实现的一种方案。
分布式锁,是控制分布式系统或不同系统之间访问共享资源的一种锁的实现,其主要解决的问题就是保证数据一致性。
Zookeeper实现的分布式锁,是利用节点的操作来进行的,加锁,就是创建节点(临时节点),解锁就是删除节点,同一个业务都是在同一父节点下进行加锁和解锁操作,如果该业务父节点下有子节点,则说明该业务已经被锁住了,如果没有子节点,则没被加锁。
临时节点的特点是,当会话失效时,Zookeeper自动清除,避免获取锁的客户端掉线后,没有删除锁节点,而其他客户端都在等这个锁节点删除,产生了死锁。
一、单节点锁
在某一业务节点,只允许创建1个子节点,代表锁,所有客户端争抢创建子节点权限,抢到并创建,则加锁成功,没抢到,则等待机会。如图所示:
二、多节点锁
在单节点锁中,所有客户端都操作同一个节点,当只有锁的客户端释放锁时,其他的客户端都从挂起状态中唤醒,来竞争锁。谁先获取锁与客户端的网络状态和Zookeeper的服务器CPU调度等不可控因素有关,和谁先来后到的无关。
如果希望客户端能按照先来后到的顺序来获取锁,就需要用多节点锁来实现,即每个客户端在同一业务节点下创建专属自己的顺序节点,按照顺序节点的序号,来决定谁获取锁。如图:
类设计如下
- public enum BusinessTypeEnum {
- items("/items"),
- orders("/orders");
- private String value;
- BusinessTypeEnum(String value){
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
- }
- public interface CuatorExt {
- /**
- * 创建临时序列节点
- * @param basePath
- * @return
- */
- public String createEphemeralSequential(String basePath) throws Exception;
-
- /**
- * 删除节点
- * @param ourPath
- */
- public void delete(String ourPath) throws Exception;
-
- /**
- * 获取子节点
- * @param basePath
- * @return
- */
- public List<String> getChildren(String basePath) throws Exception;
-
- }

- public class BaseDistributedLock extends AbstractCuatorExt {
- private static final String NAME_SPACE="lock_namespace";
- private static final String DISTRIBUTED_LOCK = "/lock-";
- BaseDistributedLock(CuratorFramework client) {
- super(client);
- }
- private static final Integer MAX_RETRY_COUNT = 10;//重试次数
- public void init(){
- this.client = this.client.usingNamespace(NAME_SPACE);
- for(BusinessTypeEnum b : BusinessTypeEnum.values()){
- try {
- if(this.client.checkExists().forPath(b.getValue())==null){
- this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(b.getValue());
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * 获取锁,并设置超时时间
- * @param time
- * @param timeUnit
- * @param businessType
- * @return
- * @throws Exception
- */
- protected String attemptLock(long time, TimeUnit timeUnit, BusinessTypeEnum businessType) throws Exception {
- boolean goDone = false;
- String ourPath = null;
- String lockName = null;
- long startMillis = System.currentTimeMillis();
- int count = 0;
- while (!goDone) {
- goDone = true;
- try {
- ourPath = createEphemeralSequential(businessType.getValue()+DISTRIBUTED_LOCK);
- lockName = waitToLock(startMillis, time, timeUnit, businessType, ourPath);
- } catch (Exception e) {
- if (count++ < MAX_RETRY_COUNT) {
- goDone = false;
- } else {
- throw e;
- }
- }
- }
- return lockName;
- }
-
- private String waitToLock(long startMillis, long time, TimeUnit timeUnit, BusinessTypeEnum businessType, String ourPath) throws Exception {
- boolean haveLock = false;
- String lockName = null;
- Long waitMillis = timeUnit == null ? null : timeUnit.toMillis(time);
- boolean doDelete = false;
- try {
- while (!haveLock) {
- List<String> children = getChildrenAndSortThem(businessType.getValue());
- int index = children.indexOf(ourPath.substring(( businessType.getValue() + "/").length()));
- if (index < 0) {
- throw new Exception("无此节点:" + ourPath);
- }
- if (index == 0) {
- haveLock = true;
- lockName = ourPath;
- } else {
- String frontPath = children.get(index-1);
- CountDownLatch countDownLatch = new CountDownLatch(1);
- getClient().getData().usingWatcher(new CuratorWatcher() {
- @Override
- public void process(WatchedEvent watchedEvent) throws Exception {
- countDownLatch.countDown();
- lg.info(frontPath + "完成");
- }
- }).forPath(businessType.getValue()+"/"+frontPath);
- if(waitMillis!=null){
- waitMillis = System.currentTimeMillis() - startMillis;
- if(waitMillis>0){
- lg.info(ourPath + "等待" + frontPath + "完成");
- countDownLatch.await(waitMillis,timeUnit);
- }else{
- throw new Exception(ourPath+"等待超时");
- }
- }else{
- lg.info(ourPath + "等待" + frontPath + "完成");
- countDownLatch.await();
- }
- startMillis = System.currentTimeMillis();
- }
- }
- } catch (Exception e) {
- doDelete = true;
- throw e;
- }finally {
- if(doDelete){
- delete(ourPath);
- }
- }
- return lockName;
- }
-
- private List<String> getChildrenAndSortThem(String basePath) {
- List<String> children = null;
- try {
- children = getChildren(basePath);
- Collections.sort(children, new Comparator<String>() {
- @Override
- public int compare(String o1, String o2) {
- return getLockNumber(o1, basePath.length()) - getLockNumber(o2, basePath.length());
- }
-
- });
- } catch (Exception e) {
- e.printStackTrace();
- }
- return children;
- }
-
- private int getLockNumber(String node, int suff) {
- node = node.substring(suff);
- return Integer.parseInt(node);
- }
- }

- public class AbstractCuatorExt implements CuatorExt {
- final static Logger lg = LoggerFactory.getLogger(AbstractCuatorExt.class);
- public CuratorFramework client;
- AbstractCuatorExt(CuratorFramework client){
- this.client = client;
- }
-
- public CuratorFramework getClient() {
- return client;
- }
-
- @Override
- public String createEphemeralSequential(String basePath) throws Exception {
- String o = this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(basePath);
- return o;
- }
-
- @Override
- public void delete(String ourPath) throws Exception {
- this.client.delete().forPath(ourPath);
- }
-
- @Override
- public List<String> getChildren(String basePath) throws Exception {
- List<String> children = this.client.getChildren().forPath(basePath);
- return children;
- }
- }

- /**
- * 分配锁
- */
- public interface DistributedLock {
-
- /**获取锁,如果没有得到就等待*/
- public String acquire(BusinessTypeEnum businessType) throws Exception;
-
- /**
- * 获取锁,直到超时
- * @param time 超时时间
- * @param unit time参数的单位
- * @return是否获取到锁
- * @throws Exception
- */
- public String acquire(BusinessTypeEnum businessType,long time, TimeUnit unit) throws Exception;
-
- /**
- * 释放锁
- * @throws Exception
- */
- public void release(String lockName) throws Exception;
-
-
- }

- public class DistributedLockImpl extends BaseDistributedLock implements DistributedLock {
- DistributedLockImpl(CuratorFramework client) {
- super(client);
- }
-
- @Override
- public String acquire(BusinessTypeEnum businessType) throws Exception {
- return attemptLock(0,null,businessType);
- }
-
- @Override
- public String acquire(BusinessTypeEnum businessType, long time, TimeUnit unit) throws Exception {
- return attemptLock(time,unit,businessType);
- }
-
- @Override
- public void release(String lockName) throws Exception {
- delete(lockName);
- }
- }

- @Service("byService")
- public class ByServiceImpl implements BuyService {
- static int i = 0;
- Logger lg = LoggerFactory.getLogger(ByServiceImpl.class);
- @Autowired
- OrderService orderService;
- @Autowired
- ItemService itemService;
- @Autowired
- DistributedLock distributedLock;
- @Override
- public String getLock(String name) {
- lg.info("开始获取锁");
- String lockName = null;
- try {
- lockName = distributedLock.acquire(BusinessTypeEnum.items);
- lg.info(lockName + "进行业务中:");
- Thread.sleep(30000);
- distributedLock.release(lockName);
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
-
- lg.info(lockName+"释放完毕");
- return lockName;
- }
- }

- <bean id="distributedLock" class="xin.youhuila.shopdobbo.web.lock.impl.DistributedLock" init-method="init">
- <constructor-arg ref="client"/>
- </bean>
- <bean id="retryPolicy" class="org.apache.curator.retry.RetryNTimes">
- <constructor-arg index="0" value="10"/>
- <constructor-arg index="1" value="5000"/>
- </bean>
- <bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start">
- <constructor-arg index="0" value="localhost:2181"/>
- <constructor-arg index="1" value="10000"/>
- <constructor-arg index="2" value="5000"/>
- <constructor-arg index="3" ref="retryPolicy"/>
- </bean>
比较简单,直接看代码吧
- public class DistributedLock {
- private CuratorFramework client = null;
- final static Logger lg =LoggerFactory.getLogger(DistributedLockImooc.class);
- private static CountDownLatch zkLocklatch = new CountDownLatch(1);
- private static final String ZK_LOCK_PROJECT = "imooc-locks";
- private static final String DISTRIBUTED_LOCK = "distributed_lock";
-
- public DistributedLockImooc(CuratorFramework client) {
- this.client = client;
- }
- private void init(){
- client = client.usingNamespace("ZKLocks-Namespace");
- try {
- if(client.checkExists().forPath("/".concat(ZK_LOCK_PROJECT))==null){
- client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
- .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
- .forPath("/"+ZK_LOCK_PROJECT);
- }
- addWatcherToLock("/"+ZK_LOCK_PROJECT);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private void addWatcherToLock(String path) throws Exception {
- final PathChildrenCache cache = new PathChildrenCache(client,path,true);
- cache.start( PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
- cache.getListenable().addListener(new PathChildrenCacheListener(){
-
- @Override
- public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
- if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
- String path = event.getData().getPath();
- lg.info("上一个会话已释放锁或断开"+path);
- lg.info("释放计数器,让当前请求来获得分布式锁");
- zkLocklatch.countDown();
-
- }
-
- }
- });
- }
-
- public boolean releaseLock(){
- try {
- if(client.checkExists().forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK)!=null){
- client.delete().forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK);
- }
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- lg.info("释放成功");
- return true;
- }
-
- public void getLock(){
- int i = 0;
- while(true){
- try {
- client.create().creatingParentsIfNeeded().withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
- .forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK);
- lg.info("获取分布式锁成功");
- return;
- } catch (Exception e) {
- lg.info("获取分布式锁失败");
- try {
- if(zkLocklatch.getCount()<=0){
- zkLocklatch = new CountDownLatch(1);
- }
- zkLocklatch.await();
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- lg.info("第"+i+"尝试此获取锁");
- }
- }
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。