当前位置:   article > 正文

spring boot 多线程/异步执行_springboot 多线程 异步

springboot 多线程 异步

spring boot 多线程/异步执行

对于spring boot的多线程/异步执行的介绍全网多如牛毛。对于线程池的监测寥寥无几,且“又臭又长”还搬来搬去的。耗费半天时间终于在Google找到一篇国内的文章:在spring boot应用监控线程池的状态,多亏这篇文章给我指点了迷津。
项目众多,且忘性比记性好,遂写下来为自己作为字典,也希望能帮到寻找答案的你。

多线程配置类

@EnableAsync

//配置注解
@Configuration
//多线程配置注解
@EnableAsync
public class AsyncConfiguration {
    @Bean("asyncTest")
    public Executor asyncTest() {
        //获取jvm核心数量
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数:线程池创建时候初始化的线程数
        executor.setCorePoolSize(availableProcessors*2);
        // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(availableProcessors*6);
        // 缓冲队列:用来缓冲执行任务的队列
        executor.setQueueCapacity(800);
        // 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(30);
        // 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix("async-test-");
        // 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.initialize();
        return executor;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

多线程/异步方法

@Async()多线程注解,放在方法上代表这个方法是多线程/异步的;如果放在类上代表这个类都是多线程/异步的。
多线程执行的返回值是Future类型或void。Future是非序列化的,微服务架构中有可能传递失败。
spring boot推荐使用的CompletableFuture在此次工作中没有经过测试。
调用此方法方法不能与当前方法在一个类中。

class AsyncClassOne{
	//只有这个方法对外是多线程/异步的
    @Async("asyncTest")
    public Future<String> asyncTestMethod(String date) {
        System.out.println(date);
        return new AsyncResult<>("执行完毕");
    }
    //这个方法不是多线程/异步的
    public void asyncTestTwo(String date){
    	System.out.println(date);
    	return;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
//这个类下面所有的方法都是多线程/异步的
@Async("asyncTest")
class AsyncClassTwo{
	//这个方法对外是多线程/异步的
    public Future<String> asyncTestMethod(String date) {
    	//调用同类中其它多线程/异步方法时多线程/异步不起作用。
    	//执行仍然是串行的
    	//会在asyncTestTwo(date)语句中等待一秒钟再返回。
        asyncTestTwo(date);
        return new AsyncResult<>("执行完毕");
    }
    //这个方法对外是多线程/异步的
    public void asyncTestTwo(String date){
    	System.out.println(date);
    	Thread.sleep(1000);
    	return;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

所有的@Async("asyncTest")共用同一个线程池:asyncTest

调用多线程/异步方法

在不接收返回值时调用线程/异步方法与调用普通方法基本一致,不同的是调用线程/异步方法时不会等待方法的执行时间,无论多长的代码都是调用即过。被调用方法在线程池中自己奔跑。

class LaunchClass{
	@Autowired
	private AsyncClassOne asyncClassOne;
	@Autowired
	private AsyncClassTwo asyncClassTwo;
    //不接收返回值
    public void launchMethod() {
        Map<String, ThreadPoolTaskExecutor> beansOfType = context.getBeansOfType(ThreadPoolTaskExecutor.class);
        ThreadPoolTaskExecutor solution = beansOfType.get("asyncUpdateTechnologySolution");
        ThreadPoolExecutor threadPoolExecutor = solution.getThreadPoolExecutor();
		asyncClassOne.asyncTestMethod("你好");
		asyncClassTwo.asyncTestTwo("你好");
		//执行完上面两句直接return,不会等待asyncTestTwo中的Thread.sleep(1000);
		return null;
    }
	//接收多线程的返回值
    public void launchMethodTwo() {
		Future<String> funture = asyncClassOne.asyncTestMethod("你好");
		boolean result = false;
		//等待线程执行
    	while (!result) {
    		//睡眠等待线程执行
        	Thread.sleep(100);
            result = true;
            if (!future.isDone()) {
            	//返回线程是否执行完毕(正常执行完毕、出错退出、终止执行都算执行完毕)
                result = future.isDone();
            } else {
            	//拿到方法返回值:"执行完毕"
                String str = future.get();
            }
        }
		return null;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

线程池动态监测

有时我们会通过多线程多次重复调用同一个方法,但是线程池终究有限,缓存队列长度也不是无限长,超过限度就会将方法丢弃导致方法没有正确执行。此时我们需要动态监测线程池中的缓存队列长度,在队列达到饱和时暂缓加入队列,在队列中有空缺时再进行补充。监测的方法是直接在spring boot的ApplicationContext 中直接拿到已经注册的线程池,通过线程池的方法进行监测。

class LaunchClassTwo{
	@Autowired
	private AsyncClassOne asyncClassOne;
	@Autowired
	private AsyncClassTwo asyncClassTwo;
    @Autowired
    private ApplicationContext context;
    
    public void monitorMethod() {
    	int count = 0;
    	//获取已经注册的线程池map,key是bean名称;
	    Map<String, ThreadPoolTaskExecutor> beansOfType = context.getBeansOfType(ThreadPoolTaskExecutor.class);
	    //通过asyncTest名称拿到线程池。
	    ThreadPoolTaskExecutor solution = beansOfType.get("asyncTest");
	    //solution 中没有获取缓存队列的方法,而ThreadPoolTaskExecutor中关于线程池是通过ThreadPoolExecutor来实现的,所以需要拿到ThreadPoolExecutor;
	    ThreadPoolExecutor threadPoolExecutor = solution.getThreadPoolExecutor();
	    List<String> listStr = new ArrayList<>();
	    listStr.add("Hello World");
	    //快速填充大量数据
	    for (int i = 0; i < 11; i++) {
            listStr.addAll(listStr);
        }
        for (String str : listStr){
        	if (listStr.size() > 800 && ++count > 800) {
        		//为了防止出现休眠后仍然满队列,采用递归调用休眠,将线程池传入。
                ThreadSleep(threadPoolExecutor);
            }
            asyncClassOne.asyncTestMethod(str);
        }
		return null;
	}
    private void ThreadSleep(ThreadPoolExecutor threadPoolExecutor){
    	//每次都要在此重新获取队列长度,通过getQueue()获取队列,通过size()获取队列长度。
       int size =threadPoolExecutor.getQueue().size();
       //要与线程池队列长度匹配。
       if (size<800){
           return;
       }else {
           try {
           		//队列满了,休息100ms
               Thread.sleep(100);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           //再次判断队列长度
           ThreadSleep(threadPoolExecutor);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

线程池获取数据的七个方法

  1. getCorePoolSize():获取核心线程数;
  2. getMaximumPoolSize:获取最大线程数;
  3. getQueue():获取线程池中的阻塞队列,并通过阻塞队列中的方法获取队列长度、元素个数等;
  4. getPoolSize():获取线程池中的工作线程数(包括核心线程和非核心线程);
  5. getActiveCount():获取活跃线程数,也就是正在执行任务的线程;
  6. getLargestPoolSize():获取线程池曾经到过的最大工作线程数;
  7. getTaskCount():获取历史已完成以及正在执行的总的任务数量;

ThreadPoolExecutor中还提供了三种钩子函数

  1. beforeExecute():Worker线程执行任务之前会调用的方法;
  2. afterExecute():在Worker线程执行任务之后会调用的方法;
  3. terminated():当线程池从运行状态变更到TERMINATED状态之前调用的方法;

线程池获取数据方法介绍引用自:https://blog.csdn.net/weixin_38592881/article/details/123467050

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/750983
推荐阅读
相关标签
  

闽ICP备14008679号