当前位置:   article > 正文

[Java Web]定时器(Timer) 线程池_timer线程池

timer线程池

使用 Timer

  1. // 使用定时器
  2. public class UseTimer {
  3. public static void main(String[] args) {
  4. // 一个定时器 + n 个定时器要去执行的任务
  5. Timer timer = new Timer(); // 闹钟
  6. // 继承 TimerTask(抽象类)
  7. // 执行的是 task.run() 方法
  8. TimerTask task = new TimerTask() {
  9. @Override
  10. public void run() {
  11. System.out.println("执行任务");
  12. }
  13. };
  14. TimerTask t2 = new TimerTask() {
  15. @Override
  16. public void run() {
  17. System.out.println("另一个任务");
  18. }
  19. };
  20. // 指定什么去执行
  21. // delay: 延时,单位是 ms
  22. // 一次性的
  23. //timer.schedule(task, 5000);
  24. // fixed: 固定的
  25. // rate: 频率
  26. // period: 周期
  27. timer.scheduleAtFixedRate(task, 5000, 1000);
  28. timer.schedule(t2, 3000);
  29. while(true); // 这个死循环的目的是延时,随后的任务不是在主线程中执行
  30. }
  31. }

实现 Timer

如何实现Timer:

1.每个任务创建一个线程 + 休眠方式         缺点:每一个任务都需要关联一个线程

  1. import java.util.concurrent.TimeUnit;
  2. public class ImplTimer {
  3. static class MyThread extends Thread{
  4. @Override
  5. public void run() {
  6. try {
  7. TimeUnit.SECONDS.sleep(5);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. while (true){
  12. System.out.println("执行任务");
  13. try {
  14. TimeUnit.SECONDS.sleep(1);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }
  20. }
  21. public static void main(String[] args) {
  22. MyThread thread = new MyThread();
  23. thread.start();
  24. while (true);
  25. }
  26. }

2.官方做法:一个线程,完成所有任务的执行

生产者(schedule 的线程) - 消费者(Worker)

放入队列 t1(delay = 5)、t2(delay = 3)、t3(delay = 1)

取任务,可能要等待一段时间、然后执行任务,t1、t2、t3 应该先取出 delay 最小的,这是一个优先级阻塞队列

但这个 delay 不断的在变化,因为前面 t3 等了 1s 之后,那么  t2 再等 2s 就可以执行了,这样等待的时间一直是变化的,因此我们 delay = runAt(任务运行时间(不会变)) - now(当前时间(一直在变))。我们取出 runAt 最小的。

流程:

1.准备好义格优先级队列(要在P/C两个线程中共享)

2.准备好一个等待/通知 对象(要在P/C两个线程中共享)

3.创建好一个 Worker 线程(传入 队列 + 通知对象),并启动

P线程:

  1. 根据 delay + now 计算 runAt
  2. 将 任务(TimerTask)放入队列
  3. 利用通知对象通知(通知的是 C 中的 3)

C线程:

  1. 获取最近的任务(TimerTask)
  2. 得到最近任务的延迟 delay' = runAt - now'
  3. 带条件的休眠
  4. 醒来之后,如果任务时间已到,执行任务;否则,先把任务放回去
  5. 循环回 1 继续

sleep VS wait(timeout)

sleepwait(timeout)
休眠带超时的等待
结束条件唯一

结束条件两个:

1.timeout已到      2.被唤醒

和锁没关系(不会释放)

释放锁(只会释放当前对象的锁)

Thread staticObject

线程池(Thread Pool)

主线程读取输入                                                有客人点菜

创建线程计算                                                    雇一个厨师 做饭

        创建线程 + 计算 + 销毁线程                      解雇厨师

生产者——消费者模型(多消费者)

1.Executor接口        一种执行任务的东西/人

        void execute(Runnable command);

2.ExecutorService接口

3.ThreadPoolExecutor实现类

        重点:构造方法的参数理解、创建线程的流程

线程池:公司       

正式工:一种工作线程、永远不会被“解雇”

临时工:一种工作线程、临时工空闲太久会被解雇

int corePoolSize        正式员工的数量上限(编制)

int maximumPoolSize        所有员工的数量上限(编制) 正式 + 临时

long keepAliveTime + TimeUnit unit :允许临时工摸鱼多久

BlockingQueue workQueue:传递任务的阻塞队列

ThreadFactory factory:让我们可以介入线程的创建流程

Reject(拒绝) ExecutionHandler:拒绝策略

 4.线程池的流程如下(按需创建)

  1. 一开始公司一个员工都没有
  2. 当有任务来了之后,当前正式工的数量 < 正式工的上限 一律招一个新的正式员工处理业务(即使有其他人还在空闲中)、把任务放入阻塞队列中、如果阻塞队列满了,雇佣临时工执行任务(临时工的数量 < 临时工的上限)
  3. 处理不过来执行拒绝策略

5.怎么让摸鱼的正式员工也被解雇?

shutdown()    VS    shutdownNow()

6.自己实现线程池

  1. import java.util.concurrent.*;
  2. // 线程池类
  3. public class MyThreadPoolExecutor implements Executor {
  4. // 创建线程的工厂对象
  5. private final ThreadFactory threadFactory;
  6. // 临时工摸鱼的时间上限
  7. private final long keepAliveTime;
  8. private final TimeUnit unit;
  9. // 当前正式员工的数量
  10. private int currentCoreSize;
  11. // 正式员工的数量上限
  12. private final int corePoolSize;
  13. // 当前临时员工的数量
  14. private int currentTemporarySize;
  15. // 临时员工的数量上限
  16. private final int temporaryPoolSize;
  17. // 传递任务的阻塞队列
  18. private final BlockingQueue<Runnable> workQueue;
  19. public MyThreadPoolExecutor(int corePoolSize,
  20. int maximumPoolSize,
  21. long keepAliveTime,
  22. TimeUnit unit,
  23. BlockingQueue<Runnable> workQueue,
  24. ThreadFactory threadFactory,
  25. RejectedExecutionHandler handler) {
  26. this.corePoolSize = corePoolSize;
  27. this.temporaryPoolSize = maximumPoolSize - corePoolSize;
  28. this.workQueue = workQueue;
  29. this.threadFactory = threadFactory;
  30. this.keepAliveTime = keepAliveTime;
  31. this.unit = unit;
  32. }
  33. // 向线程池中提交任务
  34. // 目前这个方法不是线程安全版本的
  35. @Override
  36. public void execute(Runnable command) {
  37. // 1. 如果正式员工的数量还低于正式员工的数量上限,则优先创建正式员工处理任务
  38. // 1.1 需要管理,当前正式员工有多少,正式员工的数量上限有多少?
  39. if (currentCoreSize < corePoolSize) {
  40. // 优先创建正式员工进行处理
  41. // 创建一个线程,这个线程中的任务就是不断地取任务-做任务,但是不需要考虑退出的问题
  42. CoreJob job = new CoreJob(workQueue, command);
  43. // Thread thread = new Thread(job); // 不使用工厂创建的线程
  44. Thread thread = threadFactory.newThread(job); // thread 代表的就是正式员工
  45. String name = String.format("正式员工-%d", currentCoreSize);
  46. thread.setName(name);
  47. thread.start();
  48. // 只是两种不同的策略,没有谁是正确的说法
  49. // 1. 把 command 放到队列中;command 的执行次序是在队列已有的任务之后
  50. // 2. 创建正式员工的时候,就把 command 提交给正式员工,让 command 优先执行
  51. // 我们这里采用第二种方案,主要原因就是 java 官方的就是使用的第二种策略
  52. currentCoreSize++;
  53. return;
  54. }
  55. // 走到这里,说明正式员工的数量 == 正式员工的上限了
  56. // 2. 优先把任务放入队列中,如果放入成功,execute 执行结束,否则还需要继续
  57. // 2.1 需要一个阻塞队列
  58. // workQueue.put(command); // 带阻塞的放入,是否满足这里的需求?
  59. // 我们这里希望的是立即得到结果
  60. boolean success = workQueue.offer(command);
  61. if (success == true) {
  62. // 说明放入队列成功
  63. return;
  64. }
  65. // 队列也已经放满了
  66. // 3. 继续判断,临时工的数量有没有到上限,如果没有到达,创建新的临时工来处理
  67. if (currentTemporarySize < temporaryPoolSize) {
  68. // 创建临时工进行处理
  69. TemporaryJob job = new TemporaryJob(keepAliveTime, unit, workQueue, command);
  70. // Thread thread = new Thread(job); // 不使用工厂创建的线程
  71. Thread thread = threadFactory.newThread(job); // thread 代表的就是临时员工
  72. String name = String.format("临时员工-%d", currentTemporarySize);
  73. thread.setName(name);
  74. thread.start();
  75. currentTemporarySize++;
  76. return;
  77. }
  78. // 4. 执行拒绝策略
  79. // 为了实现方便,暂时不考虑其他策略
  80. throw new RejectedExecutionException();
  81. }
  82. }
  1. import java.util.concurrent.BlockingQueue;
  2. // 一个正式员工线程要完成的工作
  3. public class CoreJob implements Runnable {
  4. // 需要阻塞队列
  5. private final BlockingQueue<Runnable> workQueue;
  6. private Runnable firstCommand;
  7. CoreJob(BlockingQueue<Runnable> workQueue, Runnable firstCommand) {
  8. this.workQueue = workQueue;
  9. this.firstCommand = firstCommand;
  10. }
  11. @Override
  12. public void run() {
  13. try {
  14. firstCommand.run(); // 优先先把刚提交的任务先做掉了
  15. firstCommand = null; // 这里设置 null 的意思是,不影响 firstCommand 对象被 GC 时的回收
  16. while (!Thread.interrupted()) {
  17. Runnable command = workQueue.take();
  18. command.run();
  19. }
  20. } catch (InterruptedException ignored) {}
  21. }
  22. }
  1. import java.util.concurrent.BlockingQueue;
  2. import java.util.concurrent.TimeUnit;
  3. // 一个临时员工线程要完成的工作
  4. public class TemporaryJob implements Runnable {
  5. // 需要阻塞队列
  6. private final BlockingQueue<Runnable> workQueue;
  7. private final long keepAliveTime;
  8. private final TimeUnit unit;
  9. private Runnable firstCommand;
  10. TemporaryJob(long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Runnable firstCommand) {
  11. this.keepAliveTime = keepAliveTime;
  12. this.unit = unit;
  13. this.workQueue = workQueue;
  14. this.firstCommand = firstCommand;
  15. }
  16. @Override
  17. public void run() {
  18. try {
  19. firstCommand.run(); // 优先先把刚提交的任务先做掉了
  20. firstCommand = null; // 这里设置 null 的意思是,不影响 firstCommand 对象被 GC 时的回收
  21. // 一旦超过一定时间没有任务,临时工是需要退出的
  22. // 1. keepAliveTime + unit 记录起来
  23. // 2. 怎么就知道超过多久没有任务了?如果一定时间内都无法从队列中取出来任务,则认为摸鱼时间够了
  24. while (!Thread.interrupted()) {
  25. // Runnable command = workQueue.take();
  26. Runnable command = workQueue.poll(keepAliveTime, unit);
  27. if (command == null) {
  28. // 说明,没有取到任务
  29. // 说明超时时间已到
  30. // 说明该线程已经 keepAliveTime + unit 时间没有工作了
  31. // 所以,可以退出了
  32. break;
  33. }
  34. command.run();
  35. }
  36. } catch (InterruptedException ignored) {}
  37. }
  38. }
  1. import java.util.concurrent.*;
  2. public class Main {
  3. static class Task implements Runnable {
  4. @Override
  5. public void run() {
  6. try {
  7. TimeUnit.SECONDS.sleep(15);
  8. } catch (InterruptedException ignored) {}
  9. }
  10. }
  11. static class MyThreadFactory implements ThreadFactory {
  12. @Override
  13. public Thread newThread(Runnable r) {
  14. return new Thread(r);
  15. }
  16. }
  17. public static void main(String[] args) {
  18. BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(5);
  19. // 同时最多有 15 个任务
  20. // 3 个正式的
  21. // 5 个队列中
  22. // 7 个临时的
  23. // 提交第 16 个任务时就会出现拒绝服务
  24. MyThreadPoolExecutor executor = new MyThreadPoolExecutor(
  25. 3, 10, 10, TimeUnit.SECONDS,
  26. workQueue,
  27. new MyThreadFactory(),
  28. new ThreadPoolExecutor.AbortPolicy()
  29. );
  30. // [0, 1, 2] 交给正式员工在处理
  31. // [3, 4, 5, 6, 7] 暂时放在队列中
  32. // [8, 9, 10, 11, 12, 13, 14] 交给临时工处理
  33. // 过了 15s 之后,第一批任务执行结束
  34. // [0, 1, 2]、[8, 9, 10, 11, 12, 13, 14] 执行结束
  35. // 剩下的 [3, 4, 5, 6, 7] 任务具体怎么分配不确定,大概率是交给正式员工执行
  36. // 就算极端情况下,5 个全部给了临时工
  37. // 也至少还有 2 个临时工没有工作
  38. // 再过 10s,至少 2 个,最多 5 个临时工要被解雇
  39. for (int i = 0; i < 15; i++) {
  40. System.out.println("提交任务: " + i);
  41. Task task = new Task();
  42. executor.execute(task);
  43. }
  44. }
  45. }

线程池总结:(⭐)

  1. 为什么
  2. 方法 + 机制
    • 构造方法的含义
    • execute的使用
    • 其他的使用暂时没有介绍
  3. 自己实现(难点)
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号