当前位置:   article > 正文

分布式锁学习笔记-猫_client.unsubscribedatachanges方法

client.unsubscribedatachanges方法

一、背景

最近网易云课堂看到一个视频,学习后整理了一下,作为笔记与备忘录。

二、单体Web服务并发问题

1.并发产生的问题

一台Tomcat 部署了一个web服务,当用户访问量非常大的时候,创建订单服务生成的订单编号由于并发问题会存在重复的情况,单体应用在应对多并发的情景使用的是Java锁的机制,代码示例:

OrderService 订单服务类

  1. package com.demo.order;
  2. public interface OrderService {
  3. void createOrder();
  4. }

OrderCodeGenerator 订单编号生成类

  1. package com.demo.order;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. public class OrderCodeGenerator {
  5. private int i = 0;
  6. public String getOrderCode() {
  7. Date now = new Date();
  8. SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss-");
  9. return sdf.format(now) + ++i;
  10. }
  11. }

OrderServiceImpl 订单实现类

  1. package com.demo.order.impl;
  2. import com.demo.order.OrderCodeGenerator;
  3. import com.demo.order.OrderService;
  4. public class OrderServiceImpl implements OrderService {
  5. private OrderCodeGenerator orderCodeGenerator = new OrderCodeGenerator();
  6. @Override
  7. public void createOrder() {
  8. String orderCode = orderCodeGenerator.getOrderCode();
  9. System.out.println(Thread.currentThread().getName() + "*******************" + orderCode);
  10. }
  11. }

ConcurrentTestDemo 并发测试类

  1. package com.demo;
  2. import com.demo.order.OrderService;
  3. import com.demo.order.impl.OrderServiceImpl;
  4. import java.util.concurrent.CyclicBarrier;
  5. public class ConcurrentTestDemo {
  6. public static void main(String[] args) {
  7. int currency = 20; //并发数
  8. CyclicBarrier cb = new CyclicBarrier(currency);//并发栅栏,通过此类模拟并发情况
  9. OrderService orderService = new OrderServiceImpl();
  10. for (int i = 0; i < currency; i++) {
  11. new Thread(() -> {
  12. System.out.println(Thread.currentThread().getName() + "---------------我准备好-----------");
  13. try {
  14. cb.await();//达到并发数后一起请求
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. //调用创建订单服务
  19. orderService.createOrder();
  20. }).start();
  21. }
  22. }
  23. }

通过以上几个类可以模拟单体应用下,多并发生成订单的情况,运行几次会出现订单编号重复的情况

2.单体应用并发问题的解决

通过Java锁机制可以防止此问题,另外实现OrderService服务代码改进,如下

OrderServiceImplWithLock 带同步锁机制的创建订单类

  1. package com.demo.order.impl;
  2. import com.demo.order.OrderCodeGenerator;
  3. import com.demo.order.OrderService;
  4. import java.util.concurrent.locks.Lock;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. public class OrderServiceImplWithLock implements OrderService {
  7. private static OrderCodeGenerator ocg = new OrderCodeGenerator();
  8. private Lock lock = new ReentrantLock();
  9. @Override
  10. public void createOrder() {
  11. String orderCode = null;
  12. try{
  13. lock.lock();
  14. orderCode = ocg.getOrderCode();
  15. }finally {
  16. lock.unlock();
  17. }
  18. System.out.println(Thread.currentThread().getName() + "***********************>>" + orderCode);
  19. }
  20. }

替换 OrderService 实现类重新运行 ConcurrentTestDemo main 访问解决解决编号重复问题

三、分布式锁运用场景

1.分布式服务并发问题

单体应用无法支撑请求时,会引入分布式服务器集群的形式,此时创建订单的服务在每个Tomcat单独运行时就会存在问题,通过

ConcurrentTestDistributeDemo 分布式多并发测试类

  1. package com.demo;
  2. import com.demo.order.OrderService;
  3. import com.demo.order.impl.OrderServiceImpl;
  4. import java.util.concurrent.CyclicBarrier;
  5. public class ConcurrentTestDistributeDemo {
  6. public static void main(String[] args) {
  7. //并发数
  8. int currency = 50;
  9. CyclicBarrier cb = new CyclicBarrier(currency);
  10. for (int i = 0; i < currency; i++) {
  11. new Thread(new Runnable() {
  12. @Override
  13. public void run() {
  14. //模拟分布式集群的场景
  15. OrderService orderService = new OrderServiceImpl();
  16. System.out.println(Thread.currentThread().getName() + "---------------我准备好-----------");
  17. //等待一起除发
  18. try {
  19. cb.await();
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. //调用创建订单服务
  24. orderService.createOrder();
  25. }
  26. }).start();
  27. }
  28. }
  29. }

通过调整 OrderService orderService = new OrderServiceImpl(); 类的创建位置来模拟多Tomcat 服务创建订单的情况,运行结果如下:

通过运行结果发现编号尾数都变成了1 ,因为对于每个Tomcat 来说,计数器都是从1开始,所以会出现多个相同的编号,这是每个Tomcat 部署的单体应用只能限制当前Tomcat下的单体应用单号不重复,不能保证所有Tomcat 下的编号不重复,这是就需要引入分布式锁机制

2.分布式锁应用

为了保证分布式下应用订单编号的唯一性,就需要在分布式系统下对生成订单的编号引入分布式锁机制,根据不同情况可以选择不同的架构,可以采用数据库行锁、缓存redis SETNX方式或Zookeeper方式

此处采用Zookeeper 的方式来实现,Zookeeper 具有节点唯一的特性,并且通过监听节点的特性可以实现分布式锁的机制,需要配置Zookeeper 服务为了模拟采用单机形式 ,下载链接: https://pan.baidu.com/s/1VRWQzy2tWQoLnp9WgVpGMg 提取码: rgwd 

zookeeper-3.4.14\conf 下 zoo_sample.cfg 复制一份重命名为 zoo.cfg 修改两个属性

dataDir=D:/Soft/zkdata/data

dataLogDir=D:/Soft/zkdata/log

修改后启动 zookeeper-3.4.14\bin 中 zkServer.cmd 文件

重新实现 OrderService 订单服务接口 

OrderServiceImpWithDisLock 分布式订单生成类

  1. package com.demo.order.impl;
  2. import com.demo.order.OrderCodeGenerator;
  3. import com.demo.order.OrderService;
  4. import com.demo.zk.ZKDistributeImproveLock;
  5. import java.util.concurrent.locks.Lock;
  6. public class OrderServiceImpWithDisLock implements OrderService {
  7. private static OrderCodeGenerator ocg = new OrderCodeGenerator();
  8. @Override
  9. public void createOrder() {
  10. String orderCode = null;
  11. //分布式锁
  12. Lock lock = new ZKDistributeImproveLock("/allen666");
  13. try {
  14. lock.lock();
  15. orderCode = ocg.getOrderCode();
  16. } finally {
  17. lock.unlock();
  18. }
  19. System.out.println(Thread.currentThread().getName() + "****************" + orderCode);
  20. }
  21. }

ZKDistributeImproveLock 分布式锁实现类

  1. package com.demo.zk;
  2. import org.I0Itec.zkclient.IZkDataListener;
  3. import org.I0Itec.zkclient.ZkClient;
  4. import org.I0Itec.zkclient.exception.ZkNodeExistsException;
  5. import java.util.Collections;
  6. import java.util.List;
  7. import java.util.concurrent.CountDownLatch;
  8. import java.util.concurrent.TimeUnit;
  9. import java.util.concurrent.locks.Condition;
  10. import java.util.concurrent.locks.Lock;
  11. /***
  12. * 此类是 ZKDistributeLock 类的改进,根据zookeeper的有序节点的创建,当前抢到锁的节点寻找
  13. */
  14. public class ZKDistributeImproveLock implements Lock {
  15. public static final String ZK_SERVICE = "localhost:2181";
  16. private String lockPath;
  17. private ZkClient client;
  18. private String currentPath;
  19. private String beforePath;
  20. public ZKDistributeImproveLock(String lockPath) {
  21. super();
  22. this.lockPath = lockPath;
  23. client = new ZkClient(ZK_SERVICE);
  24. client.setZkSerializer(new MyZkSerializer());
  25. if (!this.client.exists(lockPath)) {
  26. try {
  27. this.client.createPersistent(lockPath);//创建节点
  28. } catch (ZkNodeExistsException e) {
  29. }
  30. }
  31. }
  32. /***
  33. * 没有抢到锁,自己阻塞,
  34. * 直到订单服务中调用unlock方法释放掉锁,
  35. * 会通过handleDataDeleted 监听唤醒阻塞,
  36. * 继续执行下面递归
  37. */
  38. @Override
  39. public void lock() {
  40. if (!tryLock()) {//尝试获取锁
  41. waitForLock();
  42. lock();
  43. }
  44. }
  45. private void waitForLock() {
  46. CountDownLatch cdl = new CountDownLatch(1);
  47. IZkDataListener listener = new IZkDataListener() {
  48. @Override
  49. public void handleDataDeleted(String dataPaht) throws Exception {
  50. System.out.println("-----------------收到节点被删除----------------");
  51. cdl.countDown();//释放阻塞
  52. }
  53. @Override
  54. public void handleDataChange(String dataPaht, Object data) {
  55. }
  56. };
  57. client.subscribeDataChanges(beforePath, listener);//监听节点删除状态
  58. if (this.client.exists(beforePath)) {
  59. try {
  60. cdl.await();
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. //取消注册
  66. client.unsubscribeDataChanges(beforePath, listener);
  67. }
  68. @Override
  69. public void lockInterruptibly() throws InterruptedException {
  70. }
  71. /***
  72. * 同一时刻尝试获取锁的方法,只会有currentPath当前节点与所有节点中做小的一个匹配
  73. * 不匹配的当前节点后面一个结点则会被监听并且会阻塞,直到当前节点被释放后其他结点继续强锁
  74. * @return
  75. */
  76. @Override
  77. public boolean tryLock() {
  78. if (this.currentPath == null) {//当前节点为空,则创建临时有序节点
  79. currentPath = this.client.createEphemeralSequential(lockPath + "/", "aaa");
  80. }
  81. //获取所有的子节点
  82. List<String> children = this.client.getChildren(lockPath);
  83. Collections.sort(children);//排序节点
  84. if (currentPath.equals(lockPath + "/" + children.get(0))) {//判断是否第一个节点
  85. return true;
  86. } else {
  87. //渠道前一个
  88. //得到字节的索引号
  89. int curIndex = children.indexOf(currentPath.substring(lockPath.length() + 1));
  90. beforePath = lockPath + "/" + children.get(curIndex - 1);
  91. }
  92. return false;
  93. }
  94. @Override
  95. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  96. return false;
  97. }
  98. @Override
  99. public void unlock() {
  100. client.delete(currentPath);
  101. }
  102. @Override
  103. public Condition newCondition() {
  104. return null;
  105. }
  106. }

MyZkSerializer 序列化类

  1. package com.demo.zk;
  2. import org.I0Itec.zkclient.exception.ZkMarshallingError;
  3. import org.I0Itec.zkclient.serialize.ZkSerializer;
  4. import java.io.UnsupportedEncodingException;
  5. public class MyZkSerializer implements ZkSerializer {
  6. String charset = "UTF-8";
  7. @Override
  8. public byte[] serialize(Object obj) throws ZkMarshallingError {
  9. try {
  10. return String.valueOf(obj).getBytes(charset);
  11. } catch (UnsupportedEncodingException e) {
  12. throw new ZkMarshallingError(e);
  13. }
  14. }
  15. @Override
  16. public Object deserialize(byte[] bytes) throws ZkMarshallingError {
  17. try {
  18. return new String(bytes, charset);
  19. } catch (UnsupportedEncodingException e) {
  20. throw new ZkMarshallingError(e);
  21. }
  22. }
  23. }

调整 ConcurrentTestDistributeDemo 类中 OrderService  的实现类 OrderService orderService = new OrderServiceImpWithDisLock() 重新运行 ConcurrentTestDistributeDemo main  方法运行结果

可以看使用分布式锁之后保证了订单编号的同步不会产生重复,单体应用下同步锁升级到分布式架构之后锁机制都会产生问题

最后附上源码地址  https://github.com/zhijun0808/ConcurrentDemo.git

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/954354
推荐阅读
相关标签
  

闽ICP备14008679号