赞
踩
14、ForkJoin
ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。大数据量!
大数据:Map Reduce (把大任务拆分为小任务)
ForkJoin 特点:工作窃取
这个里面维护的都是双端队列
** * 求和计算的任务! * 3000 6000(ForkJoin) 9000(Stream并行流) * // 如何使用 forkjoin * // 1、forkjoinPool 通过它来执行 * // 2、计算任务 forkjoinPool.execute(ForkJoinTask task) * // 3. 计算类要继承 ForkJoinTask */ public class ForkJoinDemo extends RecursiveTask<Long> { private Long start; // 1 private Long end; // 1990900000 // 临界值 private Long temp = 10000L; public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } // 计算方法 @Override protected Long compute() { if ((end-start)<temp){ Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; } return sum; }else { // forkjoin 递归 long middle = (start + end) / 2; // 中间值 ForkJoinDemo task1 = new ForkJoinDemo(start, middle); task1.fork(); // 拆分任务,把任务压入线程队列 ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end); task2.fork(); // 拆分任务,把任务压入线程队列 return task1.join() + task2.join(); } } }
/** * 同一个任务,别人效率高你几十倍! */ public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { // test1(); // 12224 // test2(); // 10038 // test3(); // 153 } // 普通程序员 public static void test1(){ Long sum = 0L; long start = System.currentTimeMillis(); for (Long i = 1L; i <= 10_0000_0000; i++) { sum += i; } long end = System.currentTimeMillis(); System.out.println("sum="+sum+" 时间:"+(end-start)); } // 会使用ForkJoin public static void test2() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任务 Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum="+sum+" 时间:"+(end-start)); } public static void test3(){ long start = System.currentTimeMillis(); // Stream并行流 () (] long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum); long end = System.currentTimeMillis(); System.out.println("sum="+"时间:"+(end-start)); } }
15、异步回调
Future 设计的初衷: 对将来的某个事件的结果进行建模
/** * 异步调用: CompletableFuture * // 异步执行 * // 成功回调 * // 失败回调 */ public class Demo01 { public static void main(String[] args) throws ExecutionException, InterruptedException { // 没有返回值的 runAsync 异步回调 // CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{ // try { // TimeUnit.SECONDS.sleep(2); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println(Thread.currentThread().getName()+"runAsync=>Void"); // }); // // System.out.println("1111"); // // completableFuture.get(); // 获取阻塞执行结果 // 有返回值的 supplyAsync 异步回调 // ajax,成功和失败的回调 // 返回的是错误信息; CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer"); int i = 10/0; return 1024; }); System.out.println(completableFuture.whenComplete((t, u) -> { System.out.println("t=>" + t); // 正常的返回结果 System.out.println("u=>" + u); // 错误信息:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero }).exceptionally((e) -> { System.out.println(e.getMessage()); return 233; // 可以获取到错误的返回结果 }).get()); /** * succee Code 200 * error Code 404 500 */ } }
16、JMM
Volatile 是 Java 虚拟机提供轻量级的同步机制
1、保证可见性
2、不保证原子性
3、禁止指令重排
什么是JMM
JMM : Java内存模型,不存在的东西,概念!约定!
关于JMM的一些同步的约定:
1、线程解锁前,必须把共享变量立刻刷回主存。
2、线程加锁前,必须读取主存中的最新值到工作内存中!
3、加锁和解锁是同一把锁
线程 工作内存 、主内存
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类
型的变量来说,load、store、read和write操作在某些平台上允许例外)
lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量
才可以被其他线程锁定
read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便
随后的load动作使用
load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机
遇到一个需要使用到变量的值,就会使用到这个指令
assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变
量副本中
store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,
以便后续的write使用write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内
存的变量中
JMM对这八种指令的使用,制定了如下规则:
不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须
write
不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
不允许一个线程将没有assign的数据从工作内存同步回主内存
一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量
实施use、store操作之前,必须经过assign和load操作
一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解
锁
如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,
必须重新load或assign操作初始化变量的值
如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
对一个变量进行unlock操作之前,必须把此变量同步回主内存
问题: 程序不知道主内存的值已经被修改过了
17、Volatile
public class JMMDemo { // 不加 volatile 程序就会死循环! // 加 volatile 可以保证可见性 private volatile static int num = 0; public static void main(String[] args) { // main new Thread(()->{ // 线程 1 对主内存的变化不知道的 while (num==0){ } }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } num = 1; System.out.println(num); } }
2、不保证原子性
原子性 : 不可分割
线程A在执行任务的时候,不能被打扰的,也不能被分割。要么同时成功,要么同时失败。
// volatile 不保证原子性 public class VDemo02 { // volatile 不保证原子性 // 原子类的 Integer private volatile static AtomicInteger num = new AtomicInteger(); public static void add(){ // num++; // 不是一个原子性操作 num.getAndIncrement(); // AtomicInteger + 1 方法, CAS } public static void main(String[] args) { //理论上num结果应该为 2 万 for (int i = 1; i <= 20; i++) { new Thread(()->{ for (int j = 0; j < 1000 ; j++) { add(); } }).start(); } while (Thread.activeCount()>2){ // main gc Thread.yield(); } System.out.println(Thread.currentThread().getName() + " " + num); } }
这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的存在!
指令重排
什么是 指令重排:你写的程序,计算机并不是按照你写的那样去执行的。
源代码–>编译器优化的重排–> 指令并行也可能会重排–> 内存系统也会重排—> 执行
处理器在进行指令重排的时候,考虑:数据之间的依赖性!
volatile可以避免指令重排:
内存屏障。CPU指令。作用:
1、保证特定的操作的执行顺序!
2、可以保证某些变量的内存可见性 (利用这些特性volatile实现了可见性)
Volatile 是可以保持 可见性。不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。