赞
踩
线程是CPU分配的基本单位 也是Java进程运行的最小单位
多个线程访问少量的CUP,CUP进行上下文切换运行
多个线程同时在不同的CUP同时执行
继承Thread
start()执行线程
实现Runnable
RunableTask runableTask = new RunableTask(); new Thread(runableTask).start();执行线程
实现callable
FutureTask<> futureTask = new FutureTask<>(new 实现了callable的类); new Thread(futureTask).start();执行线程
start 运行线程
重复调用会抛异常
功能
能够取消异步执行中的任务
判断异步任务是否执行完成
获取异步执行完后的结果
继承了Runnable接口和Future接口
可以作为Thread线程实例的target实例
可以异步获取执行结果
继承了RunableFuture接口,实现接口的实例
join()
实例方法,需要thread.join()调用,阻塞当前线程,循环等待,直到thread类执行完毕或超时,isAlive()方法判断
java.util.concurrent
工作线程<核心线程
优先创建任务线程,而不是从线程队列获取一个空闲队列
工作线程>核心线程 阻塞队列没满
新接收的线程任务会加到阻塞队列
当完成一个任务时,优先从阻塞队列取任务执行
工作线程>核心线程 阻塞队列也满 工作线程<最大线程数maxinumPoolSize
新接收的任务会创建一个新的任务(非核心线程)立即执行
工作线程>核心线程 阻塞队列也满 工作线程>最大线程数maxinumPoolSize
执行拒绝策略
线程被拒的情况
工作队列已满,且maxinumPoolSize已满
线程池已关闭
拒绝后会调用RejectedExceptionHandler实例的rejectedException方法
AbortPolicy 拒绝策略
线程池默认策略
拒绝后抛出RejectedException异常
DiscardPolicy 抛弃策略
直接抛弃任务,不会抛出异常
DiscardOldestPolicy 抛弃最老任务策略
移除队头最老的元素
CallerRunsPolicy 调用者执行策略
拒绝后直接执行该任务,不会使用线程池的任务
自定义策略
实现RejectedExceptionHandler,重写rejectedException方法 将实例类传入ThreadPoolExecutor的构造方法
提供execute()接口执行已提交的Runnable实例
ExecutorService
继承Executor
提供异步任务接收
submit(CallableTask<T> task) 提交异步任务
InvokeAll(Collection<? Extends CallableTask<T>> tasks) 批量提交
execute和submit的区别
接收参数不同
execute只能接收Runnable接口的实例
submit可以接收Runnable和Callable两种实例
Callable是JDK1.5加入,作为Runnable的一种补充,允许有返回值,抛出异常
submit提交任务后有返回值,execute没有
submit方便异常处理
AbstractExecutorService
是一个抽象类,实现了ExecutorService 为接口提供默认的实现
ThreadPoolExecutor
线程池实现类
继承AbstractExecutorService
JUC核心类,提供指定可用的线程数量、线程统计、管理、监控
ScheduledExecutorService
是一个接口,继承ExecutorService
提供延时、周期性任务调度
ScheduledExecutorExecutor
继承ThreadPoolExecutor
提供了ScheduledExecutorService延时、周期任务的具体实现方法
静态工厂类
提供静态方法返回ExecutorService、ScheduledExecutorService等线程池接口方法的实例
newSingleThreadExecutor
单线程池,只有一个工作线程
核心线程和最大线程数为 1
保证线程按照FIFO
潜在的问题
超过后会进入无界阻塞队列,如果提交速度远大于处理速度,无限添加容易耗尽服务器资源OOM
newFixedThreadPool
固定数量线程池
newFixedThreadPool(int nThreads) 核心线程和最大线程数为nThreads
在未达到最大线程数量时,每接收一个线程,就会创建一个实例
潜在的问题
超过后会进入无界阻塞队列,如果提交速度远大于处理速度,无限添加容易耗尽服务器资源OOM
newCachedThreadPool
核心线程数:0 最大线程数:Integer.MAX_VAVLUE 线程最大空闲时长:60L 时间单位:TimeUnit.MILLISECONDS 任务队列:SynchronousQueue<Runnable>()
可缓存线程池,可灵活回收空闲的线程
大小没限制,大量添加也可能耗尽服务器资源
潜在的问题
可以无限添加工作线程,容易耗尽服务器资源OOM
newScheduledThreadPool
可调度线程池,功能和ScheduledExecutorService类似
空方法,提供给调用者介入线程任务执行时机重写业务逻辑
ExecutorService pool = new ThreadPoolExecutor(coreSize, maxinumPoolSize, periods, TimeUnit.SECONDS, new LinkedBlockingQueue(2)){ @Override protected void beforeExecute(){ } };
beforeExecute
任务执行前
afterExecute
任务执行后
terminated
线程池终止时
Java内置锁对象
wait()
当一个线程调用一个共享变量的wait()方法时,该调用线程会被阻塞挂起
会让出锁资源
wait(timeout)
同样是阻塞,超时后自动唤起
notify
唤起线程
notifyAll
唤起所有线程
只会唤起之前调用了wait的线程
Condition
java.until.concurrent.locks包的线程通信类,功能类似于Java内置对象的通知与等待
await()
signal()
signalAll()
当前线程必须等待插队线程执行完才能往下执行
sleep(timeout)
阻塞线程,timeout后唤起
不会让出锁资源
让出当前线程的使用权
使线程处理就绪状态,等待下一轮CUP调度
interrupt 中断
中断线程的休眠,抛出InterruptedException
应用-多个并行线程执行,其中一个提前执行完,其他线程没必要再执行,此时可以中断其他线程
正常运行的线程需要抛出异常,否则会继续执行完 throw new InterruptedException();
isInterrupted 是否中断(中断标志)
Thread.currenThread().isInterrupted();
interrupted
获取当前调用线程的中断标志而不是调用interrupted()方法实例对象的中断标志
如果发现当前线程被中断,则会清除中断标志
stop
强制中断线程
不会跑InterruptException的catch 会跑ThreadDeath的catch
过时方法,不建议使用。类似直接切断电源,会造成,不释放线程持有的锁等不可预料的情况
多个线程争夺锁资源相互无限等待
产生死锁的条件
互斥条件
请求并持有条件
不可剥夺条件
环路等待条件
区别
守护线程会随着JVM的结束而关闭
JVM的结束不影响用户线程
守护线程需要在调用start()方法前设置 运行后再设置会抛异常
使用守护线程创建出来的线程默认也是守护线程,除非手动设置为用户线程
Thread内部有threadLocals成员变量(ThreadLocalMap)
主线程定义全局变量对象ThreadLocal,在子线程中get/set 底层是调用子线程的ThreadLocalMap
get
获取threadLocal对象
懒加载,第一次获取时,调用iniitValue初始化 可以重写该方法,返回默认值
变量存储在主线程中,而且是弱引用,GC回收时会删除引用,但是不会删除变量,因此不使用时尽量调用remove移除
应用
线程隔离
为每个线程绑定用户会话信息、数据库连接Session、HTTP请求等
解决参数的线程隔离,不需要使用sychronized等锁机制,影响程序效率
跨函数传递参数
同线程内共享参数,不需要频繁传递,降低方法之间的耦合度
ThreadLocalMap是ThreadLocal的内部实现,应该使用ThreadLocal设置本地变量,而不是直接使用ThreadLocalMap
继承ThreadLocal类
inheritableThreadLocals替代threadLocals 通过重写getMap、createMap
应用
使子线程可以继承父线程的变量
存用户登录信息
中间件需要把统一的id追踪的整个调用链路记录下来
原理
创建子线程的时候,子线程引用父线程的inheritableThreadLocals的值创建一个新的对象
如果是不可变类型,比如String,父线程更新值,子线程获取不到
如果是引用类型,则可以获取到对象值的变化
多个线程同时修改同一变量,线程内部L1级缓存不共享,可能会导致变量值不准确
synchronized
内置锁/排它锁,同一时间只能有一个线程访问
加锁时清空本地内存,解锁时将本地内存刷新到共享内存
其他线程未获取到锁会进入阻塞状态
解锁条件
同步代码块执行完
获得锁的线程抛异常
获得锁的线程调用wait()
volatile
线程修改值时,会将值刷新回主内存,并且其他线程会去读取该值
线程读取值时,会直接从主内存中读取
可以替代synchronized,解决线程上下文的切换带来的开销
注意:多线程自增,volatile不能保证原子性
原子类
AtomicLong
AtomicIntegerArray
.....
AtomicRefrence
保障引用类型更新的原子性
AtomicRefrence<User> reference = new AtomicRefrence();
不能保障对象里的字段值更新的原子性
AtomicLongFieldUpdater
保障引用类型里的Long类型更新的原子性
AtomicLongFieldUpdater<User> updater = AtomicLongFieldUpdater.newUpdater(User.class, “字段名”);
AtomicStampedReference
带印戳或标记的引用原子类,用于解决ABA问题
Unsafe
位于sun.misc包的一个类,提供一些用于执行低级别、不安全的底层操作,如直接访问内存、自主管理内存资源等
大量的方法都是native方法,基于C++实现
final修饰的类,且构造方法为私有,只能通过反射的方式获取
Java指令重排序
Java内存模型允许编译器和处理器对指令重排序以提高运行性能
只会对不存在数据依赖性的指令重排序
指令的先后顺序可能对其他线程的结果有影响
伪共享/伪共存
当一个线程修改了CPU内核高速缓存的1级缓存,其他线程CPU内核的1级缓存就会失效,需要重新从2级缓存或者内存中读取,这就导致了CUP内核的缓存失效了,这就是伪共享
CUP的高速缓存
解决CUP和内存的运行速度差
一级或多级,以行存储,每行的大小为2的幂次数字节
CUP访问变量时,如果缓存有直接从缓存读取,否则就会从内存读取
缓存行以块的形势保存,可能会把多个变量存放到一个Cache行中
缓存行中的变量地址是连续的
当缓存满了时会根据一定淘汰算法替换缓存行
锁
乐观锁
使用业务字段实现控制锁,类似CAS
update xxx ,version = version+1 where version = #{version}
只有在提交修改的时候控制数据修改成功与否
悲观锁
使用数据库提供的锁机制
对整个过程加锁
公平锁
线程获取锁的顺序是按照线程请求锁的时间早晚来决定的
ReentrantLock pairLock = new ReentrantLock(true)
非公平锁
在运行时闯入,也就是先来不一定先得
ReentrantLock pairLock = new ReentrantLock(false)
在没有公平性需求的前提下尽量使用非公平锁,因为公平锁会带来性能开销
独占锁
只能有一个线程获取
ReentrantLock 可重入锁
可重入:锁能够支持对一个线程对资源的重复加锁 一个线程可多次进入同一个锁所同步的临界区代码块
synchronized
共享锁
可以多个线程持有
ReadWriteLock读写锁
可重入锁
当一个线程再次获取它自己已经获取的锁时,如果不被阻塞,那么我们说该锁是可重入的
synchronized
可重入锁的原理是在锁内部维护一个线程标示,用来标示该锁目前被哪个线程占用,然后关联一个计数器。一开始计数器值为0,说明该锁没有被任何线程占用。当一个线程获取了该锁时,计数器的值会变成1,这时其他线程再来获取该锁时会发现锁的所有者不是自己而被阻塞挂起
自旋锁
当线程获取不到锁资源时会被挂起,切换到用户状态 当线程获取到锁资源时又会唤起,进入内核状态
自旋锁可以在获取不到锁资源的时候,不放弃CUP使用权,尝试获取10次,最后失败才会被挂起
锁的使用
lock
阻塞锁,获取不到锁会一直等待
tryLock
非阻塞锁,获取不到锁会立即返回
tryLock(long time, TimeUnit unit)
非阻塞锁,尝试一定时间后返回
注意事项
释放锁需要在finally模块 否则报错释放不了锁
抢占锁需要在try之前 Lock没有声明抛出异常 没有抢占到锁,finally释放锁可能会抛异常
在Lock和try之间尽量不要插入代码 否则抛异常无法走finally释放锁
替代Random,解决多线程的随机数可能存在生成重复,Random底层 nextInt方法依赖上一个随机数的种子,这个种子在多线程下可能获取到一样的值
ThreadLocalRandom random = ThreadLocalRandom.current(); random.nextInt(5);
将种子存在当前线程的threadLocalRandomSeed,使用该值生成随机数
解决问题
相比使用synchronized关键字等,但是这些都是阻塞算法,对性能有一定损耗
原理
使用CAS非阻塞算法,性能更好
解决问题
AtomicLong无限循环不断进行自旋尝试CAS的操作,会消耗CPU资源
速度比AtomicLong快,在CAS竞争大的情况下 例如上亿次的循环累加
原理
将一个变量递增分解成多个cell,多个线程争夺多个cell递增,最后再将结果合并。减少了并发争夺资源,失败不是CASA自旋,会尝试获取其他的cell
底层是cell数组,数组使用unsafe的CAS
数组元素Cell使用@sun.misc.Contended注解进行修饰,这避免了cells数组内多个原子变量被放入同一个缓存行,避免了伪共享,提高了性能
static LongAdder longAdder = new LongAdder(); longAdder.increment();
内部成员
Cell[] cells
存放Cell的哈希表
大小为2的幂
在有竞争的情况下,新增会累加到cells[i]对应的位置
long base
基础值
在没有竞争时会更新这个值
在cells初始化时,cells不可用,也会尝试通过CAS操作累加到base
Int cellsBusy
状态标记、是否正在处于创建、扩容阶段 0,1
当为1的时候,不能进行新cells元素设置的操作
方法
increment
自增
decrement
自减
add(long x)
累加
casBase(long cmp, long Val)
设置成员变量base为base+x
longAccumulate()
实现不同线程更新各自Cell中的值
提供了更加强大的功能,可以让用户自定义累加规则
//指定初始值为10 static LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() { @Override public long applyAsLong(long left, long right) { // return left + right; //自定义值操作 return left - right; } }, 10); longAccumulator.accumulate(1L);//加1
使用ReentrantLock保证不被其他线程同时修改
CopyOnWriteArrayList(Collection<? extends E> c)
添加元素有原子性,遍历元素没有
线程未持有许可证,线程会阻塞挂起,并在nanos秒后返回
线程持有许可证,会马上返回
原理
unpark/park用于唤起和挂起线程
LockSupport类与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport类的方法的线程是不持有许可证的
park() park(Object blocker)
线程未持有许可证,线程会被阻塞
线程持有许可证,会马上返回
阻塞后可通过unpark或者interrupt唤起线程
使用诊断工具可以观察线程被阻塞的原因,诊断工具是通过调用getBlocker(Thread)方法来获取blocker对象的
park和Thread.sleep()的区别
sleep不能从外部唤醒,只能自己醒来,park可以
sleep需要捕获InterruptedException,park不需要
sleep中断时会抛出InterruptedException异常,park只会设置中断标志
park和Object.wait()的区别
wait需要在sychronized块中执行,park可以在任意地方执行
wait等待被中断时,会抛出中断异常,park不会
unpark()
线程未持有许可证,调用会让线程持有许可
线程持有许可证,调用会唤起线程
parkUntil(Object blocker, long deadline)
线程未持有许可证,线程会阻塞挂起,并在deadline这个时间点返回
1970年-deadline的时间戳
线程持有许可证,会马上返回
抽象同步队列
AQS作用
减少CAS锁自旋带来的性能消耗
避免CAS在SMP架构的CPU上的总线风暴
实现同步器的基础组件,并发包中锁的底层
FIFO的双向队列
state 记录线程状态信息
通过getState、setState、compareAndSetState函数修改其值,表示是否获取到锁
独占锁
acquire(int arg) 获取
tryAcquire(int arg)
成功,修改state,返回
失败,将当前线程封装为类型为Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部-enq(final Node node) 并调用LockSupport.park(this)方法挂起当前线程
acquireInterruptibly(int arg)获取
功能acquire一样,但会抛出中断异常 acquire不会
release(int arg)释放
tryRelease
成功,修改state LockSupport.unpark(thread)
失败,放入AQS队列并挂起
共享锁
acquireShared 获取
acquireSharedInterruptibly 获取
releaseShared 释放
AQS没有实现acquire、release,需要子类实现,state的含义也需要子类自定义
ConditionObject 条件变量
和sychronized的锁对象作用类似
await
挂起一个线程
signal
唤起一个线程
signalAll
唤起所有
ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); lock.lock(); condition.await(); lock.unlock();
ReentrantLock 重入锁
有公平和非公平的实现方式,默认是非公平 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
独占锁
state
0空闲
大于1 被占用
Sync 继承AbstractQueuedSynchronizer
lock
sync.acquire(1)
ReentrantLock重写tryAcquire(int acquires)
unlock
如果当前线程没有持有锁会抛出IllegalMonitorStateException异常
lockInterruptibly
作用和lock一致
如果当前线程被中断,会抛出异常
tryLock
tryLock(long timeout, TimeUnit unit)
超时获取锁
内部有NonfairSync和FairSync都会实现上面的锁方法
ReentrantReadWriteLock 读写锁
可以多个线程同时读,但是不能同时写 有一个线程在写时,不能读 同一个线程可以同时拥有读写锁,但要先获取写锁 unlock时,读写锁同时拥有需要一起释放
有公平和非公平的实现方式,默认是非公平
实现ReadWriteLock
ReadLock
WriteLock
Sync 继承AbstractQueuedSynchronizer
lock
sync.acquire(1)
ReentrantReadWriteLock重写tryAcquire(int acquires)
state
高16位表示读状态,也就是获取到读锁的次数
使用低16位表示获取到写锁的线程的可重入次数
StampedLock
JDK8版本新增的一个锁,该锁提供了三种模式的读写控制
获取锁/解锁成功会返回一个stamp变量
锁模式 long stamp代表锁的状态
写锁writeLock
排它锁
不可重入锁
获取锁后释放锁前不能再调用会获取锁,会造成调用线程被阻塞
悲观读锁readLock
共享锁,在没有线程获取独占写锁的情况下,多个线程可以同时获取该锁
如果已经有线程持有写锁,则其他线程请求获取该读锁会被阻塞
不可重入锁
乐观读锁tryOptimisticRead
不使用CAS设置锁的状态
如果线程没有获取读锁,返回一个stamp版本信息 使用tryOptimisticRead返回stamp修改值
tryConvertToWriteLock(long stamp) 锁类型升级转换
当前锁已经是写锁模式了
当前锁处于读锁模式,并且没有其他线程是读锁模式
当前处于乐观读模式,并且当前写锁可用
并发队列
ConcurrentLinkedQueue 线程安全的无界非阻塞队列
底层数据结构使用单向链表
入队和出队操作使用CAS来实现线程安全
add/offer 向队列尾部插入元素
poll 移除头部元素
remove 移除指定的元素
peek 取出头部元素
contains 是否包含
size 作用不大,CAS无锁算法多线程下数据可能不准确
LinkedBlockingQueue 用独占锁实现的阻塞队列
使用ReentrantLock锁和Node保存数据
private final ReentrantLock takeLock = new ReentrantLock();
执行take、 poll等操作
private final ReentrantLock putLock = new ReentrantLock();
执行put、 offer等操作
Condition记录状态
private final Condition notEmpty = takeLock.newCondition();
当执行取操作,列表为空时,挂起 notEmpty.await()
private final Condition notFull = putLock.newCondition();
当执行存操作,队列满时,挂起 notFull.await()
capacity
最大容量
count
当前队列数量
offer 向队列尾部插入元素
put 向队尾插入元素
poll 移除头部元素
peek 取出头部元素但不移除
take取出头部元素移除
remove 移除指定的元素
size 获取当前队列数量
ArrayBlockingQueue 有界数组方式实现的阻塞队列
ReetrantLock 独占锁
items 数组存储
offer 尾部插入元素
put 向尾部插入元素
poll 移除头部第一个元素
take 移除头部元素
peek 获取头部元数但不移除
size 计算个数
长度有限,默认是Integer最大值 超长再添加的话会报错
PriorityBlockingQueue 带优先级的无界阻塞队列
底层原理
每次出队返回优先级最高或最低的元素
平衡二叉树实现,遍历元素不保证有序
使用对象的compareTo方法提供比较规则
queue存放队列元素
size存放元素个数
allocationSplinLock 状态值,保证只有一个现场可以扩容队列,是个自旋锁
offer 插入一个元素,无界,总是返回true
tryGrow 扩容
先释放锁,让其他线程可以出队入队
使用CAS进行扩容
allocationSplinLock 保证只有一个线程进行扩容
扩容失败的线程会yield让出线程
poll 获取队列内部根节点元素(根据优先级出队)
take 获取队列内部根节点元素
put 调用的是offer
size 队列元数个数
DelayQueue 无界阻塞延迟队列
底层原理
队列中的每个元素都有过期时间
获取元素时,只有过期元素才会出队
队列头元素时最快要出队的元素
内部使用PriorityQueue存放数据
offer 插入元素,元素需要实现Delayed接口
take 移除队列过期元素,没有则等待
poll 移除队头过期元素,没有则返回null
size 计算队列元素,包含过期和没有过期的
解决问题
减少线程重复创建和销毁的开销
限制创建个数,动态创建
控制最大线程并发数
提供定时执行、定频执行
单线程
ctl 线程池状态
高3位表示线程池状态 低29位记录线程数量
默认是RUNNING状态,线程个数为0
状态
RUNNING 接收新任务并且处理阻塞队列里的任务
SHUTDOWN 拒接新任务,但是处理阻塞队列里的任务
STOP 拒接新任务并且抛弃阻塞队列里的任务 同时会中断正在处理的任务
TIDYING 所有任务都执行完后当前线程池活动线程数为0,将调用terminated方法
TERMINATED 终止状态,调用terminated后的状态
状态转换
RUNNING -> SHUTDOWN
显示调用shutdown()
隐式调用finalize()里的shutdown()
RUNNING/SHUTDOWN -> STOP
显示调用shutdownNow()
SHUTDOWN -> TIDYING
线程池和任务队列都为空时
STOP -> TIDYING
当线程池为空时
TIDYING -> TERMINATED
当terminated() hook方法执行完时
线程池参数
corePoolSize 线程池核心线程数
workQueue 用于保存等待执行的任务的阻塞队列
ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
PriorityBlockingQueue
maximunPoolSize 线程池最大线程数量
ThreadFactory 创建线程工厂
RejectedExecutionHandler 饱和策略
当队列安满并且线程个数达到maxinumPoolSize后的策略
AbortPolicy 抛出异常
CallerRunsPolicy 使用调用者所在线程来运行任务
DiscardOldestPolicy 调用poll丢弃一个任务,执行当前任务
DiscardPolicy 默认丢弃,不抛异常
keepAliveTime 存活时间
如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间
TimeUint 存活时间的单位
线程池类型
newFixedThreadPool
定长线程池 Executors.newFixedThreadPool(nThreads); new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new inkedBlockingQueue<Runnable>());
nThreads最长不能超过Integer.MAX_VALUE
keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收
newSingleThreadExecutor
核心线程个数和最大线程个数都为1的线程池
阻塞队列长度为Integer.MAX_VALUE
keeyAliveTime=0
newCachedThreadPool
初始线程个数为0,最多的线程数为Integer.MAX_VALUE
阻塞队列为同步队列
keeyAliveTime=60,线程在空闲60秒后回收
线程加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务
ReentrantLock mainLock
保证新增Worker线程操作的原子性
Worker 具体承载任务的对象
继承AQS和Runnable接口
state
0 锁未获取
1 锁已被获取
-1 创建时的默认状态
防止worker线程创建被中断 当其他线程调用了线程池的shutdownNow时,如果Worker状态>=0则会中断该线程
DefaultThreadFactory 线程工厂
newThread 对线程的修饰
poolNumber 统计线程工厂的个数
threadNumber 记录每个线程创建了多少个线程
execute 提交任务command到线程池 生产消费模型
shutDown 关闭线程池
调用后线程就不会再接收新任务
工作队列的任务会继续执行
该方法会立刻return,不等待队列执行任务
shutDownNow 立即关闭线程池
调用后线程就不会再接收新任务
丢弃工作队列的任务
正在执行的任务会被中断
该方法会立刻返回,不等待队列执行任务
返回值是队列里被丢弃的任务列表List<Runnable>
awaitTermination
等待线程池完成关闭
优雅的关闭线程池
执行shutDown方法拒绝接收新的线程,等待所有线程完成
执行awaitTermination(long timeout, TimeUnit unit)指定等待超时时间,判断是否已关闭所有任务,线程池关闭完成。
如果awaitTermination返回false,或者被中断,调用shutDownNow立即关闭
如果awaitTermination超时,进入循环关闭,循环一定的次数,不断关闭线程池,直到线程池关闭或循环结束
指定一定的延迟时间 或定时进行任务调度执行的线程池
继承了ThreadPoolExecutor,并实现了ScheduledExecutorService接口
线程池队列是DelayedWorkQueue
ScheduledFutureTask
继承FutureTask,实现RunnableScheduledFuture
state 状态
NEW 初始状态
COMPLETING 执行中状态
NORMAL 正常运行结束状态
EXCEPTIONAL 运行中异常
CENCELLED 任务被取消
INTERRUPTING 任务正在被中断
INTERRUPTED 任务已被中断
period 任务的类型
0
任务是一次性的,执行完就退出
负数
fixed-delay任务 固定延迟的定时可重复执行任务
正数
fixed-rate任务 固定频率的定时重复执行任务
isPeriodic() 是否周期性的,使用 period != 0判断
schedule(Runnable command, long delay, TimeUnit unit)
提交一个延迟执行的任务command 只执行一次
delay 延迟时长(执行周期)
TimeUnit 时间单位
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
任务执行完后,在延迟固定时间再次执行(固定时间间隔执行)
initiaDelay 提交任务后,多久开始执行
delay 延迟时长(执行周期)
中断循环执行条件
抛出异常
被取消了
关闭线程池
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
和scheduleWithFixedDelay功能类似
执行规则为,时间为initDelday +n*period时启动任务(定时间点执行)
如果当前任务还没有执行完,下一次要执行任务的时间到了,不会并发执行
下次要执行的任务会延迟执行,要等到当前任务执行完毕后再执行
compareTo
线程加入延时队列后,在内部建立或者调整堆时会使用该线程的compareTo方法,将快要过期的线程放到队首
CountDownLatch
应用场景
主线程开启子线程,主线程需要执行完毕再进行汇总
比join ()的优势
不必等待子线程完全执行完,就可以唤起主线程
在使用线程池的方式执行线程就没法使用join
用法
static volatile CountDownLatch countDownLatch = new CountDownLatch(count); 主线程调用countDownLatch.await(); 子线程调用countDownLatch.countDown(); 直到count减为0,唤起主线程
原理
内部有一个计数器 构造器传参存储在AQS的volatile int state
子线程调用countDown较少state
主线程调用await(),线程被挂起
当state减到0时唤醒主线程
await()
线程阻塞,无返回值
其他线程调用interrupt(),当前线程抛出异常
放入AQS阻塞队列
await(long timeout, TimeUint unit)
线程阻塞,超时时候返回false
state为0返回true
其他线程调用interrupt(),当前线程抛出异常
countDown
计数器state减1
当state为0时,唤醒所有被await阻塞的线程 调用AQS的doReleaseShared
CyclicBarrier 循环阻塞
解决问题
CountDownLatch是一次性的,当state为0时,再调用await和countDown都会立即返回,无法再使用
为了可以重置计数器
子线程和主线程可以交替执行
用法
定义CyclicBarrier cyclicBarrier = new CyclicBarrier(int parties, Runnable barrierAction);
线程池的子线程调用cyclicBarrier.await()进入阻塞,计数器递减
当调用await的次数=parties,继续执行定义CyclicBarrier的主线程
执行完主线程后的内容后,继续执行子线程await()后的程序
参数
parties 记录线程总数,值不会变
count 计数器,有子线程await会-1 当count为0时,parties会赋值给count
Runnanle barrierAction
当count为0时,执行该线程的任务 然后再接着执行子线程的await后的任务
方法
await()
阻塞当前线程
当parties个线程都调用了await() 当前线程会唤起
其他线程调用了该线程的interrupt方法
与当前线程关联的Generation对象的broken标志被设置为true,会抛出BrokenBarrierException
调用dowait(false, 0L)
await(long timeout, TimeUnit unit)
阻塞当前线程
当parties个线程都调用了await() 当前线程会唤起,并返回true
其他线程调用了该线程的interrupt方法
与当前线程关联的Generation对象的broken标志被设置为true,会抛出BrokenBarrierException
调用dowait(true, unit.toNanos(timeout))
dowait(boolean timed, long nanos) 核心功能
使用ReetrantLock加锁
当count递减为0时,调用command主任务运行 然后调用nextGeneration()唤起所有子线程 并重置count
当count递减不为0时 当前线程放入阻塞队列Condition trip 如果有超时,线程自动激活
Semaphore 信号同步器
计数器递增
不需要知道同步线程的个数 在需要同步的地方调用acquire方法指定个数
在需要同步的地方调用acquire方法指定个数
支持循环使用,需要手动重新调用acquire
使用
Semaphore semaphore = new Semaphore(0); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(() -> { //dothing semaphore.release();//计数器+1 }); //主线程等待permits个线程完成 semaphore.acquire(permits);
方法
Semaphore(int permits); Semaphore(int permits, boolean fair) 提供公平和非公平的构造方法
FairSync 继承Sync
hasQueuedPredecessors() 判断除当前线程是否有先入队的线程,保证公平性
NonfairSync 默认策略
acquire()
不传参默认获取一个信号源 sync.acquireSharedInterruptibly(1);
获取到信号量大于1,会减1 如果信号量为0,会进入AQS阻塞队列
acquire(int permits)
获取permits个信号量
acquireUniterruptibly() acquireUniterruptibly(int permits)
获取信号量 其他线程调用interrupt不能中断线程
release() release(int permits)
把当前Semaphore信号量+1 / +permits
如果有线程因调用acquire进入阻塞队列,会根据公平策略选择一个信号个数能被满足的线程唤起
使用CAS保证信号量的原子性
谷歌提供的java扩展包,其中com.google.common.util.concurrent包中的ListenableFuture,提供了异步回调的功能
相比FutureTask的优势,可以异步回调,不需要join()主动回调等待线程完成
FutureCallBack<V>
异步回调接口,实现该接口,重写该回调的方法,处理任务
void onSuccess(@Nullable V var1)
void onFailure(Throwable var1)
线程实例创建
// Guava线程池  ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
// 相当于Java线程池的创建 ExecutorService jPool = Executors.newFixedThreadPool(10);
//submit()方法用来提交任务,返回异步任务实例 ListenableFuture<Boolean> hFuture = gPool.submit(callableThread);
//绑定回调实例 Futures.addCallback(hFuture, new FutureCallback<Boolean>() { //回调方法onSuccess & onFailure });
继承java的Future接口
public interface Future<V> extends java.util.concurrent.Future<V> { boolean isSuccess(); // 判断异步执行是否成功 boolean isCancellable(); // 判断异步执行是否取消 Throwable cause(); // 获取异步任务异常的原因 
//增加异步任务执行完成Listener监听器 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);   //移除异步任务执行完成Listener监听器 Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);  ... }
- Netty的Future一般不会直接使用,会使用它的子接口,如ChannelFuture,异步I/O操作
添加GenericFutureListener监听器,加入到Future任务中,实现对异步回调的监听
package io.netty.util.concurrent;  import java.util.EventListener;  public interface GenericFutureListener<F extends Future<?>> extends EventListener { //监听器的回调方法 void operationComplete(F var1) throws Exception;  }
GenericFutureListener接口在Netty中是一个基础类型接口。在网络编程的异步回调中,一般使用Netty中提供的某个子接口,ChannelFutureListener接口
JDK1.8引入,提供异步任务回调的接口
实现Future和CompletionStage
CompletionStage子任务实例
Function
有输入、输出
产生的结果输出到下一步
Runnable
没有输入、输出
Consumer
有输入、无输出
多个CompletionStage构成一条任务流水线,一个环节执行完了会将结果交给下一个环节
oneStage.thenApply(x -> square(x)).thenAccept(y->System.out.print(y)).thenRun(()->//doThing)
runAsync
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { sleepSeconds(1);//模拟执行1秒 Print.tcfo("run end ..."); });  //等待异步任务执行完成,限时等待2秒 future.get(2, TimeUnit.SECONDS);
supplyAsync
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> { long start = System.currentTimeMillis(); sleepSeconds(1);//模拟执行1秒 Print.tcfo("run end ..."); return System.currentTimeMillis() - start; });
设置子任务回调钩子
//设置异步任务执行完成后的回调钩子 future.whenComplete(new BiConsumer<Void, Throwable>() { @Override public void accept(Void t, Throwable action){ Print.tco("执行完成!"); }  });
//设置异步任务发生异常后的回调钩子 future.exceptionally(new Function<Throwable, Void>(){ @Override public Void apply(Throwable t){ Print.tco("执行失败!" + t.getMessage()); return null; } }); //获取异步任务的结果 future.get(); }
调用 cancel方法时,也会执行异常回调钩子
如果没有设置异常回调钩子
调用get()/get(long ,TimeUnit)时, 如遇到内部异常,会抛出ExecutionException
调用join()/getNow(T)启动任务时,如遇到内部异常,会抛出CompletionException
hanle()
除了调用异常处理钩子外,还可以调用该方法实现异常处理
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { sleepSeconds(1);//模拟执行1秒 Print.tco("抛出异常!"); throw new RuntimeException("发生异常"); //Print.tco("run end ..."); }); //统一处理异常和结果 future.handle(new BiFunction<Void, Throwable, Void>() {  @Override public Void apply(Void input, Throwable throwable) {  if (throwable == null) { Print.tcfo("没有发生异常!");  } else { Print.tcfo("sorry,发生了异常!");  } return null; } });   future.get(); }
线程池的调用
在没有指定线程池的情况下,默认使用ForkJoinPool线程池
//子任务包装一个Runnable实例,并调用ForkJoinPool.commonPool()线程池来执行 public static CompletableFuture<Void>runAsync(Runnable runnable)  //子任务包装一个Runnable实例,并调用指定的executor线程池来执行 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)  //子任务包装一个Supplier实例,并调用 ForkJoinPool.commonPool()线程池来执行 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)  //子任务包装一个Supplier实例,并使用指定的executor线程池来执行 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
异步任务的串行执行
thenApply
有输入输出
//后一个任务与前一个任务在同一个线程中执行 public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn)  //后一个任务与前一个任务不在同一个线程中执行 public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn)  //后一个任务在指定的executor线程池中执行  public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn, Executor executor)
thenRun
没有输入输出
//后一个任务与前一个任务在同一个线程中执行 public CompletionStage<Void> thenRun(Runnable action);  //后一个任务与前一个任务不在同一个线程中执行 public CompletionStage<Void> thenRunAsync(Runnable action);  //后一个任务在executor线程池中执行 public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
thenAccept
有输入,没输出
//后一个任务与前一个任务在同一个线程中执行 public CompletionStage<Void> thenAccept(Consumer<? super T> action);  //后一个任务与前一个任务不在同一个线程中执行 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);  //后一个任务在指定的executor线程池中执行 public CompletionStage<Void> thenAcceptAsync( Consumer<? super T> action,Executor executor);
thenCompose
有输入输出
第二个任务的返回值是一个CompletionStage实例
异步任务的合并执行
thenCombine
runAfterBoth
thenAcceptBoth
ompletableFuture<Integer> future3 = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply( Integer step1OutCome, Integer step2OutCome) { return step1OutCome * step2OutCome; } }); Integer result = future3.get(); Print.tco(" outcome is " + result); }
AllOf
等待多个任务结束,合并所有任务
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3, future4); all.join();
applyToEither
谁返回结果快,就用谁执行下一步
runAfterEither
前两个CompletionStage实例,任何一个执行完,都会执行第三步回调操作,三个都是Runnable类型
acceptEither
两个CompletionStage实例谁返回快,就会执行第三步回调操作,返回值作为入参
logback异步日志打印
pom依赖
<!--logback日志依赖--> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency>
logback.xml配置
AsyncAppender
继承AsyncAppenderBase
ArrayBlockingQueue 阻塞队列
queueSize 队列大小,默认256
Worker 日志消费单个工作线程
守护线程
aai
appender的装饰器
存放同步日志的appender
appenderCount
记录aai里面附加的同步appender的个数
neverBlock
指示当日志队列满时是否阻塞打印日志的线程
discardingThreshold 阈值
当日志队列里面的空闲元素个数小于该值时,新来的某些级别的日志会被直接丢弃
Tomcat的NioEndPoint
生产-消费模式 Acceptor-Poller
tomcat内部有多个Acceptor while循环等待接收请求
Acceptor将接收到的请求封装成channel对象放入poller对象队列
无界队列,tomcat需要设置最大连接数
注意事项
非线程安全的工具类使用
避免在多线程使用SimpleDateFormat的单个实例
Timer和ScheduledThreadPoolExecutor
Timer多生产,单消费
下个周期开始时,线程还没跑完会抛出异常
ScheduledThreadPoolExecutor可以配置 即可以是多生产,多消费,也可以单消费
浅拷贝问题
多线程需要修改同一个引用类型参数时 需要进行深复制,不能只是new 否则还是指向同一个对象地址引用
创建线程和线程池时要指定与业务相关的名称
不指定线程名称不好定位问题 线程创建会指定常量+数字的名称 init(null, target, "Thread-" + nextThreadNum(), 0);
线程池名称在ThreadFactory中的namePrefix属性指定 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
使用完线程池后,最好调用shutdown方法关闭 否则会导致线程资源一致不被释放
FutureTask使用注意事项
拒绝策略设置为DiscardPolicy和DiscardOldestPolicy,并且被拒绝的线程任务(队列已满)调用了无参get方法,那么调用线程就会一直被阻塞
FutureTask的submit方法,在线程数达到核心线程数数量,且任务队列满了,会执行拒绝策略方法
解决方法
在日常开发中尽量使用带超时参数的get方法
拒绝策略设置为默认的AbortPolicy则会正常返回
ThreadLocal使用注意事项
key,value属于弱引用,线程一直没有调用remove,且其他地方还有对ThreadLocal的引用,value不会被释放
解决方法
使用完后及时调用remove方法
在Tomcat中多次访问servlet创建出来的ThreadLocal不会被释放,创建LocalVariable使用的是webappClassLoader,所以也释放不了,该问题在tomcat7.0后修复
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。