赞
踩
ReadWriteLock的实现
维护一对关联锁,一个只用于读操作,一个只用于写操作;读锁可以由多个读线程同时持有,写锁是排他的。同一时间两把锁不能被不同线程持有。
ReadWriteLock的多个重要参数:readcount读计数器,writecount写计数器(重入次数记录),owner(记录写线程),waiters(等待队列)
示意图:
假设:t1线程为写线程,还有t2读、t3读等线程;
写锁:t1需要抢锁先去判断,readcount是否为0(读写锁不能同时存在);readcount=0那么判断writecount是否为0,如果writecount=0,则通过CAS操作将writecount改为1,并将owner改为当前线程;如果writecount>0则,判断owner是否为当前线程,如果owner=当前线程,则writecount+1,否则线程挂起进入等待队列。如果readcount>0则线程挂起进入等待队列。解锁时:判断当前线程是否等于owner,等于则writecount-1,当writecount减为0时,修改owner为空,并唤醒等待队列头部的线程;不等于则挂起线程。
(线程等待队列:t2读、t3读)
(线程等待队列:t2读、t3读)
读锁:t2线程被唤醒需要抢锁,首先判断writecount是否为0,如果writecount>0则t2挂起,writecount=0则readcount+1,并且唤醒等待队列头部的线程t3,t3重复t2的抢锁步骤,那么当t3唤醒等待队列中的t4时,t4是写锁,会先获取readcount的值,然后重复写锁的抢锁过程,最后t4会挂起,直到读锁释放,进入下一轮抢锁。
代码如下(示例):
volatile AtomicInteger readCount = new AtomicInteger(0); AtomicInteger writeCount = new AtomicInteger(0); //独占锁 拥有者 AtomicReference<Thread> owner = new AtomicReference<>(); //等待队列 public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<WaitNode>(); class WaitNode{ int type = 0; //0 为想获取独占锁的线程, 1为想获取共享锁的线程 Thread thread = null; int arg = 0; public WaitNode(Thread thread, int type, int arg){ this.thread = thread; this.type = type; this.arg = arg; } } //获取独占锁 public void lock() { int arg = 1; //尝试获取独占锁,若成功,退出方法, 若失败... if (!tryLock(arg)){ //标记为独占锁 WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg); waiters.offer(waitNode); //进入等待队列 //循环尝试拿锁 for(;;){ //若队列头部是当前线程 WaitNode head = waiters.peek(); if (head!=null && head.thread == Thread.currentThread()){ if (!tryLock(arg)){ //再次尝试获取 独占锁 LockSupport.park(); //若失败,挂起线程 } else{ //若成功获取 waiters.poll(); // 将当前线程从队列头部移除 return; //并退出方法 } }else{ //若不是队列头部元素 LockSupport.park(); //将当前线程挂起 } } } } //释放独占锁 public boolean unlock() { int arg = 1; //尝试释放独占锁 若失败返回true,若失败... if(tryUnlock(arg)){ WaitNode next = waiters.peek(); //取出队列头部的元素 if (next !=null){ Thread th = next.thread; LockSupport.unpark(th); //唤醒队列头部的线程 } return true; //返回true } return false; } //尝试获取独占锁 public boolean tryLock(int acquires) { //如果read count !=0 返回false if (readCount.get() !=0) return false; int wct = writeCount.get(); //拿到 独占锁 当前状态 if (wct==0){ if (writeCount.compareAndSet(wct, wct + acquires)){ //通过修改state来抢锁 owner.set(Thread.currentThread()); // 抢到锁后,直接修改owner为当前线程 return true; } }else if (owner.get() == Thread.currentThread()){ writeCount.set(wct + acquires); //修改count值 return true; } return false; } //尝试释放独占锁 public boolean tryUnlock(int releases) { //若当前线程没有 持有独占锁 if(owner.get()!= Thread.currentThread()){ throw new IllegalMonitorStateException(); //抛IllegalMonitorStateException } int wc= writeCount.get(); int nextc = wc - releases; //计算 独占锁剩余占用 writeCount.set(nextc); //不管是否完全释放,都更新count值 if (nextc==0){ //是否完全释放 owner.compareAndSet(Thread.currentThread(), null); return true; }else{ return false; } } //获取共享锁 public void lockShared() { int arg = 1; if (tryLockShared(arg) < 0){ //如果tryAcquireShare失败 //将当前进程放入队列 WaitNode node = new WaitNode(Thread.currentThread(), 1, arg); waiters.offer(node); //加入队列 for (;;){ //若队列头部的元素是当前线程 WaitNode head = waiters.peek(); if (head!=null && head.thread == Thread.currentThread()){ if (tryLockShared(arg) >=0){ //尝试获取共享锁, 若成功 waiters.poll(); //将当前线程从队列中移除 WaitNode next = waiters.peek(); if (next!=null && next.type==1){ //如果下一个线程也是等待共享锁 LockSupport.unpark(next.thread); //将其唤醒 } return; //退出方法 }else{ //若尝试失败 LockSupport.park(); //挂起线程 } }else{ //若不是头部元素 LockSupport.park(); } } } } //解锁共享锁 public boolean unLockShared() { int arg = 1; if (tryUnLockShared(arg)){ //当read count变为0,才叫release share成功 WaitNode next = waiters.peek(); if (next!=null){ LockSupport.unpark(next.thread); } return true; } return false; } //尝试获取共享锁 public int tryLockShared(int acquires) { for (;;){ if (writeCount.get()!=0 && owner.get() != Thread.currentThread()) return -1; int rct = readCount.get(); if (readCount.compareAndSet(rct, rct + acquires)){ return 1; } } } //尝试解锁共享锁 public boolean tryUnLockShared(int releases) { for(;;){ int rc = readCount.get(); int nextc = rc - releases; if (readCount.compareAndSet(rc, nextc)){ return nextc==0; } } }
锁降级指的是写锁降级为读锁,持有写锁的同时,再获取读锁,随后释放写锁的过程。写锁是线程独占,读锁是共享,所以写锁->读是降级(读->写是不能实现的)。
锁降级分析:(线程t1写,t1读,t1写,t2读,t3读)
t1写线程首先判断readcount是否为0,不为0则线程挂起,readcount=0则判断writecount是否为0,不为0判断owner是否是当前线程,是当前线程writecount+1,writecount=0则修改owner为当前线程;接着t1读线程,判断writecount是否为 0,不为0,则判断owner是否为当前线程,是当前线程则,readcount+1,获取读锁成功,然后t1写线程释放锁。这个过程就是锁降级。
适合读取操作多余写入操作的场景吗,改进互斥锁的性能。比如:集合的并发线程安全性改造、缓存组件。
集合的并发线程安全性改造,代码示例:
private HashMap<Object, Object> map = new HashMap<>(); ReadWriteLock rwLock = new ReentrantReadWriteLock(); public Object get(Object key){ rwLock.readLock().lock(); try { return map.get(key); }finally { rwLock.readLock().unlock(); } } public void put(Object key,Object value){ rwLock.writeLock().lock(); try { map.put(key, value); }finally { rwLock.writeLock().unlock(); } }
缓存组件,代码示例:
//实现缓存 //使用数据 class TeacherInfoCache { //数据是否可用 static volatile boolean cacheValid; //锁 static final ReadWriteLock rwl = new ReentrantReadWriteLock(); //查询并使用数据, static void processCacheData(String dataKey){ Object data =null; rwl.readLock().lock(); try { //判断数据是否可用 if (cacheValid){ data = Redis.data.get(dataKey); }else{ rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid){ //从数据库读取 data = DataBase.queryUserInfo(); Redis.data.put(dataKey, data); cacheValid = true; } }finally { rwl.readLock().lock(); //锁降级 rwl.writeLock().unlock(); } } //使用数据, like 写数据库,落地到文件,做某种计算 }finally { rwl.readLock().unlock(); } } } class DataBase{ static String queryUserInfo(){ System.out.println("查询数据库。。。"); return "name:Kody,age:40,gender:true,"; } } class Redis{ static Map<String, Object> data = new HashMap<>(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。