赞
踩
基于Java8的CompletableFuture实现的异步执行工具类
package com.jareny.jave.design.thread; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @Slf4j public class AsyncUtil { public static ExecutorService executor = new ThreadPoolExecutor( // CPU 核数 核心线程 Runtime.getRuntime().availableProcessors(), // CPU 核数 X 2 是最大线程 Runtime.getRuntime().availableProcessors() * 2, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); /** * 异步批量执行工具类 * * @param list * @param function * @param <T> * @param <R> * @return */ public static <T, R> List<R> supplyAsync(List<T> list, Function<T, R> function) { if (Objects.isNull(list) || list.isEmpty() || Objects.isNull(function)) { return new ArrayList<>(); } List<CompletableFuture<R>> completableFutureList = list. stream() .filter(Objects::nonNull) .map(item -> CompletableFuture.supplyAsync(() -> function.apply(item), executor)) .collect(Collectors.toList()); // 返回值 List<R> result = new ArrayList<>(); for (CompletableFuture<R> completableFuture : completableFutureList) { try { R r = completableFuture.get(); result.add(r); } catch (Exception e) { log.error("异步执行,发生异常:", e); } } return result; } /** * 异步批量执行工具类 * * @param list * @param function * @param taskName * @param <T> * @param <R> * @return */ public static <T, R> List<R> supplyAsync(List<T> list, Function<T, R> function, String taskName) { if (Objects.isNull(list) || list.isEmpty() || Objects.isNull(function)) { return new ArrayList<>(); } List<CompletableFuture<R>> completableFutureList = list. stream() .filter(Objects::nonNull) .map(item -> CompletableFuture.supplyAsync(() -> function.apply(item), executor)) .collect(Collectors.toList()); // 返回值 List<R> result = new ArrayList<>(); for (CompletableFuture<R> completableFuture : completableFutureList) { try { R r = completableFuture.get(); result.add(r); } catch (Exception e) { log.error("异步执行:{} 任务,发生异常:{}", taskName, e); } } return result; } /** * 异步执行工具类 * * @param t * @param function * @param <T> * @param <R> * @return */ public static <T, R> Optional<R> supplyAsync(T t, Function<T, R> function) { if (Objects.isNull(t) || Objects.isNull(function)) { return Optional.empty(); } // 异步执行 CompletableFuture<R> completableFuture = CompletableFuture .supplyAsync(() -> function.apply(t), executor); try { return Optional.of(completableFuture.get()); } catch (Exception e) { log.error("异步执行:{} 任务,发生异常:{}", e); } return Optional.empty(); } /** * 异步执行工具类 * * @param t * @param function * @param taskName * @param <T> * @param <R> * @return */ public static <T, R> Optional<R> supplyAsync(T t, Function<T, R> function, String taskName) { if (Objects.isNull(t) || Objects.isNull(function)) { return Optional.empty(); } // 异步执行 CompletableFuture<R> completableFuture = CompletableFuture .supplyAsync(() -> function.apply(t), executor); try { return Optional.of(completableFuture.get()); } catch (Exception e) { log.error("异步执行:{} 任务,发生异常:{}", taskName, e); } return Optional.empty(); } /** * 异步执行工具类 * * @param t * @param consumer * @param <T> * @return */ public static <T> void runAsync(T t, Consumer<T> consumer) { if (Objects.isNull(t) || Objects.isNull(consumer)) { return; } // 异步执行 CompletableFuture.runAsync(() -> consumer.accept(t), executor); } /** * 异步执行工具类 * * @param t * @param consumer * @param <T> * @return */ public static <T> void runAsync(T t, Consumer<T> consumer,String taskName) { if (Objects.isNull(t) || Objects.isNull(consumer)) { return; } // 异步执行 CompletableFuture.runAsync(() -> consumer.accept(t), executor); log.error("异步执行:{} 任务,完成!", taskName); } /** * 异步执行工具类 * * @param list * @param consumer * @param taskName * @param <T> * @return */ public static <T> void runAsync(List<T> list, Consumer<T> consumer, String taskName) { if (Objects.isNull(list) || list.isEmpty() || Objects.isNull(consumer)) { return; } // 异步执行 list.stream().filter(Objects::nonNull) .forEach(item->CompletableFuture.runAsync(() -> consumer.accept(item), executor)); log.info("批量执行%s异步任务成功!",taskName); } /** * 异步执行工具类 * * @param consumer * @param <T> * @return */ public static <T> void runAsync(List<T> list, Consumer<T> consumer) { if (Objects.isNull(list) || list.isEmpty() || Objects.isNull(consumer)) { return; } // 异步执行 list.stream().filter(Objects::nonNull) .forEach(item->CompletableFuture.runAsync(() -> consumer.accept(item), executor)); } /** * 异步执行工具类 * * @param supplier * @param <T> * @return */ public static <T> List<T> supplyAsync(List<T> list, Supplier<T> supplier) { if (Objects.isNull(list) || list.isEmpty() || Objects.isNull(supplier)) { return new ArrayList<>(); } // 异步执行 List<CompletableFuture<T>> completableFutureList = list.stream().filter(Objects::nonNull) .map(item -> CompletableFuture.supplyAsync(() -> supplier.get(), executor)) .collect(Collectors.toList()); // 返回值 List<T> result = new ArrayList<>(); for (CompletableFuture<T> completableFuture : completableFutureList) { try { T r = completableFuture.get(); result.add(r); } catch (Exception e) { log.error("异步执行任务,发生异常:{}", e); } } return result; } /** * 异步执行工具类 * * @param supplier * @param <T> * @return */ public static <T> List<T> supplyAsync(List<T> list, Supplier<T> supplier,String taskName) { if (Objects.isNull(list) || list.isEmpty() || Objects.isNull(supplier)) { return new ArrayList<>(); } // 异步执行 List<CompletableFuture<T>> completableFutureList = list.stream().filter(Objects::nonNull) .map(item -> CompletableFuture.supplyAsync(() -> supplier.get(), executor)) .collect(Collectors.toList()); // 返回值 List<T> result = new ArrayList<>(); for (CompletableFuture<T> completableFuture : completableFutureList) { try { T r = completableFuture.get(); result.add(r); } catch (Exception e) { log.error("异步执行:{} 任务,发生异常:{}", taskName, e); } } return result; } /** * 异步执行工具类 * * @param supplier * @param <T> * @return */ public static <T> Optional<T> supplierAsync(Supplier<T> supplier) { if (Objects.isNull(supplier)) { return Optional.empty(); } // 异步执行 CompletableFuture<T> completableFuture = CompletableFuture.supplyAsync(() -> supplier.get(), executor); try { T t = completableFuture.get(); return Optional.of(t); } catch (Exception e) { log.error("异步执行任务,发送异常:{}",e); } return Optional.empty(); } /** * 异步执行工具类 * * @param supplier * @param taskName * @param <T> * @return */ public static <T> Optional<T> supplierAsync(Supplier<T> supplier, String taskName) { if (Objects.isNull(supplier)) { return Optional.empty(); } // 异步执行 try { CompletableFuture<T> completableFuture = CompletableFuture.supplyAsync(() -> supplier.get(), executor); return Optional.of(completableFuture.get()); } catch (Exception e) { log.error("异步执行{}任务,发送异常:{}",taskName,e); } return Optional.empty(); } /** * 异步执行工具类 * * @param <T> * @param <R> * @param t * @param function * @return */ public static <T, R> CompletableFuture<R> completableFuture(T t, Function<T, R> function) { if (Objects.isNull(t)) { throw new RuntimeException("执行异步任务的入参不能为空"); } if (Objects.isNull(function)) { throw new RuntimeException("执行异步任务的方法体不能为空"); } // 返回异步执行 return CompletableFuture.supplyAsync(() -> function.apply(t), executor); } /** * 异步执行工具类 * * @param <T> * @param <R> * @param t * @param function * @return */ public static <T, R> CompletableFuture<R> completableFuture(T t, Function<T, R> function, String taskName) { if (Objects.isNull(t)) { throw new RuntimeException(String.format("执行异步%s任务的入参不能为空",taskName)); } if (Objects.isNull(function)) { throw new RuntimeException(String.format("执行异步%s任务的方法体不能为空",taskName)); } // 返回异步执行 return CompletableFuture.supplyAsync(() -> function.apply(t), executor); } /** * * @param t * @param consumer * @param <T> * @return */ public static <T> CompletableFuture<Void> completableFuture(T t, Consumer<T> consumer) { if (Objects.isNull(t)) { throw new RuntimeException(String.format("执行异步任务的入参不能为空")); } if (Objects.isNull(consumer)) { throw new RuntimeException(String.format("执行异步任务的方法体不能为空")); } // 异步执行 return CompletableFuture.runAsync(() -> consumer.accept(t), executor); } /** * * @param t * @param consumer * @param taskName * @param <T> * @return */ public static <T> CompletableFuture<Void> completableFuture(T t, Consumer<T> consumer, String taskName) { if (Objects.isNull(t)) { throw new RuntimeException(String.format("执行异步%s任务的入参不能为空",taskName)); } if (Objects.isNull(consumer)) { throw new RuntimeException(String.format("执行异步%s任务的方法体不能为空",taskName)); } // 异步执行 return CompletableFuture.runAsync(() -> consumer.accept(t), executor); } /** * * @param supplier * @param <T> * @return */ public static <T> CompletableFuture<T> completableFuture(Supplier<T> supplier) { if (Objects.isNull(supplier)) { throw new RuntimeException(String.format("执行异步任务的方法体不能为空")); } // 异步执行 return CompletableFuture.supplyAsync(() -> supplier.get(), executor); } /** * * @param supplier * @param taskName * @param <T> * @return */ public static <T> CompletableFuture<T> completableFuture(Supplier<T> supplier, String taskName) { if (Objects.isNull(supplier)) { throw new RuntimeException(String.format("执行异步%s任务的方法体不能为空",taskName)); } // 异步执行 return CompletableFuture.supplyAsync(() -> supplier.get(), executor); } /** * 异步批量执行工具类 * * @param list * @param function * @param <T> * @param <R> * @return */ public static <T, R> List<CompletableFuture<R>> completableFutureList(List<T> list, Function<T, R> function) { if (Objects.isNull(list) || list.isEmpty() || Objects.isNull(function)) { return new ArrayList<>(); } // 异步执行 return list.stream().filter(Objects::nonNull) .map(item -> CompletableFuture.supplyAsync(() -> function.apply(item), executor)) .collect(Collectors.toList()); } /** * 异步批量执行工具类 * * @param list * @param consumer * @param <T> * @return */ public static <T> List<CompletableFuture<Void>> completableFutureList(List<T> list, Consumer<T> consumer) { if (Objects.isNull(list) || list.isEmpty() || Objects.isNull(consumer)) { return new ArrayList<>(); } // 异步执行 return list.stream().filter(Objects::nonNull) .map(item -> CompletableFuture.runAsync(() -> consumer.accept(item), executor)) .collect(Collectors.toList()); } /** * 异步批量执行工具类 * * @param list * @param supplier * @param <T> * @return */ public static <T> List<CompletableFuture<T>> completableFutureList(List<T> list, Supplier<T> supplier) { if (Objects.isNull(list) || list.isEmpty() || Objects.isNull(supplier)) { return new ArrayList<>(); } // 异步执行 return list.stream().filter(Objects::nonNull) .map(item -> CompletableFuture.supplyAsync(() -> supplier.get(), executor)) .collect(Collectors.toList()); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。