赞
踩
目录
根据阿姆达尔(Amdal)定律可知[并发编程基础 - 安全性、活跃性、性能问题#性能问题],若串行度为5%那么无论使用任何技术,性能最高提升20%,所以减小串行度(减小锁粒度、算法优化等)是前提。但是除此之外使用多线程并行任务也是比较常用的手段,效果也非常明显,只是使用不好坑会非常多,所以最好深入理解其运行原理,这当然也是面试的高频。
上面的多线程并行是最理想的状态,真正执行的时候与CPU的核数(并行度)以及上下文的切换,GC垃圾回收线程(特别是Stop The World阶段)的运行等综合的结果。理论上串行那么就是所有耗时的总和,并行使用合理则耗时为最长一个的时间;结合具体的线程池工作原理,为了方便理解执行相同耗时的任务比如上图,真实耗时介于1到三倍中间,即最差的耗时也就相当于串行。这种情况下我们可以使用下面线程池封装的executeAll方法,底层是调用了ThreadPoolExecutor的invokeAll接口。类似这样的方法,我们还可以使用Java提供了CompletionService从队列中获取;或者CompletableFuture#allOf方法;当然也可以使用execute方法结合CountDownLatch,后面专门分析和项目使用。针对更复杂的情况,CompletableFuture提供了强大的API,自己仅仅是在项目上使用部分。
比如项目上,自己将获取一个分段的字符串,每段使用线程池任务获取,只是多线程处理的结果在合并到主线程(当前主线程一般是比如Tomcat线程池中的一个线程)时,需要注意数据的安全性,我下面使用了ConcurrentHahsMap。最终放入结合下面的线程池工具类如下:
- private void aaa(StringBuilder orderCode) {
- final ConcurrentHashMap<StateConfigEnum, String> resultMap = new ConcurrentHashMap<>(16);
- List<Callable<String>> taskList = Lists.<Callable<String>>newArrayList(
- () -> {
- SendPayDTO sendPayDTO = getInstance().get().sendPayDTO;
- DefSaleOrderDTO defSaleOrderDTO = defSaleOrderService.sendPayDTOBySoCode(sendPayDTO.getSoTypeCode());
- return resultMap.put(SALE_DEFINITION, defSaleOrderDTO.getCursorCode());
- },
- () -> resultMap.put(SALE_AUDIT, confSaleOrderAuditService.sendPayStage()),
- () -> resultMap.put(SHIPPING_CONDITION, confShippingConditionService.sendPayStage())
- );
- // 阻塞获取结果
- ExecutorService executor = TtlExecutors.getTtlExecutorService(SimpleThreadPool.THREAD_POOL_EXECUTOR_MAP.get(SEND_PAY.name()));
- SimpleThreadPool.executeAll(executor, taskList).forEach(this::getFuture);
- orderCode.append(BEFORE_SALE_DEFINITION_DEFAULT)
- .append(resultMap.get(SALE_DEFINITION))
- .append(AFTER_SALE_DEFINITION_DEFAULT)
- .append(resultMap.get(SALE_AUDIT))
- .append(resultMap.get(SHIPPING_CONDITION));
- }
使用CountDownLatch实现相同的效果,上面的方法可以修改为:
- private void byCountDownLatch(StringBuilder orderCode) throws InterruptedException {
- final ConcurrentHashMap<StateConfigEnum, String> resultMap = new ConcurrentHashMap<>(16);
- CountDownLatch countDownLatch = new CountDownLatch(3);
- ExecutorService executor = TtlExecutors.getTtlExecutorService(SimpleThreadPool.THREAD_POOL_EXECUTOR_MAP.get(SEND_PAY.name()));
- executor.execute(() -> {
- SendPayDTO sendPayDTO = getInstance().get().sendPayDTO;
- DefSaleOrderDTO defSaleOrderDTO = defSaleOrderService.sendPayDTOBySoCode(sendPayDTO.getSoTypeCode());
- resultMap.put(SALE_DEFINITION, defSaleOrderDTO.getCursorCode());
- countDownLatch.countDown();
- });
- executor.submit(() -> {
- resultMap.put(SALE_AUDIT, confSaleOrderAuditService.sendPayStage());
- countDownLatch.countDown();
- });
- executor.submit(() -> {
- resultMap.put(SHIPPING_CONDITION, confShippingConditionService.sendPayStage());
- countDownLatch.countDown();
- });
- // 阻塞获取结果
- countDownLatch.await();
- orderCode.append(BEFORE_SALE_DEFINITION_DEFAULT)
- .append(resultMap.get(SALE_DEFINITION))
- .append(AFTER_SALE_DEFINITION_DEFAULT)
- .append(resultMap.get(SALE_AUDIT))
- .append(resultMap.get(SHIPPING_CONDITION));
- }
CPU密集型: 一般设置为CPU核(或者核线程,如果我们现在电脑是4核8线程)+ 1;增加1是为了防止线程偶发性的缺页中断或者其他原因导致的任务暂停,导致资源使用不充分。
I/O密集型:一般设置为CPU核 * 2
上面是极端的情况,很多时候我们的任务是综合型的,那么可以根据下面公式大概计算(我们可以使用VisualVM等来查询WT/ST):
线程数=N(CPU核数)*(1+WT(线程等待时间)/ST(线程时间运行时间))
但是不管怎么计算,上面的只是一个基础参数值,具体的还需要根据压测来判断。理论上,随着参数的增加性能会提升,替身到一定的值后会出现拐点,再比较机具的向下走,所以压测就是为了找到拐点是的参数值。
因为上面也提到了,还跟GC情况,上下文切换,CPU和内存动态等有关,所以压测是不仅要看Tps值,还要随时注意CPU和内存的使用情况,GC的回收耗时和情况,必要也可以查看线程上下文的切换[并发编程基础 - 多线程的上下文切换问题]。
使用线程池时,一般情况下只有等到问题出现时再去dump等,当然ThreadPoolExecutor提供了调用的API,比如查看处理的任务数,当前的线程数等,那么我们可以基于定时任务打印到日志中,前面还提到了每个线程执行Work#runWorker时,在调用我们的写的Runnable的run方法前后都允许定义回调任务。并且上一篇看到,Tomcat的ThreadPoolExecutor类也重写了后置回调方法。
1、定时任务打印线程池情况到日志
这种方式在下面工具的ThreadPoolInit中也有体现,具体使用了juc的单线程的ScheduledThreadPoolExecutor线程池,如下:
- public static final ScheduledExecutorService SINGLE_POOL = Executors.newSingleThreadScheduledExecutor();
-
- private static void printScheduledThreadStats() {
- SINGLE_POOL.scheduleAtFixedRate(() -> {
- THREAD_POOL_EXECUTOR_MAP.forEach((name, threadPool) -> {
- log.info("{} Pool Size: {}", name, threadPool.getPoolSize());
- log.info("{} Active Threads: {}", name, threadPool.getActiveCount());
- log.info("{} Number of Tasks Completed: {}", name, threadPool.getCompletedTaskCount());
- log.info("{} Number of Tasks in Queue: {}", name, threadPool.getQueue().size());
- });
- }, 0L, 5L, TimeUnit.SECONDS);
- }
2、每个我们的Runnable#run方法前后打印
这样需要像Tomcat一下继承自juc的ThreadPoolExecutor,并且按需要重新前置和后置方法(如果我们的任务执行非常快,并且量非常大则肯定不能使用该方式,否则打印太多了):
- @Slf4j
- public class MyThreadPoolExecutor extends ThreadPoolExecutor {
- @Override
- protected void beforeExecute(Thread thread, Runnable runnable) {}
-
- @Override
- protected void afterExecute(Runnable runnable, Throwable throwable) {
- log.info("{} Pool Size: {}", super.getPoolSize());
- log.info("{} Active Threads: {}", super.getActiveCount());
- log.info("{} Number of Tasks Completed: {}", super.getCompletedTaskCount());
- log.info("{} Number of Tasks in Queue: {}", super.getQueue().size());
- }
- // 省略重写父类的构造。。。
- }
1、自己封装的线程池管理工具,仅仅是为了统一管理,以及多个微服务项目需要使用时候,提高代码的复用性。
2、使用时发现一个项目启动了5个及以上的线程池,那么波峰是无法估计的,也比较危险,压测也只压一个线程池或者方法。之前一直不理解怎么去控制,后面理解了如果一个项目要启动那么多线程池,其实项目本身也早就该拆分了(拆分到不同的项目JVM 中)。
3、而每一个应用的线程池性质本身;服务器的配置情况只有开发的人自己清楚,所以使用了Java原生的Spi机制实现
- public class ThreadPoolImpl implements ThreadPool {
- @Override
- public List<ThreadPoolEntity> appendThreadPool() {
- return Lists.newArrayList(
- new ThreadPoolEntity(ThreadPoolEnum.TRANSFER_ORDER.name(), SimpleThreadPool.PoolModel.FAST_IO, Boolean.FALSE, Boolean.TRUE,
- "****线程池【Per PO Per Thread】", 5, 8, 30, TimeUnit.SECONDS,
- new TaskQueue(50), new ThreadPoolExecutor.AbortPolicy())
- );
- }
- }
4、定义的线程池类型枚举,CPU型、IO型、FAST_IO型【就是原生的Tomcat类型,先创建最大线程数,再放入队列,详细见编发编程工具 - Tomcat对juc线程池的扩展】
代码如下(欢迎讨论):
- @Slf4j
- @Component
- @SuppressWarnings("ALL")
- public class SimpleThreadPool extends ThreadPoolInit implements EnvironmentAware {
-
- /**
- * 默认最大的超时时间
- */
- private static int DEFAULT_TIMEOUT = 50000;
-
- @Override
- public void setEnvironment(Environment environment) {
- // 执行父类中的环境设置
- super.setEnvironment(environment);
-
- String[] activeProfiles = environment.getActiveProfiles();
- if (activeProfiles == null) {
- return;
- }
- String pro = "prod";
- for (String activeProfile : activeProfiles) {
- if (pro.equals(activeProfile)) {
- if (log.isInfoEnabled()) {
- log.info("active spring profile prod");
- }
- DEFAULT_TIMEOUT = 2000;
- }
- }
- if (log.isInfoEnabled()) {
- log.info("SimpleThreadPool DEFAULT_TIMEOUT = {} ms", DEFAULT_TIMEOUT);
- }
- }
-
- /**
- * 执行没有返回值的任务
- * @param key 线程池枚举
- * @param runnable 任务
- */
- public static void execute(String key, Runnable runnable) {
- if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
- throw new IllegalArgumentException("未找到线程池:" + key);
- }
- // 执行任务
- THREAD_POOL_EXECUTOR_MAP.get(key).execute(runnable);
- }
-
- /**
- * 执行有返回值任务
- *
- * @param key 线程池名称
- * @param callable 需要执行的任务可变数组
- * @param <T> 任务类型
- * @return 任务结果
- */
- public static <T> List<Future<T>> execute(String key, Callable<T>... callable) {
- if (callable == null || callable.length == 0) {
- throw new IllegalArgumentException("任务不能为空!");
- }
- List<Callable<T>> taskList = Arrays.stream(callable).collect(Collectors.toList());
- return executeAll(key, taskList);
- }
-
- /**
- * 批量执行并行任务, 使用默认的最大超时时间,单位毫秒
- * @param key 线程池名称
- * @param callableList 任务列表
- * @return 结果
- */
- public static <T> List<Future<T>> executeAll(String key, List<Callable<T>> callableList) {
- return executeAll(key, DEFAULT_TIMEOUT, callableList);
- }
-
- /**
- * 批量执行并行任务, 使用默认的最大超时时间,单位毫秒
- * @param key 线程池名称
- * @param callableList 任务列表
- * @return 结果
- */
- public static <T> List<Future<T>> executeAll(ExecutorService service, List<Callable<T>> callableList) {
- return executeAll(service, DEFAULT_TIMEOUT, callableList);
- }
-
- /**
- * 批量执行并行任务
- * @param key 线程池名称
- * @param maxTimeout 最大超时时间,没有取默认值,单位2000毫秒
- * @param callableList 任务列表
- * @return 结果
- */
- public static <T> List<Future<T>> executeAll(ExecutorService service, int maxTimeout, List<Callable<T>> callableList) {
- try {
- return service.invokeAll(callableList, maxTimeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- log.error("线程池批量执行任务异常失败", e);
- } catch (Exception e) {
- log.error("线程池批量执行任务异常失败", e);
- }
- return new ArrayList<>();
- }
-
- /**
- * 批量执行并行任务
- * @param key 线程池名称
- * @param callableList 任务列表
- * @return 结果
- */
- public static <T> List<Future<T>> executeAllUntil(String key, List<Callable<T>> callableList) {
- if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
- throw new IllegalArgumentException("未配置线程池" + key);
- }
- try {
- return THREAD_POOL_EXECUTOR_MAP.get(key).invokeAll(callableList);
- } catch (InterruptedException e) {
- log.error("线程池批量执行任务异常失败", e);
- } catch (Exception e) {
- log.error("线程池批量执行任务异常失败", e);
- }
- return new ArrayList<>();
- }
-
- /**
- * 批量执行并行任务
- * @param key 线程池名称
- * @param maxTimeout 最大超时时间,没有取默认值,单位2000毫秒
- * @param callableList 任务列表
- * @return 结果
- */
- public static <T> List<Future<T>> executeAll(String key, int maxTimeout, List<Callable<T>> callableList) {
- if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
- throw new IllegalArgumentException("未配置线程池" + key);
- }
- try {
- return THREAD_POOL_EXECUTOR_MAP.get(key).invokeAll(callableList, maxTimeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- log.error("线程池批量执行任务异常失败", e);
- } catch (Exception e) {
- log.error("线程池批量执行任务异常失败", e);
- }
- return new ArrayList<>();
- }
-
- /**
- * @param key 线程池名称
- * @param callableList 任务列表
- * @return 结果
- */
- public static void executeRunnable(String key, Runnable... r) {
- if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
- throw new IllegalArgumentException("未配置线程池" + key);
- }
- ThreadPoolExecutor executor = THREAD_POOL_EXECUTOR_MAP.get(key);
- for (int i = 0; i < r.length; i++) {
- executor.submit(r[i]);
- }
- }
-
- }
- @Slf4j
- public class ThreadPoolInit implements EnvironmentAware {
-
- /**
- * 定时执行任务的单线程, 所有任务公用
- * 当前使用的有: 打印线程信息,装运点等定时刷新任务
- */
- public static final ScheduledExecutorService SINGLE_POOL = Executors.newSingleThreadScheduledExecutor();
-
- /**
- * 线程池容器
- */
- public static final Map<String, ThreadPoolExecutor> THREAD_POOL_EXECUTOR_MAP = new ConcurrentHashMap<>(16);
-
- /**
- * 是否打印
- */
- private static Boolean printThreadPoolInfoInterval = Boolean.TRUE;
-
- static {
- // 初始化线程池
- ServiceLoader<ThreadPool> load = ServiceLoader.load(ThreadPool.class);
- load.forEach(threadPool -> threadPool.appendThreadPool().forEach(SimpleThreadPool::putThreadPool));
-
- }
-
- @Override
- public void setEnvironment(Environment environment) {
- String[] activeProfiles = environment.getActiveProfiles();
- String pro = "dev";
- for (String activeProfile : activeProfiles) {
- if (pro.equals(activeProfile)) {
- return;
- }
- }
- // 初始化线程池状态信息订单打印
- if (printThreadPoolInfoInterval) {
- printScheduledThreadStats();
- }
- }
-
- /**
- * 线程池类型, 只是作为标识当前任务是CPU型还是IO型为主
- */
- @SuppressWarnings("unused")
- public enum PoolModel {
- /** io型 */
- IO,
- /** CPU型 */
- CPU,
- /** io型,Tomcat扩展的 juc线程池 */
- FAST_IO
-
- }
-
- /**
- * 线程工厂
- */
- static class DefaultThreadFactory implements ThreadFactory {
-
- /**
- * 定义线程组
- */
- static ThreadGroup threadGroup;
-
- /**
- * 定义每个线程池中每个线程的名称后缀数字
- */
- static final AtomicInteger THREAD_NUMBER = new AtomicInteger(1);
-
- /**
- * 定义每个线程词的名称前缀
- */
- static String namePrefix;
-
- public DefaultThreadFactory(String name) {
- final SecurityManager securityManager = System.getSecurityManager();
- threadGroup = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
- namePrefix = name + "-thread-";
- }
-
- @Override
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(threadGroup, runnable, namePrefix + THREAD_NUMBER.getAndIncrement(), 0);
- if (thread.isDaemon()) {
- thread.setDaemon(false);
- }
- if(thread.getPriority() != Thread.NORM_PRIORITY){
- thread.setPriority(Thread.NORM_PRIORITY);
- }
- return thread;
- }
- }
-
- /**
- * 启动打印线程
- */
- private static void printScheduledThreadStats() {
- SINGLE_POOL.scheduleAtFixedRate(() -> THREAD_POOL_EXECUTOR_MAP.forEach((name, threadPool) -> {
- log.info("{} Pool Size: {}", name, threadPool.getPoolSize());
- log.info("{} Active Threads: {}", name, threadPool.getActiveCount());
- log.info("{} Number of Tasks Completed: {}", name, threadPool.getCompletedTaskCount());
- log.info("{} Number of Tasks in Queue: {}", name, threadPool.getQueue().size());
- }), 0, 5, TimeUnit.SECONDS);
- }
-
- /**
- * 往线程池容器中放入线程池池
- * @param threadPoolEntity 线程池定义对象
- */
- public static void putThreadPool(ThreadPoolEntity threadPoolEntity) {
- ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(threadPoolEntity);
- log.info("name: {}, threadPoolExecutor = {}", threadPoolEntity.taskName, threadPoolExecutor);
- THREAD_POOL_EXECUTOR_MAP.put(threadPoolEntity.taskName, threadPoolExecutor);
- }
-
- /**
- * 根据枚举获取线程池
- * @param entity 线程池枚举
- */
- private static ThreadPoolExecutor getThreadPoolExecutor(ThreadPoolEntity entity) {
- ThreadPoolExecutor executor;
- if (entity.poolModel == PoolModel.FAST_IO) {
- if (!(entity.blockingQueue instanceof TaskQueue)) {
- throw new RuntimeException("PoolModel.FAST_IO 类型的线程池,只能创建 " + TaskQueue.class.getName() + " 类型的队列!");
- }
- TaskQueue taskQueue = (TaskQueue)entity.blockingQueue;
- ThreadPoolExecutorImpl threadPoolExecutor = new ThreadPoolExecutorImpl(entity.corePoolNum, entity.maxPoolNum, entity.deleteThreadNum,
- entity.deleteTreadUnit, taskQueue, new TaskThreadFactory(entity.taskName), entity.rejectedExecutionHandler);
- // 设置父类,用于判断线程的对列表是否真的满了
- taskQueue.setParent(threadPoolExecutor);
- executor = threadPoolExecutor;
-
- log.info("init ThreadPoolExecutorImpl and TaskQueue!");
- } else {
- executor = new ThreadPoolExecutor(entity.corePoolNum, entity.maxPoolNum, entity.deleteThreadNum, entity.deleteTreadUnit,
- entity.blockingQueue, new DefaultThreadFactory(entity.taskName), entity.rejectedExecutionHandler);
- // 是否预热核心线程
- if (entity.preStartAllCoreThreads) {
- executor.prestartAllCoreThreads();
- }
-
- log.info("init ThreadPoolExecutor and {}}!", entity.blockingQueue.getClass().getSimpleName());
- }
-
- try {
- // 是否允许核心线程超时
- if (entity.allowsCoreThreadTimeOut) {
- executor.allowsCoreThreadTimeOut();
- }
- } catch (NullPointerException e) {
- log.error("初始化线程池错误:" + e);
- }
- return executor;
- }
-
- }
SPI机制使用的接口和定义如下:
- @FunctionalInterface
- public interface ThreadPool {
- /**
- * 添加线程池
- * @return 线程池定义对象
- */
- List<ThreadPoolEntity> appendThreadPool();
- }
- @AllArgsConstructor
- public final class ThreadPoolEntity {
-
- /** 先池名称 */
- public final String taskName;
-
- /** 线程池任务类型 */
- public final SimpleThreadPool.PoolModel poolModel;
-
- /** 是否允许核心线程超时 */
- public final Boolean allowsCoreThreadTimeOut;
-
- /** 是否预热核心线程池 */
- public final Boolean preStartAllCoreThreads;
-
- /** 线程池说明 */
- public final String detail;
-
- /** 核心线程数 */
- public final int corePoolNum;
-
- /** 最大线程数 */
- public final int maxPoolNum;
-
- /** 超时时间 */
- public final int deleteThreadNum;
-
- /** 超时单位 */
- public final TimeUnit deleteTreadUnit;
-
- /** 任务队列,没有特殊理由不能使用无界队列 */
- public final BlockingQueue<Runnable> blockingQueue;
-
- /** 拒绝策略 */
- public final RejectedExecutionHandler rejectedExecutionHandler;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。