赞
踩
在日常处理业务中,在某些定时任务处理数据时,因待处理数据量较大,如上千上万数据处理.虽然可以使用线程池异步处理,但是线程池处理速度和队列存放能力有限,为保护线程池稳定,需要控制数据处理频率,常见如分批次处理数据, 在多线程处理中分批次,一般可使用CountDownLatch,Future等.
常见两种创建线程的方式:
1 直接继承Thread类.
2 实现Runnable接口.
上述方法有一个问题,就是执行完任务之后不能获取任务结果.而Future,可以在执行完任务之后获取任务的结果.
Future 类是异步思想的典型运用. 当我们执行某一耗时的任务时,可以将这个耗时任务交给一个子线程去异步执行,同时我们可以干点其他事情,不用傻傻等待耗时任务执行完成。等我们的事情干完后,我们再通过 Future 类获取到耗时任务的执行结果.
Future类:
public interface Future<V> { // 取消任务 boolean cancel(boolean mayInterruptIfRunning); // 判断任务是否被取消 boolean isCancelled(); // 判断任务是否已经执行完成 boolean isDone(); // 获取任务执行结果 没有结果会一直等待 V get() throws InterruptedException, ExecutionException; // 指定时间内没有返回任务结果, 就抛出超时异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
1 Future方法解决
场景如前言中所述, 在该应用场景下, 使用Future来批量处理数据
public void completeCronJob(){ // 获取数据 模拟数据库查询 List<Student> dbList = new ArrayList<>(); // 拆分 每次处理规定数量500 List<List<Student>> stuList = Lists.partition(dbList, 500); List<Future<Student>> futureList = new ArrayList<>(); // 遍历集合 for (List<Student> studentList : stuList) { // 每次最多500条 studentList.forEach(student -> { Future<Student> studentFuture = dealStudent.dealStudent(student); futureList.add(studentFuture); }); for (Future<Student> studentFuture : futureList) { try { // 等待子线程完成 studentFuture.get(); // 设置指定时间后未返回 就抛出超时异常 // studentFuture.get(5, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { log.error("批量处理学生数据失败"); } } } }
逻辑处理类
public class DealStudent {
public Future<Student> dealStudent(Student stu){
// 处理逻辑
// xxx
return new AsyncResult<>(stu);
}
}
2 CountDownLatch方法解决
场景同上, 在该应用场景下, 使用CountDownLatch来批量处理数据. 入侵传参列表,需要传入与业务无关参数,影响可读性.
public void completeCronJob2(){ // 获取数据 模拟数据库查询 List<Student> dbList = new ArrayList<>(); // 拆分 每次处理规定数量500 List<List<Student>> stuList = Lists.partition(dbList, 500); // 遍历集合 for (List<Student> studentList : stuList) { CountDownLatch count = new CountDownLatch(studentList.size()); // 每次最多500条 studentList.forEach(student -> { dealStudent.dealStudent2(student, count); }); try { count.await(); } catch (InterruptedException e) { log.error("处理学生数据失败"); } } }
逻辑处理类
public class DealStudent {
public void dealStudent2(Student stu, CountDownLatch count){
// 处理逻辑
// xxx
count.countDown();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。