赞
踩
JUC
指的就是 java.util.concurrent
工具包的简称,里面的工具类可以很方便的实现多线程的开发。
java.util
工具包、包、分类。
业务:普通的线程代码 Thread。
Runnable
没有返回值、效率相比 Callable
相对较低!
进程是操作系统中的应用程序、是资源分配的基本单位。
线程是用来执行具体的任务和功能,是CPU调度和分派的最小单位。
一个进程往往可以包含多个线程,至少包含一个。
一个进程往往可以包含多个线程,至少包含一个。
Java默认有两个线程:一个 main
, 一个GC
。
对于Java而言:Thread
、Runable
、Callable
是进行开启线程的。
Java 真的可以开启线程吗?
答案:开不了。
Java是没有权限去开启线程的、操作硬件的,这是一个 native
的一个本地方法,它调用的底层是c++代码。
public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } } //这是一个C++底层,Java是没有权限操作底层硬件的 private native void start0();
多线程操作同一个资源。
获取 CPU 的核数:
public class Test1 {
public static void main(String[] args) {
//获取cpu的核数
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
线程的状态:
public enum State { // 线程新生 NEW, //运行状态 RUNNABLE, //阻塞 BLOCKED, //等待,死死地等 WAITING, //超时等待 TIMED_WAITING, //终止 TERMINATED; }
来自不同的类
wait -> Object
sleep -> Thread
锁的释放
wait
会释放锁。
sleep
处于睡眠状态,不会释放锁。
使用的范围不同
wait
必须在同步代码块中。
sleep
可以在任何地方处于睡眠。
是否需要捕获异常
wait
不需要捕获异常。
sleep
必须要捕获异常。
public class SaleTicketDemo1 { public static void main(String[] args) { // 并发:多线程操作同一个类,把资源丢进线程 Ticket ticket = new Ticket(); //@FunctionalInterface 函数式接口,jdk1.8 lambda表达式 (参数)-> {代码} new Thread( ()-> { for (int i = 0; i < 40; i++) { ticket.sale(); } }, "A").start(); new Thread( ()->{ for (int i = 0; i < 40; i++) { ticket.sale(); } }, "B").start(); new Thread( ()-> { ticket.sale(); }, "C").start(); } } class Ticket { private int number = 50; // synchronized: 本质就是队列、锁 public synchronized void sale() { if (number > 0) { System.out.println(Thread.currentThread().getName() + "卖出了第" + (number --) + "票,剩余" + number); } } }
运行结果如下:
A窗口卖出了第50票,剩余49 A窗口卖出了第49票,剩余48 A窗口卖出了第48票,剩余47 A窗口卖出了第47票,剩余46 A窗口卖出了第46票,剩余45 B窗口卖出了第45票,剩余44 B窗口卖出了第44票,剩余43 B窗口卖出了第43票,剩余42 B窗口卖出了第42票,剩余41 B窗口卖出了第41票,剩余40 B窗口卖出了第40票,剩余39 B窗口卖出了第39票,剩余38 B窗口卖出了第38票,剩余37 B窗口卖出了第37票,剩余36 B窗口卖出了第36票,剩余35 B窗口卖出了第35票,剩余34 B窗口卖出了第34票,剩余33 B窗口卖出了第33票,剩余32 B窗口卖出了第32票,剩余31 B窗口卖出了第31票,剩余30 B窗口卖出了第30票,剩余29 B窗口卖出了第29票,剩余28 B窗口卖出了第28票,剩余27 B窗口卖出了第27票,剩余26 B窗口卖出了第26票,剩余25 B窗口卖出了第25票,剩余24 B窗口卖出了第24票,剩余23 B窗口卖出了第23票,剩余22 B窗口卖出了第22票,剩余21 B窗口卖出了第21票,剩余20 B窗口卖出了第20票,剩余19 B窗口卖出了第19票,剩余18 B窗口卖出了第18票,剩余17 B窗口卖出了第17票,剩余16 B窗口卖出了第16票,剩余15 B窗口卖出了第15票,剩余14 B窗口卖出了第14票,剩余13 B窗口卖出了第13票,剩余12 B窗口卖出了第12票,剩余11 B窗口卖出了第11票,剩余10 B窗口卖出了第10票,剩余9 B窗口卖出了第9票,剩余8 B窗口卖出了第8票,剩余7 B窗口卖出了第7票,剩余6 B窗口卖出了第6票,剩余5 A窗口卖出了第5票,剩余4 A窗口卖出了第4票,剩余3 A窗口卖出了第3票,剩余2 A窗口卖出了第2票,剩余1 A窗口卖出了第1票,剩余0
代码如下:
public class SaleTicketDemo2 { public static void main(String[] args) { // 并发:多线程操作同一个类,把资源丢进线程 Ticket2 ticket = new Ticket2(); //@FunctionalInterface 函数式接口,jdk1.8 lambda表达式 (参数)-> {代码} new Thread( ()-> { for (int i = 0; i < 40; i++) { ticket.sale(); } }, "A").start(); new Thread( ()->{ for (int i = 0; i < 40; i++) { ticket.sale(); } }, "B").start(); new Thread( ()-> { ticket.sale(); }, "C").start(); } } /** * lock三部曲 * 1. new ReentrantLock() * 2. lock.lock() 加锁 * 3. finally --> lock.unlock() 解锁 */ class Ticket2 { private int number = 50; Lock lock = new ReentrantLock(); public void sale() { lock.lock(); // 加锁 try { // 业务代码 if(number > 0){ System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "票,剩余" + number); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); // 解锁 } } }
运行结果如下:
A卖出了50票,剩余49 A卖出了49票,剩余48 A卖出了48票,剩余47 C卖出了47票,剩余46 A卖出了46票,剩余45 A卖出了45票,剩余44 A卖出了44票,剩余43 A卖出了43票,剩余42 A卖出了42票,剩余41 A卖出了41票,剩余40 A卖出了40票,剩余39 A卖出了39票,剩余38 A卖出了38票,剩余37 A卖出了37票,剩余36 A卖出了36票,剩余35 A卖出了35票,剩余34 A卖出了34票,剩余33 A卖出了33票,剩余32 A卖出了32票,剩余31 A卖出了31票,剩余30 A卖出了30票,剩余29 A卖出了29票,剩余28 A卖出了28票,剩余27 A卖出了27票,剩余26 A卖出了26票,剩余25 A卖出了25票,剩余24 A卖出了24票,剩余23 A卖出了23票,剩余22 A卖出了22票,剩余21 A卖出了21票,剩余20 A卖出了20票,剩余19 A卖出了19票,剩余18 A卖出了18票,剩余17 A卖出了17票,剩余16 A卖出了16票,剩余15 A卖出了15票,剩余14 A卖出了14票,剩余13 A卖出了13票,剩余12 A卖出了12票,剩余11 A卖出了11票,剩余10 A卖出了10票,剩余9 B卖出了9票,剩余8 B卖出了8票,剩余7 B卖出了7票,剩余6 B卖出了6票,剩余5 B卖出了5票,剩余4 B卖出了4票,剩余3 B卖出了3票,剩余2 B卖出了2票,剩余1 B卖出了1票,剩余0
Synchronized
是内置的Java关键字; Lock
是一个Java类。Synchronized
无法判断获取锁的状态; Lock
可以判断是否获取了锁。Synchronized
会自动释放锁; Lock
必须手动释放锁,如果不释放锁,会出现死锁。Synchronized
线程1(获取锁,阻塞)、线程2(一直等待);Lock
锁不一定会等待下去。Synchronized
可重入锁,不可以中断的、非公平;Lock
可重入锁,可以判断锁,非公平(可以自己设置)。Synchronized
适合锁少量的代码同步问题;Lock
适合锁大量的同步代码。/** * 线程之间的通信问题:生产者和消费者问题 等待唤醒,通知唤醒 * 线程交替执行 A B 操作同一个变量 num = 0 * A num + 1 * B num - 1 */ public class A { public static void main(String[] args) { Data data = new Data(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); } } //判断等待,业务,通知 class Data { private int number = 0; // + 1 public synchronized void increment() throws InterruptedException { if(number != 0){ //等待 this.wait(); } number ++; System.out.println(Thread.currentThread().getName() + "-->" + number); //通知其他线程,我 +1 完毕了 this.notifyAll(); } // - 1 public synchronized void decrement() throws InterruptedException { if(number == 0){ //等待 this.wait(); } number --; System.out.println(Thread.currentThread().getName() + "-->" + number); //通知其他线程,我 -1 完毕了 this.notifyAll(); } }
运行结果如下:
A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0
问题存在:如果是A, B, C, D 四个线程
常规的代码如下:
/** * 线程之间的通信问题:生产者和消费者问题 等待唤醒,通知唤醒 * 线程交替执行 A B 操作同一个变量 num = 0 * A num + 1 * B num - 1 */ public class A { public static void main(String[] args) { Data data = new Data(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); } } //判断等待,业务,通知 class Data { private int number = 0; // + 1 public synchronized void increment() throws InterruptedException { if(number != 0){ //等待 this.wait(); } number ++; System.out.println(Thread.currentThread().getName() + "-->" + number); //通知其他线程,我 +1 完毕了 this.notifyAll(); } // - 1 public synchronized void decrement() throws InterruptedException { if(number == 0){ //等待 this.wait(); } number --; System.out.println(Thread.currentThread().getName() + "-->" + number); //通知其他线程,我 -1 完毕了 this.notifyAll(); } }
运行结果会出现 number
的值为2或3的情况,这样显然是错误的。
A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 C-->1 A-->2 C-->3 B-->2 C-->3 D-->2 D-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0
解决方案:
将 if 改为 while 进行判断即可:
以下是解释为什么 if 改为 while 就可以解决问题了:
拿两个加法线程A、B来说,比如A先执行,执行时调用了wait方法,那它会等待,此时会释放锁,那么线程B获得锁并且也会执行wait方法,两个加线程一起等待被唤醒。此时减线程中的某一个线程执行完毕并且唤醒了这俩加线程,那么这俩加线程不会一起执行,其中A获取了锁并且加1,执行完毕之后B再执行。如果是if的话,那么A修改完num后,B不会再去判断num的值,直接会给num+1。如果是while的话,A执行完之后,B还会去判断num的值,因此就不会执行。
/** * 线程之间的通信问题:生产者和消费者问题 等待唤醒,通知唤醒 * 线程交替执行 A B 操作同一个变量 num = 0 * A num + 1 * B num - 1 */ public class A { public static void main(String[] args) { Data data = new Data(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); } } //判断等待,业务,通知 class Data { private int number = 0; // + 1 public synchronized void increment() throws InterruptedException { while (number != 0){ //等待 this.wait(); } number ++; System.out.println(Thread.currentThread().getName() + "-->" + number); //通知其他线程,我 +1 完毕了 this.notifyAll(); } // - 1 public synchronized void decrement() throws InterruptedException { while (number == 0){ //等待 this.wait(); } number --; System.out.println(Thread.currentThread().getName() + "-->" + number); //通知其他线程,我 -1 完毕了 this.notifyAll(); } }
运行结果如下:
A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 C-->1 B-->0 A-->1 B-->0 C-->1 B-->0 A-->1 D-->0 C-->1 B-->0 A-->1 D-->0 C-->1 B-->0 A-->1 D-->0 C-->1 B-->0 A-->1 D-->0 C-->1 D-->0 A-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0
代码实现:
/** * 线程之间的通信问题:生产者和消费者问题 等待唤醒,通知唤醒 * 线程交替执行 A B 操作同一个变量 num = 0 * A num + 1 * B num - 1 */ public class B { public static void main(String[] args) { Data2 data = new Data2(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); } } //判断等待,业务,通知 class Data2 { // 数字 资源类 private int number = 0; Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); // + 1 public void increment() throws InterruptedException { lock.lock(); try { //业务代码 while (number != 0) { //等待 condition.await(); } number ++; System.out.println(Thread.currentThread().getName() + "-->" + number); //通知其他线程,我+1完毕了 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } // - 1 public synchronized void decrement() throws InterruptedException { lock.lock(); try { //业务代码 while (number == 0) { //等待 condition.await(); } number --; System.out.println(Thread.currentThread().getName() + "-->" + number); //通知其他线程,我-1完毕了 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); //解锁 } } }
运行结果如下:
A-->1 B-->0 C-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 B-->0 A-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0 C-->1 D-->0
出现的问题:运行结果是随机的状态出现,我们想要的是有序的执行 A->B->C->D。
代码如下:
/** * A执行完调用B,B执行完调用C,C执行完调用A */ public class C { public static void main(String[] args) { Data3 data = new Data3(); new Thread(()-> { for (int i = 0; i < 10; i++) { data.printA(); } }, "A").start(); new Thread(()-> { for (int i = 0; i < 10; i++) { data.printB(); } }, "B").start(); new Thread(()-> { for (int i = 0; i < 10; i++) { data.printC(); } }, "C").start(); } } class Data3 { private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); private int number = 1; public void printA() { lock.lock(); try { // 业务代码,判断 -> 执行 -> 通知 while (number != 1) { // 等待 condition1.await(); } System.out.println(Thread.currentThread().getName() + "->执行了A"); // 唤醒指定的人,B condition2.signal(); number = 2; } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { // 业务代码,判断 -> 执行 -> 通知 while (number != 2) { // 等待 condition2.await(); } System.out.println(Thread.currentThread().getName() + "->执行了B"); // 唤醒指定的人,B condition3.signal(); number = 3; } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { // 业务代码,判断 -> 执行 -> 通知 while (number != 3) { // 等待 condition3.await(); } System.out.println(Thread.currentThread().getName() + "->执行了C"); // 唤醒指定的人,B condition1.signal(); number = 1; } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
运行结果如下:
A->执行了A B->执行了B C->执行了C A->执行了A B->执行了B C->执行了C A->执行了A B->执行了B C->执行了C A->执行了A B->执行了B C->执行了C A->执行了A B->执行了B C->执行了C A->执行了A B->执行了B C->执行了C A->执行了A B->执行了B C->执行了C A->执行了A B->执行了B C->执行了C A->执行了A B->执行了B C->执行了C A->执行了A B->执行了B C->执行了C
测试1代码如下:
/** * 八锁,就是关于锁的八个问题 * 1. 标准情况下,两个线程先打印发短信还是打电话? 发短信 -> 打电话 * 2. send 延迟4秒,两个线程先打印发短信还是打电话? 发短信 -> 打电话 */ public class Test1 { public static void main(String[] args) { Phone phone = new Phone(); // 锁的问题 new Thread(()->{ phone.send(); }, "A").start(); try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone.call(); }, "B").start(); } } class Phone { // synchronized 锁的对象是方法的调用者 // 两个方法用的是同一把锁,谁先拿到谁执行! public synchronized void send() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } }
测试1 运行结果如下:
发短信
打电话
测试2代码如下:
/** * 3. 增加一个普通方法 先执行发短息还是hello? 普通方法 * 4. 两个对象都是同步方法,先发短信还是打电话? 打电话 */ public class Test2 { public static void main(String[] args) { // 两个对象,两个调用者,两把锁 Phone2 phone1 = new Phone2(); Phone2 phone2 = new Phone2(); // 锁的问题 new Thread(()->{ phone1.send(); }, "A").start(); try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone2.call(); }, "B").start(); } } class Phone2 { // synchronized 锁的对象是方法的调用者 public synchronized void send() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } // 这里没有锁!不是同步方法,不受锁的影响 public void hello() { System.out.println("hello"); } }
测试2运行结果如下:
打电话
发短信
测试3代码如下:
/** * 5.增加两个静态的同步方法,只有一个对象 先打印 发短信 还是 打电话? 发短信 * 6.两个对象! 增加两个静态的同步方法,先打印 发短信 还是 打电话? 发短信 * */ public class Test3 { public static void main(String[] args) { //两个对象的class类模板只有一个 static 锁的是class Phone3 phone1 = new Phone3(); Phone3 phone2 = new Phone3(); //锁的问题 new Thread(() -> { phone1.send(); }, "A").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone2.call(); }, "A").start(); } } class Phone3 { //synchronized 锁的对象是方法的调用者 //static静态方法 //类一加载就有了!class模板 public static synchronized void send() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } //这里没有锁!不是同步方法,不受锁的影响 public void hello() { System.out.println("hello"); } public static synchronized void call() { System.out.println("打电话"); } }
测试3运行结果如下:
发短信
打电话
测试4代码如下:
/** * 7. 1个静态的同步方法,1个普通的同步方法,一个对象,先打印 发短信 还是 打电话? 打电话 * 8. 1个静态的同步方法,1个普通的同步方法,二个对象,先打印 发短信 还是 打电话? 打电话 */ public class Test4 { public static void main(String[] args) { //两个对象的class类模板只有一个 static 锁的是class Phone4 phone1 = new Phone4(); Phone4 phone2 = new Phone4(); //锁的问题 new Thread(() -> { phone1.send(); }, "A").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone2.call(); }, "A").start(); } } class Phone4 { //静态同步方法 锁的是class类模板 public static synchronized void send() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } //普通的同步方法 锁的调用者 public synchronized void call() { System.out.println("打电话"); } }
测试4运行结果如下:
打电话
发短信
代码如下:
//java.util.ConcurrentModificationException 并发修改异常 public class ListTest { public static void main(String[] args) { // 并发下ArrayList不安全 /** * 解决方案: * 1. List<String> list = new Vector<>(); * 2. List<String> list = Collections.synchronizedList(new ArrayList<>()); * 3. List<System> list = new CopyOnWriteArrayList<>(); */ //CopyOnWrite写入时复制 COW 计算机程序设计领域的一种优化策略 //多个线程调用的时候,list读取的时候,固定的,写入(覆盖) //在写入的时候避免覆盖造成数据问题 //读写分离 List<String> list = new CopyOnWriteArrayList<>(); for (int i = 1; i <= 10; i ++ ) { new Thread(()-> { list.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(list); },String.valueOf(i)).start(); } } }
运行结果如下:
public class SetTest {
public static void main(String[] args) {
// Set<String> set = new HashSet<>();
// Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 30; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}
HashSet的底层实现:
public HashSet(){
map = new HashMap<>();
}
//add Set本质就是 map Key是无法重复的
public boolean add(E e){
return map.put(e,PRESENT) == null;
}
private static final Object PRESENT = new Object(); //不变得值!
public class MapTest { public static void main(String[] args) { //map是这样用的吗 不是,工作中不用HashMap // 默认等价于什么 new HashMap<>(16,0.75); //Map<String, String> map = new HashMap<>(); Map<String, String> map = new ConcurrentHashMap<>(); //加载因子 、初始化容量 for (int i = 1; i <= 30; i++) { new Thread(() -> { map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5)); System.out.println(map); }, String.valueOf(i)).start(); } } }
public class CallableTest { public static void main(String[] args) throws ExecutionException, InterruptedException { // new Thread(new Thread()).start(); // new Thread(new FutureTask<V>).start(); // new Thread().start(); // 怎么启动Callable MyThread thread = new MyThread(); FutureTask futureTask = new FutureTask(thread); // 适配类 new Thread(futureTask, "A").start(); String o = (String) futureTask.get(); // 获取Callable 的返回结果。 这个get方法可能会产生阻塞,把它放到最后或者用异步通信来处理 System.out.println(o); } } class MyThread implements Callable<String> { @Override public String call() { System.out.println("call()"); return "123"; } }
运行结果如下:
call()
123
细节:
测试代码如下:
// 计数器 public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { // 总数是6,必须要执行任务的时候,再使用 CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 1; i <= 6; i ++ ) { new Thread(()->{ System.out.println(Thread.currentThread().getName() + " Go out"); countDownLatch.countDown(); // 数量-1 }, String.valueOf(i)).start(); } countDownLatch.await(); // 等待计数器归0,然后再向下执行 System.out.println("close door"); } }
运行结果如下:
原理:
countDownLatch.countDown();
// 数量 -1
countDownLatch.await()
// 等待计数器归0,然后再向下执行
每次有线程调用 countDown()
数量-1,假设计数器变为0, countDownLatch.await()
就会被唤醒,继续执行。
CyclicBarrier
即 加法计数器,测试代码如下:
public class CyclicBarrierDemo { public static void main(String[] args) { /** * 集齐7颗龙珠召唤神龙 */ CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> { System.out.println("召唤神龙成功!"); }); for (int i = 1; i <= 7; i ++ ) { final int temp = i; // lambda 不能直接取到i,必须借助于temp new Thread(()-> { System.out.println(Thread.currentThread().getName() + "收集" + temp + "个龙珠"); try { cyclicBarrier.await(); // 等待计数器变成7 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
运行结果如下:
Semaphore
即信号量。
测试代码如下:
public class SemaphoreDemo { // 6辆车、3个车位 public static void main(String[] args) { // 线程数量;停车位!限流! Semaphore semaphore = new Semaphore(3); for (int i = 1; i <= 6; i ++ ) { new Thread(()-> { // acquire() 得到 try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "抢到车位"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "离开车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } // release() 释放 }, String.valueOf(i)).start(); } } }
运行结果如下:
原理:
semaphore.acquire();
获得,假设如果已经满了,等待被释放为止。
semaphore.release();
释放,会将当前的信号量释放+1,然后唤醒等待的线程。
作用:多个共享资源互斥的使用!并发限流,控制最大的线程数。
旧版:即自定义缓存,测试代码如下:
public class ReadWriteLock { public static void main(String[] args) { MyCache myCache = new MyCache(); // 只写入 for (int i = 1; i <= 5; i ++ ) { final int temp = i; new Thread(()-> { myCache.put(temp + "", temp + ""); }, String.valueOf(i)).start(); } // 读取 for (int i = 1; i <= 5; i ++ ) { final int temp = i; new Thread(()-> { myCache.get(temp + ""); }, String.valueOf(i)).start(); } } } /** * 自定义缓存 */ class MyCache { private volatile Map<String, Object> map = new HashMap<>(); // 存,写 public void put(String key, Object value) { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入完毕"); } // 取,读 public void get(String key) { System.out.println(Thread.currentThread().getName() + "读取" + key); Object o = map.get(key); System.out.println(Thread.currentThread().getName() + "读取完毕"); } }
运行结果如下:
出现的问题:在一个线程写入的过程中会有其他线程的插入,可以用加锁的缓存来解决该问题,相关代码如下:
/** * 独占锁(写锁) 一次只能被一个线程占有 * 共享锁(读锁) 多个线程可以同时占有 * 读——读 可以共存 * 读——写 不能共存 * 写——写 不能共存 */ public class ReadWriteLock { public static void main(String[] args) { //MyCache myCache = new MyCache(); MyCacheLock myCache = new MyCacheLock(); // 只写入 for (int i = 1; i <= 5; i ++ ) { final int temp = i; new Thread(()-> { myCache.put(temp + "", temp + ""); }, String.valueOf(i)).start(); } // 读取 for (int i = 1; i <= 5; i ++ ) { final int temp = i; new Thread(()-> { myCache.get(temp + ""); }, String.valueOf(i)).start(); } } } /** * 加锁的 */ class MyCacheLock { private volatile Map<String, Object> map = new HashMap<>(); // 读写锁:更加细粒度的控制 private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); //存,写的时候,只希望同时只有一个线程写 public void put(String key, Object value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入完毕"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } // 取,读的时候,所有人都可以读取 public void get(String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "读取" + key); Object o = map.get(key); System.out.println(Thread.currentThread().getName() + "读取完毕"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } } /** * 自定义缓存 */ class MyCache { private volatile Map<String, Object> map = new HashMap<>(); // 存,写 public void put(String key, Object value) { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入完毕"); } // 取,读 public void get(String key) { System.out.println(Thread.currentThread().getName() + "读取" + key); Object o = map.get(key); System.out.println(Thread.currentThread().getName() + "读取完毕"); } }
运行结果如下:
什么情况下会使用阻塞队列:多线程并发处理,线程池!
使用队列: 添加、移除
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer() | put() | offer(, ,) |
删除 | remove | poll() | take() | poll(,) |
检测队首元素 | element | peek() |
public class Test { public static void main(String[] args) { test1(); } /** * 抛出异常 */ public static void test1() { // 队列的大小 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); System.out.println("---------------"); //IllegalStateException: Queue full 抛出异常! //System.out.println(blockingQueue.add("d")); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); //java.util.NoSuchElementException 抛出异常! //System.out.println(blockingQueue.remove()); } }
运行结果如下:
2. 有返回值,没有异常的测试代码如下:
public class Test { public static void main(String[] args) { test2(); } /** * 有返回值,没有异常 */ public static void test2() { // 队列的大小 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("d")); // 不抛出异常,false System.out.println(blockingQueue.peek()); // 弹出队首元素 System.out.println("--------"); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); // 不抛出异常,null } }
运行结果如下:
3. 等待,阻塞(一直阻塞) 的测试代码如下:
public class Test { public static void main(String[] args) throws InterruptedException { test3(); } /** * 等待,阻塞(一直阻塞) */ public static void test3() throws InterruptedException { // 队列的大小 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); // 一直阻塞 blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); //blockingQueue.put("d"); // 队列没有位置了,一直阻塞 System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); //System.out.println(blockingQueue.take()); // 没有这个元素,一直阻塞 } }
运行结果如下:
4. 等待,阻塞(等待超时) 测试代码如下:
public class Test { public static void main(String[] args) throws InterruptedException { test4(); } /** * 等待,阻塞(等待超时) */ public static void test4() throws InterruptedException { // 队列的大小 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.offer("a"); blockingQueue.offer("b"); blockingQueue.offer("c"); //blockingQueue.offer("d", 2, TimeUnit.SECONDS); // 等待超过2秒就退出 System.out.println("-------"); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); //System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS)); // 等待超过2秒就退出 } }
运行结果如下:
SynchronousQueue
是没有容量的,进去一个元素,必须等待取出来之后,才能再往里面放入一个元素。
添加操作 put
、删除操作take
。
测试代码如下:
/** * 同步队列 * 和其他的BlockQueue不一样,SynchronousQueue不存储元素 * put 了一个元素,必须从里面take取出来,否则不能put进去值 */ public class SynchronousQueueDemo { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); // 同步队列 new Thread(()->{ try { System.out.println(Thread.currentThread().getName() + " put 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName() + " put 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName() + " put 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } }, "T1").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "T2").start(); } }
运行结果如下:
线程池:3大方法、7大参数、4种拒绝策略。
线程池的好处:
线程复用、可以控制最大并发数、管理线程。
线程池:三大方法
3大方法的测试如下:
Executors.newSingleThreadExecutor()
方法的测试如下:// Executors 工具类、3大方法 public class Demo01 { public static void main(String[] args) { ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 单个线程 //ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小 //ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩的,遇强则强,遇弱则弱 try { for (int i = 0; i < 10; i++) { // 使用线程池之后,使用线程池来创建线程 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + " 创建成功"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池用完,程序结束,关闭线程池 threadPool.shutdown(); } } }
Executors.newFixedThreadPool(5)
方法的测试如下:// Executors 工具类、3大方法 public class Demo01 { public static void main(String[] args) { //ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 单个线程 ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小 //ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩的,遇强则强,遇弱则弱 try { for (int i = 0; i < 10; i++) { // 使用线程池之后,使用线程池来创建线程 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + " 创建成功"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池用完,程序结束,关闭线程池 threadPool.shutdown(); } } }
Executors.newCachedThreadPool()
方法的测试如下:// Executors 工具类、3大方法 public class Demo01 { public static void main(String[] args) { //ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 单个线程 //ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小 ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩的,遇强则强,遇弱则弱 try { for (int i = 0; i < 10; i++) { // 使用线程池之后,使用线程池来创建线程 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + " 创建成功"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池用完,程序结束,关闭线程池 threadPool.shutdown(); } } }
线程池:七大参数
源码分析:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, //21亿 OOM 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //本质: ThreadPoolExecutor() public ThreadPoolExecutor(int corePoolSize, //核心线程池大小 int maximumPoolSize, //最大核心线程池大小 long keepAliveTime, //超时了没有人调用就会释放 TimeUnit unit, //超时单位 BlockingQueue<Runnable> workQueue, //阻塞队列 ThreadFactory threadFactory,//线程工厂:创建线程的,一般不用动 RejectedExecutionHandler handler//拒绝策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
手动创建一个线程池,相关代码如下:
//Executors 工具类 3大方法 /** * 1. new ThreadPoolExecutor.AbortPolicy()); //银行满了 还有人进来,不处理这个人的,并且抛出异常 * 2. new ThreadPoolExecutor.CallerRunsPolicy()); //哪来的去哪里! * 3. new ThreadPoolExecutor.DiscardPolicy()); //队列满了,丢掉任务,不会抛出异常 * 4. new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了,尝试和最早的竞争,也不会抛出异常 */ public class Demo02 { public static void main(String[] args) { //自定义线程池!工作ThreadPoolExecutor ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了,尝试和最早的竞争,也不会抛出异常 try { //最大承载:Deque + max // 超过 RejectedExecutionException for (int i = 1; i <= 9; i++) { //使用了线程池之后,使用线程池来创建线程 threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + " ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //线程池用完,程序结束,关闭线程池 threadPool.shutdown(); } } }
运行结果如下:
四种拒绝策略
4种拒绝策略:
/**
* 1. new ThreadPoolExecutor.AbortPolicy()); //银行满了 还有人进来,不处理这个人的,并且抛出异常
* 2. new ThreadPoolExecutor.CallerRunsPolicy()); //哪来的去哪里!
* 3. new ThreadPoolExecutor.DiscardPolicy()); //队列满了,丢掉任务,不会抛出异常
* 4. new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了,尝试和最早的竞争,也不会抛出异常
*/
小结和扩展
了解:IO密集型、CPU密集型:(调优)
public class Demo03 { public static void main(String[] args) { //自定义线程池!工作ThreadPoolExecutor //最大线程池该如何定义 //1、CPU密集型,几核CPU就定义为几 可以保证cpu的效率最高 //2、IO密集型 > 判断你程序中十分耗IO线程 // 程序 15个大型任务 io十分占用资源! //获取CPU核数 System.out.println(Runtime.getRuntime().availableProcessors()); ExecutorService threadPool = new ThreadPoolExecutor( 2, Runtime.getRuntime().availableProcessors(), 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了,尝试和最早的竞争,也不会抛出异常 try { //最大承载:Deque + max // 超过 RejectedExecutionException for (int i = 1; i <= 9; i++) { //使用了线程池之后,使用线程池来创建线程 threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + " ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //线程池用完,程序结束,关闭线程池 threadPool.shutdown(); } } }
lambda表达式、链式编程、函数式接口、Stream流式计算。
函数式接口:只有一个方法的接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
//超级多FunctionalInterface
//简化编程模型,在新版本的框架底层大量应用!
//foreach(消费者类型的函数式接口)
测试代码如下:
/** * Function 函数型接口 * 有一个输入参数,有一个输出 * 只要是函数式接口,就可以用lambda表达式简化 */ public class Demo01 { public static void main(String[] args) { /* Function function = new Function<String, String>() { @Override public String apply(String str) { return str; } }; */ Function<String, String> function = (str) -> { return str; }; System.out.println(function.apply("abc")); } }
断定型接口: 有一个输入参数,返回值只能是 布尔值。
测试代码如下:
/** * 断定型接口:有一个输入参数,返回值只能是布尔值! */ public class Demo02 { public static void main(String[] args) { // 判断字符串是否为空 /* Predicate<String> predicate = new Predicate<String>() { @Override public boolean test(String str) { return str.isEmpty(); } };*/ Predicate<String> predicate = (str) -> { return str.isEmpty(); }; System.out.println(predicate.test("")); } }
Consumer 消费型接口: 只有输入,没有返回值
测试代码如下:
/** * Consumer 消费型接口: 只有输入,没有返回值 */ public class Demo03 { public static void main(String[] args) { /*Consumer<String> consumer = new Consumer<String>() { @Override public void accept(String str) { System.out.println(str); } };*/ Consumer<String> consumer = (str) -> { System.out.println(str); }; consumer.accept("abcd"); } }
Supplier 供给型接口: 没有参数,只有返回值
测试代码如下:
/** * Supplier 供给型接口没有参数,只有返回值 */ public class Demo04 { public static void main(String[] args) { /* Supplier supplier = new Supplier<Integer>() { @Override public Integer get() { System.out.println("get()"); return 1024; } }; */ Supplier supplier = () -> { System.out.println("get()"); return 1024; }; System.out.println(supplier.get()); } }
什么是Stream流式计算
大数据:存储 + 计算
集合、MySQL本质就是存储东西的
计算都应该交给流操作
/** * 题目要求:一分钟内完成此题,只能用一行代码实现! * 现在有5个用户!筛选: * 1、ID必须是偶数 * 2、年龄必须大于23岁 * 3、用户名转为大写字母 * 4、用户名字母倒着排序 * 5、只输出一个用户! */ public class Test { public static void main(String[] args) { User u1 = new User(1, "a", 21); User u2 = new User(2, "b", 22); User u3 = new User(3, "c", 23); User u4 = new User(4, "d", 24); User u5 = new User(6, "e", 25); //集合就是存储 List<User> list = Arrays.asList(u1, u2, u3, u4, u5); //计算交给stream流 list.stream() .filter(u -> {return u.getId() % 2 == 0;}) .filter(u -> {return u.getAge() > 23;}) .map(u -> {return u.getName().toUpperCase();}) .sorted((o1,o2)->{return o2.compareTo(o1);}) .limit(1) .forEach(System.out::println); } }
什么是ForkJoin?
在JDK1.7中引入了一种新的Fork/Join线程池,它可以将一个大的任务拆分成多个小的任务并行执行并汇总执行结果。
Fork/Join采用的是分而治之的基本思想,分而治之就是将一个复杂的任务,按照规定的阈值划分成多个简单的小任务,然后将这些小任务的结果再进行汇总返回,得到最终的任务。
ForkJoin特点:工作窃取
这个里面维护都是双端队列。
ForkJoin
/** * 求和计算的任务 * 如何使用forkJoin * 1、ForkJoinPool通过它来执行 * 2、计算任务 ForkJoinPool.execute(ForkJoinTask task) * 3、计算类要继承ForkJoinTask */ public class ForkJoinDemo extends RecursiveTask<Long> { private Long start; private Long end; //临界值 private Long temp = 10000L; public ForkJoinDemo(Long start, Long end) { this.start = start; // 1 this.end = end; // 1990900000 } //计算方法 @Override protected Long compute() { if ((end - start) < temp) { Long sum = 0L; for (Long i = start; i < end; i++) { //分支合并计算 sum += i; } return sum; } else { //forkJoin 递归 long mid = (start + end) / 2; //中间值 ForkJoinDemo task1 = new ForkJoinDemo(start, mid); task1.fork(); //拆分任务,把任务压入线程队列 ForkJoinDemo task2 = new ForkJoinDemo(mid + 1, end); task2.fork(); //拆分任务,把任务压入线程队列 return task1.join() + task2.join(); } } }
测试代码如下:
// 同一个任务效率高几十倍 public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { //test1(); //test2(); test3(); } // 普通 12224 public static void test1() { long start = System.currentTimeMillis(); Long sum = 0L; for (Long i = 1L; i <= 10_000_0000; i ++ ) { sum += i; } long end = System.currentTimeMillis(); System.out.println("sum = " + sum + " 时间:" + (end - start)); } //会使用ForkJoin的 10038 public static void test2() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task);//提交任务 Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum=" + sum + "时间:" + (end - start)); } //Stream并行流() 153 public static void test3() { long start = System.currentTimeMillis(); LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum); long end = System.currentTimeMillis(); System.out.println("sum = " + "时间:" + (end - start)); } }
Future设计的初衷:可以对将来的某个事件的结果进行建模
代码如下:
public class Demo01 { public static void main(String[] args) throws ExecutionException, InterruptedException { //没有返回值的runAsync异步回调 CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "runAsync -> Void"); }); System.out.println("111"); completableFuture.get(); // 获取执行结果 } }
public class Demo02 { public static void main(String[] args) throws ExecutionException, InterruptedException { // 有返回值的异步回调supplyAsync CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("CompletableFuture"); // int i = 10 / 0; return 1024; }); System.out.println(completableFuture.whenComplete((t, u) -> { System.out.println("t==>" + t); //正常的返回结果 System.out.println("u==>" + u); //错误的信息,java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero }).exceptionally((e) -> { System.out.println(e.getMessage()); return 233; //可以获取到错误的返回结果 }).get()); } }
whenComplete: 有两个参数,一个是 t, 一个是 u。
T
: 代表的是 正常返回的结果;
U
: 代表的是 抛出异常的错误信息;
如果发生了异常, get
可以获取到 exceptionally
返回的值。
对Volatile的理解
Volatile
是 Java
虚拟机提供 轻量级的同步机制。
1、保证可见性
2、不保证原子性
3、禁止指令重排
什么是JMM
JMM: Java 内存模型,不存在的东西、概念!约定!
关于 JMM 的一些同步的约定:
1、线程解锁前,必须把共享变量立刻刷回主存。
2、线程加锁前,必须读取主存中的最新值到工作内存中。
3、加锁和解锁是同一把锁。
8种操作:
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)
JMM对这八种指令的使用,制定了如下规则:
public class JMMDemo { private static int num = 0; public static void main(String[] args) { new Thread(() -> { while (num == 0) { } }).start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } num = 1; System.out.println(num); } }
问题:程序不知道主内存的值已经被修改过了。
- 保证可见性
public class JMMDemo { // 不加 volatile 程序就会死循环 // 加了 volatile 可以保证可见性 private volatile static int num = 0; public static void main(String[] args) { // main 线程 new Thread(() -> { // 线程1 对主内存的变化是不知道的 while (num == 0) { } }).start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } num = 1; System.out.println(num); } }
- 不保证原子性
原子性:不可分割
线程A在执行任务的时候,不能被打扰的,也不能被分割。要么同时成功,要么同时失败。
代码如下:
// 不保证原子性 public class VDemo { // volatile 不保证原子性 private volatile static int num = 0; public static void add() { num ++; // 不是原子性操作 } public static void main(String[] args) { // 理论上num结果为20000 for (int i = 0; i < 20; i ++ ) { new Thread(() -> { for (int j = 0; j < 1000; j ++ ) { add(); } }).start(); } while (Thread.activeCount() > 2) { Thread.yield(); } System.out.println(Thread.currentThread().getName() + " " + num); } }
如果不加lock和synchronized,怎么样保证原子性?
原子类为什么这么高级?
public class VDemo02 { // volatile 不保证原子性 // 原子类的 private volatile static AtomicInteger num = new AtomicInteger(); public static void add() { //num ++; // 不是原子性操作 num.getAndIncrement(); // Atomicnteger + 1 方法 } public static void main(String[] args) { // 理论上num结果为20000 for (int i = 0; i < 20; i ++ ) { new Thread(() -> { for (int j = 0; j < 1000; j ++ ) { add(); } }).start(); } while (Thread.activeCount() > 2) { Thread.yield(); } System.out.println(Thread.currentThread().getName() + " " + num); } }
这些类的底层都直接和操作系统挂钩!在内存中修改值 ! Unsafe
类是一个很特殊的存在!
- 指令重排
什么是指令重排: 你写的程序,计算机并不会按照你写的那样去执行的。
源代码 -> 编译器优化的重排 -> 指令并行也可能会重排 -> 内存系统也会重排 -> 执行。
处理器在进行指令重排的时候会考虑一个问题,考虑:数据之间的依赖性!
int x = 1; //1
int y = 2; //2
x = x + 5; //3
y = x * x; //4
我们所期望的:1234 但是可能执行的时候会变成 2134 1324
可不可能是 4123!
可能造成影响的结果: a b x y 这四个值默认都是0;
线程A | 线程B |
---|---|
x = a | y = b |
b = 1 | a = 2 |
正常的结果:x = 0; y = 0.
线程A | 线程B |
---|---|
b = 1 | a = 2 |
x = a | y = b |
指令重排导致的结果: x = 2; y = 1.
volatile
可以避免指令重排:
内存屏障,CPU指令,作用:
1、保证特定的操作的执行顺序。
2、可以保证某些变量的内存可见性(利用这些特性volatile实现了可见性)。
Volatile是可以保证可见性。不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生!
volatile
与 synchronized
的区别:
volatile
是轻量级的 synchronized
,因为它不会引起上下文的切换和调亮,所以 volatile
性能更好。volatile
只能修饰变量, synchronized
可以修饰方法,静态方法,代码块。volatile
对任意单个变量的读 / 写具有原子性,但是类似于 i++
这种复合操作不具有原子性。而锁的互斥执行的特性可以确保整个临界区代码执行具有原子性。volatile
不会发生阻塞,而 synchronized
会发生阻塞。volatile
是变量在多线程之间的可见性,synchronized
是多线程之间访问资源的同步性。
饿汉式
//饿汉式单例 public class Hungry { //可能会浪费空间 private byte[] data1 = new byte[1024 * 1024]; private byte[] data2 = new byte[1024 * 1024]; private byte[] data3 = new byte[1024 * 1024]; private byte[] data4 = new byte[1024 * 1024]; private Hungry() { } private final static Hungry HUNGRY = new Hungry(); public static Hungry getInstance() { return HUNGRY; } }
DCL懒汉式
//懒汉式单例模式 public class LazyMan { private static boolean sign = false; private LazyMan() { synchronized (LazyMan.class) { if (sign == false) { sign = true; } else { throw new RuntimeException("不要试图使用反射破坏异常"); } } System.out.println(Thread.currentThread().getName() + "OK"); } private static LazyMan lazyMan; //双重检测锁模式的懒汉式单例 简称DCL懒汉式 public static LazyMan getInstance() { if (lazyMan == null) { synchronized (LazyMan.class) { if (lazyMan == null) { lazyMan = new LazyMan(); //不是一个原子性操作 } } } return lazyMan; } } /** * 1.分配内存空间 * 2.执行构造方法,初始化对象 * 3.把这个对象指向这个空间 * <p> * 期望 123 * 执行 132 A * B //此时lazyMan还没有完成构造 */
静态内部类
//静态内部类
public class Holder {
private Holder() {
}
public static Holder getInstance() {
return InnerClass.HOLDER;
}
public static class InnerClass {
private static final Holder HOLDER = new Holder();
}
}
单例不安全,因为有反射
枚举
//enum 是一个什么?本身也是一个Class类 public enum EnumSingle { INSTANCE; public EnumSingle getInstance() { return INSTANCE; } } class Test { public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { EnumSingle instance1 = EnumSingle.INSTANCE; Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class, int.class); declaredConstructor.setAccessible(true); EnumSingle instance2 = declaredConstructor.newInstance(); // NoSuchMethodException: com.kuang.single.EnumSingle.<init>() System.out.println(instance1); System.out.println(instance2); } }
枚举类型的最终反编译源:
public final class EnumSingle extends Enum { public static EnumSingle[] values() { return (EnumSingle[])$VALUES.clone(); } public static EnumSingle valueOf(String name) { return (EnumSingle)Enum.valueOf(com/kuang/single/EnumSingle, name); } private EnumSingle(String s, int i) { super(s, i); } public EnumSingle getInstance() { return INSTANCE; } public static final EnumSingle INSTANCE; private static final EnumSingle $VALUES[]; static { INSTANCE = new EnumSingle("INSTANCE", 0); $VALUES = (new EnumSingle[] { INSTANCE }); } }
什么是CAS?
CAS的全称为Compare-And-Swap
,直译就是对比交换。是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值,经过调查发现,其实现方式是基于硬件平台的汇编指令,就是说CAS是靠硬件实现的,JVM只是封装了汇编调用,那些AtomicInteger类便是使用了这些封装后的接口。
简单解释:CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先比较下在旧值有没有发生变化,如果没有发生变化,才交换成新值,发生了变化则不交换。
CAS: 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环!
缺点:
1、循环会耗时
2、一次性只能保证一个共享变量的原子性
3、ABA问题
CAS:ABA问题(狸猫换太子)
测试代码 如下:
public class CASDemo { // CAS compareAndSet: 比较并交换 public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(2020); // 期望,更新 //public final boolean compareAndSet(int expectedValue, int newValue) // 如果我期望的值达到了,那么就更新,否则不更新 // ==============捣乱的线程====== System.out.println(atomicInteger.compareAndSet(2020, 2021)); System.out.println(atomicInteger.get()); System.out.println(atomicInteger.compareAndSet(2021, 2020)); System.out.println(atomicInteger.get()); // ==============期望的线程====== System.out.println(atomicInteger.compareAndSet(2020, 6666)); System.out.println(atomicInteger.get()); } }
解决ABA问题,引入原子引用!对应的思想是乐观锁。
带版本号的操作!
注意:
Integer使用了对象缓存机制,默认范围是-128~-127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为valueOf使用缓存,而new一定会创建新的对象分配新的内存空间。
测试代码如下:
public class CASDemo { //CAS compareAndSet:比较并交换 public static void main(String[] args) { // AtomicStampedReference 注意:如果泛型是一个包装类,注意对象的引用问题 // 正常在业务操作,这里比较的都是一个对象 AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1); new Thread(() -> { int stamp = atomicStampedReference.getStamp(); //获得版本号 System.out.println("a1 =>" + stamp); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(atomicStampedReference.compareAndSet(1, 2, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1)); System.out.println("a2 =>" + atomicStampedReference.getStamp()); System.out.println(atomicStampedReference.compareAndSet(2, 1, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1)); System.out.println("a3 =>" + atomicStampedReference.getStamp()); }, "a").start(); //乐观锁的原理相同 new Thread(() -> { int stamp = atomicStampedReference.getStamp(); //获得版本号 System.out.println("b1 =>" + stamp); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(atomicStampedReference.compareAndSet(1, 6, stamp, stamp + 1)); System.out.println("b2 =>" + atomicStampedReference.getStamp()); }, "b").start(); } }
运行结果如下:
公平锁:非常公平,不能插队,必须先来后到!
非公平锁:非常不公平,可以插队(默认都是非公平的)
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可重入锁(递归锁)
synchronized 版
代码如下:
//Synchronized public class Demo01 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(() -> { phone.sms(); }, "A").start(); new Thread(() -> { phone.sms(); }, "B").start(); } } class Phone { public synchronized void sms() { System.out.println(Thread.currentThread().getName() + " sms"); call(); //这里也有锁 } public synchronized void call() { System.out.println(Thread.currentThread().getName() + " Call"); } }
运行结果如下:
lock版
代码如下:
// lock public class Demo02 { public static void main(String[] args) { Phone2 phone = new Phone2(); new Thread(() -> { phone.sms(); }, "A").start(); new Thread(() -> { phone.sms(); }, "B").start(); } } class Phone2 { Lock lock = new ReentrantLock(); public void sms() { try { lock.lock(); //细节问题:lock.lock(); lock.unlock(); // lock 锁必须配对 ,否则就会死在里面 System.out.println(Thread.currentThread().getName() + " sms"); call(); //这里也有锁 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void call() { try { lock.lock(); System.out.println(Thread.currentThread().getName() + " Call"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
运行结果如下:
spinlock
:
自定义一个锁测试:
//自旋锁 public class SpinlockDemo { //int 0 //Thread null AtomicReference<Thread> atomicReference = new AtomicReference<>(); //加锁 public void myLock() { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + "=> mylock"); //自旋锁 while (!atomicReference.compareAndSet(null, thread)) { } } //解锁 public void myUnLock() { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + "=> myUnlock"); atomicReference.compareAndSet(thread, null); } }
测试代码如下:
public class TestSpinLock { public static void main(String[] args) throws InterruptedException { /* ReentrantLock reentrantLock = new ReentrantLock(); reentrantLock.lock(); reentrantLock.unlock();*/ //底层使用的自旋锁CAS SpinlockDemo lock = new SpinlockDemo(); new Thread(() -> { lock.myLock(); try { TimeUnit.SECONDS.sleep(5); } catch (Exception e) { e.printStackTrace(); } finally { lock.myUnLock(); } }, "T1").start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { lock.myLock(); try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } finally { lock.myUnLock(); } }, "T2").start(); } }
运行结果如下:
什么是死锁?
死锁测试:
public class DeadLockDemo { public static void main(String[] args) { String lockA = "lockA"; String lockB = "lockB"; new Thread(new MyThread(lockA, lockB),"T1").start(); new Thread(new MyThread(lockB, lockA),"T2").start(); } } class MyThread implements Runnable { private String lockA; private String lockB; public MyThread(String lockA, String lockB) { this.lockA = lockA; this.lockB = lockB; } @Override public void run() { synchronized (lockA) { System.out.println(Thread.currentThread().getName() + "lock:" + lockA + "get " + lockB); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lockB) { System.out.println(Thread.currentThread().getName() + "lock:" + lockB + "get " + lockA); } } } }
解决问题
1、使用 jps -l
定位进程号
2、使用 jstack 进程号
找到死锁问题
排查问题:
1、日志
2、堆栈信息
创作不易,如果有帮助到你,请给题解点个赞和收藏,让更多的人看到!!!
关注博主不迷路,内容持续更新中。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。