赞
踩
创建Java线程需要给线程分配堆栈内存以及初始化内存,还需要进行系统调用,频繁地创建和销毁线程会大大降低系统的运行效率,采用线程池来管理线程有以下好处:
1. Executor
Executor提供了execute()接口来执行已提交的Runnable执行目标实例,它只有1个方法:
void execute(Runnable command)
2. ExecutorService
继承于Executor,Java异步目标任务的“执行者服务接”口,对外提供异步任务的接收服务
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);//向线程池提交单个异步任务
//想线程池提交批量异步任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
3. AbstractExecutorService
抽象类,实现了ExecutorService
4. ThreadPoolExecutor
线程池实现类,继承于AbstractExecutorService,JUC线程池的核心实现类
5. ScheduledExecutorService
继承于ExecutorService。它是一个可以完成“延时”和“周期性”任务的调度线程池接口
6. ScheduledThreadPoolExecutor
继承于ThreadPoolExecutor,实现了ExecutorService中延时执行和周期执行等抽象方法
7. Executors
静态工厂类,它通过静态工厂方法返回ExecutorService、ScheduledExecutorService等线程池示例对象
1. newSingleThreadExecutor创建“单线程化线程池”
package threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class CreateThreadPollDemo {
public static final int SLEEP_GAP=1000;
static class TargetTask implements Runnable{
static AtomicInteger taskNo=new AtomicInteger(1);
private String taskName;
public TargetTask()
{
taskName="task-"+taskNo;
taskNo.incrementAndGet();
}
public void run()
{
System.out.println("task:"+taskName+" is doing...");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task:"+taskName+" end...");
}
}
public static void main(String[] args) {
ExecutorService pool=Executors.newSingleThreadExecutor();
for(int i=0;i<3;i++)
{
pool.execute(new TargetTask());
pool.submit(new TargetTask());
}
pool.shutdown();
}
}
特点:
2. newFixedThreadPool创建“固定数量的线程池
package threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class CreateThreadPollDemo {
public static final int SLEEP_GAP=1000;
static class TargetTask implements Runnable{
static AtomicInteger taskNo=new AtomicInteger(1);
private String taskName;
public TargetTask()
{
taskName="task-"+taskNo;
taskNo.incrementAndGet();
}
public void run()
{
System.out.println(taskName+" is doing...");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(taskName+" end...");
}
}
public static void main(String[] args) {
ExecutorService pool=Executors.newFixedThreadPool(3);//创建含有3个线程的线程池
for(int i=0;i<5;i++)
{
pool.execute(new TargetTask());
pool.submit(new TargetTask());
}
pool.shutdown();
}
}
特点:
适用场景:
缺点:
3. newCachedThreadPool创建“可缓存线程池”
package threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class CreateThreadPollDemo {
public static final int SLEEP_GAP=1000;
static class TargetTask implements Runnable{
static AtomicInteger taskNo=new AtomicInteger(1);
private String taskName;
public TargetTask()
{
taskName="task-"+taskNo;
taskNo.incrementAndGet();
}
public void run()
{
System.out.println(taskName+" is doing...");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(taskName+" end...");
}
}
public static void main(String[] args) {
ExecutorService pool=Executors.newCachedThreadPool();
for(int i=0;i<5;i++)
{
pool.execute(new TargetTask());
pool.submit(new TargetTask());
}
pool.shutdown();
}
}
特点:
适用场景:
缺点:
4. newScheduledThreadPool创建“可调度线程池”
package threadpool;
import java.security.Policy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class CreateThreadPollDemo {
public static final int SLEEP_GAP=1000;
static class TargetTask implements Runnable{
static AtomicInteger taskNo=new AtomicInteger(1);
private String taskName;
public TargetTask()
{
taskName="task-"+taskNo;
taskNo.incrementAndGet();
}
public void run()
{
System.out.println(taskName+" is doing...");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(taskName+" end...");
}
}
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService pool=Executors.newScheduledThreadPool(2);
for(int i=0;i<2;i++)
{
pool.scheduleAtFixedRate(new TargetTask(), 0, 500, TimeUnit.MILLISECONDS);
//参数1: task任务
//参数2: 首次执行任务的延迟时间
//参数3: 周期性执行的时间
//参数4: 时间单位
}
Thread.sleep(3000);//主线程睡眠时间越长 周期次数越多
pool.shutdown();
}
}
总结:Executors创建线程池的4种方法十分方便,但是构造器创建普通线程池、可调度线程池比较复杂,这些构造器会涉及大量的复杂参数,已经较少使用。
Executors创建线程池存在的问题:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
阻塞队列无界,队列很大,很有可能导致JVM出现OOM(Out Of Memory)异常,即内存资源耗尽
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
问题和固定数量线程池一样,阻塞队列无界
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
问题存在于其最大线程数量不设限上。由于其maximumPoolSize的值为Integer.MAX_VALUE(非常大),可以认为可以无限创建线程,如果任务提交较多,就会造成大量的线程被启动,很有可能造成OOM异常,甚至导致CPU线程资源耗尽
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
主要问题在于线程数不设上限
总结:
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;//核心线程数,即使线程空闲也不会被收回
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;//线程的上限
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;//线程的最大空闲时长
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;//任务的排队队列
private volatile ThreadFactory threadFactory;//新线程的产生方式
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;//拒绝策略
}
1. 核心线程和最大线程数量
2. BlockingQueue
3. keepAliveTime
注意:若调用了allowCoreThreadTimeOut(boolean)方法,并且传入了参数true,则keepAliveTime参数所设置的Idle超时策略也将被应用于核心线程
void execute(Runnable command)
: Executor接口中的方法 <T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
两种方法的区别:
1. 通过submit()返回的Future对象获取结果
package threadpool;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
public class CreateThreadPollDemo {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService pool=Executors.newScheduledThreadPool(2);
Future<Integer> future=pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 123;
}
});
try {
Integer result=future.get();
System.out.println("result:"+result);//123
} catch (ExecutionException e) {
e.printStackTrace();
}
Thread.sleep(1000);
pool.shutdown();
}
}
2. 通过submit()返回的Future对象捕获异常
package threadpool;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.RuntimeErrorException;
public class CreateThreadPollDemo {
public static final int SLEEP_GAP=1000;
static class TargetTask implements Runnable{
static AtomicInteger taskNo=new AtomicInteger(1);
String taskName;
public TargetTask()
{
taskName="task-"+taskNo;
taskNo.incrementAndGet();
}
public void run()
{
System.out.println(taskName+" is doing...");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(taskName+" end...");
}
}
static class TargetTaskWithError extends TargetTask{
public void run()
{
super.run();//执行父类的run方法
throw new RuntimeException("Error from "+taskName);
}
}
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService pool=Executors.newScheduledThreadPool(2);
pool.execute(new TargetTaskWithError());
Future future=pool.submit(new TargetTaskWithError());
try {
if(future.get()==null)
{
System.out.println("No Exception");
}
} catch (ExecutionException e) {
e.printStackTrace();
}
Thread.sleep(1000);
pool.shutdown();
}
}
execute()方法在启动任务执行后,任务执行过程中可能发生的异常调用者并不关心。而通过submit()方法返回的Future对象(异步执行实例),可以进行异步执行过程中的异常捕获
注意点:
example: 设置核心线程数量为1,阻塞队列为100,有5个任务待执行(假设极端情况下任务一直执行不接受),则只有1个任务可以被执行,其他4个任务在阻塞队列中,而不是创建新线程进行处理(阻塞队列未满)
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory);
public static ExecutorService newFixedThreadPool(int nThreads)
package threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class CreateThreadPollDemo {
public static final int SLEEP_GAP=1000;
static class TargetTask implements Runnable{
static AtomicInteger taskNo=new AtomicInteger(1);
String taskName;
public TargetTask()
{
taskName="task-"+taskNo;
taskNo.incrementAndGet();
}
public void run()
{
System.out.println(Thread.currentThread().getName()+": "+taskName+" is doing...");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(taskName+" end...");
}
}
static class SimpleThreadFactory implements ThreadFactory{
static AtomicInteger threadNo=new AtomicInteger(1);
public Thread newThread(Runnable task) {
String threadName="simpleThread-"+threadNo;
System.out.println("创建一条线程,名字是:"+threadName);
threadNo.incrementAndGet();
Thread thread=new Thread(task,threadName);
thread.setDaemon(true);
return thread;
}
public static void main(String[] args) throws InterruptedException {
ExecutorService pool=Executors.newFixedThreadPool(2,new SimpleThreadFactory());
// ExecutorService pool=Executors.newFixedThreadPool(2);
for(int i=0;i<5;i++)
{
pool.submit(new TargetTask());
}
Thread.sleep(5000);
pool.shutdown();
}
}
}
使用默认线程工厂的情况如下:
public static void main(String[] args) throws InterruptedException {
//ExecutorService pool=Executors.newFixedThreadPool(2,new SimpleThreadFactory());
ExecutorService pool=Executors.newFixedThreadPool(2);
for(int i=0;i<5;i++)
{
pool.submit(new TargetTask());
}
Thread.sleep(5000);
pool.shutdown();
线程工厂和线程池工厂:
Executors为线程池工厂类,用于快捷创建线程池(Thread Pool);ThreadFactory为线程工厂类,用于创建线程(Thread)
特点:在一个线程从一个空的阻塞队列中获取元素时线程会被阻塞,直到阻塞队列中有了元素;当队列中有元素后,被阻塞的线程会自动被唤醒
常见的几种阻塞队列的实现:
三个钩子方法存在于ThreadPoolExecutor类,这3个方法都是空方法,一般会在子类中重写
protected void beforeExecute(Thread t, Runnable r) { }
: 任务执行之前的钩子方法
protected void afterExecute(Runnable r, Throwable t) { }
: 任务执行之后的钩子方法
protected void terminated() { }
: 线程池终止时的钩子方法
package threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class CreateThreadPollDemo {
public static final int SLEEP_GAP=1000;
static class TargetTask implements Runnable{
static AtomicInteger taskNo=new AtomicInteger(1);
String taskName;
public TargetTask()
{
taskName="task-"+taskNo;
taskNo.incrementAndGet();
}
public void run()
{
System.out.println(Thread.currentThread().getName()+": "+taskName+" is doing...");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(taskName+" end...");
}
}
static class SimpleThreadFactory implements ThreadFactory{
static AtomicInteger threadNo=new AtomicInteger(1);
public Thread newThread(Runnable task) {
String threadName="simpleThread-"+threadNo;
System.out.println("创建一条线程,名字是:"+threadName);
threadNo.incrementAndGet();
Thread thread=new Thread(task,threadName);
thread.setDaemon(true);
return thread;
}
public static void main(String[] args) throws InterruptedException {
ExecutorService pool=new ThreadPoolExecutor(2, 4, 60,TimeUnit.SECONDS, new LinkedBlockingQueue<>(2)){
@Override
protected void terminated()
{
System.out.println("调度器已停止...");
}
@Override
protected void beforeExecute(Thread t,Runnable target)
{
System.out.println("前钩执行...");
super.beforeExecute(t, target);
}
@Override
protected void afterExecute(Runnable target,Throwable t)
{
System.out.println("后钩执行...");
super.afterExecute(target, t);
}
};
for(int i=0;i<5;i++)
pool.execute(new TargetTask());
Thread.sleep(5000);
pool.shutdown();
}
}
}
拒绝情况:
几种常见的拒绝策略:
新任务就会被拒绝,并且抛出RejectedExecutionException异常。该策略是线程池默认的拒绝策略
新任务就会直接被丢掉,并且不会有任何异常抛出
将最早进入队列的任务抛弃,从队列中腾出空间,再尝试加入队列(一般队头元素最老)
新任务被添加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务
package threadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class CreateThreadPollDemo {
public static final int SLEEP_GAP=1000;
static class TargetTask implements Runnable{
static AtomicInteger taskNo=new AtomicInteger(1);
String taskName;
public TargetTask()
{
taskName="task-"+taskNo;
taskNo.incrementAndGet();
}
public void run()
{
System.out.println(Thread.currentThread().getName()+": "+taskName+" is doing...");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(taskName+" end...");
}
}
static class SimpleThreadFactory implements ThreadFactory{
static AtomicInteger threadNo=new AtomicInteger(1);
public Thread newThread(Runnable task) {
String threadName="simpleThread-"+threadNo;
System.out.println("创建一条线程,名字是:"+threadName);
threadNo.incrementAndGet();
Thread thread=new Thread(task,threadName);
thread.setDaemon(true);
return thread;
}
static class CustomerIgnorePolicy implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(Thread.currentThread().getName()+"-rejected; taskCount-"+executor.getTaskCount());
}
}
public static void main(String[] args) throws InterruptedException {
int corePoolSize=2;//核心线程数
int maximumPoolSize=4;//最大线程数
long keepAlive=10;//空闲时间
TimeUnit unit=TimeUnit.SECONDS;//时间单位
BlockingQueue<Runnable> workQueue=new LinkedBlockingQueue<>(2);//阻塞队列
ThreadFactory factory=new SimpleThreadFactory();//自定义线程工厂
RejectedExecutionHandler policy=new CustomerIgnorePolicy();//自定义拒绝策略
ThreadPoolExecutor pool=new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAlive,unit,workQueue,factory,policy);
pool.prestartAllCoreThreads();
for(int i=0;i<11;i++)
pool.execute(new TargetTask());
Thread.sleep(5000);
pool.shutdown();
}
}
}
线程池的5种状态:
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
几种状态的转换:
几种关闭线程池的方法:
等待当前工作队列中的剩余任务全部执行完成之后,才会执行关闭,但是此方法被调用之后线程池的状态转为SHUTDOWN,线程池不会再接收新的任务
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//检测权限
advanceRunState(SHUTDOWN);//设置线程池状态
interruptIdleWorkers();//中断空闲线程
onShutdown(); // 钩子函数,用于清理一些资源
} finally {
mainLock.unlock();
}
tryTerminate();
}
立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//检测权限
advanceRunState(STOP);//设置线程池状态
interruptWorkers();//中断所有线程(工作线程以及空闲线程)
tasks = drainQueue();//丢弃工作队列中的剩余任务
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
等待线程池完成关闭, shutdown()与shutdownNow()方法之后,用户程序都不会主动等待线程池关闭完成
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
在设置的时间timeout内如果线程池完成关闭,返回true, 否则返回false
参考书籍:《Java高并发编程卷2》 尼恩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。