当前位置:   article > 正文

【Java并发编程】ThreadPoolTaskExecutor线程池的简单应用

threadpooltaskexecutor

Java JDK中提供了线程池类ThreadPoolExecutor,但在实际开发中多使用SpringBoot来开发,Spring默认也是自带了一个线程池方便我们开发,它就是ThreadPoolTaskExecutor;翻看了好多讲ThreadPoolTaskExecutor的文章,这些文章多从原理和配置来进行介绍,但是实际写代码的时候还要考虑怎么设计使用的问题,这对于老手来说可能没什么,但是对于刚开始使用的新手来说就有可能一头雾水。
下面我就从我实际开发过程中使用的方式来介绍。

1.设计思路

ThreadPoolTaskExecutor类
Spring异步线程池的接口类是TaskExecutor,本质还是java.util.concurrent.Executor,没有配置的情况下,默认使用的是simpleAsyncTaskExecutor。但是Spring更加推荐我们开发者使用ThreadPoolTaskExecutor类来创建线程池,其本质是对java.util.concurrent.ThreadPoolExecutor的包装。
实际编程过程中我们可能遇到许多计算密集型或IO密集型的操作,很多时候是要对一个Collection中的数据都进行操作,这个时候我们就想到如果能用线程池操作就好了。然后就有了这篇文章。
结果获取
多线程操作结果的获取使用Future类来获取。Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。ThreadPoolTaskExecutor作为对ThreadPoolExecutor的包装,自然也会提供这三个submit方法。

// 提交Runnable任务
Future<?> 
  submit(Runnable task);
// 提交Callable任务
<T> Future<T> 
  submit(Callable<T> task);
// 提交Runnable任务及结果引用  
<T> Future<T> 
  submit(Runnable task, T result);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

因为Runable接口的Run方法是没有返回值的,所以其future方法获取的只能用来断言任务是否已经结束。Callable具有返回值,我们可以使用这个接口来获取运行结果。第三个接口稍有些复杂,不符合我们简单应用的理念。至于FutureTask后续再说。

2.具体实现

  1. 配置线程池
package com.lordum.threadpoolpractice.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@EnableAsync
public class TaskPoolConfig {

    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor(){
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(20);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("taskExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        return taskExecutor;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  1. 创建任务实现Callable接口
package com.lordum.threadpoolpractice.tasks;

import java.util.concurrent.Callable;

public abstract class AbstractTask implements Callable<Result> {

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
package com.lordum.threadpoolpractice.tasks;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;
import java.util.Date;
import java.util.concurrent.TimeUnit;

@AllArgsConstructor
@NoArgsConstructor
@Slf4j
public class DemoTask extends AbstractTask{
    private String name;

    @Override
    public Result call() throws Exception {
        Result result = new Result();
        log.info(Thread.currentThread().getName() + "接收到任务:" + name + ". 当前时间:" + LocalDateTime.now().toString());
        result.setData("Call " + name);
        TimeUnit.SECONDS.sleep(2);
        return result;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

结果接收类

package com.lordum.threadpoolpractice.tasks;

import lombok.Data;

@Data
public class Result {
    private Integer code = 1;//1:成功  0:失败
    private String desc;//描述
    private Object data;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. 调用服务
package com.lordum.threadpoolpractice.service;

import com.lordum.threadpoolpractice.tasks.DemoTask;
import com.lordum.threadpoolpractice.tasks.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@Slf4j
@Service
public class DemoService {

    @Autowired
    @Qualifier("taskExecutor")
    private ThreadPoolTaskExecutor taskExecutor;


    public List<String> useTaskPoolDemo() throws ExecutionException, InterruptedException {
        List<String> strings = Arrays.asList("Spider man", "Haoke", "Mary", "Lilei", "Han Meimei");
        long start = System.currentTimeMillis();
        List<String> resultList = new ArrayList<>();
        List<Future<Result>> futureList = new ArrayList<>();
        for (String string : strings) {
            DemoTask demoTask = new DemoTask(string);
            Future<Result> result = this.taskExecutor.submit(demoTask);
            futureList.add(result);
            log.info("##" + string +"## 添加到结果: " + string + "结果时间: " + LocalDateTime.now());
        }
        // future.get() 会阻塞调用线程(主线程),如果在上面的循环中获取,整个服务就会变成并行,失去使用线程池的意义
        for (Future<Result> resultFuture : futureList) {
            Result data = resultFuture.get();
            log.info("## 获取到future结果: " + data.getData().toString() + "结果时间: " + LocalDateTime.now());
            resultList.add(data.getData().toString());
        }
        long end = System.currentTimeMillis();
        log.info("程序执行时间:" + String.valueOf(end - start));
        return resultList;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

3.总结

优点

  1. 线程池可以做到共用,不需要每个服务都去创建线程池
  2. 扩展方便,如果有其他需要使用线程池的服务可以继承AbstractTask进行扩展。

注意
future.get() 会阻塞调用线程(主线程),如果在调用线程池的循环中获取,整个服务就会变成并行,失去使用线程池的意义

结果展示
在这里插入图片描述
这里我们的并行服务每个用时2S,如果没有用线程池需要执行10S多,这里使用了线程池之后值使用了2s11ms,可见使用线程池并行处理确实提高了执行速度。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/104682?site
推荐阅读
相关标签
  

闽ICP备14008679号