赞
踩
对于spring boot的多线程/异步执行的介绍全网多如牛毛。对于线程池的监测寥寥无几,且“又臭又长”还搬来搬去的。耗费半天时间终于在Google找到一篇国内的文章:在spring boot应用监控线程池的状态,多亏这篇文章给我指点了迷津。
项目众多,且忘性比记性好,遂写下来为自己作为字典,也希望能帮到寻找答案的你。
@EnableAsync
//配置注解 @Configuration //多线程配置注解 @EnableAsync public class AsyncConfiguration { @Bean("asyncTest") public Executor asyncTest() { //获取jvm核心数量 int availableProcessors = Runtime.getRuntime().availableProcessors(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数:线程池创建时候初始化的线程数 executor.setCorePoolSize(availableProcessors*2); // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程 executor.setMaxPoolSize(availableProcessors*6); // 缓冲队列:用来缓冲执行任务的队列 executor.setQueueCapacity(800); // 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁 executor.setKeepAliveSeconds(30); // 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池 executor.setThreadNamePrefix("async-test-"); // 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程) executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.initialize(); return executor; } }
@Async()
多线程注解,放在方法上代表这个方法是多线程/异步的;如果放在类上代表这个类都是多线程/异步的。
多线程执行的返回值是Future类型或void。Future是非序列化的,微服务架构中有可能传递失败。
spring boot推荐使用的CompletableFuture在此次工作中没有经过测试。
调用此方法的方法不能与当前方法在一个类中。
class AsyncClassOne{
//只有这个方法对外是多线程/异步的
@Async("asyncTest")
public Future<String> asyncTestMethod(String date) {
System.out.println(date);
return new AsyncResult<>("执行完毕");
}
//这个方法不是多线程/异步的
public void asyncTestTwo(String date){
System.out.println(date);
return;
}
}
//这个类下面所有的方法都是多线程/异步的 @Async("asyncTest") class AsyncClassTwo{ //这个方法对外是多线程/异步的 public Future<String> asyncTestMethod(String date) { //调用同类中其它多线程/异步方法时多线程/异步不起作用。 //执行仍然是串行的 //会在asyncTestTwo(date)语句中等待一秒钟再返回。 asyncTestTwo(date); return new AsyncResult<>("执行完毕"); } //这个方法对外是多线程/异步的 public void asyncTestTwo(String date){ System.out.println(date); Thread.sleep(1000); return; } }
所有的@Async("asyncTest")
共用同一个线程池:asyncTest
在不接收返回值时调用线程/异步方法与调用普通方法基本一致,不同的是调用线程/异步方法时不会等待方法的执行时间,无论多长的代码都是调用即过。被调用方法在线程池中自己奔跑。
class LaunchClass{ @Autowired private AsyncClassOne asyncClassOne; @Autowired private AsyncClassTwo asyncClassTwo; //不接收返回值 public void launchMethod() { Map<String, ThreadPoolTaskExecutor> beansOfType = context.getBeansOfType(ThreadPoolTaskExecutor.class); ThreadPoolTaskExecutor solution = beansOfType.get("asyncUpdateTechnologySolution"); ThreadPoolExecutor threadPoolExecutor = solution.getThreadPoolExecutor(); asyncClassOne.asyncTestMethod("你好"); asyncClassTwo.asyncTestTwo("你好"); //执行完上面两句直接return,不会等待asyncTestTwo中的Thread.sleep(1000); return null; } //接收多线程的返回值 public void launchMethodTwo() { Future<String> funture = asyncClassOne.asyncTestMethod("你好"); boolean result = false; //等待线程执行 while (!result) { //睡眠等待线程执行 Thread.sleep(100); result = true; if (!future.isDone()) { //返回线程是否执行完毕(正常执行完毕、出错退出、终止执行都算执行完毕) result = future.isDone(); } else { //拿到方法返回值:"执行完毕" String str = future.get(); } } return null; } }
有时我们会通过多线程多次重复调用同一个方法,但是线程池终究有限,缓存队列长度也不是无限长,超过限度就会将方法丢弃导致方法没有正确执行。此时我们需要动态监测线程池中的缓存队列长度,在队列达到饱和时暂缓加入队列,在队列中有空缺时再进行补充。监测的方法是直接在spring boot的ApplicationContext
中直接拿到已经注册的线程池,通过线程池的方法进行监测。
class LaunchClassTwo{ @Autowired private AsyncClassOne asyncClassOne; @Autowired private AsyncClassTwo asyncClassTwo; @Autowired private ApplicationContext context; public void monitorMethod() { int count = 0; //获取已经注册的线程池map,key是bean名称; Map<String, ThreadPoolTaskExecutor> beansOfType = context.getBeansOfType(ThreadPoolTaskExecutor.class); //通过asyncTest名称拿到线程池。 ThreadPoolTaskExecutor solution = beansOfType.get("asyncTest"); //solution 中没有获取缓存队列的方法,而ThreadPoolTaskExecutor中关于线程池是通过ThreadPoolExecutor来实现的,所以需要拿到ThreadPoolExecutor; ThreadPoolExecutor threadPoolExecutor = solution.getThreadPoolExecutor(); List<String> listStr = new ArrayList<>(); listStr.add("Hello World"); //快速填充大量数据 for (int i = 0; i < 11; i++) { listStr.addAll(listStr); } for (String str : listStr){ if (listStr.size() > 800 && ++count > 800) { //为了防止出现休眠后仍然满队列,采用递归调用休眠,将线程池传入。 ThreadSleep(threadPoolExecutor); } asyncClassOne.asyncTestMethod(str); } return null; } private void ThreadSleep(ThreadPoolExecutor threadPoolExecutor){ //每次都要在此重新获取队列长度,通过getQueue()获取队列,通过size()获取队列长度。 int size =threadPoolExecutor.getQueue().size(); //要与线程池队列长度匹配。 if (size<800){ return; }else { try { //队列满了,休息100ms Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //再次判断队列长度 ThreadSleep(threadPoolExecutor); } } }
ThreadPoolExecutor中还提供了三种钩子函数,
线程池获取数据方法介绍引用自:https://blog.csdn.net/weixin_38592881/article/details/123467050
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。