赞
踩
一、背景
最近网易云课堂看到一个视频,学习后整理了一下,作为笔记与备忘录。
二、单体Web服务并发问题
1.并发产生的问题
一台Tomcat 部署了一个web服务,当用户访问量非常大的时候,创建订单服务生成的订单编号由于并发问题会存在重复的情况,单体应用在应对多并发的情景使用的是Java锁的机制,代码示例:
OrderService 订单服务类
- package com.demo.order;
-
- public interface OrderService {
-
- void createOrder();
-
- }
OrderCodeGenerator 订单编号生成类
- package com.demo.order;
-
- import java.text.SimpleDateFormat;
- import java.util.Date;
-
- public class OrderCodeGenerator {
-
- private int i = 0;
-
- public String getOrderCode() {
- Date now = new Date();
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss-");
- return sdf.format(now) + ++i;
- }
-
- }
OrderServiceImpl 订单实现类
- package com.demo.order.impl;
-
- import com.demo.order.OrderCodeGenerator;
- import com.demo.order.OrderService;
-
- public class OrderServiceImpl implements OrderService {
-
- private OrderCodeGenerator orderCodeGenerator = new OrderCodeGenerator();
-
- @Override
- public void createOrder() {
-
- String orderCode = orderCodeGenerator.getOrderCode();
- System.out.println(Thread.currentThread().getName() + "*******************" + orderCode);
-
- }
- }
ConcurrentTestDemo 并发测试类
- package com.demo;
-
- import com.demo.order.OrderService;
- import com.demo.order.impl.OrderServiceImpl;
-
- import java.util.concurrent.CyclicBarrier;
-
- public class ConcurrentTestDemo {
-
- public static void main(String[] args) {
-
- int currency = 20; //并发数
-
- CyclicBarrier cb = new CyclicBarrier(currency);//并发栅栏,通过此类模拟并发情况
-
- OrderService orderService = new OrderServiceImpl();
-
- for (int i = 0; i < currency; i++) {
- new Thread(() -> {
- System.out.println(Thread.currentThread().getName() + "---------------我准备好-----------");
-
- try {
- cb.await();//达到并发数后一起请求
- } catch (Exception e) {
- e.printStackTrace();
- }
- //调用创建订单服务
- orderService.createOrder();
- }).start();
- }
-
-
- }
-
- }
通过以上几个类可以模拟单体应用下,多并发生成订单的情况,运行几次会出现订单编号重复的情况
2.单体应用并发问题的解决
通过Java锁机制可以防止此问题,另外实现OrderService服务代码改进,如下
OrderServiceImplWithLock 带同步锁机制的创建订单类
- package com.demo.order.impl;
-
- import com.demo.order.OrderCodeGenerator;
- import com.demo.order.OrderService;
-
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
-
- public class OrderServiceImplWithLock implements OrderService {
-
- private static OrderCodeGenerator ocg = new OrderCodeGenerator();
-
- private Lock lock = new ReentrantLock();
-
-
- @Override
- public void createOrder() {
- String orderCode = null;
- try{
- lock.lock();
- orderCode = ocg.getOrderCode();
- }finally {
- lock.unlock();
- }
- System.out.println(Thread.currentThread().getName() + "***********************>>" + orderCode);
-
- }
- }
替换 OrderService 实现类重新运行 ConcurrentTestDemo main 访问解决解决编号重复问题
三、分布式锁运用场景
1.分布式服务并发问题
单体应用无法支撑请求时,会引入分布式服务器集群的形式,此时创建订单的服务在每个Tomcat单独运行时就会存在问题,通过
ConcurrentTestDistributeDemo 分布式多并发测试类
- package com.demo;
-
- import com.demo.order.OrderService;
- import com.demo.order.impl.OrderServiceImpl;
-
- import java.util.concurrent.CyclicBarrier;
-
- public class ConcurrentTestDistributeDemo {
-
-
- public static void main(String[] args) {
- //并发数
- int currency = 50;
-
- CyclicBarrier cb = new CyclicBarrier(currency);
-
-
- for (int i = 0; i < currency; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- //模拟分布式集群的场景
- OrderService orderService = new OrderServiceImpl();
-
- System.out.println(Thread.currentThread().getName() + "---------------我准备好-----------");
- //等待一起除发
- try {
- cb.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- //调用创建订单服务
- orderService.createOrder();
- }
- }).start();
- }
-
-
- }
-
- }
通过调整 OrderService orderService = new OrderServiceImpl(); 类的创建位置来模拟多Tomcat 服务创建订单的情况,运行结果如下:
通过运行结果发现编号尾数都变成了1 ,因为对于每个Tomcat 来说,计数器都是从1开始,所以会出现多个相同的编号,这是每个Tomcat 部署的单体应用只能限制当前Tomcat下的单体应用单号不重复,不能保证所有Tomcat 下的编号不重复,这是就需要引入分布式锁机制
2.分布式锁应用
为了保证分布式下应用订单编号的唯一性,就需要在分布式系统下对生成订单的编号引入分布式锁机制,根据不同情况可以选择不同的架构,可以采用数据库行锁、缓存redis SETNX方式或Zookeeper方式
此处采用Zookeeper 的方式来实现,Zookeeper 具有节点唯一的特性,并且通过监听节点的特性可以实现分布式锁的机制,需要配置Zookeeper 服务为了模拟采用单机形式 ,下载链接: https://pan.baidu.com/s/1VRWQzy2tWQoLnp9WgVpGMg 提取码: rgwd
zookeeper-3.4.14\conf 下 zoo_sample.cfg 复制一份重命名为 zoo.cfg 修改两个属性
dataDir=D:/Soft/zkdata/data
dataLogDir=D:/Soft/zkdata/log
修改后启动 zookeeper-3.4.14\bin 中 zkServer.cmd 文件
重新实现 OrderService 订单服务接口
OrderServiceImpWithDisLock 分布式订单生成类
- package com.demo.order.impl;
-
- import com.demo.order.OrderCodeGenerator;
- import com.demo.order.OrderService;
- import com.demo.zk.ZKDistributeImproveLock;
-
- import java.util.concurrent.locks.Lock;
-
- public class OrderServiceImpWithDisLock implements OrderService {
-
- private static OrderCodeGenerator ocg = new OrderCodeGenerator();
-
- @Override
- public void createOrder() {
- String orderCode = null;
- //分布式锁
- Lock lock = new ZKDistributeImproveLock("/allen666");
- try {
- lock.lock();
- orderCode = ocg.getOrderCode();
- } finally {
- lock.unlock();
- }
-
-
- System.out.println(Thread.currentThread().getName() + "****************" + orderCode);
- }
- }
ZKDistributeImproveLock 分布式锁实现类
- package com.demo.zk;
-
- import org.I0Itec.zkclient.IZkDataListener;
- import org.I0Itec.zkclient.ZkClient;
- import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
-
- /***
- * 此类是 ZKDistributeLock 类的改进,根据zookeeper的有序节点的创建,当前抢到锁的节点寻找
- */
- public class ZKDistributeImproveLock implements Lock {
-
- public static final String ZK_SERVICE = "localhost:2181";
-
-
- private String lockPath;
-
- private ZkClient client;
-
- private String currentPath;
-
- private String beforePath;
-
-
- public ZKDistributeImproveLock(String lockPath) {
- super();
- this.lockPath = lockPath;
- client = new ZkClient(ZK_SERVICE);
- client.setZkSerializer(new MyZkSerializer());
- if (!this.client.exists(lockPath)) {
- try {
- this.client.createPersistent(lockPath);//创建节点
- } catch (ZkNodeExistsException e) {
- }
- }
- }
-
-
- /***
- * 没有抢到锁,自己阻塞,
- * 直到订单服务中调用unlock方法释放掉锁,
- * 会通过handleDataDeleted 监听唤醒阻塞,
- * 继续执行下面递归
- */
- @Override
- public void lock() {
- if (!tryLock()) {//尝试获取锁
- waitForLock();
- lock();
- }
- }
-
- private void waitForLock() {
- CountDownLatch cdl = new CountDownLatch(1);
- IZkDataListener listener = new IZkDataListener() {
- @Override
- public void handleDataDeleted(String dataPaht) throws Exception {
- System.out.println("-----------------收到节点被删除----------------");
- cdl.countDown();//释放阻塞
- }
-
- @Override
- public void handleDataChange(String dataPaht, Object data) {
-
- }
- };
-
- client.subscribeDataChanges(beforePath, listener);//监听节点删除状态
- if (this.client.exists(beforePath)) {
- try {
- cdl.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- //取消注册
- client.unsubscribeDataChanges(beforePath, listener);
-
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
-
- }
-
- /***
- * 同一时刻尝试获取锁的方法,只会有currentPath当前节点与所有节点中做小的一个匹配
- * 不匹配的当前节点后面一个结点则会被监听并且会阻塞,直到当前节点被释放后其他结点继续强锁
- * @return
- */
- @Override
- public boolean tryLock() {
- if (this.currentPath == null) {//当前节点为空,则创建临时有序节点
- currentPath = this.client.createEphemeralSequential(lockPath + "/", "aaa");
- }
- //获取所有的子节点
- List<String> children = this.client.getChildren(lockPath);
- Collections.sort(children);//排序节点
- if (currentPath.equals(lockPath + "/" + children.get(0))) {//判断是否第一个节点
- return true;
- } else {
- //渠道前一个
- //得到字节的索引号
- int curIndex = children.indexOf(currentPath.substring(lockPath.length() + 1));
- beforePath = lockPath + "/" + children.get(curIndex - 1);
- }
- return false;
- }
-
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- return false;
- }
-
- @Override
- public void unlock() {
- client.delete(currentPath);
- }
-
- @Override
- public Condition newCondition() {
- return null;
- }
- }
MyZkSerializer 序列化类
- package com.demo.zk;
-
- import org.I0Itec.zkclient.exception.ZkMarshallingError;
- import org.I0Itec.zkclient.serialize.ZkSerializer;
-
- import java.io.UnsupportedEncodingException;
-
- public class MyZkSerializer implements ZkSerializer {
-
- String charset = "UTF-8";
-
- @Override
- public byte[] serialize(Object obj) throws ZkMarshallingError {
- try {
- return String.valueOf(obj).getBytes(charset);
- } catch (UnsupportedEncodingException e) {
- throw new ZkMarshallingError(e);
- }
-
- }
-
- @Override
- public Object deserialize(byte[] bytes) throws ZkMarshallingError {
- try {
- return new String(bytes, charset);
- } catch (UnsupportedEncodingException e) {
- throw new ZkMarshallingError(e);
- }
- }
- }
调整 ConcurrentTestDistributeDemo 类中 OrderService 的实现类 OrderService orderService = new OrderServiceImpWithDisLock() 重新运行 ConcurrentTestDistributeDemo main 方法运行结果
可以看使用分布式锁之后保证了订单编号的同步不会产生重复,单体应用下同步锁升级到分布式架构之后锁机制都会产生问题
最后附上源码地址 https://github.com/zhijun0808/ConcurrentDemo.git
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。