赞
踩
为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLock或Synchronized)进行互斥控制。在单机环境中,Java中提供了很多并发处理相关的API。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。
利用Memcached的add命令。此命令是原子性操作,只有在key不存在的情况下,才能add成功,也就意味着线程得到了锁。
和Memcached的方式类似,利用Redis的setnx命令。此命令同样是原子性操作,只有在key不存在的情况下,才能set成功。(setnx命令并不完善,指令本身是不支持传入超时时间的,幸好Redis 2.6.12以上版本为set指令增加了可选参数)
可参考:https://juejin.cn/post/6844903616667451399
Google公司实现的粗粒度分布式锁服务,底层利用了Paxos一致性算法。
可参考:https://www.w3cschool.cn/architectroad/architectroad-distributed-lock-2.html
利用Zookeeper的顺序临时节点,来实现分布式锁和等待队列。Zookeeper设计的初衷,就是为了实现分布式锁服务的。我们这篇文章主要将的是这种方式。
Items.java:
package com.huiq.springboot.domain; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; @Entity @Table public class Items { @Id private String id; private String name; private Integer counts; // get和set方法省略。。。。。。 }
Orders.java:
package com.huiq.springboot.domain; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; @Entity @Table public class Orders { @Id private String id; private String itemId; // get和set方法省略。。。。。。 }
ItemsDao.java:
package com.huiq.springboot.dao;
import com.huiq.springboot.domain.Items;
import org.springframework.data.jpa.repository.JpaRepository;
public interface ItemsDao extends JpaRepository<Items, String > {
}
OrdersDao.java:
package com.huiq.springboot.dao;
import com.huiq.springboot.domain.Orders;
import org.springframework.data.jpa.repository.JpaRepository;
public interface OrdersDao extends JpaRepository<Orders, String> {
}
ItemsService.java:
package com.huiq.springboot.service; import com.huiq.springboot.dao.ItemsDao; import com.huiq.springboot.domain.Items; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.UUID; @Service public class ItemsService { @Autowired private ItemsDao itemsDao; public Items getItem(String itemId) { return itemsDao.findById(itemId).orElse(null); } public void save(Items items) { items.setId(UUID.randomUUID().toString()); itemsDao.save(items); } // 根据itemId获取库存量 public int getItemCounts(String itemId) { return itemsDao.findById(itemId).orElse(null).getCounts(); } // 调整库存 public void reduceCount(String itemId, int count) { Items items = getItem(itemId); items.setCounts(items.getCounts() - count); itemsDao.save(items); } }
OrdersService.java:
package com.huiq.springboot.service; import com.huiq.springboot.dao.OrdersDao; import com.huiq.springboot.domain.Orders; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.UUID; @Service @Slf4j public class OrdersService { @Autowired private OrdersDao ordersDao; public boolean save(String itemId) { try { Orders orders = new Orders(); orders.setId(UUID.randomUUID().toString()); orders.setItemId(itemId); ordersDao.save(orders); log.info("订单创建成功。。。。。。"); return true; } catch (Exception e) { e.printStackTrace(); return false; } } }
PayService.java:
package com.huiq.springboot.service; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @Slf4j public class PayService { @Autowired private OrdersService ordersService; @Autowired private ItemsService itemsService; public boolean buy(String itemId) { // 假设每次购买9个 int buyCount = 9; // 第一步:是否有库存 int count = itemsService.getItemCounts(itemId); if (count < buyCount) { log.error("库存不足,下单失败。购买数{}件,库存只有{}件", buyCount, count); return false; } // 第二步:创建订单 boolean flag = ordersService.save(itemId); // 模拟高并发的场景 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } // 第三步:减库存 if (flag) { itemsService.reduceCount(itemId, buyCount); } else { log.error("订单创建失败。。。。。。"); return false; } return true; } }
BuyController.java:
package com.huiq.springboot.controller; import com.huiq.springboot.service.PayService; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @RestController public class BuyController { @Autowired private PayService payService; @GetMapping("/buy1") @ResponseBody public String buy1(String itemId) { if (StringUtils.isNotBlank(itemId)) { if (payService.buy(itemId)) { return "订单创建成功。。。。。。"; } else { return "订单创建失败。。。。。。"; } } else { return "条目id不能为空。。。。。。"; } } @GetMapping("/buy2") @ResponseBody public String buy2(String itemId) { if (StringUtils.isNotBlank(itemId)) { if (payService.buy(itemId)) { return "订单创建成功。。。。。。"; } else { return "订单创建失败。。。。。。"; } } else { return "条目id不能为空。。。。。。"; } } }
pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.3</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.huiq</groupId> <artifactId>spring-boot-data-pay</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-data-jdbc</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <!-- 引入@Sl4j日志依赖 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- StringUtils依赖 --> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
启动程序后插入相应的测试数据:
insert into items (id,counts,name) values('1',10,'三国演义');
insert into items (id,counts,name) values('2',6,'红楼梦');
insert into items (id,counts,name) values('3',2,'西游记');
模拟高并发访问产生的问题:
首先安装好Zookeeper:
在pom.xml中添加:
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.5</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
增加分布式锁代码DistributedLock.java:
package com.huiq.springboot.utils; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.concurrent.CountDownLatch; @Configuration @Slf4j public class DistributedLock { private CuratorFramework client = null; private static final String ZK_LOCK = "huiq-zk-locks"; private static final String DISTRIBUTED_LOCK = "huiq-distributed-lock"; private static CountDownLatch countDownLatch = new CountDownLatch(1); public DistributedLock() { client = CuratorFrameworkFactory.builder() .connectString("192.168.42.132:2181") .sessionTimeoutMs(10000) .retryPolicy(new ExponentialBackoffRetry(1000, 5)) .namespace("zk-namespace") .build(); client.start(); } @Bean public CuratorFramework getClient() { client = client.usingNamespace("zk-namespace"); try { if (client.checkExists().forPath("/" + ZK_LOCK) == null) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE); } addWatch("/" + ZK_LOCK); // 监听 } catch (Exception e) { e.printStackTrace(); } return client; } private void addWatch(String path) throws Exception { PathChildrenCache cache = new PathChildrenCache(client, path, true); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { String path = event.getData().getPath(); if (path.contains(DISTRIBUTED_LOCK)) { countDownLatch.countDown(); } } } }); } // 获得分布式锁 public void getLock() { while (true) { try { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/" + ZK_LOCK + "/" + DISTRIBUTED_LOCK); log.info("分布式锁获得成功。。。。。。"); } catch (Exception e) { // e.printStackTrace(); if (countDownLatch.getCount() <= 0) { countDownLatch = new CountDownLatch(1); } try { countDownLatch.await(); } catch (InterruptedException ex) { ex.printStackTrace(); } } return; } } // 释放分布式锁:订单创建成功或者异常的时候释放锁 public boolean releaseLock() { try { if (client.checkExists().forPath("/" + ZK_LOCK + "/" + DISTRIBUTED_LOCK) != null) { client.delete().forPath("/" + ZK_LOCK + "/" + DISTRIBUTED_LOCK); } } catch (Exception e) { e.printStackTrace(); return false; } log.info("分布式锁释放成功。。。。。。"); return true; } }
修改PayService.java为:
package com.huiq.springboot.service; import com.huiq.springboot.utils.DistributedLock; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @Slf4j public class PayService { @Autowired private OrdersService ordersService; @Autowired private ItemsService itemsService; @Autowired private DistributedLock distributedLock; public boolean buy(String itemId) { distributedLock.getLock(); // 获取锁 // 假设每次购买9个 int buyCount = 9; // 第一步:是否有库存 int count = itemsService.getItemCounts(itemId); if (count < buyCount) { log.error("库存不足,下单失败。购买数{}件,库存只有{}件", buyCount, count); distributedLock.releaseLock(); // 释放锁 return false; } // 第二步:创建订单 boolean flag = ordersService.save(itemId); // 模拟高并发的场景 try { Thread.sleep(5000); } catch (InterruptedException e) { distributedLock.releaseLock(); // 异常的话释放锁 e.printStackTrace(); } // 第三步:减库存 if (flag) { itemsService.reduceCount(itemId, buyCount); distributedLock.releaseLock(); // 成功也释放掉锁 } else { log.error("订单创建失败。。。。。。"); distributedLock.releaseLock(); // 失败了也释放掉锁 return false; } return true; } }
启动程序查看zookeeper创建成功:
测试:
注(Curator简介):
Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助我们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。从编码风格上来讲,它提供了基于Fluent的编程风格支持。
除此之外,Curator还提供了Zookeeper的各种应用场景:Recipe、共享锁服务、Master选举机制和分布式计数器等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。