当前位置:   article > 正文

Java并发编程(知识点梳理)_java 并发编程

java 并发编程

前言:Java并发编程的一些知识点梳理

一、并发编程基础

1.什么是线程

  • 线程是CPU分配的基本单位 也是Java进程运行的最小单位

2.什么是并发

  • 多个线程访问少量的CUP,CUP进行上下文切换运行

3.什么是并行

  • 多个线程同时在不同的CUP同时执行

4.线程创建与运行

  • 继承Thread

    • start()执行线程

  • 实现Runnable

    • RunableTask runableTask = new RunableTask(); new Thread(runableTask).start();执行线程

  • 实现callable

    • FutureTask<> futureTask = new FutureTask<>(new 实现了callable的类); new Thread(futureTask).start();执行线程

  • start 运行线程

    • 重复调用会抛异常

5.Future接口

  • 功能

    • 能够取消异步执行中的任务

    • 判断异步任务是否执行完成

    • 获取异步执行完后的结果

6.RunableFuture接口

  • 继承了Runnable接口和Future接口

  • 可以作为Thread线程实例的target实例

  • 可以异步获取执行结果

7.FutureTask

  • 继承了RunableFuture接口,实现接口的实例

  • join()

    • 实例方法,需要thread.join()调用,阻塞当前线程,循环等待,直到thread类执行完毕或超时,isAlive()方法判断

8.JUC线程池

  • java.util.concurrent

8.1线程池任务调度流程
  • 工作线程<核心线程

    • 优先创建任务线程,而不是从线程队列获取一个空闲队列

  • 工作线程>核心线程 阻塞队列没满

    • 新接收的线程任务会加到阻塞队列

  • 当完成一个任务时,优先从阻塞队列取任务执行

  • 工作线程>核心线程 阻塞队列也满 工作线程<最大线程数maxinumPoolSize

    • 新接收的任务会创建一个新的任务(非核心线程)立即执行

  • 工作线程>核心线程 阻塞队列也满 工作线程>最大线程数maxinumPoolSize

    • 执行拒绝策略

8.2线程池拒绝策略
  • 线程被拒的情况

    • 工作队列已满,且maxinumPoolSize已满

    • 线程池已关闭

  • 拒绝后会调用RejectedExceptionHandler实例的rejectedException方法

  • AbortPolicy 拒绝策略

    • 线程池默认策略

    • 拒绝后抛出RejectedException异常

  • DiscardPolicy 抛弃策略

    • 直接抛弃任务,不会抛出异常

  • DiscardOldestPolicy 抛弃最老任务策略

    • 移除队头最老的元素

  • CallerRunsPolicy 调用者执行策略

    • 拒绝后直接执行该任务,不会使用线程池的任务

  • 自定义策略

    • 实现RejectedExceptionHandler,重写rejectedException方法 将实例类传入ThreadPoolExecutor的构造方法

8.3Executor
  • 提供execute()接口执行已提交的Runnable实例

  • ExecutorService

    • 继承Executor

    • 提供异步任务接收

      • submit(CallableTask<T> task) 提交异步任务

      • InvokeAll(Collection<? Extends CallableTask<T>> tasks) 批量提交

    • execute和submit的区别

      • 接收参数不同

        • execute只能接收Runnable接口的实例

        • submit可以接收Runnable和Callable两种实例

        • Callable是JDK1.5加入,作为Runnable的一种补充,允许有返回值,抛出异常

      • submit提交任务后有返回值,execute没有

      • submit方便异常处理

  • AbstractExecutorService

    • 是一个抽象类,实现了ExecutorService 为接口提供默认的实现

  • ThreadPoolExecutor

    • 线程池实现类

    • 继承AbstractExecutorService

    • JUC核心类,提供指定可用的线程数量、线程统计、管理、监控

  • ScheduledExecutorService

    • 是一个接口,继承ExecutorService

    • 提供延时、周期性任务调度

  • ScheduledExecutorExecutor

    • 继承ThreadPoolExecutor

    • 提供了ScheduledExecutorService延时、周期任务的具体实现方法

    8.4Executors
    • 静态工厂类

    • 提供静态方法返回ExecutorService、ScheduledExecutorService等线程池接口方法的实例

    • newSingleThreadExecutor

      • 单线程池,只有一个工作线程

        • 核心线程和最大线程数为 1

      • 保证线程按照FIFO

      • 潜在的问题

        • 超过后会进入无界阻塞队列,如果提交速度远大于处理速度,无限添加容易耗尽服务器资源OOM

    • newFixedThreadPool

      • 固定数量线程池

        • newFixedThreadPool(int nThreads) 核心线程和最大线程数为nThreads

      • 在未达到最大线程数量时,每接收一个线程,就会创建一个实例

      • 潜在的问题

        • 超过后会进入无界阻塞队列,如果提交速度远大于处理速度,无限添加容易耗尽服务器资源OOM

    • newCachedThreadPool

      • 核心线程数:0 最大线程数:Integer.MAX_VAVLUE 线程最大空闲时长:60L 时间单位:TimeUnit.MILLISECONDS 任务队列:SynchronousQueue<Runnable>()

      • 可缓存线程池,可灵活回收空闲的线程

      • 大小没限制,大量添加也可能耗尽服务器资源

      • 潜在的问题

        • 可以无限添加工作线程,容易耗尽服务器资源OOM

    • newScheduledThreadPool

      • 可调度线程池,功能和ScheduledExecutorService类似

    8.5调度器的钩子方法
    • 空方法,提供给调用者介入线程任务执行时机重写业务逻辑

      • ExecutorService pool = new ThreadPoolExecutor(coreSize, maxinumPoolSize, periods, TimeUnit.SECONDS, new LinkedBlockingQueue(2)){ @Override protected void beforeExecute(){ } };

    • beforeExecute

      • 任务执行前

    • afterExecute

      • 任务执行后

    • terminated

      • 线程池终止时

9.线程通知与等待

  • Java内置锁对象

    • wait()

      • 当一个线程调用一个共享变量的wait()方法时,该调用线程会被阻塞挂起

      • 会让出锁资源

    • wait(timeout)

      • 同样是阻塞,超时后自动唤起

    • notify

      • 唤起线程

    • notifyAll

      • 唤起所有线程

      • 只会唤起之前调用了wait的线程

  • Condition

    • java.until.concurrent.locks包的线程通信类,功能类似于Java内置对象的通知与等待

    • await()

    • signal()

    • signalAll()

10.中止等待线程的join方法

  • 当前线程必须等待插队线程执行完才能往下执行

11.让线程睡眠的sleep方法

  • sleep(timeout)

    • 阻塞线程,timeout后唤起

    • 不会让出锁资源

12.让出CPU执行权的yield方法

  • 让出当前线程的使用权

  • 使线程处理就绪状态,等待下一轮CUP调度

13.线程中断

  • interrupt 中断

    • 中断线程的休眠,抛出InterruptedException

    • 应用-多个并行线程执行,其中一个提前执行完,其他线程没必要再执行,此时可以中断其他线程

    • 正常运行的线程需要抛出异常,否则会继续执行完 throw new InterruptedException();

  • isInterrupted 是否中断(中断标志)

    • Thread.currenThread().isInterrupted();

  • interrupted

    • 获取当前调用线程的中断标志而不是调用interrupted()方法实例对象的中断标志

    • 如果发现当前线程被中断,则会清除中断标志

  • stop

    • 强制中断线程

    • 不会跑InterruptException的catch 会跑ThreadDeath的catch

    • 过时方法,不建议使用。类似直接切断电源,会造成,不释放线程持有的锁等不可预料的情况

14.线程死锁

  • 多个线程争夺锁资源相互无限等待

  • 产生死锁的条件

    • 互斥条件

    • 请求并持有条件

    • 不可剥夺条件

    • 环路等待条件

15.守护线程与用户线程

  • 区别

    • 守护线程会随着JVM的结束而关闭

    • JVM的结束不影响用户线程

  • 守护线程需要在调用start()方法前设置 运行后再设置会抛异常

  • 使用守护线程创建出来的线程默认也是守护线程,除非手动设置为用户线程

16.ThreadLocal 本地线程参数存储

  • Thread内部有threadLocals成员变量(ThreadLocalMap)

  • 主线程定义全局变量对象ThreadLocal,在子线程中get/set 底层是调用子线程的ThreadLocalMap

    • get

      • 获取threadLocal对象

      • 懒加载,第一次获取时,调用iniitValue初始化 可以重写该方法,返回默认值

  • 变量存储在主线程中,而且是弱引用,GC回收时会删除引用,但是不会删除变量,因此不使用时尽量调用remove移除

  • 应用

    • 线程隔离

      • 为每个线程绑定用户会话信息、数据库连接Session、HTTP请求等

      • 解决参数的线程隔离,不需要使用sychronized等锁机制,影响程序效率

    • 跨函数传递参数

      • 同线程内共享参数,不需要频繁传递,降低方法之间的耦合度

  • ThreadLocalMap是ThreadLocal的内部实现,应该使用ThreadLocal设置本地变量,而不是直接使用ThreadLocalMap

17.InheritableThreadLocal

  • 继承ThreadLocal类

  • inheritableThreadLocals替代threadLocals 通过重写getMap、createMap

  • 应用

    • 使子线程可以继承父线程的变量

    • 存用户登录信息

    • 中间件需要把统一的id追踪的整个调用链路记录下来

  • 原理

    • 创建子线程的时候,子线程引用父线程的inheritableThreadLocals的值创建一个新的对象

      • 如果是不可变类型,比如String,父线程更新值,子线程获取不到

      • 如果是引用类型,则可以获取到对象值的变化

18.线程安全问题

  • 多个线程同时修改同一变量,线程内部L1级缓存不共享,可能会导致变量值不准确

  • synchronized

    • 内置锁/排它锁,同一时间只能有一个线程访问

    • 加锁时清空本地内存,解锁时将本地内存刷新到共享内存

    • 其他线程未获取到锁会进入阻塞状态

    • 解锁条件

      • 同步代码块执行完

      • 获得锁的线程抛异常

      • 获得锁的线程调用wait()

  • volatile

    • 线程修改值时,会将值刷新回主内存,并且其他线程会去读取该值

    • 线程读取值时,会直接从主内存中读取

    • 可以替代synchronized,解决线程上下文的切换带来的开销

    • 注意:多线程自增,volatile不能保证原子性

  • 原子类

    • AtomicLong

    • AtomicIntegerArray

    • .....

    • AtomicRefrence

      • 保障引用类型更新的原子性

      • AtomicRefrence<User> reference = new AtomicRefrence();

      • 不能保障对象里的字段值更新的原子性

    • AtomicLongFieldUpdater

      • 保障引用类型里的Long类型更新的原子性

      • AtomicLongFieldUpdater<User> updater = AtomicLongFieldUpdater.newUpdater(User.class, “字段名”);

    • AtomicStampedReference

      • 带印戳或标记的引用原子类,用于解决ABA问题

  • Unsafe

    • 位于sun.misc包的一个类,提供一些用于执行低级别、不安全的底层操作,如直接访问内存、自主管理内存资源等

    • 大量的方法都是native方法,基于C++实现

    • final修饰的类,且构造方法为私有,只能通过反射的方式获取

  • Java指令重排序

    • Java内存模型允许编译器和处理器对指令重排序以提高运行性能

    • 只会对不存在数据依赖性的指令重排序

    • 指令的先后顺序可能对其他线程的结果有影响

  • 伪共享/伪共存

    • 当一个线程修改了CPU内核高速缓存的1级缓存,其他线程CPU内核的1级缓存就会失效,需要重新从2级缓存或者内存中读取,这就导致了CUP内核的缓存失效了,这就是伪共享

    • CUP的高速缓存

      • 解决CUP和内存的运行速度差

      • 一级或多级,以行存储,每行的大小为2的幂次数字节

      • CUP访问变量时,如果缓存有直接从缓存读取,否则就会从内存读取

      • 缓存行以块的形势保存,可能会把多个变量存放到一个Cache行中

      • 缓存行中的变量地址是连续的

      • 当缓存满了时会根据一定淘汰算法替换缓存行

    • 乐观锁

      • 使用业务字段实现控制锁,类似CAS

        • update xxx ,version = version+1 where version = #{version}

      • 只有在提交修改的时候控制数据修改成功与否

    • 悲观锁

      • 使用数据库提供的锁机制

      • 对整个过程加锁

    • 公平锁

      • 线程获取锁的顺序是按照线程请求锁的时间早晚来决定的

        • ReentrantLock pairLock = new ReentrantLock(true)

    • 非公平锁

      • 在运行时闯入,也就是先来不一定先得

        • ReentrantLock pairLock = new ReentrantLock(false)

      • 在没有公平性需求的前提下尽量使用非公平锁,因为公平锁会带来性能开销

    • 独占锁

      • 只能有一个线程获取

      • ReentrantLock 可重入锁

        • 可重入:锁能够支持对一个线程对资源的重复加锁 一个线程可多次进入同一个锁所同步的临界区代码块

      • synchronized

    • 共享锁

      • 可以多个线程持有

        • ReadWriteLock读写锁

    • 可重入锁

      • 当一个线程再次获取它自己已经获取的锁时,如果不被阻塞,那么我们说该锁是可重入的

        • synchronized

      • 可重入锁的原理是在锁内部维护一个线程标示,用来标示该锁目前被哪个线程占用,然后关联一个计数器。一开始计数器值为0,说明该锁没有被任何线程占用。当一个线程获取了该锁时,计数器的值会变成1,这时其他线程再来获取该锁时会发现锁的所有者不是自己而被阻塞挂起

    • 自旋锁

      • 当线程获取不到锁资源时会被挂起,切换到用户状态 当线程获取到锁资源时又会唤起,进入内核状态

      • 自旋锁可以在获取不到锁资源的时候,不放弃CUP使用权,尝试获取10次,最后失败才会被挂起

    • 锁的使用

      • lock

        • 阻塞锁,获取不到锁会一直等待

      • tryLock

        • 非阻塞锁,获取不到锁会立即返回

      • tryLock(long time, TimeUnit unit)

        • 非阻塞锁,尝试一定时间后返回

      • 注意事项

        • 释放锁需要在finally模块 否则报错释放不了锁

        • 抢占锁需要在try之前 Lock没有声明抛出异常 没有抢占到锁,finally释放锁可能会抛异常

        • 在Lock和try之间尽量不要插入代码 否则抛异常无法走finally释放锁

二、Java并发编程高级

1.ThreadLocalRandom 随机数生成器

  • 替代Random,解决多线程的随机数可能存在生成重复,Random底层 nextInt方法依赖上一个随机数的种子,这个种子在多线程下可能获取到一样的值

  • ThreadLocalRandom random = ThreadLocalRandom.current(); random.nextInt(5);

  • 将种子存在当前线程的threadLocalRandomSeed,使用该值生成随机数

2.AtomicLong

  • 解决问题

    • 相比使用synchronized关键字等,但是这些都是阻塞算法,对性能有一定损耗

  • 原理

    • 使用CAS非阻塞算法,性能更好

3.LongAdder

  • 解决问题

    • AtomicLong无限循环不断进行自旋尝试CAS的操作,会消耗CPU资源

    • 速度比AtomicLong快,在CAS竞争大的情况下 例如上亿次的循环累加

  • 原理

    • 将一个变量递增分解成多个cell,多个线程争夺多个cell递增,最后再将结果合并。减少了并发争夺资源,失败不是CASA自旋,会尝试获取其他的cell

    • 底层是cell数组,数组使用unsafe的CAS

    • 数组元素Cell使用@sun.misc.Contended注解进行修饰,这避免了cells数组内多个原子变量被放入同一个缓存行,避免了伪共享,提高了性能

  • static LongAdder longAdder = new LongAdder(); longAdder.increment();

  • 内部成员

    • Cell[] cells

      • 存放Cell的哈希表

      • 大小为2的幂

      • 在有竞争的情况下,新增会累加到cells[i]对应的位置

    • long base

      • 基础值

      • 在没有竞争时会更新这个值

      • 在cells初始化时,cells不可用,也会尝试通过CAS操作累加到base

    • Int cellsBusy

      • 状态标记、是否正在处于创建、扩容阶段 0,1

      • 当为1的时候,不能进行新cells元素设置的操作

  • 方法

    • increment

      • 自增

    • decrement

      • 自减

    • add(long x)

      • 累加

    • casBase(long cmp, long Val)

      • 设置成员变量base为base+x

    • longAccumulate()

      • 实现不同线程更新各自Cell中的值

4.LongAccumulator

  • 提供了更加强大的功能,可以让用户自定义累加规则

  • //指定初始值为10 static LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() { @Override public long applyAsLong(long left, long right) { // return left + right; //自定义值操作 return left - right; } }, 10); longAccumulator.accumulate(1L);//加1

5.CopyOnWriteArrayList

  • 使用ReentrantLock保证不被其他线程同时修改

  • CopyOnWriteArrayList(Collection<? extends E> c)

  • 添加元素有原子性,遍历元素没有

6.parkNanos(long nanos)

  • 线程未持有许可证,线程会阻塞挂起,并在nanos秒后返回

  • 线程持有许可证,会马上返回

7.LockSupport

  • 原理

    • unpark/park用于唤起和挂起线程

    • LockSupport类与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport类的方法的线程是不持有许可证的

  • park() park(Object blocker)

    • 线程未持有许可证,线程会被阻塞

    • 线程持有许可证,会马上返回

    • 阻塞后可通过unpark或者interrupt唤起线程

    • 使用诊断工具可以观察线程被阻塞的原因,诊断工具是通过调用getBlocker(Thread)方法来获取blocker对象的

    • park和Thread.sleep()的区别

      • sleep不能从外部唤醒,只能自己醒来,park可以

      • sleep需要捕获InterruptedException,park不需要

      • sleep中断时会抛出InterruptedException异常,park只会设置中断标志

    • park和Object.wait()的区别

      • wait需要在sychronized块中执行,park可以在任意地方执行

      • wait等待被中断时,会抛出中断异常,park不会

  • unpark()

    • 线程未持有许可证,调用会让线程持有许可

    • 线程持有许可证,调用会唤起线程

  • parkUntil(Object blocker, long deadline)

    • 线程未持有许可证,线程会阻塞挂起,并在deadline这个时间点返回

    • 1970年-deadline的时间戳

    • 线程持有许可证,会马上返回

8.AQS AbstractQueuedSynchronizer

抽象同步队列

  • AQS作用

    • 减少CAS锁自旋带来的性能消耗

    • 避免CAS在SMP架构的CPU上的总线风暴

  • 实现同步器的基础组件,并发包中锁的底层

  • FIFO的双向队列

  • state 记录线程状态信息

    • 通过getState、setState、compareAndSetState函数修改其值,表示是否获取到锁

    • 独占锁

      • acquire(int arg) 获取

        • tryAcquire(int arg)

          • 成功,修改state,返回

          • 失败,将当前线程封装为类型为Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部-enq(final Node node) 并调用LockSupport.park(this)方法挂起当前线程

      • acquireInterruptibly(int arg)获取

        • 功能acquire一样,但会抛出中断异常 acquire不会

      • release(int arg)释放

        • tryRelease

          • 成功,修改state LockSupport.unpark(thread)

          • 失败,放入AQS队列并挂起

    • 共享锁

      • acquireShared 获取

      • acquireSharedInterruptibly 获取

      • releaseShared 释放

  • AQS没有实现acquire、release,需要子类实现,state的含义也需要子类自定义

  • ConditionObject 条件变量

    • 和sychronized的锁对象作用类似

    • await

      • 挂起一个线程

    • signal

      • 唤起一个线程

    • signalAll

      • 唤起所有

    • ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); lock.lock(); condition.await(); lock.unlock();

  • ReentrantLock 重入锁

    • 有公平和非公平的实现方式,默认是非公平 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }

    • 独占锁

    • state

      • 0空闲

      • 大于1 被占用

    • Sync 继承AbstractQueuedSynchronizer

      • lock

        • sync.acquire(1)

        • ReentrantLock重写tryAcquire(int acquires)

      • unlock

        • 如果当前线程没有持有锁会抛出IllegalMonitorStateException异常

      • lockInterruptibly

        • 作用和lock一致

        • 如果当前线程被中断,会抛出异常

      • tryLock

      • tryLock(long timeout, TimeUnit unit)

        • 超时获取锁

      • 内部有NonfairSync和FairSync都会实现上面的锁方法

  • ReentrantReadWriteLock 读写锁

    • 可以多个线程同时读,但是不能同时写 有一个线程在写时,不能读 同一个线程可以同时拥有读写锁,但要先获取写锁 unlock时,读写锁同时拥有需要一起释放

    • 有公平和非公平的实现方式,默认是非公平

    • 实现ReadWriteLock

      • ReadLock

      • WriteLock

    • Sync 继承AbstractQueuedSynchronizer

      • lock

        • sync.acquire(1)

        • ReentrantReadWriteLock重写tryAcquire(int acquires)

    • state

      • 高16位表示读状态,也就是获取到读锁的次数

      • 使用低16位表示获取到写锁的线程的可重入次数

  • StampedLock

    • JDK8版本新增的一个锁,该锁提供了三种模式的读写控制

    • 获取锁/解锁成功会返回一个stamp变量

    • 锁模式 long stamp代表锁的状态

      • 写锁writeLock

        • 排它锁

        • 不可重入锁

          • 获取锁后释放锁前不能再调用会获取锁,会造成调用线程被阻塞

      • 悲观读锁readLock

        • 共享锁,在没有线程获取独占写锁的情况下,多个线程可以同时获取该锁

        • 如果已经有线程持有写锁,则其他线程请求获取该读锁会被阻塞

        • 不可重入锁

      • 乐观读锁tryOptimisticRead

        • 不使用CAS设置锁的状态

        • 如果线程没有获取读锁,返回一个stamp版本信息 使用tryOptimisticRead返回stamp修改值

      • tryConvertToWriteLock(long stamp) 锁类型升级转换

        • 当前锁已经是写锁模式了

        • 当前锁处于读锁模式,并且没有其他线程是读锁模式

        • 当前处于乐观读模式,并且当前写锁可用

9.BlockingQueue

并发队列

  • ConcurrentLinkedQueue 线程安全的无界非阻塞队列

    • 底层数据结构使用单向链表

    • 入队和出队操作使用CAS来实现线程安全

    • add/offer 向队列尾部插入元素

    • poll 移除头部元素

    • remove 移除指定的元素

    • peek 取出头部元素

    • contains 是否包含

    • size 作用不大,CAS无锁算法多线程下数据可能不准确

  • LinkedBlockingQueue 用独占锁实现的阻塞队列

    • 使用ReentrantLock锁和Node保存数据

      • private final ReentrantLock takeLock = new ReentrantLock();

        • 执行take、 poll等操作

      • private final ReentrantLock putLock = new ReentrantLock();

        • 执行put、 offer等操作

    • Condition记录状态

      • private final Condition notEmpty = takeLock.newCondition();

        • 当执行取操作,列表为空时,挂起 notEmpty.await()

      • private final Condition notFull = putLock.newCondition();

        • 当执行存操作,队列满时,挂起 notFull.await()

    • capacity

      • 最大容量

    • count

      • 当前队列数量

    • offer 向队列尾部插入元素

    • put 向队尾插入元素

    • poll 移除头部元素

    • peek 取出头部元素但不移除

    • take取出头部元素移除

    • remove 移除指定的元素

    • size 获取当前队列数量

  • ArrayBlockingQueue 有界数组方式实现的阻塞队列

    • ReetrantLock 独占锁

    • items 数组存储

    • offer 尾部插入元素

    • put 向尾部插入元素

    • poll 移除头部第一个元素

    • take 移除头部元素

    • peek 获取头部元数但不移除

    • size 计算个数

    • 长度有限,默认是Integer最大值 超长再添加的话会报错

  • PriorityBlockingQueue 带优先级的无界阻塞队列

    • 底层原理

      • 每次出队返回优先级最高或最低的元素

      • 平衡二叉树实现,遍历元素不保证有序

      • 使用对象的compareTo方法提供比较规则

      • queue存放队列元素

      • size存放元素个数

      • allocationSplinLock 状态值,保证只有一个现场可以扩容队列,是个自旋锁

    • offer 插入一个元素,无界,总是返回true

    • tryGrow 扩容

      • 先释放锁,让其他线程可以出队入队

      • 使用CAS进行扩容

      • allocationSplinLock 保证只有一个线程进行扩容

      • 扩容失败的线程会yield让出线程

    • poll 获取队列内部根节点元素(根据优先级出队)

    • take 获取队列内部根节点元素

    • put 调用的是offer

    • size 队列元数个数

  • DelayQueue 无界阻塞延迟队列

    • 底层原理

      • 队列中的每个元素都有过期时间

      • 获取元素时,只有过期元素才会出队

      • 队列头元素时最快要出队的元素

      • 内部使用PriorityQueue存放数据

    • offer 插入元素,元素需要实现Delayed接口

    • take 移除队列过期元素,没有则等待

    • poll 移除队头过期元素,没有则返回null

    • size 计算队列元素,包含过期和没有过期的

10.ThreadPoolExecutor 线程池

  • 解决问题

    • 减少线程重复创建和销毁的开销

    • 限制创建个数,动态创建

    • 控制最大线程并发数

    • 提供定时执行、定频执行

    • 单线程

  • ctl 线程池状态

    • 高3位表示线程池状态 低29位记录线程数量

    • 默认是RUNNING状态,线程个数为0

    • 状态

      • RUNNING 接收新任务并且处理阻塞队列里的任务

      • SHUTDOWN 拒接新任务,但是处理阻塞队列里的任务

      • STOP 拒接新任务并且抛弃阻塞队列里的任务 同时会中断正在处理的任务

      • TIDYING 所有任务都执行完后当前线程池活动线程数为0,将调用terminated方法

      • TERMINATED 终止状态,调用terminated后的状态

    • 状态转换

      • RUNNING -> SHUTDOWN

        • 显示调用shutdown()

        • 隐式调用finalize()里的shutdown()

      • RUNNING/SHUTDOWN -> STOP

        • 显示调用shutdownNow()

      • SHUTDOWN -> TIDYING

        • 线程池和任务队列都为空时

      • STOP -> TIDYING

        • 当线程池为空时

      • TIDYING -> TERMINATED

        • 当terminated() hook方法执行完时

  • 线程池参数

    • corePoolSize 线程池核心线程数

    • workQueue 用于保存等待执行的任务的阻塞队列

      • ArrayBlockingQueue

      • LinkedBlockingQueue

      • SynchronousQueue

      • PriorityBlockingQueue

    • maximunPoolSize 线程池最大线程数量

    • ThreadFactory 创建线程工厂

    • RejectedExecutionHandler 饱和策略

      • 当队列安满并且线程个数达到maxinumPoolSize后的策略

      • AbortPolicy 抛出异常

      • CallerRunsPolicy 使用调用者所在线程来运行任务

      • DiscardOldestPolicy 调用poll丢弃一个任务,执行当前任务

      • DiscardPolicy 默认丢弃,不抛异常

    • keepAliveTime 存活时间

      • 如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间

    • TimeUint 存活时间的单位

  • 线程池类型

    • newFixedThreadPool

      • 定长线程池 Executors.newFixedThreadPool(nThreads); new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new inkedBlockingQueue<Runnable>());

      • nThreads最长不能超过Integer.MAX_VALUE

      • keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收

    • newSingleThreadExecutor

      • 核心线程个数和最大线程个数都为1的线程池

      • 阻塞队列长度为Integer.MAX_VALUE

      • keeyAliveTime=0

    • newCachedThreadPool

      • 初始线程个数为0,最多的线程数为Integer.MAX_VALUE

      • 阻塞队列为同步队列

      • keeyAliveTime=60,线程在空闲60秒后回收

      • 线程加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务

  • ReentrantLock mainLock

    • 保证新增Worker线程操作的原子性

  • Worker 具体承载任务的对象

    • 继承AQS和Runnable接口

    • state

      • 0 锁未获取

      • 1 锁已被获取

      • -1 创建时的默认状态

        • 防止worker线程创建被中断 当其他线程调用了线程池的shutdownNow时,如果Worker状态>=0则会中断该线程

  • DefaultThreadFactory 线程工厂

    • newThread 对线程的修饰

    • poolNumber 统计线程工厂的个数

    • threadNumber 记录每个线程创建了多少个线程

  • execute 提交任务command到线程池 生产消费模型

  • shutDown 关闭线程池

    • 调用后线程就不会再接收新任务

    • 工作队列的任务会继续执行

    • 该方法会立刻return,不等待队列执行任务

  • shutDownNow 立即关闭线程池

    • 调用后线程就不会再接收新任务

    • 丢弃工作队列的任务

    • 正在执行的任务会被中断

    • 该方法会立刻返回,不等待队列执行任务

    • 返回值是队列里被丢弃的任务列表List<Runnable>

  • awaitTermination

    • 等待线程池完成关闭

  • 优雅的关闭线程池

    • 执行shutDown方法拒绝接收新的线程,等待所有线程完成

    • 执行awaitTermination(long timeout, TimeUnit unit)指定等待超时时间,判断是否已关闭所有任务,线程池关闭完成。

    • 如果awaitTermination返回false,或者被中断,调用shutDownNow立即关闭

    • 如果awaitTermination超时,进入循环关闭,循环一定的次数,不断关闭线程池,直到线程池关闭或循环结束

11.ScheduledThreadPoolExecutor

  • 指定一定的延迟时间 或定时进行任务调度执行的线程池

  • 继承了ThreadPoolExecutor,并实现了ScheduledExecutorService接口

  • 线程池队列是DelayedWorkQueue

  • ScheduledFutureTask

    • 继承FutureTask,实现RunnableScheduledFuture

    • state 状态

      • NEW 初始状态

      • COMPLETING 执行中状态

      • NORMAL 正常运行结束状态

      • EXCEPTIONAL 运行中异常

      • CENCELLED 任务被取消

      • INTERRUPTING 任务正在被中断

      • INTERRUPTED 任务已被中断

    • period 任务的类型

      • 0

        • 任务是一次性的,执行完就退出

      • 负数

        • fixed-delay任务 固定延迟的定时可重复执行任务

      • 正数

        • fixed-rate任务 固定频率的定时重复执行任务

    • isPeriodic() 是否周期性的,使用 period != 0判断

    • schedule(Runnable command, long delay, TimeUnit unit)

      • 提交一个延迟执行的任务command 只执行一次

      • delay 延迟时长(执行周期)

      • TimeUnit 时间单位

    • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

      • 任务执行完后,在延迟固定时间再次执行(固定时间间隔执行)

      • initiaDelay 提交任务后,多久开始执行

      • delay 延迟时长(执行周期)

      • 中断循环执行条件

        • 抛出异常

        • 被取消了

        • 关闭线程池

    • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

      • 和scheduleWithFixedDelay功能类似

      • 执行规则为,时间为initDelday +n*period时启动任务(定时间点执行)

      • 如果当前任务还没有执行完,下一次要执行任务的时间到了,不会并发执行

      • 下次要执行的任务会延迟执行,要等到当前任务执行完毕后再执行

    • compareTo

      • 线程加入延时队列后,在内部建立或者调整堆时会使用该线程的compareTo方法,将快要过期的线程放到队首

12.线程同步原理

  • CountDownLatch

    • 应用场景

      • 主线程开启子线程,主线程需要执行完毕再进行汇总

      • 比join ()的优势

        • 不必等待子线程完全执行完,就可以唤起主线程

        • 在使用线程池的方式执行线程就没法使用join

    • 用法

      • static volatile CountDownLatch countDownLatch = new CountDownLatch(count); 主线程调用countDownLatch.await(); 子线程调用countDownLatch.countDown(); 直到count减为0,唤起主线程

    • 原理

      • 内部有一个计数器 构造器传参存储在AQS的volatile int state

      • 子线程调用countDown较少state

      • 主线程调用await(),线程被挂起

      • 当state减到0时唤醒主线程

    • await()

      • 线程阻塞,无返回值

      • 其他线程调用interrupt(),当前线程抛出异常

      • 放入AQS阻塞队列

    • await(long timeout, TimeUint unit)

      • 线程阻塞,超时时候返回false

      • state为0返回true

      • 其他线程调用interrupt(),当前线程抛出异常

    • countDown

      • 计数器state减1

      • 当state为0时,唤醒所有被await阻塞的线程 调用AQS的doReleaseShared

  • CyclicBarrier 循环阻塞

    • 解决问题

      • CountDownLatch是一次性的,当state为0时,再调用await和countDown都会立即返回,无法再使用

      • 为了可以重置计数器

      • 子线程和主线程可以交替执行

    • 用法

      • 定义CyclicBarrier cyclicBarrier = new CyclicBarrier(int parties, Runnable barrierAction);

      • 线程池的子线程调用cyclicBarrier.await()进入阻塞,计数器递减

      • 当调用await的次数=parties,继续执行定义CyclicBarrier的主线程

      • 执行完主线程后的内容后,继续执行子线程await()后的程序

    • 参数

      • parties 记录线程总数,值不会变

      • count 计数器,有子线程await会-1 当count为0时,parties会赋值给count

      • Runnanle barrierAction

        • 当count为0时,执行该线程的任务 然后再接着执行子线程的await后的任务

    • 方法

      • await()

        • 阻塞当前线程

        • 当parties个线程都调用了await() 当前线程会唤起

        • 其他线程调用了该线程的interrupt方法

        • 与当前线程关联的Generation对象的broken标志被设置为true,会抛出BrokenBarrierException

        • 调用dowait(false, 0L)

      • await(long timeout, TimeUnit unit)

        • 阻塞当前线程

        • 当parties个线程都调用了await() 当前线程会唤起,并返回true

        • 其他线程调用了该线程的interrupt方法

        • 与当前线程关联的Generation对象的broken标志被设置为true,会抛出BrokenBarrierException

        • 调用dowait(true, unit.toNanos(timeout))

      • dowait(boolean timed, long nanos) 核心功能

        • 使用ReetrantLock加锁

        • 当count递减为0时,调用command主任务运行 然后调用nextGeneration()唤起所有子线程 并重置count

        • 当count递减不为0时 当前线程放入阻塞队列Condition trip 如果有超时,线程自动激活

  • Semaphore 信号同步器

    • 计数器递增

    • 不需要知道同步线程的个数 在需要同步的地方调用acquire方法指定个数

    • 在需要同步的地方调用acquire方法指定个数

    • 支持循环使用,需要手动重新调用acquire

    • 使用

      • Semaphore semaphore = new Semaphore(0); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(() -> { //dothing semaphore.release();//计数器+1 }); //主线程等待permits个线程完成 semaphore.acquire(permits);

    • 方法

      • Semaphore(int permits); Semaphore(int permits, boolean fair) 提供公平和非公平的构造方法

        • FairSync 继承Sync

          • hasQueuedPredecessors() 判断除当前线程是否有先入队的线程,保证公平性

        • NonfairSync 默认策略

      • acquire()

        • 不传参默认获取一个信号源 sync.acquireSharedInterruptibly(1);

        • 获取到信号量大于1,会减1 如果信号量为0,会进入AQS阻塞队列

      • acquire(int permits)

        • 获取permits个信号量

      • acquireUniterruptibly() acquireUniterruptibly(int permits)

        • 获取信号量 其他线程调用interrupt不能中断线程

      • release() release(int permits)

        • 把当前Semaphore信号量+1 / +permits

        • 如果有线程因调用acquire进入阻塞队列,会根据公平策略选择一个信号个数能被满足的线程唤起

        • 使用CAS保证信号量的原子性

13.Guava

  • 谷歌提供的java扩展包,其中com.google.common.util.concurrent包中的ListenableFuture,提供了异步回调的功能

  • 相比FutureTask的优势,可以异步回调,不需要join()主动回调等待线程完成

  • FutureCallBack<V>

    • 异步回调接口,实现该接口,重写该回调的方法,处理任务

    • void onSuccess(@Nullable V var1)

    • void onFailure(Throwable var1)

  • 线程实例创建

    • ​​​ // Guava线程池 
​​​​​ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);​​

      • // 相当于Java线程池的创建 ExecutorService jPool = Executors.newFixedThreadPool(10);

    • //submit()方法用来提交任务,返回异步任务实例
​​​​​ListenableFuture<Boolean> hFuture = gPool.submit(callableThread);

    • //绑定回调实例
​​​​​Futures.addCallback(hFuture, new FutureCallback<Boolean>()
​​​​​{ //回调方法onSuccess & onFailure 
});

14.Netty的异步回调模式

  • 继承java的Future接口

    • public interface Future<V> extends java.util.concurrent.Future<V> {
​​​​ boolean isSuccess(); // 判断异步执行是否成功
​​​​​ boolean isCancellable(); // 判断异步执行是否取消
​​​​​ Throwable cause(); // 获取异步任务异常的原因
​​​​​
​​​​​

//增加异步任务执行完成Listener监听器
​​ ​​​ Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); 
​​​​​
​​​​​ //移除异步任务执行完成Listener监听器
​​​​​Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); 
​​​​​ ...
​​​​​}​​

- Netty的Future一般不会直接使用,会使用它的子接口,如ChannelFuture,异步I/O操作
  • 添加GenericFutureListener监听器,加入到Future任务中,实现对异步回调的监听

    • package io.netty.util.concurrent; 
​​​​​import java.util.EventListener; 
​​​​​public interface GenericFutureListener<F extends Future<?>> extends EventListener { 
​​​​​ //监听器的回调方法 
​​​​​ void operationComplete(F var1) throws Exception; 
​​​​​}​​

    • GenericFutureListener接口在Netty中是一个基础类型接口。在网络编程的异步回调中,一般使用Netty中提供的某个子接口,ChannelFutureListener接口

15.CompletableFuture

  • JDK1.8引入,提供异步任务回调的接口

实现Future和CompletionStage

  • CompletionStage子任务实例

    • Function

      • 有输入、输出

      • 产生的结果输出到下一步

    • Runnable

      • 没有输入、输出

    • Consumer

      • 有输入、无输出

    • 多个CompletionStage构成一条任务流水线,一个环节执行完了会将结果交给下一个环节

      • oneStage.thenApply(x -> square(x)).thenAccept(y->System.out.print(y)).thenRun(()->//doThing)

    • runAsync

      • CompletableFuture<Void> future = CompletableFuture.runAsync(() ->
​​​​​ { sleepSeconds(1);//模拟执行1秒 
​​​​​ Print.tcfo("run end ...");
​​​​​ });
​​​​​
​​​​ ​ //等待异步任务执行完成,限时等待2秒
​​​​​ future.get(2, TimeUnit.SECONDS);

    • supplyAsync

      • CompletableFuture<Long> future = CompletableFuture.supplyAsync(() ->
​​​​​ {
​​​​​ long start = System.currentTimeMillis();
​​​​​ sleepSeconds(1);//模拟执行1秒
​​ ​​​ Print.tcfo("run end ...");
​​​​​ return System.currentTimeMillis() - start;
​​​​​ });

  • 设置子任务回调钩子

    • //设置异步任务执行完成后的回调钩子 ​​​​​future.whenComplete(new BiConsumer<Void, Throwable>()
​​{
​​​​​ @Override
​​​​​ public void accept(Void t, Throwable action){ Print.tco("执行完成!"); } 
​​​​});

    • //设置异步任务发生异常后的回调钩子
​​​​​ future.exceptionally(new Function<Throwable, Void>(){
​​​​​ @Override
​​​ public Void apply(Throwable t){
​ Print.tco("执行失败!" + t.getMessage());
​​​​​ return null;
​​​​​ }
​​​​​ });
​​​
​​​ //获取异步任务的结果
 future.get();
​​​​​ }

    • 调用 cancel方法时,也会执行异常回调钩子

    • 如果没有设置异常回调钩子

      • 调用get()/get(long ,TimeUnit)时, 如遇到内部异常,会抛出ExecutionException

      • 调用join()/getNow(T)启动任务时,如遇到内部异常,会抛出CompletionException

  • hanle()

    • 除了调用异常处理钩子外,还可以调用该方法实现异常处理

    • CompletableFuture<Void> future = CompletableFuture.runAsync(() ->
​​​​​ {
​​​​​ sleepSeconds(1);//模拟执行1秒
​​​​​ Print.tco("抛出异常!");
​​​​​ throw new RuntimeException("发生异常");
​​​​​ //Print.tco("run end ...");
​​​​​ });

​​​​​ //统一处理异常和结果
 ​​​​​ future.handle(new BiFunction<Void, Throwable, Void>() { ​​​​​
​​​​​ @Override
 ​​​​​ public Void apply(Void input, Throwable throwable)
​​​​​ { 
​​​​​ if (throwable == null)
​​​​​ {
​​​​​ Print.tcfo("没有发生异常!");
​​​​​
​​​​​ } else
​​​​​ {
​​​​​ Print.tcfo("sorry,发生了异常!");
​​​​​
​​​​​ }
​​​​​ return null;
​​​​​ }
​​​​​ }); 
​​​​​
​​​​​ future.get();
​​​​​ }

  • 线程池的调用

    • 在没有指定线程池的情况下,默认使用ForkJoinPool线程池

    • ​​​//子任务包装一个Runnable实例,并调用ForkJoinPool.commonPool()线程池来执行
 ​​​​​public static CompletableFuture<Void>runAsync(Runnable runnable)
​​​​​
​​​​​ //子任务包装一个Runnable实例,并调用指定的executor线程池来执行
​​​​​ public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
​​​​​ 
​​​​​ //子任务包装一个Supplier实例,并调用 ForkJoinPool.commonPool()线程池来执行
​​​​​public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
​​​​​ 
​​​​​ //子任务包装一个Supplier实例,并使用指定的executor线程池来执行
​​​​​ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)​​

  • 异步任务的串行执行

    • thenApply

      • 有输入输出

      • //后一个任务与前一个任务在同一个线程中执行
​​​​​public <U> CompletableFuture<U> thenApply(
​​​​​ Function<? super T,? extends U> fn)
​​​​​
​​​​​//后一个任务与前一个任务不在同一个线程中执行
​​​​​public <U> CompletableFuture<U> thenApplyAsync(
​​​​​ Function<? super T,? extends U> fn)
​​​​​
​​​​​//后一个任务在指定的executor线程池中执行 
​​​​​public <U> CompletableFuture<U> thenApplyAsync(
​​​​​ Function<? super T,? extends U> fn, Executor executor)​​

    • thenRun

      • 没有输入输出

      • //后一个任务与前一个任务在同一个线程中执行
​​​​​public CompletionStage<Void> thenRun(Runnable action);
​​​​​
​​​​​//后一个任务与前一个任务不在同一个线程中执行
​​​​​public CompletionStage<Void> thenRunAsync(Runnable action);
​​​​​
​​​​​//后一个任务在executor线程池中执行
​​​​​public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);​​

    • thenAccept

      • 有输入,没输出

      • //后一个任务与前一个任务在同一个线程中执行
​​​​​public CompletionStage<Void> thenAccept(Consumer<? super T> action);
​​​​​
​​​​​//后一个任务与前一个任务不在同一个线程中执行
​​​​​public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
​​​​​
​​​​​//后一个任务在指定的executor线程池中执行
​​​​​public CompletionStage<Void> thenAcceptAsync(
​​​​​ Consumer<? super T> action,Executor executor);​​

    • thenCompose

      • 有输入输出

      • 第二个任务的返回值是一个CompletionStage实例

  • 异步任务的合并执行

    • thenCombine

    • runAfterBoth

    • thenAcceptBoth

    • ompletableFuture<Integer> future3 = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {
​​​​​ @Override
​​​​​ public Integer apply(
​​​​​ Integer step1OutCome, Integer step2OutCome)
​​​​​ {
​​​​​ return step1OutCome * step2OutCome;
​​​​​ }
​​​​​ });
​​​​​ Integer result = future3.get();
​​​​​ Print.tco(" outcome is " + result);
​​​​​ }

    • AllOf

      • 等待多个任务结束,合并所有任务

      • CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3, future4);
​​​​​ all.join();

  • applyToEither

    • 谁返回结果快,就用谁执行下一步

  • runAfterEither

    • 前两个CompletionStage实例,任何一个执行完,都会执行第三步回调操作,三个都是Runnable类型

  • acceptEither

    • 两个CompletionStage实例谁返回快,就会执行第三步回调操作,返回值作为入参

三、Java并发编程实践

1.ArrayBlockingQueue的使用

  • logback异步日志打印

    • pom依赖

      • <!--logback日志依赖--> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency>

    • logback.xml配置

    • AsyncAppender

      • 继承AsyncAppenderBase

        • ArrayBlockingQueue 阻塞队列

        • queueSize 队列大小,默认256

        • Worker 日志消费单个工作线程

          • 守护线程

        • aai

          • appender的装饰器

          • 存放同步日志的appender

        • appenderCount

          • 记录aai里面附加的同步appender的个数

        • neverBlock

          • 指示当日志队列满时是否阻塞打印日志的线程

        • discardingThreshold 阈值

          • 当日志队列里面的空闲元素个数小于该值时,新来的某些级别的日志会被直接丢弃

2.ConcurrentLinkedQueue的使用

  • Tomcat的NioEndPoint

    • 生产-消费模式 Acceptor-Poller

    • tomcat内部有多个Acceptor while循环等待接收请求

    • Acceptor将接收到的请求封装成channel对象放入poller对象队列

    • 无界队列,tomcat需要设置最大连接数

  • 注意事项

    • 非线程安全的工具类使用

      • 避免在多线程使用SimpleDateFormat的单个实例

    • Timer和ScheduledThreadPoolExecutor

      • Timer多生产,单消费

        • 下个周期开始时,线程还没跑完会抛出异常

      • ScheduledThreadPoolExecutor可以配置 即可以是多生产,多消费,也可以单消费

    • 浅拷贝问题

      • 多线程需要修改同一个引用类型参数时 需要进行深复制,不能只是new 否则还是指向同一个对象地址引用

    • 创建线程和线程池时要指定与业务相关的名称

      • 不指定线程名称不好定位问题 线程创建会指定常量+数字的名称 init(null, target, "Thread-" + nextThreadNum(), 0);

      • 线程池名称在ThreadFactory中的namePrefix属性指定 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";

    • 使用完线程池后,最好调用shutdown方法关闭 否则会导致线程资源一致不被释放

    • FutureTask使用注意事项

      • 拒绝策略设置为DiscardPolicy和DiscardOldestPolicy,并且被拒绝的线程任务(队列已满)调用了无参get方法,那么调用线程就会一直被阻塞

      • FutureTask的submit方法,在线程数达到核心线程数数量,且任务队列满了,会执行拒绝策略方法

      • 解决方法

        • 在日常开发中尽量使用带超时参数的get方法

        • 拒绝策略设置为默认的AbortPolicy则会正常返回

    • ThreadLocal使用注意事项

      • key,value属于弱引用,线程一直没有调用remove,且其他地方还有对ThreadLocal的引用,value不会被释放

      • 解决方法

        • 使用完后及时调用remove方法

      • 在Tomcat中多次访问servlet创建出来的ThreadLocal不会被释放,创建LocalVariable使用的是webappClassLoader,所以也释放不了,该问题在tomcat7.0后修复

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/926073
推荐阅读
相关标签
  

闽ICP备14008679号