当前位置:   article > 正文

线程池(三)ThreadPoolTaskExecutor类(1)介绍与集成

threadpooltaskexecutor

一、ThreadPoolTaskExecutor介绍:

1、介绍:

对ThreadPoolExecutor的进一步封装,实际应用中一般使用ThreadPoolTaskExecutor而不是ThreadPoolExecutor。

  1. public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor {
  2. private final Object poolSizeMonitor = new Object();
  3. private int corePoolSize = 1;
  4. private int maxPoolSize = 2147483647;
  5. private int keepAliveSeconds = 60;
  6. private boolean allowCoreThreadTimeOut = false;
  7. private int queueCapacity = 2147483647;
  8. private ThreadPoolExecutor threadPoolExecutor; //这里就用到了ThreadPoolExecutor
2、与ThreadPoolExecutor的比较:

(1)包类关系:

ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC

org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.util.concurrent.ThreadPoolExecutor

(2)源码逻辑:

ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。

ThreadPoolExecutor结构,祖类都是调用Executor接口:

ThreadPoolTaskExecutor结构,祖类都是调用Executor接口:

3、ThreadPoolTaskExecutor中的队列

使用ThreadPoolTaskExecutor无需设置阻塞队列类型,只需要按需设置队列大小即可。ThreadPoolTaskExecutor底层使用的队列为:

队列容量大于0则使用 LinkedBlockingQueue队列;否则使用 SynchronousQueue队列;

二、ThreadPoolTaskExecutor构造函数

不同于ThreadPoolExecutor的构造函数中含有很多参数(这也是使用ThreadPoolExecutor不够便捷的方式之一),ThreadPoolTaskExecutor只有一个构造函数,且是无参构造:

使用无参构造实例化后,可以使用set方法设置一些属性的自定义配置,其含义和ThreadPoolExecutor是一样的。常见的有:

1、setCorePoolSize

指定线程池中的核心线程数量

2、setMaxPoolSize

线程池中的最大线程数量

3、setQueueCapacity

队列大小。

4、setKeepAliveSeconds

线程池中线程数量超过corePoolSize 的时候,多余的空闲线程会在这个设定的时间段内被销毁。

三、集成方式:

同ThreadPoolExecutor的集成方式。

demo:假设有三个耗时任务,一个返回结果,一个不返回结果,一个不返回结果且批量执行

1、线程池配置:

将ThreadPoolTaskExecutor做为一个bean,通过spring的注入,保证只会初始化一次。

  1. package exceldemo.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  5. @Configuration
  6. public class ThreadPoolTaskExecutorConfig {
  7. private static int CORE_POOL_SIZE = 5;
  8. private static int MAX_POOL_SIZE = 1000;
  9. @Bean(name="threadPoolTaskExecutor")
  10. public ThreadPoolTaskExecutor serviceJobTaskExecutor(){
  11. ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
  12. //线程池维护线程的最少数量
  13. poolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
  14. //线程池维护线程的最大数量
  15. poolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
  16. //线程池所使用的缓冲队列
  17. poolTaskExecutor.setQueueCapacity(200);
  18. //线程池维护线程所允许的空闲时间
  19. poolTaskExecutor.setKeepAliveSeconds(30000);
  20. poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  21. System.out.println(poolTaskExecutor);
  22. return poolTaskExecutor;
  23. }
  24. }
2、业务:
  1. package exceldemo.service.impl;
  2. import exceldemo.dto.User;
  3. import exceldemo.service.UserService;
  4. import org.springframework.stereotype.Service;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. @Service("userService")
  8. public class UserServiceImpl implements UserService {
  9. @Override
  10. public List<User> getByIds(List<Integer> ids) {
  11. List<User> users = new ArrayList<>();
  12. for(Integer id : ids){
  13. User user = new User();
  14. user.setAge(id);
  15. user.setUserName("用户"+id);
  16. //耗时操作
  17. try {
  18. Thread.sleep(10);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. users.add(user);
  23. }
  24. return users;
  25. }
  26. @Override
  27. public void addUserAction() {
  28. //耗时操作
  29. try {
  30. Thread.sleep(20);
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. System.out.println("行为记录");
  35. }
  36. }
 3、需要提交的task:
  1. package exceldemo.task;
  2. import exceldemo.dto.User;
  3. import exceldemo.service.UserService;
  4. import java.util.List;
  5. import java.util.concurrent.Callable;
  6. public class UserTask implements Callable<List<User>> {
  7. private List<Integer> userIds;
  8. private UserService userService;
  9. public UserTask(List<Integer> queryIds, UserService userService) {
  10. this.userService = userService;
  11. this.userIds = queryIds;
  12. //System.out.println("ids "+queryIds.size());
  13. }
  14. @Override
  15. public List<User> call() throws Exception {
  16. //System.out.println("*******************");
  17. List<User> users = userService.getByIds(userIds);
  18. return users;
  19. }
  20. }

异步任务:

  1. package exceldemo.task;
  2. import java.util.List;
  3. public class DemoTask implements Runnable {
  4. private List<Integer> ids ;
  5. public DemoTask(List<Integer> ids){
  6. this.ids = ids;
  7. }
  8. @Override
  9. public void run() {
  10. try {
  11. Thread.sleep(500);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. //System.out.println("run "+ids.size());
  16. }
  17. }

4、controller:
  1. package exceldemo.rest;
  2. import exceldemo.dto.User;
  3. import exceldemo.service.UserService;
  4. import exceldemo.task.UserTask;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import java.util.ArrayList;
  10. import java.util.Date;
  11. import java.util.List;
  12. import java.util.concurrent.Future;
  13. import java.util.concurrent.ThreadPoolExecutor;
  14. @RestController
  15. @RequestMapping("/user1")
  16. public class UserController {
  17. @Autowired
  18. private UserService userService;
  19. @Autowired
  20. private ThreadPoolTaskExecutor threadPoolTaskExecutor;
  21. @RequestMapping("/getAllSc")
  22. public List<User> getAllSc(){
  23. List<Integer> ids = new ArrayList<>();
  24. for(int i = 0;i<=500;i++){
  25. ids.add(i);
  26. }
  27. //1、异步通知
  28. threadPoolTaskExecutor.execute(new Runnable() {
  29. @Override
  30. public void run() {
  31. userService.addUserAction();
  32. }
  33. });
  34. //2、批量异步通知
  35. List<Integer> childIds = new ArrayList<>();
  36. for (int i = 0; i < ids.size(); i += 100) {
  37. int startIndex = i;
  38. int endIndex = startIndex + 100 > ids.size() ? ids.size() : startIndex + 100;
  39. DemoTask task = new DemoTask(ids.subList(startIndex, endIndex));
  40. threadPoolTaskExecutor.execute(task);
  41. }
  42. //3、异步获取所有用户
  43. long startTime = new Date().getTime();
  44. List<User> users = new ArrayList<>();
  45. List<Future> futures = new ArrayList<>();
  46. for (int i = 0; i < ids.size(); i += 100) {
  47. int startIndex = i;
  48. int endIndex = startIndex + 100 > ids.size() ? ids.size() : startIndex + 100;
  49. UserTask task = new UserTask(ids.subList(startIndex, endIndex),userService);
  50. Future<List<User>> future = threadPoolTaskExecutor.submit(task);
  51. futures.add(future);
  52. }
  53. //取数据
  54. try{
  55. for(Future future : futures){
  56. users.addAll((List<User>) future.get());
  57. }
  58. }catch (Exception e){
  59. }
  60. long endTime = new Date().getTime();
  61. System.out.println("耗时"+(endTime-startTime));
  62. return users;
  63. }
  64. }

控制台打印:

  1. 行为记录
  2. 耗时1046

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/1008853
推荐阅读
相关标签
  

闽ICP备14008679号