赞
踩
某一天,李四同学去某一家互联网公司面试。
面试官:李同学,你知道Java有哪一个是使用java写的锁并且和sychronized性能差不多?
李同学:…
面试官:那你知道什么是CAS吗
李同学:…
面试官:出门右转是电梯自己走吧!
其实,在我们Java中也有一个使用了Java语言开发的一把锁,名字叫ReentrantLock。它位于我们的java.uilt.concurrent包下的一个线程安全的工具类。
我们大名鼎鼎的JUC包就是由这位图中的老爷爷写的,ReentrantLock是JUC包的其中一个。那么它是怎么来的呢?
其实在JDK1.6之前我们的Synchronized是一个重量锁,为什么这么说呢?因为在它还没有被优化之前它实际上是通过底层操作系统保证我们的线程安全的。我在讲volatile的时候有提到过内核态和用户态的切换。也就是说在并发并不是特别高的情况下频繁的进行用户态内核态切换是很耗费我们系统资源的,所以doug lea就为我们Java编写了ReentrantLock。我们先来了解一下ReentrantLock的特性吧。
简单的说一下上面4个分别是个什么东西
1.互斥锁:所谓互斥锁就是指当我线程1拿到这把锁之后,其他的线程都没有办法进行操作。只能等待线程1操作完毕释放锁后才可以进行下一步的操作
2 公平锁:公平锁用通俗一点的话来解释就是,比如你现在要去食堂吃饭,人很多。但是一窗口一次只能有一个人打饭。那么后面来的人不管第一个人有没有打完饭都得去后面乖乖排队,等待第二个、第三个、第四个…一直按顺序进行下去。也就是所谓的先进先出。
3.非公平锁:这个也很好理解,我们把公平锁的例子拿过来。非公平锁的意思就是当我第一个人打完饭了之后后面来的人可以插队,不管后面有没有人在排队,我都可以直接跟食堂阿姨打饭。
4.可重入锁:这个就比较抽象了,我用一段代码表示一下
ReentrantLock lock = new ReentrantLock(); public void a(){ lock.lock(); try { b(); }finally { lock.unlock(); } } public void b(){ lock.lock(); try { }finally { lock.unlock(); } }
假设说我现在有一个线程要执行A方法,首先他会通过lock.lock()方法拿到这把锁,紧接着执行b方法。但是b方法里也有一个相同的lock.lock。那么既然我是同一个线程拿这个锁,那么我就允许你进行一次重入操作,这个就是所谓的可重入性。
其实在刚刚我的代码里就已经展示出来了ReentrantLock的用法。首先我们要先new一个ReentrantLock对象。他的构造方法里传入一个boolean类型的值。当你为true的时候就是创建的一个公平锁,如果为false就是一把非公平锁,当然你不填写他默认的就是公平锁。根据阿里巴巴的代码规范,我们再使用ReentrantLock的时候需要使用try finally代码块来配合。因为ReentrantLock和Synchronized不一样,他需要我们自己手动去释放锁,所以为了避免锁无法被释放,我们需要在finally代码块里写lock.unlock方法保证我们的锁一定会被释放。
首先,我们的ReentrantLock实现了AbstractQueueSynchronize这个抽象类的抽象方法。AbstractQueueSynchronize也就是我们常说的AQS,我们很多JUC的工具类都是基于AQS去实现的。
什么是AQS?AQS其实就是一个并发编程的框架,它里面实现了许多的方法并定义了一些需要我们自己实现的一些抽象方法供我们自己编写。AQS整体使用的就是我们的CAS+自旋来保证的线程安全。
首先AQS内部实现了一个叫CLH队列,这个队列是一个双向队列,CLH名称取决于三个人名字的缩写。因为CLH队列是这三个人发明的。我们先来看一下CLH队列的模样
数据结构的基础我就不在这里多说了,其实知道双向链表的同学一看这个图就懂了。我们先来说说第一个核心 CAS
什么是CAS?CAS就是Compare and swap或者Compare and Exchange。翻译成中文就是比较并交换
什么意思呢?假设说我现在有一个int i=0这样一个变量,当我有一个线程想要给i = 0重新赋值比如给他一个1,那么再并发场景下是不是会有可能这个i = 0已经被另一个线程修改成了 i = 2。所以我们要比对一下先将我线程内部读取到的0 和 i 比较,如果判断int i和我读取到的值是一样的,那么就进行值的修改,把i = 0修改成i = 1,反之则不做修改。我们简单的看一下Java中的CAS操作是如何实现的:
首先我们先找到Unsafe类,这是一个魔法类,顾名思义就是不安全的类,里面定义了很多不安全的操作。我们找到这个
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
发现他是一个用native修饰的一个类,所以我们必须通过C++代码来查看他是如何实现的了。
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
这个函数位于Unsafe.cpp文件下,我们可以看到它最终调用了Atomic下的cmpxchg方法。这个方法实际上最终就会使用我们的汇编指令cmpxchg来实现CAS操作。
我们再回到JAVA中,我们看
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
首先他返回的是一个boolean类型,所以当存在并发问题的时候,我们再CAS操作失败的时候就会返回一个false,但是我们这一次的操作不可能放弃对吧,如果放弃了那这个数据就丢失了,我们来进行线程安全的操作意义也就不是很大了,所以我们要上另一个法宝来配合CAS:自旋
所谓自旋就是通过while(true)或者for(;;)这样的方式来循环进行cas操作,直到返回为true,则表示交换成功,则跳出循环,否则会一只在里面进行循环cas操作。
在刚刚提到AQS的时候我有稍微介绍了CLH队列,那么CLH队列实际上就是存放我们线程节点的,因为我们后面排队的线程不可能丢弃,必须要在队列里面进行排队,等待第一个线程释放锁之后后面的线程再获取锁。同样CLH队列为了保证它的线程安全,再AQS中同样是使用了CAS+自旋的方式实现的。简单的给大家看一下源码
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这个是他的入队操作,我们可以看到compareAndSetHead和compareAndSetTail方法,并且用for(;;)包裹在外面。
我刚刚已经说了ReentrantLock是一个可重入锁,并且也是一个互斥锁。那么他们两种特性我该如何去实现呢?这里就需要用到了state来进行一个计数操作,当我一个线程拿到锁了之后state+1,如果释放则-1。直到state为0的时候我们就知道锁已经完全被释放,那么我们就可以通知其他线程拿到这把锁进行操作了。当然在线程进行锁竞争的时候也会有安全问题,所以state我们童谣需要使用cas操作来保证并发安全,但是这里我们就不一定需要自旋了,因为我后面操作失败的线程可以加入到我们的CLH队列当中,一直等着就好了。我们也可以自旋一定的次数,如果没有拿到这把锁我们再加入CLH阻塞队列当中,这两种都是可行的,当然后一种效率在某种程度上来讲效率会稍微高一些。
LockSupport这个类可以实现我们线程的唤醒和阻塞操作,同样它位于JUC包下,是
一个工具类。
当我们想要阻塞一个线程的时候我们可以调用它的park方法。例如
LockSupport.park(Thread.currentThread());
而我们想要唤醒它则调用
LockSupport.unpark(Thread.currentThread());
既可。
在经过上面4个核心思想介绍了之后,我们就按照逻辑来手写实现一个ReentrantLock。
首先我们创建一个类MyReentrantLock类,定义好我们需要的属性:
/**
* 使用ConcurrentLinkedQueue来模拟CLH双向队列
*/
public final ConcurrentLinkedQueue threadQueue = new ConcurrentLinkedQueue();
/**
* 计数器
*/
public volatile int state;
/**
* 用于记录拿到锁的线程
*/
private static Thread executorThread;
同样因为Unsafe类我们不可以直接调用,所以要用反射才能获取到unsafe类对象。
我们直接通过静态代码块的形式来获取
private static Unsafe unsafe;
static {
try {
Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafeInstance.setAccessible(true);
unsafe = (Unsafe) theUnsafeInstance.get(null);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}
有了上面这些之后我们就可以编写第一个方法了,首先我说了我们state操作的时候会存在线程竞争问题,所以我们的state要使用CAS操作。
public final boolean compareAndSwapSetState(int expect, int value) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, value);
}
我们调用了unsafe.compareAndSwapInt方法,里面需要传入四个值。首先是当前对象,第二个需要进行cas操作的当前对象里的属性的内存偏移量,第三个是期望值,第四个是交换后的值。可能大家现在还不清楚前两个是啥,我们先这样写。在后面的文章中我会对Unsafe类做一个详细介绍。那么我们怎么获取到偏移量呢?同样我们再静态代码块中加一句话和一个属性
stateOffset = unsafe.objectFieldOffset(MyReentrantLock.class.getDeclaredField("state"));
/**
* state偏移量
*/
private static long stateOffset;
有了这些,就相当于有了灵魂,但是灵魂需要一个躯体去承载它,我们开始编写我们的方法。首先我们需要编写的主要两个方法就是 lock和unlock。那么我们先定义好
public void lock(){
}
public void unlock(){
}
我们先来编写第一个方法lock。首先我们思考一下,我们拿到锁的条件是不是当state = 0的时候,就代表了我们的锁已经被释放了,但是这里又存在线程并发问题,所以这里我们要使用cas操作去判断,所以第一句代码应该是判断当前是否可以获取锁,由于我们要记录当前线程是哪个,所以同样我们也要先获取当前线程
//获取当前线程
final Thread thread = Thread.currentThread();
//判断CAS操作是否成功如果成功则执行拿锁并记录当前线程
if (compareAndSwapSetState(0, 1)) {
setExecutorThread(thread);
}
当cas返回false的时候,就说明现在的锁已经被其他线程拿到了,那么就要执行else语句。但是因为我们要保证锁的可重入性,所以我们先判断进入else语句的是不是拿到锁的线程,如果是则state +1 。
else if (executorThread == thread){
state++;
}
如果这两个条件都失败,那就说明是其他线程现在在抢这把锁,所以我们需要把其他线程丢入到阻塞队列当中,等待当前线程释放锁了之后再执行操作。所以这里要这样写
else { //把需要等待的线程加入阻塞队列当中
threadQueue.add(thread);
//阻塞该线程,等待锁释放后唤醒
LockSupport.park(thread);
}
这样我们的整个lock方法写完了。但是大家想一下这样会不会有什么问题。是不是当我第一个线程执行完毕后,将state改为了0.后面的线程都不会再加锁,而是直接执行。所以这里我们要加一个自旋的操作,让被唤醒的线程再走一遍拿锁的流程。这样保证我们每一个线程都会有一个拿锁解锁的动作。
public void lock() { for (;;){ //获取当前线程 final Thread thread = Thread.currentThread(); //判断CAS操作是否成功如果成功则执行拿锁并记录当前线程 if (compareAndSwapSetState(0, 1)) { setExecutorThread(thread); break; } else if (executorThread == thread) { //因为只有一个线程会进入这里,所以不存在并发问题,直接state++操作就可以 state++; } else { //把需要等待的线程加入阻塞队列当中 threadQueue.add(thread); //阻塞该线程,等待锁释放后唤醒 LockSupport.park(thread); } } }
这样,我们的lock操作就编写完了。我们接下来来编写unlock方法。
我刚刚说了,我们所有的判断锁的操作其实都是基于state这个计数器,所以我们在释放锁的时候要通过判断计数器来判断锁是否已经全部释放完毕。我们直接在if里写逻辑
public void unlock() {
//判断锁是否已经释放完毕
if ((state = state - 1) == 0) {
setExecutorThread(null);
Thread thread = threadQueue.poll();
LockSupport.unpark(thread);
}
}
当判断state == 0 了之后,我们就执行下面的逻辑,先从队列里面出队一个线程,然后我们调用LockSupport.unpark()方法唤醒该线程,那么该 线程就会又回到刚刚的for循环里再执行一遍拿锁的流程。
/** * @author quziwei * @date 2020/10/05 * @description **/ public class MyReentrantLockDemo { public static MyReentrantLock lock = new MyReentrantLock(); public static int a = 0; public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { new Thread(() -> { lock.lock(); try { for (int j = 0; j < 1000; j++){ a++; } } finally { lock.unlock(); } }).start(); } //因为是异步计算所以先让主线程等待5秒等待所有计算结果执行完毕后在输出 Thread.sleep(5000); System.out.println(a); } }
输出结果:
至此我们今天的文章到这里就结束了,喜欢的话记得点赞收藏关注一波哟~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。