赞
踩
这里为了便于叙述,毕竟不是本次的重点,我直接上源码,没基础的可以去找些其他资料来补一补
public class ThreadpoolApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 继承Thread类实现
Thread impl =new ThreadImpl();
impl.start();
// 2. 实现Runnable接口
// 对于有 @FunctionalInterface 的类 or 接口,我们可以使用lambda表达式来简化,当然
// 没有这个注解也可以,但是一定要符合函数式编程的规范
// 1. 只能有1个待实现的方法
// 2. 允许有默认方法
// 3. 只能是接口
// @FunctionalInterface 可有可无,但是为了规范建议写上,起一个标记作用,告诉编译器这是一个函数式接口
// 可以让IDE帮你检测你的函数式接口是否符合规范
Thread runnableImpl = new Thread(new Runnable() {
// 这里可以用函数式接口lambda表达式来简写,具体的内容这里不做过多解释,
// 你可以理解lambda实现为是对接口的简单实现,因为你用lambda返回的也是个Runnable实现对象
// 后面我就直接用Lambda表达式来简写了。
@Override
public void run() {
}
});
runnableImpl.start();
// 实现Callable接口,带返回值
FutureTask<Integer> futureTask= new FutureTask<>(()->{
return 1;
});
new Thread(futureTask).start();
System.out.println(futureTask.get());
Integer result=0;
// 实现Runnable接口,带返回值
futureTask=new FutureTask<>(()->{
},result);
new Thread(futureTask).start();
futureTask.get(); // 其实这个值就是你设置的值,可以去看一下源码,这里只是为了方便任务管理
return ;
}
}
大家可能对函数式编程有点懵,其实就是符合上面所说的规范
对于有 @FunctionalInterface 的类 or 接口,我们可以使用lambda表达式来简化,当然没有这个注解也可以,但是一定要符合函数式编程的规范
@FunctionalInterface 可有可无,但是为了规范建议写上,起一个标记作用,告诉编译器这是一个函数式接口可以让IDE帮你检测你的函数式接口是否符合规范
比如这样子:
@FunctionalInterface
public interface MethodInterface {
void print();
default MethodInterface andThen(MethodInterface methodInterface){
// 想一下有什么区别
// print();
// methodInterface.print();
// return methodInterface;
return ()->{
print();
methodInterface.print();;
};
// 不用lambda表达式,那么就是在andThen里面进行执行的,每次执行andThen的时候都会自动执行print方法,而且总体上每次
// methodInterface.print()会执行两次,那么methodInterface.print()不写的话最后需要再执行一次applay
// 如果你是使用的lambda表达式返回,那么返回的是一个全新的接口,如果我们需要链式调用完,那么在最后还要执行一下
// print方法,当然你也可以选择实现一个end()方法来表示结束,相当于另外起一个名
}
}
其实上面的函数式接口和Cusumer一样,不过Cusumer多了一个判空的过程,除此之外还有另外几个常用的函数式接口(统一规范),如下:
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
// 执行一个before操作,类似于AOP思想
default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
Objects.requireNonNull(before);
// 将执行完成before的结果再带入当前的Fuction进行运算
// nnd, 这不是指针啊!!!,lambda表达是只有一行的时候可以不用写大括号
return (V v) -> apply(before.apply(v));
// 相当于这样
// return (V v) -> {
// apply(before.apply(v));
// }
}
default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
// 在本次执行完之后再执行after
return (T t) -> after.apply(apply(t));
}
// 这又是个什么玩意儿,别急,看我代码补全,构造一个传入和返回同类型的接口
// 这有啥用,呃呃呃,简写而已,这个是对apply的实现就是给啥还啥,但是只能过用在Function接口上
// 其他接口不适用,因为接口返回类型,但是你都可以用 t -> t 来表示
static <T> Function<T, T> identity() {
// return t -> return t;
return t -> t;
}
}
@FunctionalInterface
public interface BiFunction<T, U, R> {
// 对t和u进行运算,返回R类型,nnd,不就是sort函数的第三个cmp参数吗,不说了
R apply(T t, U u);
// 这一块就不说了
default <V> BiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (T t, U u) -> after.apply(apply(t, u));
}
工厂模式,要什么可以现在这类面配置好,最后再来一个beanFactory.get()就行
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
@FunctionalInterface
public interface Predicate<T> {
// 实现一个原子性的判断,也就是说我们可以把这个放在if里面来玩儿
boolean test(T t);
// 与运算
default Predicate<T> and(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) && other.test(t);
}
// 非
default Predicate<T> negate() {
return (t) -> !test(t);
}
// 或
default Predicate<T> or(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) || other.test(t);
}
// 判断相等
static <T> Predicate<T> isEqual(Object targetRef) {
return (null == targetRef)
? Objects::isNull
: object -> targetRef.equals(object);
}
// 类似于上面Function的identity()方法
@SuppressWarnings("unchecked")
static <T> Predicate<T> not(Predicate<? super T> target) {
Objects.requireNonNull(target);
return (Predicate<T>)target.negate();
}
}
也许你有一个疑问,为什么最后又会返回一个函数式接口?而不是使用直接执行,拿这个方法来说
default MethodInterface andThen(MethodInterface methodInterface){
// 想一下有什么区别
// print();
// return methodInterface;
return ()->{
print();
methodInterface.print();;
};
// 不用lambda表达式,那么就是在andThen里面进行执行的,每次执行andThen的时候都会自动执行print方法,而且总体上每次
// methodInterface.print()会执行两次,那么methodInterface.print()不写的话最后需要再执行一次applay
// 如果你是使用的lambda表达式返回,那么返回的是一个全新的接口,如果我们需要链式调用完,那么在最后还要执行一下
// print方法,当然你也可以选择实现一个end()方法来表示结束,相当于另外起一个名
}
对于直接返回method,最后再来执行一次,那么就像排队一样,一步一步执行,那么你调试的时候,就会频繁的在上下文跳来跳去。
如果你使用接口封装,那么调用print()相当于是一次组合,什么还不懂?
那么好,如果说我要用这个结合来做很多次(比如放入异步任务,当然这里不可能,我是说如果),你不用接口组合,而是用面向过程来实现
那么我们每次放进去都要放
a.andThen()
.andThen()
.andThen()
.andThen()
...
.andThen().print();
// 重新运行只能这样做
a.andThen()
.andThen()
.andThen()
.andThen()
...
.andThen().print();
那么如果我们是用组合
我们只需要这样做
MethodInterface mi=a.andThen()
.andThen()
.andThen()
.andThen()
...
.andThen();
// 每次要重复执行的时候只需要运行一下,一行代码解决
mi.print();
关于函数式编程在stream中的运用,我给大家推荐一篇文章,这里我就不做过多的讲解了,毕竟就是写调用的问题,上面的能看懂,那么我相信你看源代码的时候都能够迎刃而解!
一个线程一个Thread对象,那么Thread对象的创建和销毁就太频繁了,那么有没有优化的方案呢?
之前我们不是将FutureTask给放入了Thread进行运行吗?也就是说FutureTask能够管理我们的任务方法
没错,我们可以用FutureTask来对线程进行封装,再放入Thread,每次任务结束的时候控制FutrueTask就可以了,完全不用管Thread对象的创建
那么FutureTask的任务创建是啥?
来看看源码吧,先来看看是如何做到Runnable和Callable之间的适配的
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable; // 如果是callable,那么直接静态代理,能够理解
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result); // 如果是runnable,那么采用适配器模式转换一下
/**
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
**/
this.state = NEW; // ensure visibility of callable
}
// runnable适配器
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result; // 这里可以看到只是使用适配器模式对task进行一个代理,返回的result的值是固定的,当然有异常的话就没有返回
}
public String toString() {
return super.toString() + "[Wrapped task = " + task + "]";
}
}
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); // 不管是哪个类型,都适配成call来被代理
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
来看看FutureTask的继承
但是找了一圈儿也没有找到一个可以设置callable的方法,都是初始化,那么就自己实现吧,参考上面的实现一个
package fyi.wzl.threadpool.threadimpl;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.Callable;
public class CallableImpl<T> implements Callable {
private Callable<T> callable;
@Override
public T call() throws Exception {
if (ObjectUtils.isEmpty(callable)) return null;
return callable.call();
}
public CallableImpl(Callable<T> callable) {
this.callable = callable;
}
public CallableImpl(Runnable runnable,T res) {
this.callable = new RunnableAdpter(runnable,res);
}
public CallableImpl<T> setCallable(Callable<T> callable) {
this.callable = callable;
return this;
}
public CallableImpl<T> setCallable(Runnable runnable,T res) {
this.callable = new RunnableAdpter(runnable,res);
return this;
}
private class RunnableAdpter implements Callable<T> {
Runnable runnable;
T res;
public RunnableAdpter(Runnable runnable, T res) {
this.runnable = runnable;
this.res = res;
}
@Override
public T call() throws Exception {
this.runnable.run();
return res;
}
}
}
当然,你也可以写一个CallableAdpter来适配Callable为Runnable
因为Java引用传递,那么我们可以通过修改callble来达到修改任务的目的,那么貌似不用FutureTask也可以了?当然你也可以实现一个类似FutrueTask的实现,这样就不用FutrueTask,直接交给Thread进行管理也可以。
那么FutrueTask的作用是啥?
FutureTask一个可取消的异步计算,FutureTask 实现了Future的基本方法,提供 start cancel 操作,可以查询计算是否已经完成,并且可以获取计算的结果。结果只可以在计算完成之后获取,get方法会阻塞当计算没有完成的时候,一旦计算已经完成,那么计算就不能再次启动或是取消。
再看看FutrueTask的源码,又有AQS这些东东,这些东西我们在以后的文章里面细说。
好了,刚才讲到,我们可以用FutrueTask来封装Runnable和Callable(那么我封装自己也可以吧),那么我们也可以手写一个实现来实现可变换任务的封装(其实这里不太严谨,但是为了照顾新手,就讲的简单点,我们直接使用)
CallableImpl<Integer> callable = new CallableImpl<>(() -> {
System.out.println("没改");
return 1;
});
FutureTask futureTask = new FutureTask<Integer>(callable);
Thread thread = new Thread(futureTask);
thread.start();
callable.setCallable(()->{
System.out.println("改了");
return 2;
});
System.out.println(futureTask.get());
但是注意的是Thread中start只能执行一次,那么怎么做?
重写Thread来判断任务状态,然后没有任务的时候进行阻塞,有任务的时候进行唤醒,下面来简单实现一下(其实这里也可以实现一个任务队列,然后异步监控来判断实现notifyall)
public class ThreadpoolApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableImpl<Integer> callable = new CallableImpl<>(() -> {
System.out.println("没改,线程ID为:"+Thread.currentThread().getId());
return 1;
});
ThreadImpl thread=new ThreadImpl(callable);
// 线程开始后是死循环,没执行一次后进行阻塞,等到新任务进入后进再执行
thread.start();
AtomicInteger i= new AtomicInteger(0);
while(true){
// 这里是每次设定不同的任务
callable.setCallable(thread.getId(),()->{
System.out.println("改了"+i.getAndIncrement()+"线程ID为:"+Thread.currentThread().getId());
return 2;
});
}
}
}
来看一看对于Thread的重新实现吧
public class ThreadImpl extends Thread{
// 自己实现的一个Callble接口,模仿FutrueTask实现
CallableImpl callable=null;
public ThreadImpl(CallableImpl callable) {
this.callable = callable;
}
// 其实主要是对run方法进行的修改,这里有一点AOP那个味道了,好好好
@Override
public void run() {
while (true){
// 多线程代码
String intern = String.valueOf(this.getId()).intern();
synchronized (intern){
try {
this.callable.call();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
intern.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
}
// 这个方法可能有点偏离原始实现Thread的思想,现在感觉应该是以实现RUnnable接口为主,难得改了,有兴趣的朋友可以写一下
public class CallableImpl<T> implements Callable {
private Callable<T> callable;
@Override
public T call() throws Exception {
if (ObjectUtils.isEmpty(callable)) return null;
return callable.call();
}
public CallableImpl(Callable<T> callable) {
this.callable = callable;
}
public CallableImpl(Runnable runnable,T res) {
this.callable = new RunnableAdpter(runnable,res);
}
public CallableImpl<T> setCallable(long threadId,Callable<T> callable) throws InterruptedException {
String id=String.valueOf(threadId).intern();
synchronized (id) {
this.callable = callable;
id.intern().notify();
// notify需要等待锁释放完成后才会对其它锁进行唤醒操作,不信的话可以把注释删掉跑一边
// while (true){
// if (id==null)break;
// }
}
return this;
}
public CallableImpl<T> setCallable(Runnable runnable,T res) {
this.callable = new RunnableAdpter(runnable,res);
return this;
}
private class RunnableAdpter implements Callable<T> {
Runnable runnable;
T res;
public RunnableAdpter(Runnable runnable, T res) {
this.runnable = runnable;
this.res = res;
}
@Override
public T call() throws Exception {
this.runnable.run();
return res;
}
}
}
看看执行结果
上面只是提到了对于Thread执行任务的一种动态实现方法,肯定还有其他的。
那么动态实现有什么好处呢?
当我们有很多个任务的时候,我们如果一直使用new,再让gc的话,那么对于系统资源的消耗无疑是巨大的。
那么这个时候,如果我们固定一下,专门拿几个线程来处理并发任务呢?但是当并发任务很多又该怎么办?
这个时候就引入了池化思想 —— Pool
什么是池?
在学JDBC的时候我们知道了连接池,在学Spring的时候,我们又接触到了对象池。
其实按理来说线程池应该是大家在初学JavaSE的时候应该就遇到的,这里我们再来讲一下。
线程池,就是用一个容器来管理线程,这个容器叫做池(Pool)。
pool里面的线程数量是固定的,我们拿着固定的线程数量区执行不同的任务,下面来看一个思维图。
这里要引入几概念:
最大线程数
线程池允许创建的最大线程数量。
核心线程数
线程池维护的最小线程数量,核心线程创建后不会被回收(注意:设置allowCoreThreadTimeout=true后,空闲的核心线程超过存活时间也会被回收)。
阻塞队列
一个用于存放任务的队列
当最开始有任务到达的时候,会抢先占用核心线程,当核心线程占用满了以后进入任务队列,任务队列满了以后还有新的线程,那么启用临时线程来进行处理,注意的是临时线程不会处理阻塞队列中的任务,并且临时线程拥有一个存活时间,当长时间没有任务的时候,就会进行自动销毁。
如果临时线程也满了,就是说超过了最大线程数,这也代表着阻塞队列满了,那么采用拒绝策略。
下面我们堆四大拒绝策略来展开说说
终止策略,这是ThreadPoolExecutor
线程池默认的拒绝策略,程序将会抛出RejectedExecutionException
异常。
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
调用者运行策略,线程池中没办法运行,那么就由提交任务的这个线程运行(嗯,也就相当于变成了原来的单线程串行执行)。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run(); // 直接就在这里运行了,并且不会爬出异常
}
}
}
丢弃最早未处理请求策略,丢弃最先进入阻塞队列的任务以腾出空间让新的任务入队列。
有一点像滑动窗口那个感觉了
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r); // 在这里面执行了入队操作
}
}
}
丢弃策略,什么都不做,即丢弃新提交的任务。
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
总结一下,上面其实可以看出一个优先级:
任务总是按照如下优先级进入:
核心线程>阻塞队列>临时线程>拒绝策略
说了这么多,那么我们如何创建线程池呢?
我们只需要new一个对象就可以了
这里我们来玩儿个好玩儿的,玩儿玩儿阻塞,哈哈,组省赛策略我们选择CallerRunsPlicy,也就是哪来的回哪里去
public class ThreadpoolApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int corePoolSize=3;
int maximumPoolSize=6;
long keepAliveTime=7000L;
TimeUnit unit=TimeUnit.MILLISECONDS;
BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(10);
ExecutorService executorService = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,new ThreadPoolExecutor.CallerRunsPolicy());
int i=0;
Thread local=Thread.currentThread();
while (i++<3){
int j=0;
while(j++<6){
executorService.execute(()->{
Thread thread = Thread.currentThread();
System.out.println(thread +"完成,时间:"+new Date());
try {
if (!thread.equals(local)){
Thread.sleep(5000);
}
else {
Thread.sleep(2000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
}
}
我们可以看一下运行结果
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[pool-1-thread-6,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[main,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[main,5,main]完成,时间:Fri Aug 11 05:55:05 CST 2023
Thread[pool-1-thread-6,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:55:13 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:55:13 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:55:13 CST 2023
Thread[pool-1-thread-6,5,main]完成,时间:Fri Aug 11 05:55:13 CST 2023
可以看到我们在主线程也产生了阻塞,那么我们把阻塞时间给换一下,看看会发生啥(qwq,就是玩儿)
这个时候的输出如下
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[main,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-6,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-6,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:56:47 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:56:47 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 05:56:47 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 05:56:47 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:56:48 CST 2023
其实这里也可以看到,lambda表达式其实就是将函数封装成接口,但是让我们用函数的方法来书写,方便看,然后直接传入,其实任务的提交很快,先提交,再消费,所以说主线程就算阻塞了也不会影响其他线程的运行
其实出了execute以外还有一个方法。
executorService.submit
这里有一个submit和一个execute,有什么区别呢?
submit会有一个Futrue<?>的返回值,我们可以通过Futrue<T>.get()来获取T类型的返回值,来看看演示吧
我们稍微改一下循环
while (i++<3){
int j=0;
while(j++<6){
int finalJ = j;
Future<?> submit = executorService.submit(() -> {
Thread thread = Thread.currentThread();
System.out.println(thread + "完成,时间:" + new Date());
try {
if (!thread.equals(local)) {
Thread.sleep(5000);
} else {
Thread.sleep(2000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return finalJ;
});
System.out.println(String.valueOf(i)+","+submit.get()+",时间:"+new Date());
}
运行结果
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 06:18:10 CST 2023
1,1,时间:Fri Aug 11 06:18:15 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 06:18:15 CST 2023
1,2,时间:Fri Aug 11 06:18:20 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 06:18:20 CST 2023
1,3,时间:Fri Aug 11 06:18:25 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 06:18:25 CST 2023
1,4,时间:Fri Aug 11 06:18:30 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 06:18:30 CST 2023
1,5,时间:Fri Aug 11 06:18:35 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 06:18:35 CST 2023
1,6,时间:Fri Aug 11 06:18:40 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 06:18:40 CST 2023
2,1,时间:Fri Aug 11 06:18:45 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 06:18:45 CST 2023
2,2,时间:Fri Aug 11 06:18:50 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 06:18:50 CST 2023
2,3,时间:Fri Aug 11 06:18:55 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 06:18:55 CST 2023
2,4,时间:Fri Aug 11 06:19:00 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 06:19:00 CST 2023
2,5,时间:Fri Aug 11 06:19:05 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 06:19:05 CST 2023
2,6,时间:Fri Aug 11 06:19:10 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 06:19:10 CST 2023
3,1,时间:Fri Aug 11 06:19:15 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 06:19:15 CST 2023
3,2,时间:Fri Aug 11 06:19:20 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 06:19:20 CST 2023
...
通过调试,发现get方法是阻塞的,没错和上面FutrueTask一样,因为它们都实现于Futrue接口,当然各自也有实现。
那么这里我们如何才能异步获取结果而不印象主线程的运行呢,把Futrue放在一个数组里面就可以了,哈哈。
这里其实也有一些小细节,看代码都能理解,但还是得根据实际来解释
public class ThreadpoolApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int corePoolSize=3;
int maximumPoolSize=6;
long keepAliveTime=7000L;
TimeUnit unit=TimeUnit.MILLISECONDS;
BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(10);
ExecutorService executorService = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,new ThreadPoolExecutor.CallerRunsPolicy());
int i=0;
Thread local=Thread.currentThread();
List<List<Future>> list=new ArrayList<>();
list.add(new ArrayList<>());
System.out.println("任务提交开始,时间:"+new Date().getTime());
while (i++<3){
list.add(new ArrayList<>());
int j=0;
while(j++<6){
int finalJ = j;
int finalI1 = i;
System.out.println(finalI1 +","+finalJ+","+Thread.currentThread()+"提交,时间:" + new Date().getTime());
Future<?> submit = executorService.submit(() -> {
Thread thread = Thread.currentThread();
try {
System.out.println(finalI1 +","+finalJ+","+thread +"开始处理,时间:" + new Date().getTime());
if (!thread.equals(local)) {
Thread.sleep(5000);
} else {
Thread.sleep(2000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println(finalI1 +","+finalJ+","+thread + "完成,时间:" + new Date().getTime());
}
return finalJ;
});
list.get(i).add(submit);
}
}
System.out.println("任务提交完成,时间:"+new Date().getTime());
for (i=1;i<list.size();i++){
int finalI = i;
list.get(i).forEach(v->{
try {
System.out.println(finalI+","+v.get()+",获取结果,时间:"+new Date().getTime());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
});
}
}
}
来看看结果(时间戳前面的几位1691710我就删了,都一样,懒得看)
任务提交开始,时间:1691713108082
1,1,Thread[main,5,main]提交,时间:1691713108082
1,2,Thread[main,5,main]提交,时间:1691713108161
1,3,Thread[main,5,main]提交,时间:1691713108161
1,1,Thread[pool-1-thread-1,5,main]开始处理,时间:1691713108161
1,2,Thread[pool-1-thread-2,5,main]开始处理,时间:1691713108161
1,4,Thread[main,5,main]提交,时间:1691713108161
1,5,Thread[main,5,main]提交,时间:1691713108161
1,6,Thread[main,5,main]提交,时间:1691713108162
1,3,Thread[pool-1-thread-3,5,main]开始处理,时间:1691713108162
2,1,Thread[main,5,main]提交,时间:1691713108162
2,2,Thread[main,5,main]提交,时间:1691713108162
2,3,Thread[main,5,main]提交,时间:1691713108162
2,4,Thread[main,5,main]提交,时间:1691713108162
2,5,Thread[main,5,main]提交,时间:1691713108162
2,6,Thread[main,5,main]提交,时间:1691713108162
3,1,Thread[main,5,main]提交,时间:1691713108162
3,2,Thread[main,5,main]提交,时间:1691713108162
3,3,Thread[main,5,main]提交,时间:1691713108163
3,4,Thread[main,5,main]提交,时间:1691713108163
3,2,Thread[pool-1-thread-4,5,main]开始处理,时间:1691713108163
3,5,Thread[main,5,main]提交,时间:1691713108163
3,5,Thread[main,5,main]开始处理,时间:1691713108163
3,3,Thread[pool-1-thread-5,5,main]开始处理,时间:1691713108163
3,4,Thread[pool-1-thread-6,5,main]开始处理,时间:1691713108163
3,5,Thread[main,5,main]完成,时间:1691713110163
3,6,Thread[main,5,main]提交,时间:1691713110163
3,6,Thread[main,5,main]开始处理,时间:1691713110163
3,6,Thread[main,5,main]完成,时间:1691713112164
任务提交完成,时间:1691713112164
1,1,Thread[pool-1-thread-1,5,main]完成,时间:1691713113162
1,2,Thread[pool-1-thread-2,5,main]完成,时间:1691713113162
1,1,获取结果,时间:1691713113162
1,4,Thread[pool-1-thread-1,5,main]开始处理,时间:1691713113162
1,5,Thread[pool-1-thread-2,5,main]开始处理,时间:1691713113162
1,2,获取结果,时间:1691713113162
1,3,Thread[pool-1-thread-3,5,main]完成,时间:1691713113163
1,6,Thread[pool-1-thread-3,5,main]开始处理,时间:1691713113163
1,3,获取结果,时间:1691713113163
3,2,Thread[pool-1-thread-4,5,main]完成,时间:1691713113164
3,4,Thread[pool-1-thread-6,5,main]完成,时间:1691713113164
3,3,Thread[pool-1-thread-5,5,main]完成,时间:1691713113164
2,2,Thread[pool-1-thread-6,5,main]开始处理,时间:1691713113164
2,1,Thread[pool-1-thread-4,5,main]开始处理,时间:1691713113164
2,3,Thread[pool-1-thread-5,5,main]开始处理,时间:1691713113164
1,4,Thread[pool-1-thread-1,5,main]完成,时间:1691713118163
1,5,Thread[pool-1-thread-2,5,main]完成,时间:1691713118163
2,4,Thread[pool-1-thread-1,5,main]开始处理,时间:1691713118163
1,4,获取结果,时间:1691713118163
2,5,Thread[pool-1-thread-2,5,main]开始处理,时间:1691713118163
1,5,获取结果,时间:1691713118163
1,6,Thread[pool-1-thread-3,5,main]完成,时间:1691713118164
2,6,Thread[pool-1-thread-3,5,main]开始处理,时间:1691713118164
1,6,获取结果,时间:1691713118164
2,1,Thread[pool-1-thread-4,5,main]完成,时间:1691713118165
2,3,Thread[pool-1-thread-5,5,main]完成,时间:1691713118165
2,2,Thread[pool-1-thread-6,5,main]完成,时间:1691713118165
2,1,获取结果,时间:1691713118165
3,1,Thread[pool-1-thread-4,5,main]开始处理,时间:1691713118165
2,2,获取结果,时间:1691713118165
2,3,获取结果,时间:1691713118165
2,4,Thread[pool-1-thread-1,5,main]完成,时间:1691713123164
2,5,Thread[pool-1-thread-2,5,main]完成,时间:1691713123164
2,4,获取结果,时间:1691713123164
2,5,获取结果,时间:1691713123164
2,6,Thread[pool-1-thread-3,5,main]完成,时间:1691713123165
2,6,获取结果,时间:1691713123165
3,1,Thread[pool-1-thread-4,5,main]完成,时间:1691713123166
3,1,获取结果,时间:1691713123166
3,2,获取结果,时间:1691713123166
3,3,获取结果,时间:1691713123166
3,4,获取结果,时间:1691713123166
3,5,获取结果,时间:1691713123166
3,6,获取结果,时间:1691713123166
从这个结果我们可以看出我们把全部把任务提交后任务,接下来我们跟着结果来分析一下
从上面可以看出
核心线程如下
临时线程
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
这里为了方便看结果,我简单处理了下文本
任务提交开始,时间:08082
1,1,从【主线程】提交,时间:08082
1,2,从【主线程】提交,时间:08161
1,3,从【主线程】提交,时间:08161
1,1,在 【核心线程1】(线程1)开始处理,时间:08161
1,2,在 【核心线程2】(线程2)开始处理,时间:08161
# 后面的10个进程进入阻塞队列
1,4,从【主线程】提交,时间:08161
1,5,从【主线程】提交,时间:08161
1,6,从【主线程】提交,时间:08162
1,3,在 【核心线程3】(线程3)开始处理,时间:08162
2,1,从【主线程】提交,时间:08162
2,2,从【主线程】提交,时间:08162
2,3,从【主线程】提交,时间:08162
2,4,从【主线程】提交,时间:08162
2,5,从【主线程】提交,时间:08162
2,6,从【主线程】提交,时间:08162
3,1,从【主线程】提交,时间:08162
# 阻塞队列满了
3,2,从【主线程】提交,时间:08162
3,3,从【主线程】提交,时间:08163
3,4,从【主线程】提交,时间:08163
3,2,在 【临时线程1】(线程4)开始处理,时间:08163
3,5,从【主线程】提交,时间:08163 # 核心线程、阻塞队列、临时线程都满了,执行拒绝策略
3,5,从【主线程】开始处理,时间:08163 # 在 主线程 处理
3,3,在 【临时线程2】(线程5)开始处理,时间:08163
3,4,在 【临时线程3】(线程6)开始处理,时间:08163
3,5,从【主线程】完成,时间:10163
3,6,从【主线程】提交,时间:10163
3,6,从【主线程】开始处理,时间:10163
3,6,从【主线程】完成,时间:12164 # 注意,这里开始主线程的任务完了
任务提交完成,时间:12164
1,1,在 【核心线程1】(线程1)完成,时间:13162
1,2,在 【核心线程2】(线程2)完成,时间:13162
1,1,获取结果,时间:13162
1,4,在 【核心线程1】(线程1)开始处理,时间:13162
1,5,在 【核心线程2】(线程2)开始处理,时间:13162
1,2,获取结果,时间:13162
1,3,在 【核心线程3】(线程3)完成,时间:13163
1,6,在 【核心线程3】(线程3)开始处理,时间:13163
1,3,获取结果,时间:13163
3,2,在 【临时线程1】(线程4)完成,时间:13164 # 临时线程没有销毁又进入临时线程了
3,4,在 【临时线程3】(线程6)完成,时间:13164
3,3,在 【临时线程2】(线程5)完成,时间:13164
2,2,在 【临时线程3】(线程6)开始处理,时间:13164
2,1,在 【临时线程1】(线程4)开始处理,时间:13164
2,3,在 【临时线程2】(线程5)开始处理,时间:13164
1,4,在 【核心线程1】(线程1)完成,时间:18163
1,5,在 【核心线程2】(线程2)完成,时间:18163
2,4,在 【核心线程1】(线程1)开始处理,时间:18163
1,4,获取结果,时间:18163
2,5,在 【核心线程2】(线程2)开始处理,时间:18163
1,5,获取结果,时间:18163
1,6,在 【核心线程3】(线程3)完成,时间:18164
2,6,在 【核心线程3】(线程3)开始处理,时间:18164
1,6,获取结果,时间:18164
2,1,在 【临时线程1】(线程4)完成,时间:18165
2,3,在 【临时线程2】(线程5)完成,时间:18165
2,2,在 【临时线程3】(线程6)完成,时间:18165
2,1,获取结果,时间:18165
3,1,在 【临时线程1】(线程4)开始处理,时间:18165
2,2,获取结果,时间:18165
2,3,获取结果,时间:18165
2,4,在 【核心线程1】(线程1)完成,时间:23164
2,5,在 【核心线程2】(线程2)完成,时间:23164
2,4,获取结果,时间:23164
2,5,获取结果,时间:23164
2,6,在 【核心线程3】(线程3)完成,时间:23165
2,6,获取结果,时间:23165
3,1,在 【临时线程1】(线程4)完成,时间:23166
3,1,获取结果,时间:23166
3,2,获取结果,时间:23166
3,3,获取结果,时间:23166
3,4,获取结果,时间:23166
3,5,获取结果,时间:23166
3,6,获取结果,时间:23166
从上面可以看出当核心线程和临时线程同时存在,如果这个时候开始消费阻塞队列里的任务是,临时线程可以和核心线程一起处理,神奇!
那么我们再来想想,这里我们如果没有任务进入的话,那么核心线程会不会销毁?
试一试。
我们来写一个方法,来获取线程池里面的信息
public static void printThreadPloolInfo(ThreadPoolExecutor threadPoolExecutor){
System.out.println("当前线程池大小:"+threadPoolExecutor.getPoolSize()+
" 核心线程数:"+threadPoolExecutor.getCorePoolSize()+
" 活跃线程数:"+threadPoolExecutor.getActiveCount());
}
同样,修改一下线程池创建的代码
ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,new ThreadPoolExecutor.CallerRunsPolicy());
最后的代码:
public class ThreadpoolApplication {
public static void printThreadPloolInfo(ThreadPoolExecutor threadPoolExecutor){
System.out.println("当前线程池大小:"+threadPoolExecutor.getPoolSize()+
" 核心线程数:"+threadPoolExecutor.getCorePoolSize()+
" 活跃线程数:"+threadPoolExecutor.getActiveCount());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
int corePoolSize=3;
int maximumPoolSize=6;
long keepAliveTime=7000L;
TimeUnit unit=TimeUnit.MILLISECONDS;
BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,new ThreadPoolExecutor.CallerRunsPolicy());
int i=0;
Thread local=Thread.currentThread();
List<List<Future>> list=new ArrayList<>();
list.add(new ArrayList<>());
System.out.println("任务提交开始,时间:"+new Date().getTime());
while (i++<3){
list.add(new ArrayList<>());
int j=0;
while(j++<6){
int finalJ = j;
int finalI1 = i;
System.out.println(finalI1 +","+finalJ+","+Thread.currentThread()+"提交,时间:" + new Date().getTime());
Future<?> submit = executorService.submit(() -> {
Thread thread = Thread.currentThread();
try {
System.out.println(finalI1 +","+finalJ+","+thread +"开始处理,时间:" + new Date().getTime());
if (!thread.equals(local)) {
Thread.sleep(5000);
} else {
Thread.sleep(2000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println(finalI1 +","+finalJ+","+thread + "完成,时间:" + new Date().getTime());
}
return finalJ;
});
list.get(i).add(submit);
}
printThreadPloolInfo(executorService);
}
System.out.println("任务提交完成,时间:"+new Date().getTime());
for (i=1;i<list.size();i++){
int finalI = i;
list.get(i).forEach(v->{
try {
System.out.println(finalI+","+v.get()+",获取结果,时间:"+new Date().getTime());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
});
}
long current = new Date().getTime();
while(true){
printThreadPloolInfo(executorService);
if(new Date().getTime()-current>10*1000)break;
Thread.sleep(500);
}
return;
}
}
结果:
任务提交开始,时间:1691722972284
1,1,Thread[main,5,main]提交,时间:1691722972284
1,2,Thread[main,5,main]提交,时间:1691722972339
1,1,Thread[pool-1-thread-1,5,main]开始处理,时间:1691722972339
1,3,Thread[main,5,main]提交,时间:1691722972339
1,2,Thread[pool-1-thread-2,5,main]开始处理,时间:1691722972339
1,4,Thread[main,5,main]提交,时间:1691722972339
1,5,Thread[main,5,main]提交,时间:1691722972339
1,6,Thread[main,5,main]提交,时间:1691722972339
1,3,Thread[pool-1-thread-3,5,main]开始处理,时间:1691722972339
当前线程池大小:3 核心线程数:3 活跃线程数:3
2,1,Thread[main,5,main]提交,时间:1691722972340
2,2,Thread[main,5,main]提交,时间:1691722972340
2,3,Thread[main,5,main]提交,时间:1691722972340
2,4,Thread[main,5,main]提交,时间:1691722972340
2,5,Thread[main,5,main]提交,时间:1691722972340
2,6,Thread[main,5,main]提交,时间:1691722972340
当前线程池大小:3 核心线程数:3 活跃线程数:3
3,1,Thread[main,5,main]提交,时间:1691722972340
3,2,Thread[main,5,main]提交,时间:1691722972340
3,3,Thread[main,5,main]提交,时间:1691722972340
3,2,Thread[pool-1-thread-4,5,main]开始处理,时间:1691722972340
3,4,Thread[main,5,main]提交,时间:1691722972340
3,3,Thread[pool-1-thread-5,5,main]开始处理,时间:1691722972341
3,5,Thread[main,5,main]提交,时间:1691722972341
3,5,Thread[main,5,main]开始处理,时间:1691722972341
3,4,Thread[pool-1-thread-6,5,main]开始处理,时间:1691722972341
3,5,Thread[main,5,main]完成,时间:1691722974341
3,6,Thread[main,5,main]提交,时间:1691722974341
3,6,Thread[main,5,main]开始处理,时间:1691722974341
3,6,Thread[main,5,main]完成,时间:1691722976342
当前线程池大小:6 核心线程数:3 活跃线程数:6
任务提交完成,时间:1691722976342
1,2,Thread[pool-1-thread-2,5,main]完成,时间:1691722977340
1,3,Thread[pool-1-thread-3,5,main]完成,时间:1691722977340
1,1,Thread[pool-1-thread-1,5,main]完成,时间:1691722977340
1,4,Thread[pool-1-thread-3,5,main]开始处理,时间:1691722977340
1,5,Thread[pool-1-thread-2,5,main]开始处理,时间:1691722977340
1,1,获取结果,时间:1691722977340
1,6,Thread[pool-1-thread-1,5,main]开始处理,时间:1691722977340
1,2,获取结果,时间:1691722977340
1,3,获取结果,时间:1691722977340
3,4,Thread[pool-1-thread-6,5,main]完成,时间:1691722977342
3,2,Thread[pool-1-thread-4,5,main]完成,时间:1691722977342
2,2,Thread[pool-1-thread-4,5,main]开始处理,时间:1691722977342
3,3,Thread[pool-1-thread-5,5,main]完成,时间:1691722977342
2,1,Thread[pool-1-thread-6,5,main]开始处理,时间:1691722977342
2,3,Thread[pool-1-thread-5,5,main]开始处理,时间:1691722977342
1,5,Thread[pool-1-thread-2,5,main]完成,时间:1691722982340
1,6,Thread[pool-1-thread-1,5,main]完成,时间:1691722982340
1,4,Thread[pool-1-thread-3,5,main]完成,时间:1691722982340
2,5,Thread[pool-1-thread-1,5,main]开始处理,时间:1691722982340
2,4,Thread[pool-1-thread-2,5,main]开始处理,时间:1691722982340
1,4,获取结果,时间:1691722982340
2,6,Thread[pool-1-thread-3,5,main]开始处理,时间:1691722982340
1,5,获取结果,时间:1691722982340
1,6,获取结果,时间:1691722982340
2,2,Thread[pool-1-thread-4,5,main]完成,时间:1691722982342
2,1,Thread[pool-1-thread-6,5,main]完成,时间:1691722982342
3,1,Thread[pool-1-thread-4,5,main]开始处理,时间:1691722982342
2,1,获取结果,时间:1691722982342
2,3,Thread[pool-1-thread-5,5,main]完成,时间:1691722982342
2,2,获取结果,时间:1691722982342
2,3,获取结果,时间:1691722982342
2,6,Thread[pool-1-thread-3,5,main]完成,时间:1691722987341
2,4,Thread[pool-1-thread-2,5,main]完成,时间:1691722987341
2,4,获取结果,时间:1691722987341
2,5,Thread[pool-1-thread-1,5,main]完成,时间:1691722987341
2,5,获取结果,时间:1691722987341
2,6,获取结果,时间:1691722987341
3,1,Thread[pool-1-thread-4,5,main]完成,时间:1691722987343
3,1,获取结果,时间:1691722987343
3,2,获取结果,时间:1691722987343
3,3,获取结果,时间:1691722987343
3,4,获取结果,时间:1691722987343
3,5,获取结果,时间:1691722987343
3,6,获取结果,时间:1691722987343
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0
我们处理一下
当前线程池大小:3 核心线程数:3 活跃线程数:3
当前线程池大小:6 核心线程数:3 活跃线程数:6
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0
可以看到,核心线程并不会被销毁,所谓核心线程,只是保证了线程池的最小线程数而已。
那么核心线程能否被回收呢?
我们可以看到构造器中有一个方法
executorService.allowCoreThreadTimeOut(true);
那我们开启试一试
我们会看到最后又有新的东西出现
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:0 核心线程数:3 活跃线程数:0
当前线程池大小:0 核心线程数:3 活跃线程数:0
当前线程池大小:0 核心线程数:3 活跃线程数:0
当前线程池大小:0 核心线程数:3 活跃线程数:0
这里也就是说corePool只是一个阙值,用户可以自定义是否销毁还是不销毁。
核心线程只是按照开发者的想法起的一个名称而已,和临时线程没有什么区别。
那么线程存放在哪里的呢?
这个地方我们跟随源代码来看一看。
我们在ThreadPoolExecutor的源码里面发现了一个东西
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
看到没有,线程其实是存放在一个HashSet里面去的
我们来看看submit和execute
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask); // submit也会交给execute进行处理
return ftask;
}
class Integer{
/**
* The number of bits used to represent an {@code int} value in two's
* complement binary form.
*
* @since 1.5
*/
@Native public static final int SIZE = 32;
}
/**
主要的池控制状态ctl是一个原子整数,包含两个概念字段:
workerCount,表示有效线程数。
runState,表示运行状态,关闭状态等。
**/
// 运行状态存储在高位比特中
/***
RUNNING:运行中的状态,所有线程都可接受新任务。
SHUTDOWN:关闭状态,不再接受新任务,但会处理已排队的任务。
STOP:停止状态,不再接受新任务,不处理已排队的任务,且会中断正在执行的任务。
TIDYING:整理状态,所有任务已终止,线程池正在清理资源。
TERMINATED:终止状态,线程池已经终止,所有任务已完成。
***/
private static final int COUNT_BITS = Integer.SIZE - 3;
// 这个地方参考HashMap如何通过哈希寻找位置,COUNT_BITS是哈希掩码,https://www.wzl1.top/2023/07/21/996/
// 看到这里你以为是这样?放屁!还记得计算机网络如何通过网络掩码来计算网络号和主机号吗?这个就是相似的原理
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static int workerCountOf(int c) { return c & CAPACITY; }
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 当前任务数量小于核心线程数
if (addWorker(command, true)) // 将任务加入到核心线程池里面
return;
c = ctl.get();
/**
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
**/
// 判断运行状态,并且加入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 那我不执行,没问题吧,具体的addworker代码看后文
}
else if (!addWorker(command, false))//阻塞队列满了,启动非核心
reject(command);
}
private volatile RejectedExecutionHandler handler;
final void reject(Runnable command) {
handler.rejectedExecution(command, this); // 拒绝策略
}
// 拒绝策略的实现就看上面了
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
我们再来看看addWork
/*
runState提供了主要的生命周期控制,可以取以下值:
RUNNING:接受新任务并处理排队的任务。
SHUTDOWN:不接受新任务,但处理排队的任务。
STOP:不接受新任务,不处理排队的任务,并中断正在进行的任务。
TIDYING:所有任务已终止,workerCount为零,转换到TIDYING状态的线程将运行terminated()钩子方法。
TERMINATED:terminated()已完成。
这些值之间的数值顺序很重要,以允许有序比较。runState随时间单调递增,但不必达到每个状态。转换如下:
RUNNING -> SHUTDOWN:调用shutdown()时,可能隐式地在finalize()中调用。
(RUNNING或SHUTDOWN) -> STOP:调用shutdownNow()时。
SHUTDOWN -> TIDYING:队列和池都为空时。
STOP -> TIDYING:池为空时。
TIDYING -> TERMINATED:完成了terminated()钩子方法。
*/
private static final int RUNNING = -1 << COUNT_BITS; // 运行
private static final int SHUTDOWN = 0 << COUNT_BITS; // 关闭
private static final int STOP = 1 << COUNT_BITS; // 暂停
private static final int TIDYING = 2 << COUNT_BITS; // 整理
private static final int TERMINATED = 3 << COUNT_BITS; // 终止
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Packing and unpacking ctl
// CAPACITY = 00000000011111111111111111(前面自动补零,有30个左右的零,后面的1同理)
private static int runStateOf(int c) { return c & ~CAPACITY; } // 取高位
private static int workerCountOf(int c) { return c & CAPACITY; } // 取低位
private static int ctlOf(int rs, int wc) { return rs | wc; } // 合并
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 获取运行状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// 如果不再接受任务的状态,并且满足下面条件之一直接return false
// 1. 当前状态不是shutdown (shutdown都不是了,那么我也不能接受新的任务)
// 2. firstTask不为空 (在这个条件满足之前,一定会满足为非运行态并且是shutdown状态,虽然你不会空,但是我也无法接受任务了啊)
// 3. workQueue为空(好好好,shutdown状态,firstTask为空,这个时候没有workQueue了,那么我也不用处理了对不对)
// rs != shutdown || firstTask != null || workQueue.isEmpty()
// 唯一能够进行下去的就是, rs是shutdown的状态,fitstTask为空,并且有任务可以处理
return false;
for (;;) {
int wc = workerCountOf(c); // 获取任务数量
if (wc >= CAPACITY || // OOM溢出,CAPACITY虽然是掩码,每一位为1,那么是不是可以看成任务数量的最大值
wc >= (core ? corePoolSize : maximumPoolSize)) // 加入核心线程还是临时线程
return false;
if (compareAndIncrementWorkerCount(c)) // 如果这个时候c的值没有变化,那么自增(CAS乐观锁)
break retry; // 语法糖,跳出retry,的所有循环,直接进入try那里,和goto类似,你在这里可以看作是对外层循环的控制
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 运行状态改变,重试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 新建一个work
/**
这里是构造函数,内部使用的工厂模式来创建的线程
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
**/
final Thread t = w.thread; // 获取线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); // 再次看运行状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 放入work集合
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 修改最大线程数
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 执行线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
对于Worker,来说,有一个现成制造工厂,其实我们可以在线程池创建的时候去设定,这里我们看看默认工厂是如何实现的
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
// 在这里,我们把runnable传进去的
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false); // 默认模式不开启守护线程
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
而这里的r,我们从worker里面传入的是this
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
我这里直接把Worker这个类给放出来
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
既然是这样,那么t.start()一定会调用run()方法,我们再来看看runWoker(this)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 获取当前线程信息
Runnable task = w.firstTask; // 获取当前Worker任务
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 从当前的worker里面获取,或者从workQueue里面获取
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 执行这个任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
好了,现在我们知道大概的调用流程了
submit()->execute()->addWorker()->new Worker()->t=worker.thread=ThreadFactory.newThread(this)->t.start()执行worker.run()->runwark(this)->this.task.run()
总得来说就是把当前任务交给了worker来进行代理,那么阻塞队列的消费呢?上面的逻辑是在addWorker成功的情况下,如果失败了,就放在workQueue里面,那个offer()方法(祝大家拿到心仪的offer),然后我们在addWorker(null)。
那么我们找找workQueue.take()在哪里。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c); // 获取线程数
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 判断可用线程是否有时间限制
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/**
public E poll(long timeout, TimeUnit unit) throws
参数:此方法采用两个强制性参数:
timeout–等待的时间,以单位为单位。
unit–超时参数的TimeUnit。
返回值:此方法从此LinkedBlockingQueue的头部检索并删除元素,如果在元素可用之前经过了指定的等待时间,则为null。
异常:如果在等待元素可用时方法被中断,则此方法将引发InterruptedException。
**/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //如果我们等待任务的时间都已经超过了线程存活时间,那么也没有必要 处理了,直接处理null就行
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
那么在那里又用到了getTask()呢?
哦,是runWorker(),也就是是说上面我们的流程又要变一下了。
submit()->execute()->addWorker()(如果添加失败的话放入队列,addWorker(null))->new Worker()->t=worker.thread=ThreadFactory.newThread(this)->t.start()执行worker.run()->runwark(this) (如果task是null的话从阻塞队列里面拿队首的来执行)->this.task.run()
那么问题又来了,我们是如何保证一定是被指定的线程执行的呢?
这就要从我们的线程是从哪里来的说起,我们创建线程的时候是在addWorker()方法,然后通过线程工厂,但是默认线程工厂的代码及其简单,给我感觉很强的应该是里面的ThreadGroup,我们看看什么时候用到了。
原来是构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
再来回顾下DefaultThreadFactor
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
// 在这里,我们把runnable传进去的
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false); // 默认模式不开启守护线程
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
那么好吧,我们继续看看ThreadGroup
这一块就是Java SE的内容了,不做过多的阐述
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
private final ThreadGroup parent;
String name;
int maxPriority;
boolean destroyed;
boolean daemon;
boolean vmAllowSuspension;
int nUnstartedThreads = 0;
int nthreads;
Thread threads[];
int ngroups;
ThreadGroup groups[];
... // 忽略其他方法
}
原来这里秒你就有一个线程数组,那么ThreadGroup哪里有用?
Thread的一个构造方法
public Thread(ThreadGroup group, Runnable target, String name,
long stackSize) {
init(group, target, name, stackSize);
}
我们再来看看init
private void init(ThreadGroup g, Runnable target, String name,
long stackSize) {
init(g, target, name, stackSize, null, true);
}
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}
this.name = name;
Thread parent = currentThread();
SecurityManager security = System.getSecurityManager();
if (g == null) {
/* Determine if it's an applet or not */
/* If there is a security manager, ask the security manager
what to do. */
if (security != null) {
g = security.getThreadGroup();
}
/* If the security doesn't have a strong opinion of the matter
use the parent thread group. */
if (g == null) {
g = parent.getThreadGroup();
}
}
/* checkAccess regardless of whether or not threadgroup is
explicitly passed in. */
g.checkAccess();
/*
* Do we have the required permissions?
*/
if (security != null) {
if (isCCLOverridden(getClass())) {
security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
}
}
g.addUnstarted();
// 对新的这个线程初始化
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;
/* Set thread ID */
tid = nextThreadID();
}
但是,你这只是说明了线程组啊,线程数量是在哪里限制的呢?addWork()
那么如何将任务退出的呢?
我们碰一碰processWorkerExit()吧
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
从上面可以看出,线程池的本质其实是一直控制new的线程的数量,只不过线程的ID、名称一直没有变过,所以是给人一种只是切换任务的错觉。
另外JUC为我们提供了一个构造器Executors,之里面有几种线程池的实现,底层还是靠的new ThreadPoolExecutor(),但是《阿里巴巴开发手册》里面不建议这样做,这里还是简单的介绍一下。
Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。
创建固定数目线程的线程池。
newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程。
其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子
和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器
从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同:
fixed池线程数固定,并且是0秒IDLE(无IDLE)
cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
创建一个单线程化的Executor。
单例线程,任意时间池中只能有一个线程
用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
调度型线程池
这个池子里的线程可以按schedule依次delay执行,或周期执行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
线程池,总算是搞懂了,所谓的核心线程数,只是一个定界指针,而线程存活时间也是靠按照时间来获取任务来实现的。
而任务的执行,并不是靠更换线程的仍无,仍然是靠new线程, 当执行完了以后进行processWorkerExit()进行释放
同时大家也可以看到,这里面的阻塞队列,有多种实现,具体的各种实现我们下期再谈。
参考文章:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。