当前位置:   article > 正文

java中的线程池_线程池达到最大线程数会怎样

线程池达到最大线程数会怎样

Java线程池的一些重要参数包括:

corePoolSize(核心线程数):

描述:核心线程池的大小,即线程池中始终保持存活的线程数量。
作用:当有新的任务提交时,线程池会优先创建核心线程来处理任务。

maximumPoolSize(最大线程数):

描述:线程池允许创建的最大线程数。
作用:当任务数量超过核心线程数,并且工作队列已满时,线程池会创建新的线程,但数量不会超过最大线程数。

keepAliveTime(线程空闲时间):

描述:当线程池中的线程数量超过核心线程数时,多余的空闲线程在被回收之前等待新任务的最长时间。
作用:控制非核心线程的生命周期,超过这个时间,多余的非核心线程会被终止。

unit(时间单位):

描述:用于指定keepAliveTime的时间单位,例如,秒、毫秒、微秒等。

workQueue(工作队列):

描述:用于存储等待执行的任务的队列。
作用:当线程池中的线程都在执行任务时,新的任务会被放入工作队列,等待执行。

threadFactory(线程工厂):

描述:用于创建新线程的工厂。
作用:通过定制线程工厂,可以自定义线程的名称、优先级等属性。

handler(拒绝策略):

描述:在线程池已经饱和且工作队列已满时,新任务的处理策略。
作用:当无法接受新任务时,通过拒绝策略来处理。

当一个任务提交到线程池,线程池的处理流程:

  1. 如果当前线程数少于核心线程(corePoolSize),则创建新线程(需要获取全局锁)
  2. 如果当前线程大于等于核心线程,则将任务加入等待队列
  3. 如果阻塞队列已满,并且创建新线程不会使运行线程超出线程池的最大线程数(maximumPoolSize),则创建新线程来执行(需要获取全局锁)
  4. 如果超出最大线程数(maximumPoolSize),则任务被拒绝,并且调用RejectedExecutionHandler.rejectedExecution()方法。

当一个线程完成任务后会到阻塞队列中获取任务

线程池的处理流程方案的好处就是尽可能地避免获取全局锁,完成corePoolSize的创建后,就很少需要获取全局锁了

向线程池提交任务:submit和execute

execute()提交没有返回值的实现Runnable类实例,无法判断任务是否执行完毕。
submit()提交需要返回值的任务,线程池会返回一个future实例。

配置线程池参数的原理

cpu密集的任务应该配置尽可能小的线程数(cpu数量+1)。
io密集的任务尽可能多的线程数(2*cpu数量)。
通过Runtime.getRuntime().availableProcesssors()方法获得当前设备的cpu数量。
执行时间不同的任务应该交给不同的线程池来执行。
依赖数据库连接的或者远程调用接口的线程,因为线程要等待返回结果,等待的时间越长,设置的线程数量应该越大。

Executor框架

任务交给executor框架,executor将任务分配给线程池,线程池中的线程通过操作系统映射到操作系统的线程去调用cpu,从而执行任务。

Executor框架包括:任务,任务的执行,异步返回的结果

任务:Runnable接口和Callable接口

任务的执行:Executor是核心接口,有两个关键类实现了executor接口(ThreadPoolExecutor:线程池的核心实现类,ScheduledThreadPoolExecutor:定时任务的线程池的核心实现类)

  • ThreadPoolExecutor有三种类型:FixedThreadPool,SingleThreadExecutor,CachedThreadPool。
  • ScheduledThreadPoolExecutor。

异步返回的结果:Future接口和实现Future接口的FutureTask类,代表执行的结果

public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state; // 任务状态
    private static final int NEW = 0;
    private static final int COMPLETING = 1;
    private static final int NORMAL = 2;
    private static final int EXCEPTIONAL = 3;

    private Callable<V> callable;
    private Object outcome; // 存储任务的执行结果或异常
    private volatile Thread runner; // 执行任务的线程

    public FutureTask(Callable<V> callable) {
        if (callable == null) {
            throw new NullPointerException();
        }
        this.callable = callable;
        this.state = NEW;
    }

    public void run() {
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) {
            return;
        }
        try {
            Callable<V> callable = this.callable;
            if (callable != null && state == NEW) {
                outcome = callable.call(); // 执行任务
                state = NORMAL; // 设置任务状态为成功完成
            }
        } catch (Throwable ex) {
            outcome = ex; // 记录异常
            state = EXCEPTIONAL; // 设置任务状态为异常
        } finally {
            runner = null; // 清空执行线程
            if (state == COMPLETING) {
                // Possibly replace value with interrupted exception.
                state = INTERRUPTED;
            }
            finishCompletion();
        }
    }

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING) {
            s = awaitDone(false, 0L);
        }
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        int s = state;
        if (s > COMPLETING) {
            return s;
        }
        if (s == COMPLETING) {
            Thread.yield(); // 启发式地让出CPU
        }
        if (w.isInterrupted() || (s == COMPLETING && state == COMPLETING)) {
            return INTERRUPTED;
        }
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                return state;
            }
        }
        // 使用等待/通知机制
        synchronized (this) {
            while (state <= COMPLETING) {
                if (timed) {
                    if (nanos <= 0L) {
                        return state;
                    }
                    // 在等待时,当前线程可能被中断,需要重新检查中断状态
                    wait(nanos / 1000000, (int) (nanos % 1000000));
                    if ((s = state) > COMPLETING) {
                        return s;
                    }
                    if (w.isInterrupted() || (s == COMPLETING && state == COMPLETING)) {
                        return INTERRUPTED;
                    }
                    nanos = deadline - System.nanoTime();
                } else {
                    wait(0);
                }
            }
            return state;
        }
    }

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL) {
            return (V) x;
        }
        if (s >= CANCELLED) {
            throw new CancellationException();
        }
        throw new ExecutionException((Throwable) x);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104

get 方法是一个阻塞方法,它会一直等待直到任务完成才返回结果。在 FutureTask 的实现中,get 方法内部通过调用 awaitDone 方法来实现等待。awaitDone 方法中包含一个 while 循环,不断检查任务的状态,如果任务尚未完成,它会一直等待。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/822738
推荐阅读
相关标签
  

闽ICP备14008679号