当前位置:   article > 正文

基于线程池的并发编程取消运行中的任务_java提交到线程池的任务可以撤回吗

java提交到线程池的任务可以撤回吗

        我们都知道,java.util.concurrent包给我们提供了并发编程的相关工具类,有线程池,队列,CountDownLatch,Semaphore等等工具类,使我们编写并发程序非常方便。

        我今天要讲的是如何在将任务提交到线程池后取消正在运行的任务?

有线程池编程经验的朋友可能第一反应就是利用Future.cancel(boolean)方法。这个方法确实可以取消任务,但这个方法有一个缺陷。当任务还在队列中没有得到执行的时候,也会被取消掉。这种情况有时候可能不满足用户需求,例如:我的需求是当任务正在运行,并且运行时间超过5秒钟,则取消任务。这种需求Future.cancel(boolean)就不能满足了。接下来我给大家分享一下我的实现方式,如果有不对的地方还请大家指正,谢谢!

        为了验证实践方法,我写了两个类,分别为工作任务类:Task.java实现了Runnable接口,此类模拟了一个耗时任务。另一个:Main.java,此类模拟提交任务,并对超时任务取消操作。具体代码如下(代码中的注释已经很全面):

Task.java:

  1. /**
  2. * <p>
  3. * Create Time: 2018年5月25日
  4. * </p>
  5. * @version 1.0
  6. */
  7. package cn.concurrent1;
  8. import java.lang.Thread.State;
  9. import java.util.UUID;
  10. /**
  11. * 任务类,实现了Runnable接口
  12. * <p>
  13. * Create Time: 2018年5月25日
  14. * </p>
  15. *
  16. * @version 1.0
  17. */
  18. public class Task implements Runnable {
  19. private String taskName;// 任务名称
  20. private volatile long start = 0L; // 任务开始时间
  21. private State state; // 线程状态
  22. private Thread taskInThread; // 当前任务所处的线程
  23. public Task(String taskName) {
  24.     this.taskName = taskName;
  25. }
  26. @Override
  27. public void run() {
  28. // 将当前任务与当前线程相关连,为的是后续设置中断状态做准备
  29. this.setTaskInThread(Thread.currentThread());
  30. // 记录任务执行开始时间
  31. start = System.currentTimeMillis();
  32. long num = 0;
  33. try {
  34. String uuid = "";
  35. // 模拟任务耗时开始 //
  36. uuid = UUID.randomUUID().toString();
  37. while (!Thread.currentThread().isInterrupted()) {
  38. if (uuid.startsWith("0")) {
  39. break;
  40. }
  41. }
  42. long now = System.currentTimeMillis();
  43. num = now - start;
  44. System.out.println(taskName + "=>开始 start==" + start + " 模拟耗时" + num / 1000);
  45. // 模拟任务耗时结束 //
  46. // 任务执行结束,在存储任务容器中删除该任务
  47. if (Main.map.get(this.getTaskName()) != null) {
  48. Main.map.remove(this.getTaskName());
  49. if (!Thread.currentThread().isInterrupted()) {
  50. System.out.println(taskName + "==>正常结束,模拟清除 耗时:" + (num / 1000) + " 实际耗时=="
  51. + ((System.currentTimeMillis() - start) / 1000) + " start=" + start + " end=" + now);
  52. } else {
  53. System.out.println(taskName + "==>中断结束,模拟清除 耗时:" + (num / 1000) + " 实际耗时=="
  54. + ((System.currentTimeMillis() - start) / 1000) + " start=" + start + " end=" + now);
  55. }
  56. }
  57. } catch (Exception e) {
  58. long end = System.currentTimeMillis();
  59. System.out.println(taskName + "===>被中断了 模拟耗时:" + (num / 1000) + " start:" + start + " end:" + end
  60. + " 实际耗时==" + ((System.currentTimeMillis() - start) / 1000));
  61. // 任务中断,在存储任务容器中删除该任务
  62. if (Main.map.get(this.getTaskName()) != null) {
  63. Main.map.remove(this.getTaskName());
  64. }
  65. }
  66. }
  67. // 对外提供设置任务中断的方法
  68. public void setInterrupte(Thread thread) {
  69. try {
  70. thread.interrupt();
  71. } catch (Exception e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. // Getter Setter方法
  76. public String getTaskName() {
  77. return taskName;
  78. }
  79. public void setTaskName(String taskName) {
  80. this.taskName = taskName;
  81. }
  82. public State getThreadStatus() {
  83. this.state = Thread.currentThread().getState();
  84. return this.state;
  85. }
  86. public long getStart() {
  87. return start;
  88. }
  89. public Thread getTaskInThread() {
  90. return taskInThread;
  91. }
  92. public void setTaskInThread(Thread taskInThread) {
  93. this.taskInThread = taskInThread;
  94. }
  95. }

Main.java

  1. /**
  2. * <p>
  3. * Create Time: 2018年5月25日
  4. * </p>
  5. * @version 1.0
  6. */
  7. package cn.concurrent1;
  8. import java.lang.Thread.State;
  9. import java.util.Iterator;
  10. import java.util.Map.Entry;
  11. import java.util.Set;
  12. import java.util.concurrent.ConcurrentHashMap;
  13. import java.util.concurrent.LinkedBlockingQueue;
  14. import java.util.concurrent.ScheduledExecutorService;
  15. import java.util.concurrent.ScheduledThreadPoolExecutor;
  16. import java.util.concurrent.ThreadPoolExecutor;
  17. import java.util.concurrent.TimeUnit;
  18. /**
  19. * <p>
  20. * Create Time: 2018年5月25日
  21. * </p>
  22. *
  23. * @version 1.0
  24. */
  25. public class Main {
  26. // 处理器个数
  27. public final static int PROCESS_NUM = Runtime.getRuntime().availableProcessors();
  28. // 根据处理器个数定义线程个数
  29. public final static int THREAD_NUM = Math.max(PROCESS_NUM, 4) * 5;
  30. // 任务队列大小
  31. public final static int DISPENSE_MAX_WAITTING_THREAD_NUM = Short.MAX_VALUE >> 1;// 16383
  32. // 最大任务执行时间,时间上限
  33. public final static int MAXIMUM_TASK_EXECUTION_TIME = 5;
  34. // 线程池,拒绝策略为丢弃旧的任务
  35. private static ThreadPoolExecutor es = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM, 0L, TimeUnit.MILLISECONDS,
  36. new LinkedBlockingQueue<Runnable>(DISPENSE_MAX_WAITTING_THREAD_NUM), new NamedThreadFactory("测试任务"),
  37. new ThreadPoolExecutor.DiscardOldestPolicy());
  38. // 定时任务,定时删除任务执行时间大于5秒钟的任务
  39. private static ScheduledExecutorService threadCostTimeEs = new ScheduledThreadPoolExecutor(1,
  40. new NamedThreadFactory("定时任务"), new ThreadPoolExecutor.AbortPolicy());
  41. // 存储任务容器
  42. public static ConcurrentHashMap<String, Task> map = new ConcurrentHashMap<>();
  43. // mian方法
  44. public static void main(String[] args) {
  45. // 模拟提交100个任务
  46. for (int i = 0; i < 100; i++) {
  47. Task task = new Task("Task-" + ((i + 1) < 10 ? "0" + (i + 1) : (i + 1)));
  48. map.put(task.getTaskName(), task);
  49. es.submit(task);
  50. }
  51. // 定时任务定时处理逻辑
  52. threadCostTimeEs.scheduleAtFixedRate(new Runnable() {
  53. @Override
  54. public void run() {
  55. // 监控线程池
  56. System.out.println("获取HTML线程池任务总数:" + es.getQueue().size() + "," + "活动线程数量:" + es.getActiveCount() + ","
  57. + "总任务数量:" + es.getTaskCount());
  58. Set<Entry<String, Task>> set = map.entrySet();
  59. Iterator<Entry<String, Task>> itr = set.iterator();
  60. while (itr.hasNext()) {
  61. try {
  62. Entry<String, Task> entry = itr.next();
  63. Task t = entry.getValue();
  64. long start = t.getStart();
  65. if (start != 0) {
  66. long currentTime = System.currentTimeMillis();
  67. long sub = currentTime - start;
  68. sub = sub / 1000;
  69. // 队列中包含该任务,并且任务执行时间大于最大任务执行时间(目前5秒)
  70. if (sub > MAXIMUM_TASK_EXECUTION_TIME && !es.getQueue().contains(t)) {
  71. System.out.println(t.getTaskName() + "====>开始中断 耗时====" + sub + " start==" + start
  72. + " currentTime=" + currentTime);
  73. State state = t.getThreadStatus();
  74. // 任务为Runnable状态的,设置中断状态
  75. if (state == State.RUNNABLE) {
  76. t.setInterrupte(t.getTaskInThread());
  77. }
  78. }
  79. }
  80. } catch (Exception e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. }
  85. }, 1, 1, TimeUnit.SECONDS);
  86. // while (es.getActiveCount() == 0) {
  87. // es.shutdown();
  88. // threadCostTimeEs.shutdown();
  89. // }
  90. }
  91. }

 

NameThreadFactory 线程命名工具类:

 

  1. package cn.concurrent1;
  2. import java.util.concurrent.ThreadFactory;
  3. import java.util.concurrent.atomic.AtomicInteger;
  4. public class NameThreadFactory implements ThreadFactory {
  5. private final AtomicInteger threadNumber = new AtomicInteger(1);
  6. private final String threadName;
  7. public NamedThreadFactory(String threadName) {
  8. this.threadName = threadName;
  9. }
  10. public Thread newThread(Runnable r) {
  11. Thread t = new Thread(r, threadName + threadNumber.getAndIncrement());
  12. return t;
  13. }
  14. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/83649
推荐阅读
相关标签
  

闽ICP备14008679号