当前位置:   article > 正文

并发编程(入门) 多线程学习 手写ReentrantLock_手写多线程实现并发和加锁

手写多线程实现并发和加锁

本文学习资料和灵感来自网络,感谢享学课堂13号技师开课吧小师妹、汪文君老师、《Java并发编程实战》[Brian Goetz]

一:概述

随着java技术的成熟和工作年限的增长,现在出去面试言必谈“多线程、高并发”,诸如:多线程和高并发是什么关系?什么是线程安全?锁是什么?举个例子,你在工作中是怎么使用多线程的?你们系统的秒并发数秒TPS数达到了多少?每天不知道有多少人跪倒在了面试官的休闲裤下。

坊间流传一句话,面试造航母,工作拧螺丝。然而现实中,拿着造航母的简历却干不好拧螺丝的工作的人比比皆是,其中也包括我自己。学技术,还是希望自己能够多一点脚踏实地,不仅能拧好螺丝,也能在心中勾画出航母,勾画出明天的自己,CODE A BETTER LIFE。

1.1:名词解释

线程:计算机操作(cpu)的基本调度执行单位

进程:类比现实中的一个任务,操作系统管理的基本单位

异步:同步就是串行,异步就是“同时”执行多个任务,线程可以说是一种异步的实现

性能:某一组操作所需要的消耗:执行时间越短性能越好,消耗资源越少性能越好...

原子性:同一时刻某一组操作,只能由同一个线程完成

可见性:线程对变量的修改会马上同步到主内存,其他线程get()到的都是该线程修改后的值

安全:指某个接口、某个类能正确的实现功能

线程安全:多线程下访问某个类,这个类始终能正确的实现功能。

锁:线程安全问题本质上就是对共享变量读写操作使用同步,锁就是实现这一同步方式的统称

内置锁:synchronize,通过对class头的monitor的持有来锁定对象

死锁:由于加锁时序的原因,多个线程相互持有等待对方释放的锁。死循环、死锁、递归栈溢出,都是程序不可自动修复的严重问题。

重入:将获取锁的粒度细化到“线程”而不是“调用”,所有线程都可以抢锁,类及子类可以获取到被持有的锁(递归调用不会死锁,实现:内置一个锁持有者+计数器,获取锁前先比对是不是类及子类,成功获取一次锁+1,释放一次锁-1,计数器为0则释放该锁)

竞态条件:正确的执行结果取决于运气(多线程交替执行时序),多线程安全说到底就是->操作(读+改+写),改和写依赖于读,在改和写的过程中读的结果失效了(被另外线程写入了其他结果)。并发编程书本中的说明是:基于一种可能失效的观察结果来做出判断或者执行某个计算

状态:状态就是指实例变量,无状态就是说:线程间对象不包含其他类中域的引用,计算过程使用的局部变量且只能由正在执行的线程访问,也就是所谓的两个线程没有共享状态,所以线程安全。大部分servlet都是无状态的作为容器存在的,所以servlet本身是线程安全的。

CAS:compare and swap,比较和交换,绝大数锁的实现机制。读+改+写,在写的时候通过内存的方式(原子)获取当前读的值并和之前读的值比较,如果符合期望(没被修改)则成功写入,如果值被改变,则放弃写入。

AQS:AbstractQueuedSynchronizer,用于构建锁和同步器的框架,常用方法有acquire(),tryAcquire(),compareAndSet()

ReentrantLock:jdk提供的一种cas机制的可重入锁

ThreadLocal:线程封闭方式之一,通过内置的get和set方法为每个线程创建一份只属于自己的独立副本,计算操作不受其他线程影响

FutureTask:获取线程的计算结果或者抛出异常,常用作Callable接口的返回类型

CountDownLatch:阻塞线程,当计数器为零时解除所有等待线程的阻塞。常被用于模拟高并发,和FutureTask结合使用有奇效

1.2:现代计算机模型

说到多线程就不得不讲讲JMM(Java Memory Model)以及现代计算机模型。

1946年的情人节(2月14日),世界上第一台计算机(ENIAC)诞生,美国人为了发展弹道导弹技术制造它用来计算导弹轨迹。原始的计算机是通过纸条打洞来实现计算的,从头到尾只执行一个程序。后来产生了能存储程序和程序控制原理的电脑,有了操作系统,我们可以使用操作系统帮助我们每次运行多个程序(任务/进程),通过异步的方式提高了计算机资源的利用效率。软件的发展往往是附属于硬件的,随着cpu的频率提高到达天花板,多处理器系统的出现和普及,进一步提高了异步处理方式需求,的也促进了相较于进程,更细粒度的线程的发展。

撇开进程线程这些绕口的话题,谈回硬件,社会和科学技术发展的基石就是材料学,我们不知道外星人造出来的电脑是什么样子的,但现在的电脑就是目前人类便于利用的地球上材料量产电脑的最优方案。以前课本上计算机内存和磁盘分别叫RAM/ROM,RAM比ROM快但比较小且断电数据就会消失,ROM比较慢但能相对永久的保存信息(磁化),之所以要把电脑分这么多部件也是人们根据地球上已有材料的特性按照现实的需求摸索来的。

我们都知道,cpu是用于计算,磁盘用于存储我们编写的程序,但cpu远远快于磁盘的,如果他们直接交互相,恐怕cpu不得急死。内存存在的意义就是作为cpu和磁盘之间的桥梁,但cpu仍然远远快于内存,因此现代计算机产生了高速缓存(一级缓存,二级缓存,三级缓存),如下图

有两个线程执行某一段代码,从主内存中读取值+1然后更新主内存,理论上两个线程执行两次就应该是2,可是因为cpu执行时序的问题,导致第一个线程读+改+写的完整操作被打断了,最终呈现了错误的结果1

我们可以理解为,我们读操作是在主内存执行的,cpu运行计算的值是存放在高速缓存中的(可以理解为ThreadLocal),volatile所谓的可见性就是强制每次set到高速缓存的值也通过缓存一致性协议通知给其他高速缓存(同步set到主内存中),最终的写操作是在主内存进行的。而所谓的JMM,虽然是在JVM中执行的,但本质上还是cpu和内存配合运行了我们的指令,虽然有JIT指令重排等特殊情况,但基本原理是一样的。

二:Synchronized简单使用和实现原理,原子类简单使用,volatile使用原则

2.1.实现原理

原子类:如AtomicInteger,也是使用的CAS机制,主要使用了Unsafe类里面的native方法,AQS的实现也主要使用Unsafe类

volatile:volatile的特点是保证可见性和禁止指令重排序,不保证原子性。当出现多线程写入时,会有线程安全问题,因此适合一写多读的业务场景,比如全局的flag字段。

synchronized是jdk的内置锁,也是一种可重入锁,使用非常优雅,直接在对应的方法上使用关键字或者synchronized(this){}代码块就可以。他的主要实现原理是通过jvm监视器的方式,获取对象的头部文件,通过monitorenter获取对象的访问权限,通过monitorexit释放对象的访问权限。

从本质上讲,synchronized是一种悲观锁的实现,早期的synchronized性能较低,在jdk1.6以后,内置锁得到了大量的优化,比如轻量级锁、偏向锁、自旋锁、锁消除等,总体上来说就是减少线程的挂起 和恢复开销

轻量级锁:通过对象头的“Mark Word”的分析,在代码进入同步块的时候,虚拟机建一个Lock Record(一开始是空的,加锁成功则存储被加锁的对象头信息Mark Word)的空间来存储Mark Word的拷贝。然后使用CAS操作让要加锁对象的Mark Work指向Lock Record,如果成功则加锁。失败则判断是不是自己,不是自己则锁被其他线程抢占了。实现思想是,在整个同步期间,绝大部分锁是不存在竞争的,用CAS操作去消除同步的互斥量

偏向锁:在锁无竞争的情况下把整个同步消除掉,锁会偏向于第一个获得到他的线程,在接下来的执行过程中,锁如果没有被其他线程获取,则持有偏向锁的线程不会进行同步。当有另外线程尝试获取这个锁时,偏向模式结束,进入轻量级锁模式。

自旋锁:让后面请求锁的线程进入忙循环(默认自旋10次,“等一下”,而不是挂起),这个时间段内很大可能共享数据的锁定状态已经解除。

锁消除:在编译运行时,通过逃逸分析技术,判断代码堆上的数据不会被其他线程访问到,则去除锁

2.2.synchronize简单使用代码示例如下:

  1. package generator;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.atomic.AtomicInteger;
  5. public class ThreadDemo {
  6. //公共变量
  7. int no = 0;
  8. static AtomicInteger num = new AtomicInteger(0);
  9. //公共方法
  10. public synchronized void add(){
  11. // synchronized (this){
  12. // no++;
  13. // }
  14. no++;
  15. }
  16. public static void increment(){
  17. num.getAndIncrement();
  18. }
  19. //线程类,继承Thread
  20. private static class CountThread extends Thread{
  21. private ThreadDemo threadDemo;
  22. public CountThread(ThreadDemo threadDemo) {
  23. this.threadDemo = threadDemo;
  24. }
  25. @Override
  26. public void run() {
  27. for (int i = 0; i <10000 ; i++) {
  28. threadDemo.add();
  29. increment();
  30. }
  31. }
  32. }
  33. //线程类,实现runnable
  34. private static class CountRunnable implements Runnable{
  35. private ThreadDemo threadDemo;
  36. public CountRunnable(ThreadDemo threadDemo) {
  37. this.threadDemo = threadDemo;
  38. }
  39. @Override
  40. public void run() {
  41. for (int i = 0; i <10000 ; i++) {
  42. threadDemo.add();
  43. increment();
  44. }
  45. }
  46. }
  47. //测试结果
  48. /**
  49. *Thread将线程执行体和线程绑定在了一起,执行多次需要手工创建多个线程对象。
  50. *Runnable将执行体与线程分开,用线程池负责线程的创建与回收
  51. * @throws InterruptedException
  52. */
  53. public static void main(String[] args) throws InterruptedException {
  54. //创建简单线程池
  55. ExecutorService threads = Executors.newFixedThreadPool(10);
  56. //实例对象,传输公共属性和方法
  57. ThreadDemo td1 = new ThreadDemo();
  58. ThreadDemo td2 = new ThreadDemo();
  59. //线程类
  60. CountThread ct1 = new CountThread(td1);
  61. CountThread ct2 = new CountThread(td1);
  62. CountRunnable cr1 = new CountRunnable(td2);
  63. CountRunnable cr2 = new CountRunnable(td2);
  64. //启动线程
  65. ct1.start();
  66. ct2.start();
  67. threads.submit(cr1);
  68. threads.submit(cr2);
  69. //睡眠当前线程main,等待任务线程执行结束
  70. Thread.sleep(4000);
  71. System.out.println(td1.no+","+td2.no);
  72. System.out.println(ThreadDemo.num);
  73. }
  74. }

执行结果如下:

  1. 20000,20000
  2. 40000

num输出40000是因为添加了static。简单说,却别是:num的内存地址是class单独开辟出的,生命周期同class;而no属于对象,生命周期同创建的对象。

三:ReentrantLock简单使用和手写实现

3.1.ReentrantLock、Condition使用示例

  1. package generator;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.locks.Condition;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. public class ReetrantLockDemo {
  7. private boolean flag = false;
  8. private final int MAX_NO = 5;
  9. ReentrantLock lock = new ReentrantLock();
  10. Condition condition = lock.newCondition();
  11. public void getA(){
  12. try {
  13. lock.lock();
  14. while(flag == true){
  15. condition.await();
  16. }
  17. System.out.print("a");
  18. flag = true;
  19. condition.signal();
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. } finally {
  23. lock.unlock();
  24. }
  25. }
  26. public void getB(){
  27. try {
  28. lock.lock();
  29. while(flag == false){
  30. condition.await();
  31. }
  32. System.out.print("b");
  33. flag = false;
  34. condition.signal();
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. } finally {
  38. lock.unlock();
  39. }
  40. }
  41. private static class Arunnable implements Runnable{
  42. private ReetrantLockDemo reetrantLockDemo;
  43. public Arunnable(ReetrantLockDemo reetrantLockDemo) {
  44. this.reetrantLockDemo = reetrantLockDemo;
  45. }
  46. @Override
  47. public void run() {
  48. for (int i = 0; i <reetrantLockDemo.MAX_NO ; i++) {
  49. reetrantLockDemo.getA();
  50. }
  51. }
  52. }
  53. private static class Brunnable implements Runnable{
  54. private ReetrantLockDemo reetrantLockDemo;
  55. public Brunnable(ReetrantLockDemo reetrantLockDemo) {
  56. this.reetrantLockDemo = reetrantLockDemo;
  57. }
  58. @Override
  59. public void run() {
  60. for (int i = 0; i <reetrantLockDemo.MAX_NO ; i++) {
  61. reetrantLockDemo.getB();
  62. }
  63. }
  64. }
  65. public static void main(String[] args) {
  66. ReetrantLockDemo reetrantLockDemo = new ReetrantLockDemo();
  67. ExecutorService threads = Executors.newFixedThreadPool(10);
  68. Arunnable arunnable = new Arunnable(reetrantLockDemo);
  69. Brunnable brunnable = new Brunnable(reetrantLockDemo);
  70. threads.submit(arunnable);
  71. threads.submit(brunnable);
  72. }
  73. }

执行结果为:ababababab

解释:lock()锁定线程、unlock()释放锁,两个必须成对出现。使用condition需要先通过lock.lock()获得线程(对象监视器),condition.await和signal的是当前线程

3.2.手写实现简单ReentrantLock

我们知道ReentrantLock是一个可重入锁,可重入锁的特点是所有线程都可以竞争锁,只有第一个线程类及其子类能够获取到锁。需要我们内置一个计数器,需要我们内置一个列表来存放等待线程

百度实现1,待测试优化

  1. public class MyLock implements Lock{
  2. Thread lockBy = null;
  3. int lockCount = 0;
  4. //锁标志
  5. private boolean isLocked = false;
  6. /*
  7. * 加锁
  8. * (non-Javadoc)
  9. * @see java.util.concurrent.locks.Lock#lock()
  10. */
  11. @Override
  12. public synchronized void lock() {
  13. Thread currentThread = Thread.currentThread();
  14. //如果不是第一个进来,则在这儿等待
  15. while (isLocked && currentThread != lockBy) {
  16. // ...阻塞到这儿,都等待,但是第一个进来不让让等待
  17. try {
  18. wait();
  19. } catch (InterruptedException e) {
  20. // TODO Auto-generated catch block
  21. e.printStackTrace();
  22. }
  23. }
  24. //如果是第一个进来,将其变为false
  25. isLocked = true;
  26. //将lockBy指向当前线程
  27. lockBy = currentThread;
  28. //计数器自增
  29. lockCount ++;
  30. }
  31. /*
  32. * 释放锁
  33. * (non-Javadoc)
  34. * @see java.util.concurrent.locks.Lock#lockInterruptibly()
  35. */
  36. @Override
  37. public synchronized void unlock() {
  38. //如果当前线程等于lockBy
  39. if (lockBy == Thread.currentThread()) {
  40. lockCount --;
  41. if (lockCount == 0) {
  42. notify();
  43. //释放锁
  44. isLocked =false;
  45. }
  46. }
  47. }
  48. @Override
  49. public void lockInterruptibly() throws InterruptedException {
  50. }
  51. @Override
  52. public boolean tryLock() {
  53. return false;
  54. }
  55. @Override
  56. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  57. return false;
  58. }
  59. @Override
  60. public Condition newCondition() {
  61. return null;
  62. }
  63. }

ReentrantLock手写实现2

  1. package generator;
  2. import java.util.concurrent.LinkedBlockingDeque;
  3. import java.util.concurrent.TimeUnit;
  4. import java.util.concurrent.atomic.AtomicReference;
  5. import java.util.concurrent.locks.Condition;
  6. import java.util.concurrent.locks.Lock;
  7. import java.util.concurrent.locks.LockSupport;
  8. public class CassellReentrantLock implements Lock {
  9. //原子特殊线程
  10. AtomicReference<Thread> owner = new AtomicReference<>();
  11. //等待列表,存放没有抢到锁的线程
  12. LinkedBlockingDeque<Thread> waiter = new LinkedBlockingDeque<>();
  13. @Override
  14. public void lock() {
  15. //比较旧值null,如一致则替换为新值Thread.currentThread()
  16. while(!owner.compareAndSet(null,Thread.currentThread())){
  17. //没有抢到,进入等待列表
  18. waiter.add(Thread.currentThread());
  19. //等待,类似wait
  20. LockSupport.park();
  21. //线程被唤醒,从等待列表中移除该线程
  22. waiter.remove(Thread.currentThread());
  23. }
  24. }
  25. @Override
  26. public void unlock() {
  27. //比较旧值Thread.currentThread(),如一致则替换为新值null
  28. if(owner.compareAndSet(Thread.currentThread(),null)){
  29. //遍历等待线程列表,唤醒
  30. Object[] objects = waiter.toArray();
  31. for (Object object : objects) {
  32. Thread next = (Thread)object;
  33. LockSupport.unpark(next);
  34. }
  35. }
  36. }
  37. @Override
  38. public void lockInterruptibly() {
  39. }
  40. @Override
  41. public boolean tryLock() {
  42. return false;
  43. }
  44. @Override
  45. public boolean tryLock(long time, TimeUnit unit) {
  46. return false;
  47. }
  48. @Override
  49. public Condition newCondition() {
  50. return null;
  51. }
  52. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/271694?site
推荐阅读
相关标签
  

闽ICP备14008679号