赞
踩
- // 使用定时器
- public class UseTimer {
- public static void main(String[] args) {
- // 一个定时器 + n 个定时器要去执行的任务
- Timer timer = new Timer(); // 闹钟
-
- // 继承 TimerTask(抽象类)
- // 执行的是 task.run() 方法
- TimerTask task = new TimerTask() {
- @Override
- public void run() {
- System.out.println("执行任务");
- }
- };
- TimerTask t2 = new TimerTask() {
- @Override
- public void run() {
- System.out.println("另一个任务");
- }
- };
-
- // 指定什么去执行
- // delay: 延时,单位是 ms
- // 一次性的
- //timer.schedule(task, 5000);
-
- // fixed: 固定的
- // rate: 频率
- // period: 周期
- timer.scheduleAtFixedRate(task, 5000, 1000);
-
- timer.schedule(t2, 3000);
-
- while(true); // 这个死循环的目的是延时,随后的任务不是在主线程中执行
- }
- }
如何实现Timer:
1.每个任务创建一个线程 + 休眠方式 缺点:每一个任务都需要关联一个线程
- import java.util.concurrent.TimeUnit;
-
- public class ImplTimer {
- static class MyThread extends Thread{
- @Override
- public void run() {
- try {
- TimeUnit.SECONDS.sleep(5);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- while (true){
- System.out.println("执行任务");
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- }
- }
-
- public static void main(String[] args) {
- MyThread thread = new MyThread();
- thread.start();
-
- while (true);
- }
- }
2.官方做法:一个线程,完成所有任务的执行
生产者(schedule 的线程) - 消费者(Worker)
放入队列 t1(delay = 5)、t2(delay = 3)、t3(delay = 1)
取任务,可能要等待一段时间、然后执行任务,t1、t2、t3 应该先取出 delay 最小的,这是一个优先级阻塞队列
但这个 delay 不断的在变化,因为前面 t3 等了 1s 之后,那么 t2 再等 2s 就可以执行了,这样等待的时间一直是变化的,因此我们 delay = runAt(任务运行时间(不会变)) - now(当前时间(一直在变))。我们取出 runAt 最小的。
流程:
1.准备好义格优先级队列(要在P/C两个线程中共享)
2.准备好一个等待/通知 对象(要在P/C两个线程中共享)
3.创建好一个 Worker 线程(传入 队列 + 通知对象),并启动
P线程:
C线程:
sleep | wait(timeout) |
休眠 | 带超时的等待 |
结束条件唯一 | 结束条件两个: 1.timeout已到 2.被唤醒 |
和锁没关系(不会释放) | 释放锁(只会释放当前对象的锁) |
Thread static | Object |
主线程读取输入 有客人点菜
创建线程计算 雇一个厨师 做饭
创建线程 + 计算 + 销毁线程 解雇厨师
生产者——消费者模型(多消费者)
1.Executor接口 一种执行任务的东西/人
void execute(Runnable command);
2.ExecutorService接口
3.ThreadPoolExecutor实现类
重点:构造方法的参数理解、创建线程的流程
线程池:公司
正式工:一种工作线程、永远不会被“解雇”
临时工:一种工作线程、临时工空闲太久会被解雇
int corePoolSize 正式员工的数量上限(编制)
int maximumPoolSize 所有员工的数量上限(编制) 正式 + 临时
long keepAliveTime + TimeUnit unit :允许临时工摸鱼多久
BlockingQueue workQueue:传递任务的阻塞队列
ThreadFactory factory:让我们可以介入线程的创建流程
Reject(拒绝) ExecutionHandler:拒绝策略
4.线程池的流程如下(按需创建)
5.怎么让摸鱼的正式员工也被解雇?
shutdown() VS shutdownNow()
6.自己实现线程池
- import java.util.concurrent.*;
-
- // 线程池类
- public class MyThreadPoolExecutor implements Executor {
- // 创建线程的工厂对象
- private final ThreadFactory threadFactory;
-
- // 临时工摸鱼的时间上限
- private final long keepAliveTime;
- private final TimeUnit unit;
-
- // 当前正式员工的数量
- private int currentCoreSize;
-
- // 正式员工的数量上限
- private final int corePoolSize;
-
- // 当前临时员工的数量
- private int currentTemporarySize;
-
- // 临时员工的数量上限
- private final int temporaryPoolSize;
-
- // 传递任务的阻塞队列
- private final BlockingQueue<Runnable> workQueue;
-
- public MyThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- this.corePoolSize = corePoolSize;
- this.temporaryPoolSize = maximumPoolSize - corePoolSize;
- this.workQueue = workQueue;
- this.threadFactory = threadFactory;
- this.keepAliveTime = keepAliveTime;
- this.unit = unit;
- }
-
- // 向线程池中提交任务
- // 目前这个方法不是线程安全版本的
- @Override
- public void execute(Runnable command) {
- // 1. 如果正式员工的数量还低于正式员工的数量上限,则优先创建正式员工处理任务
- // 1.1 需要管理,当前正式员工有多少,正式员工的数量上限有多少?
- if (currentCoreSize < corePoolSize) {
- // 优先创建正式员工进行处理
- // 创建一个线程,这个线程中的任务就是不断地取任务-做任务,但是不需要考虑退出的问题
- CoreJob job = new CoreJob(workQueue, command);
- // Thread thread = new Thread(job); // 不使用工厂创建的线程
- Thread thread = threadFactory.newThread(job); // thread 代表的就是正式员工
- String name = String.format("正式员工-%d", currentCoreSize);
- thread.setName(name);
-
- thread.start();
-
- // 只是两种不同的策略,没有谁是正确的说法
- // 1. 把 command 放到队列中;command 的执行次序是在队列已有的任务之后
- // 2. 创建正式员工的时候,就把 command 提交给正式员工,让 command 优先执行
- // 我们这里采用第二种方案,主要原因就是 java 官方的就是使用的第二种策略
-
- currentCoreSize++;
- return;
- }
-
- // 走到这里,说明正式员工的数量 == 正式员工的上限了
- // 2. 优先把任务放入队列中,如果放入成功,execute 执行结束,否则还需要继续
- // 2.1 需要一个阻塞队列
- // workQueue.put(command); // 带阻塞的放入,是否满足这里的需求?
- // 我们这里希望的是立即得到结果
- boolean success = workQueue.offer(command);
- if (success == true) {
- // 说明放入队列成功
- return;
- }
-
- // 队列也已经放满了
- // 3. 继续判断,临时工的数量有没有到上限,如果没有到达,创建新的临时工来处理
- if (currentTemporarySize < temporaryPoolSize) {
- // 创建临时工进行处理
- TemporaryJob job = new TemporaryJob(keepAliveTime, unit, workQueue, command);
- // Thread thread = new Thread(job); // 不使用工厂创建的线程
- Thread thread = threadFactory.newThread(job); // thread 代表的就是临时员工
- String name = String.format("临时员工-%d", currentTemporarySize);
- thread.setName(name);
-
- thread.start();
-
- currentTemporarySize++;
- return;
- }
-
- // 4. 执行拒绝策略
- // 为了实现方便,暂时不考虑其他策略
- throw new RejectedExecutionException();
- }
- }
- import java.util.concurrent.BlockingQueue;
-
- // 一个正式员工线程要完成的工作
- public class CoreJob implements Runnable {
- // 需要阻塞队列
- private final BlockingQueue<Runnable> workQueue;
- private Runnable firstCommand;
-
- CoreJob(BlockingQueue<Runnable> workQueue, Runnable firstCommand) {
- this.workQueue = workQueue;
- this.firstCommand = firstCommand;
- }
-
- @Override
- public void run() {
- try {
- firstCommand.run(); // 优先先把刚提交的任务先做掉了
- firstCommand = null; // 这里设置 null 的意思是,不影响 firstCommand 对象被 GC 时的回收
-
- while (!Thread.interrupted()) {
- Runnable command = workQueue.take();
- command.run();
- }
- } catch (InterruptedException ignored) {}
- }
- }
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.TimeUnit;
-
- // 一个临时员工线程要完成的工作
- public class TemporaryJob implements Runnable {
- // 需要阻塞队列
- private final BlockingQueue<Runnable> workQueue;
- private final long keepAliveTime;
- private final TimeUnit unit;
- private Runnable firstCommand;
-
- TemporaryJob(long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Runnable firstCommand) {
- this.keepAliveTime = keepAliveTime;
- this.unit = unit;
- this.workQueue = workQueue;
- this.firstCommand = firstCommand;
- }
-
- @Override
- public void run() {
- try {
- firstCommand.run(); // 优先先把刚提交的任务先做掉了
- firstCommand = null; // 这里设置 null 的意思是,不影响 firstCommand 对象被 GC 时的回收
-
- // 一旦超过一定时间没有任务,临时工是需要退出的
- // 1. keepAliveTime + unit 记录起来
- // 2. 怎么就知道超过多久没有任务了?如果一定时间内都无法从队列中取出来任务,则认为摸鱼时间够了
- while (!Thread.interrupted()) {
- // Runnable command = workQueue.take();
- Runnable command = workQueue.poll(keepAliveTime, unit);
- if (command == null) {
- // 说明,没有取到任务
- // 说明超时时间已到
- // 说明该线程已经 keepAliveTime + unit 时间没有工作了
- // 所以,可以退出了
- break;
- }
- command.run();
- }
- } catch (InterruptedException ignored) {}
- }
- }
- import java.util.concurrent.*;
-
- public class Main {
- static class Task implements Runnable {
- @Override
- public void run() {
- try {
- TimeUnit.SECONDS.sleep(15);
- } catch (InterruptedException ignored) {}
- }
- }
-
- static class MyThreadFactory implements ThreadFactory {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r);
- }
- }
-
- public static void main(String[] args) {
- BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(5);
- // 同时最多有 15 个任务
- // 3 个正式的
- // 5 个队列中
- // 7 个临时的
- // 提交第 16 个任务时就会出现拒绝服务
- MyThreadPoolExecutor executor = new MyThreadPoolExecutor(
- 3, 10, 10, TimeUnit.SECONDS,
- workQueue,
- new MyThreadFactory(),
- new ThreadPoolExecutor.AbortPolicy()
- );
-
- // [0, 1, 2] 交给正式员工在处理
- // [3, 4, 5, 6, 7] 暂时放在队列中
- // [8, 9, 10, 11, 12, 13, 14] 交给临时工处理
- // 过了 15s 之后,第一批任务执行结束
- // [0, 1, 2]、[8, 9, 10, 11, 12, 13, 14] 执行结束
- // 剩下的 [3, 4, 5, 6, 7] 任务具体怎么分配不确定,大概率是交给正式员工执行
- // 就算极端情况下,5 个全部给了临时工
- // 也至少还有 2 个临时工没有工作
- // 再过 10s,至少 2 个,最多 5 个临时工要被解雇
- for (int i = 0; i < 15; i++) {
- System.out.println("提交任务: " + i);
- Task task = new Task();
- executor.execute(task);
- }
- }
- }
线程池总结:(⭐)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。