当前位置:   article > 正文

JUC(六) 四大函数式接口和ForkJoin_function

function

目录

四大函数式接口

函数型接口

 Function 函数型接口

Predicate 断定型接口

Supplier 供给型接口

Consumer 消费型接口

Stream 流式计算

 ForkJoin


四大函数式接口

lambda表达式,链式编程,函数式接口,Steram流式计算

函数型接口

 Function 函数型接口

函数式接口:只有一个方法的接口

传入参数T,返回类型R

只要是函数式接口,就可以用lambda表达式简化

  1. public class FunctionDemo {
  2. public static void main(String[] args) {
  3. /*Function<String,String> function = new Function<String,String>() {
  4. //工具类:输出输入的值
  5. @Override
  6. public String apply(String str) {
  7. return str;
  8. };
  9. };*/
  10. Function<String,String> function = (str)->{return str;};
  11. System.out.println(function.apply("aaa"));
  12. }
  13. }

Predicate 断定型接口

 有一个输入参数,返回值只能是布尔值

  1. public class PredicateDemo {
  2. public static void main(String[] args) {
  3. //判断字符串是否为空
  4. /*Predicate<String> predicate = new Predicate<String>() {
  5. @Override
  6. public boolean test(String str) {
  7. return str.isEmpty();
  8. }
  9. };*/
  10. Predicate<String> predicate = (str)->{return str.isEmpty();};
  11. System.out.println(predicate.test(""));
  12. }
  13. }

Supplier 供给型接口

  1. public class SuppierDemo {
  2. public static void main(String[] args) {
  3. //打印字符串
  4. /*Supplier<String> supplier = new Supplier<String>() {
  5. @Override
  6. public String get() {
  7. return "aaa";
  8. }
  9. };*/
  10. Supplier<String> supplier = ()->{return "aaa";};
  11. System.out.println(supplier.get());
  12. }
  13. }

Consumer 消费型接口

  1. public class ConsummerDemo {
  2. public static void main(String[] args) {
  3. //打印字符串
  4. /* Consumer<String> consumer = new Consumer<String>() {
  5. @Override
  6. public void accept(String s) {
  7. System.out.println(s);
  8. }
  9. };*/
  10. Consumer<String> consumer = (s)->{System.out.println(s);};
  11. consumer.accept("sss");
  12. }
  13. }

Stream 流式计算

什么是Stream流式计算

  • 大数据:存储+计算
  • 集合、Mysql本来就是存储数据的,计算应该交给流来操作。

  1. /**
  2. * 题目要求: 用一行代码实现
  3. * 1. Id 必须是偶数
  4. * 2.年龄必须大于23
  5. * 3. 用户名转为大写
  6. * 4. 用户名倒序
  7. * 5. 只能输出一个用户
  8. **/
  9. public class StreamDemo {
  10. public static void main(String[] args) {
  11. User u1 = new User(1, "a", 23);
  12. User u2 = new User(2, "b", 23);
  13. User u3 = new User(3, "c", 23);
  14. User u4 = new User(6, "d", 24);
  15. User u5 = new User(4, "e", 25);
  16. List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
  17. // lambda、链式编程、函数式接口、流式计算
  18. list.stream()
  19. .filter(user -> {return user.getId()%2 == 0;})//判断偶数
  20. .filter(user -> {return user.getAge() > 23;})//判断大于23
  21. .map(user -> {return user.getName().toUpperCase();})//转换大写
  22. .sorted((user1, user2) -> {return user2.compareTo(user1);})//排序(比较user1和user2)
  23. .limit(1)//打印一个
  24. .forEach(System.out::println);
  25. }
  26. }

ForkJoin

ForkJoin 在JDK1.7,并行执行任务!提高效率~。在大数据量速率会更快!

一个线程并发成多个去操作

大数据中:Map Reduce 核心思想->把大任务拆分为小任务

 ForkJoin 特点: 工作窃取!(B执行完后会把A没执行的任务执行)
这里面维护的是双端队列(两端都可操作)
实现原理是:双端队列!从上面和下面都可以去拿到任务进行执行!

  • 如何使用ForkJoin?
    • 通过ForkJoinPool来执行
    • 计算任务 execute(ForkJoinTask<?> task)
    • 计算类要去继承ForkJoinTask;

  1. public class ForkJoinDemo extends RecursiveTask<Long> {
  2. private long star;
  3. private long end;
  4. /** 临界值 */
  5. private long temp = 1000000L;
  6. public ForkJoinDemo(long star, long end) {
  7. this.star = star;
  8. this.end = end;
  9. }
  10. /**
  11. * 计算方法
  12. * @return
  13. */
  14. @Override
  15. protected Long compute() {
  16. if ((end - star) < temp) {
  17. Long sum = 0L;
  18. for (Long i = star; i < end; i++) {
  19. sum += i;
  20. }
  21. return sum;
  22. }else {
  23. // 使用ForkJoin 分而治之 计算
  24. //1 . 计算平均值
  25. long middle = (star + end) / 2;
  26. ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(star, middle);
  27. // 拆分任务,把线程压入线程队列
  28. forkJoinDemo1.fork();
  29. ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle, end);
  30. forkJoinDemo2.fork();
  31. long taskSum = forkJoinDemo1.join() + forkJoinDemo2.join();
  32. return taskSum;
  33. }
  34. }
  35. }

测试

  1. public class ForkJoinTest {
  2. private static final long SUM = 20_0000_0000;
  3. public static void main(String[] args) throws ExecutionException, InterruptedException {
  4. test1();
  5. test2();
  6. test3();
  7. }
  8. /**
  9. * 使用普通方法
  10. */
  11. public static void test1() {
  12. long star = System.currentTimeMillis();
  13. long sum = 0L;
  14. for (long i = 1; i < SUM ; i++) {
  15. sum += i;
  16. }
  17. long end = System.currentTimeMillis();
  18. System.out.println(sum);
  19. System.out.println("时间:" + (end - star));
  20. System.out.println("----------------------");
  21. }
  22. /**
  23. * 使用ForkJoin 方法
  24. */
  25. public static void test2() throws ExecutionException, InterruptedException {
  26. long star = System.currentTimeMillis();
  27. ForkJoinPool forkJoinPool = new ForkJoinPool();
  28. ForkJoinTask<Long> task = new ForkJoinDemo(0L, SUM);
  29. ForkJoinTask<Long> submit = forkJoinPool.submit(task); //提交任务
  30. Long along = submit.get();
  31. System.out.println(along);
  32. long end = System.currentTimeMillis();
  33. System.out.println("时间:" + (end - star));
  34. System.out.println("-----------");
  35. }
  36. /**
  37. * 使用 Stream并行流计算
  38. */
  39. public static void test3() {
  40. long star = System.currentTimeMillis();
  41. long sum = LongStream.rangeClosed(0L, SUM).parallel().reduce(0, Long::sum);
  42. System.out.println(sum);
  43. long end = System.currentTimeMillis();
  44. System.out.println("时间:" + (end - star));
  45. System.out.println("-----------");
  46. }
  47. }

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号