赞
踩
我们都知道,java.util.concurrent包给我们提供了并发编程的相关工具类,有线程池,队列,CountDownLatch,Semaphore等等工具类,使我们编写并发程序非常方便。
我今天要讲的是如何在将任务提交到线程池后取消正在运行的任务?
有线程池编程经验的朋友可能第一反应就是利用Future.cancel(boolean)方法。这个方法确实可以取消任务,但这个方法有一个缺陷。当任务还在队列中没有得到执行的时候,也会被取消掉。这种情况有时候可能不满足用户需求,例如:我的需求是当任务正在运行,并且运行时间超过5秒钟,则取消任务。这种需求Future.cancel(boolean)就不能满足了。接下来我给大家分享一下我的实现方式,如果有不对的地方还请大家指正,谢谢!
为了验证实践方法,我写了两个类,分别为工作任务类:Task.java实现了Runnable接口,此类模拟了一个耗时任务。另一个:Main.java,此类模拟提交任务,并对超时任务取消操作。具体代码如下(代码中的注释已经很全面):
Task.java:
- /**
- * <p>
- * Create Time: 2018年5月25日
- * </p>
- * @version 1.0
- */
- package cn.concurrent1;
-
- import java.lang.Thread.State;
- import java.util.UUID;
-
- /**
- * 任务类,实现了Runnable接口
- * <p>
- * Create Time: 2018年5月25日
- * </p>
- *
- * @version 1.0
- */
- public class Task implements Runnable {
-
- private String taskName;// 任务名称
- private volatile long start = 0L; // 任务开始时间
- private State state; // 线程状态
-
- private Thread taskInThread; // 当前任务所处的线程
-
- public Task(String taskName) {
- this.taskName = taskName;
- }
-
- @Override
- public void run() {
- // 将当前任务与当前线程相关连,为的是后续设置中断状态做准备
- this.setTaskInThread(Thread.currentThread());
- // 记录任务执行开始时间
- start = System.currentTimeMillis();
- long num = 0;
- try {
- String uuid = "";
- // 模拟任务耗时开始 //
- uuid = UUID.randomUUID().toString();
- while (!Thread.currentThread().isInterrupted()) {
- if (uuid.startsWith("0")) {
- break;
- }
- }
- long now = System.currentTimeMillis();
- num = now - start;
- System.out.println(taskName + "=>开始 start==" + start + " 模拟耗时" + num / 1000);
-
- // 模拟任务耗时结束 //
-
- // 任务执行结束,在存储任务容器中删除该任务
- if (Main.map.get(this.getTaskName()) != null) {
- Main.map.remove(this.getTaskName());
- if (!Thread.currentThread().isInterrupted()) {
- System.out.println(taskName + "==>正常结束,模拟清除 耗时:" + (num / 1000) + " 实际耗时=="
- + ((System.currentTimeMillis() - start) / 1000) + " start=" + start + " end=" + now);
- } else {
- System.out.println(taskName + "==>中断结束,模拟清除 耗时:" + (num / 1000) + " 实际耗时=="
- + ((System.currentTimeMillis() - start) / 1000) + " start=" + start + " end=" + now);
- }
- }
- } catch (Exception e) {
- long end = System.currentTimeMillis();
- System.out.println(taskName + "===>被中断了 模拟耗时:" + (num / 1000) + " start:" + start + " end:" + end
- + " 实际耗时==" + ((System.currentTimeMillis() - start) / 1000));
-
- // 任务中断,在存储任务容器中删除该任务
- if (Main.map.get(this.getTaskName()) != null) {
- Main.map.remove(this.getTaskName());
- }
- }
-
- }
-
- // 对外提供设置任务中断的方法
- public void setInterrupte(Thread thread) {
- try {
- thread.interrupt();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- // Getter Setter方法
- public String getTaskName() {
- return taskName;
- }
-
- public void setTaskName(String taskName) {
- this.taskName = taskName;
- }
-
- public State getThreadStatus() {
- this.state = Thread.currentThread().getState();
- return this.state;
- }
-
- public long getStart() {
- return start;
- }
-
- public Thread getTaskInThread() {
- return taskInThread;
- }
-
- public void setTaskInThread(Thread taskInThread) {
- this.taskInThread = taskInThread;
- }
- }
Main.java
- /**
- * <p>
- * Create Time: 2018年5月25日
- * </p>
- * @version 1.0
- */
- package cn.concurrent1;
-
- import java.lang.Thread.State;
- import java.util.Iterator;
- import java.util.Map.Entry;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ScheduledThreadPoolExecutor;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
-
-
- /**
- * <p>
- * Create Time: 2018年5月25日
- * </p>
- *
- * @version 1.0
- */
- public class Main {
-
- // 处理器个数
- public final static int PROCESS_NUM = Runtime.getRuntime().availableProcessors();
-
- // 根据处理器个数定义线程个数
- public final static int THREAD_NUM = Math.max(PROCESS_NUM, 4) * 5;
-
- // 任务队列大小
- public final static int DISPENSE_MAX_WAITTING_THREAD_NUM = Short.MAX_VALUE >> 1;// 16383
-
- // 最大任务执行时间,时间上限
- public final static int MAXIMUM_TASK_EXECUTION_TIME = 5;
-
- // 线程池,拒绝策略为丢弃旧的任务
- private static ThreadPoolExecutor es = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM, 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(DISPENSE_MAX_WAITTING_THREAD_NUM), new NamedThreadFactory("测试任务"),
- new ThreadPoolExecutor.DiscardOldestPolicy());
-
- // 定时任务,定时删除任务执行时间大于5秒钟的任务
- private static ScheduledExecutorService threadCostTimeEs = new ScheduledThreadPoolExecutor(1,
- new NamedThreadFactory("定时任务"), new ThreadPoolExecutor.AbortPolicy());
-
- // 存储任务容器
- public static ConcurrentHashMap<String, Task> map = new ConcurrentHashMap<>();
-
- // mian方法
- public static void main(String[] args) {
-
- // 模拟提交100个任务
- for (int i = 0; i < 100; i++) {
- Task task = new Task("Task-" + ((i + 1) < 10 ? "0" + (i + 1) : (i + 1)));
- map.put(task.getTaskName(), task);
- es.submit(task);
- }
-
- // 定时任务定时处理逻辑
- threadCostTimeEs.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- // 监控线程池
- System.out.println("获取HTML线程池任务总数:" + es.getQueue().size() + "," + "活动线程数量:" + es.getActiveCount() + ","
- + "总任务数量:" + es.getTaskCount());
-
- Set<Entry<String, Task>> set = map.entrySet();
- Iterator<Entry<String, Task>> itr = set.iterator();
- while (itr.hasNext()) {
- try {
- Entry<String, Task> entry = itr.next();
- Task t = entry.getValue();
- long start = t.getStart();
- if (start != 0) {
- long currentTime = System.currentTimeMillis();
- long sub = currentTime - start;
- sub = sub / 1000;
- // 队列中包含该任务,并且任务执行时间大于最大任务执行时间(目前5秒)
- if (sub > MAXIMUM_TASK_EXECUTION_TIME && !es.getQueue().contains(t)) {
- System.out.println(t.getTaskName() + "====>开始中断 耗时====" + sub + " start==" + start
- + " currentTime=" + currentTime);
- State state = t.getThreadStatus();
- // 任务为Runnable状态的,设置中断状态
- if (state == State.RUNNABLE) {
- t.setInterrupte(t.getTaskInThread());
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }, 1, 1, TimeUnit.SECONDS);
- // while (es.getActiveCount() == 0) {
- // es.shutdown();
- // threadCostTimeEs.shutdown();
- // }
- }
-
- }
NameThreadFactory 线程命名工具类:
- package cn.concurrent1;
-
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.atomic.AtomicInteger;
-
- public class NameThreadFactory implements ThreadFactory {
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String threadName;
-
- public NamedThreadFactory(String threadName) {
- this.threadName = threadName;
- }
-
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, threadName + threadNumber.getAndIncrement());
- return t;
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。