当前位置:   article > 正文

spring boot自定义线程池进行异步调用获取接口数据_线程池汇总不同微服务模块的接口数据

线程池汇总不同微服务模块的接口数据

spring boot自定义线程池进行异步调用

  1. 我们需要异步调用的时候,很容易就想到多线程的方式,先创建线程池,然后实现 Runnable 或者 Callable
  2. 接口来创建对象,然后将对象放在线程池中去执行。除了这个,spring 提供了更简单粗暴的方式,这就是本章的主角: @Async
  3. 如果直接使用 @Async,那么默认就是使用 SimpleAsyncTaskExecutor 线程池,由于 SimpleAsyncTaskExecutor 不限制并发线程而且不重用线程,那么直接使用是有风险的,所以需要通过自定义线程池来使用@Async 达到异步调用的目的。

1. 创建自定义线程池

@Configuration
@EnableAsync    //@EnableAsync 注解主要是为了扫描范围包下的所有 @Async 注解
public class ThreadPoolConfig {
    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        executor.setCorePoolSize(16);   //核心线程数目

        executor.setMaxPoolSize(900);   //指定最大线程数

        executor.setQueueCapacity(5);   //队列中最大的数目

        executor.setThreadNamePrefix("defaultThreadPool_"); //线程名称前缀
        //rejection-policy:当pool已经达到max size的时候,如何处理新任务
        //CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    //对拒绝task的处理策略

        executor.setKeepAliveSeconds(60);  //线程空闲后的最大存活时间

        executor.initialize();  //加载
        return executor;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

2.创建异步任务类(service 层)


@Service
public class ThreadPoolServiceImpl {
    // task 1
    @Async("taskExecutor")
    public void Asynctask1() {
        System.out.println("task1 开始执行");
        long startTime = System.currentTimeMillis();
        try {
            Thread.sleep(500L);
            Asynctask2();
            System.out.println("运行成功,task1的线程是"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("出错了!task1的线程是"+Thread.currentThread().getName());
        }
        System.out.println("test 执行结束,耗时:" + (System.currentTimeMillis() - startTime));
    }
    // task 2
    @Async("taskExecutor")
    public void Asynctask2() {
        System.out.println("task2 开始执行");
        long startTime = System.currentTimeMillis();
        try {
            Thread.sleep(500L);
            System.out.println("运行成功,task2的线程是"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("出错了!task2的线程是"+Thread.currentThread().getName());
        }
        System.out.println("test 执行结束,耗时:" + (System.currentTimeMillis() - startTime));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

3.接口调用类(controller 层)

@RestController
public class ThreadPoolController {
    @Autowired
    private ThreadPoolServiceImpl threadPoolService;

    @Async("taskExecutor")
    @ResponseBody
    @RequestMapping(value = "/test1", method = RequestMethod.GET)
    public String threadPooltest1() {
        System.out.println("test 开始执行");
        threadPoolService.Asynctask1();
        return "运行成功threadPooltest1";
    }

    @Async("taskExecutor")
    @ResponseBody
    @RequestMapping(value = "/test2", method = RequestMethod.GET)
    public String threadPooltest2() {
        System.out.println("test 开始执行");
        threadPoolService.Asynctask1();
        threadPoolService.Asynctask2();
        return "运行成功threadPooltest2";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

3.1 运行结果:

在这里插入图片描述

4.有返回值的异步任务

以上的 task1 和 task2 任务是没有返回值,如果碰到需要获取任务结果就要用到 Future 了。

4.1 service 层:

@Service
public class ThreadPoolServiceImpl {
    // task 3
    @Async("taskExecutor")
    public Future<String>  Asynctask3() throws Exception{
        System.out.println("task3线程名称:" + Thread.currentThread().getName());
        long startTime = System.currentTimeMillis();
//        int i=8/0;
        Thread.sleep(1000);
        System.out.println("test3 执行结束,耗时:" + (System.currentTimeMillis() - startTime));
        return new AsyncResult<>("成功");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

4.2 controller 层:


@RestController
public class ThreadPoolController {
    @Autowired
    private ThreadPoolServiceImpl threadPoolService;

    @Async("taskExecutor")
    @ResponseBody
    @RequestMapping(value = "/test", method = RequestMethod.GET)
    public String threadPooltest3() throws Exception {
        System.out.println("test 开始执行");
        System.out.println("threadPooltest3此时正在运行的线程名称:" + Thread.currentThread().getName());
        try {
            Future<String> stringFuture = threadPoolService.Asynctask3();
            System.out.println("threadPooltest3此时正在运行的线程名称:" + Thread.currentThread().getName());
            System.out.println(stringFuture.get());
            return "成功了"+stringFuture.get();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("出错了");
            return "出错了";
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

4.3 调试接口:

在这里插入图片描述

可以明显看出,task1 和 task2 不是异步执行的。原因就是调用 task1 和 task2 方法的是 AsyncTask 对象本身,而不是 spring 启动的时候为其创建的代理对象,没有经过 spring 容器。如果要解决这个问题,就按照这个思路,创建一个代理对象即可。

5.真正的异步调用

在 service 中先通过自动装配获取 applicationContext 对象,然后通过 applicationContext 获取 AsyncTask 的代理对象,通过代理对象来调用 task1 和 task2 :

@Component
public class ThreadPoolServiceImpl {

    @Autowired
    private ApplicationContext applicationContext;


    public void test(){
        System.out.println("test 开始执行");
        long startTime = System.currentTimeMillis();
        applicationContext.getBean(ThreadPoolServiceImpl.class).task1();
        applicationContext.getBean(ThreadPoolServiceImpl.class).task2();
        System.out.println("test 执行结束,耗时:" + (System.currentTimeMillis() - startTime));
    }

    /**
     * task 1
     */
    @Async("taskExecutor")
    public Future<String> task1() {
        System.out.println("task1 开始执行");
        long startTime = System.currentTimeMillis();
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
        }
        System.out.println("task1 执行结束,耗时:" + (System.currentTimeMillis() - startTime));
        return new AsyncResult<>("task1 success");
    }

    /**
     * task 2
     */
    @Async("taskExecutor")
    public Future<String> task2() {
        System.out.println("task2 开始执行");
        long startTime = System.currentTimeMillis();
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
        }
        System.out.println("task2 执行结束,耗时:" + (System.currentTimeMillis() - startTime));
        return new AsyncResult<>("task2 success");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

调用方法:

@GetMapping("/async")
public void test() throws ExecutionException, InterruptedException {
    System.out.println("test 开始执行");
    asyncTask.test();
    long startTime = System.currentTimeMillis();
    System.out.println("test 执行结束,耗时:" + (System.currentTimeMillis() - startTime));
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

推荐阅读
相关标签