赞
踩
Semaphore(int permits)
Semaphore(int permits, boolean fair)
//一次获取一个信号量 //释放一个信号量
acquire() release()
//一次获取permits个信号量 //释放permits个信号量
acquire(int permits) release(int permits)
//尝试获取permits个信号量,等待单位为unit的timeout,如果能拿到就走拿到的逻辑,否则就可以走降级的逻辑或者相隔一定的时间后再去获取
tryAcquire(int permits, long timeout, TimeUnit unit)
//不支持中断特性,线程执行过程中发现执行错误,不想让其继续执行,就可以发送中断信号,如果支持中断特性,就会抛出一个异常
acquireUninterruptibly(int permits)
Semaphore:信号量 类似于操作系统中的信号量;用来做限流操作 两个作用:多个共享资源的互斥使用;并发线程数的控制 原理: Semaphore semaphore = new Semaphore(4);//初始化信号量,当数量为1时可用作互斥锁. //AQS中的State设为4,所允许的最大的并行度为4 semaphore.acquire(); //得到一个信号量 p操作 否则阻塞 //state-1 semaphore.release(); //释放一个信号量 v操作 否则阻塞 //state+1 作用:多个共享资源的互斥使用;并发限流,控制最大的线程数 Demo: public class SemaphoreTest01 { public static void main(String[] args) throws InterruptedException { Semaphore mutex = new Semaphore(1); //互斥锁 Semaphore notFull = new Semaphore(4);//不满的数量 Semaphore notEmpty = new Semaphore(0);//不空的数量 Consumen consumen = new Consumen(); Produce produce = new Produce(); //生产者 while (true){ new Thread( () ->{ try { notFull.acquire(); mutex.acquire(); consumen.cons(); System.out.println("已经生产了一个产品"); System.out.println("-----------------------------------"); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(notEmpty.availablePermits()); System.out.println(notFull.availablePermits()); mutex.release(); notEmpty.release(); } }).start(); new Thread( () ->{ try { notEmpty.acquire(); mutex.acquire(); produce.prod(); System.out.println("已经消耗了一个产品"); System.out.println("-----------------------------------"); } catch (InterruptedException e) { e.printStackTrace(); }finally { mutex.release(); notFull.release(); } }).start(); } } } class Consumen{ public void cons(){ System.out.println("生产者生产了一个产品"); } } class Produce{ public void prod(){ System.out.println("消费者消费了一个产品"); } }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //Semaphore内部定义了两个类Sync和NonfairSync,tryAcquireShared在两个类内部都有实现 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { for (;;) { //公平锁:如果有人排队的话,返回-1,需要进行排队,不会直接获取 if (hasQueuedPredecessors()) return -1; //获得当前的State int available = getState(); //减去自己要获得的信号量的个数 int remaining = available - acquires; //remarting<0也就是初始化的信号量已经用完,需要排队 if (remaining < 0 || //通过CAS写回最新的State compareAndSetState(available, remaining)) return remaining; } } AbstractQueuedSynchronizer private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //构建一个节点,插入到CLH队列中 final Node node = addWaiter(Node.SHARED); try { for (;;) { //获得插入节点的前一个节点,判断是不是头部节点 final Node p = node.predecessor(); if (p == head) { //再次获取信号量 int r = tryAcquireShared(arg); if (r >= 0) { //获取到一个信号量,会进行广播,因为是一种共享的模式,可能在当前节点tryAcquireShared的时候,已经有其他的节点添加到了这个节点后面,所以需要广播 setHeadAndPropagate(node, r); p.next = null; // help GC return; } } //当线程唤醒后就会走这个方法,因为之前的线程在这里被阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } } private void setHeadAndPropagate(Node node, int propagate) { //h指向之前的head节点 Node h = head; //设置头节点为当前head节点 setHead(node); //propagate大于0 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { //得到node的下一个节点 Node s = node.next; //s为shard if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { */ for (;;) { //h指向最新的head Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //waitStatuw为-1 if (ws == Node.SIGNAL) { //改为0 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // 唤醒后续节点 unparkSuccessor(h); } //将最新头节点的waitStatue设为-3 else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } //如果头节点没有发生变化,则退出循环 //此时可能有线程t1进来,t2线程将head修改,在这里判断 if (h == head) break; } }
public final boolean releaseShared(int arg) { //返回true则已经写成功 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //将当前的信号量加回去 if (compareAndSetState(current, next)) return true; } } private void doReleaseShared() { */ for (;;) { //h指向最新的head Node h = head; //判断队列里面有没有正在排队的 if (h != null && h != tail) { int ws = h.waitStatus; //头节点的waitStatus为-1 if (ws == Node.SIGNAL) { //将waitStatue设为0 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒后面的节点 unparkSuccessor(h); } //将最新头节点的waitStatue设为-3 else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } //如果头节点没有发生变化,则退出循环 //此时可能有线程t1进来,t2线程将head修改,在这里判断 if (h == head) break; } }
允许一个线程或者多个线程等待直到在其他线程中执行的一组操作完成的同步操作.里面Await方法等待计数器归零然后再向下执行(先让必须要完成的任务先执行)
LockSupport是用来创建锁和其他同步类的基本线程阻塞原语.LockSupport中的park(()和unpark()的作用分别是阻塞线程和解除阻塞线程
//调用LockSupport()时,permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设为1,park方法会被唤醒
public static void park() {
U.park(false, 0L);
}
//调用unpark(thread)方法后,将thread线程的许可permit设置为1(注意多次调用unpark方法,不会累加,permit值还是1)会自动唤醒thread线程,即之前阻塞中的LockSupport.park()方法会立即返回
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
纯粹的是只想阻塞线程,并不想加锁
支持先唤醒后等待
变成阻塞(因为permit为零了会阻塞在这里,一直到permit变为1),这时调用unpark会把permit置为1。每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累凭证。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。