当前位置:   article > 正文

零基础小白都能看懂Java 并发编程详细知识点(synchronized、JUC、线程池)_线程池 synchronized

线程池 synchronized

Java 并发编程

为什么很重要?

并发编程可以充分利用计算机的资源,把计算机的性能发挥到最大,可以最大程度节约公司的成本,提高效率。

什么是高并发

并发和并行的区别

并发 concurrency:多线程“同时”操作同一个资源,并不是真正的同时操作,而是交替操作,单核 CPU 的情况下,资源按照时间段分配给多个线程。张三李四王五使用一口锅炒菜,交替

并行 parallelism:是真正的多个线程同时执行,多喝 CPU,每个线程使用一个 CPU 资源来运行。张三李四王五使用三口锅炒菜,同时进行

并发编程描述的是一种使系统允许多个任务可以在重叠的时间段内执行的设计结构,不是指多个任务在同一时间段内执行,而是指系统具备处理多个任务在同一时间段内同时执行的能力。

高并发是指我们设计的程序,可以支持海量任务的执行在时间段上重叠的情况。

高并发的标准:

  • QPS:每秒钟响应的 HTTP 请求数量,QPS 不是并发数。

  • 吞吐量:单位时间内处理的请求数,由 QPS 和并发数来决定。

  • 平均响应时间:系统对一个请求作出响应的平均时间。

QPS = 并发数 / 平均响应时间

  • 并发用户数:同时承载正常使用系统的用户人数

互联网分布式架构设计,提高系统并发能力的方式:

  • 垂直扩展

  • 水平扩展

垂直扩展

提升单机处理能力

1、提升单机的硬件设备,增加 CPU 核数,升级网卡,硬盘扩容,升级内存

2、提升单机的架构性能,使用缓存 Cache 提高效率,使用异步请求来增加单服务吞吐量,NoSQL 提升数据库访问能力

水平扩展

集群:一个厨师搞不定,多雇几个厨师一起炒菜,多个人干同一件事情。

分布式:给厨师雇两个助手,一个负责洗菜,一个负责切菜,厨师只负责炒菜,一件事情拆分成多个步骤,由不同的人去完成。

站点层扩展:Nginx 反向代理,一个 Tomcat 跑不动,那就 10 个 Tomcat 去跑。

服务层扩展:RPC 框架实现远程调用,Spring Boot/Spring Cloud,Dubbo,分布式架构,将业务逻辑拆分到不同的 RPC Client,各自完成对应的业务,如果某项业务并发量很大,增加新的 RPC Client,就能扩展服务层的性能,做到理论上的无限高并发。

数据层扩展:在数据量很大的情况下,将原来的一台数据库服务器,拆分成多台,以达到扩充系统性能的目的,主从复制,读写分离,分表分库。

JUC

JDK 提供的一个工具包,专门用来帮助开发者完成 Java 并发编程。

进程和线程

Java 默认的线程数 2 个

  • main 主线程

  • GC 垃圾回收机制

Java 本身是无法开启线程的,Java 无法操作硬件,只能通过调用本地方法,C++ 编写的动态函数库。

private native void start0();

Java 中实现多线程有几种方式?

  1. 继承 Thread 类

  2. 实现 Runnable 接口

  3. 实现 Callable 接口

Callable 和 Runnable 的区别在于 Runnable 的 run 方法没有返回值,Callable 的 call 方法有返回值。

  1. package com.bsj.Demo14;
  2. import java.util.concurrent.Callable;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.Future;
  5. import java.util.concurrent.FutureTask;
  6. public class Test {
  7.    public static void main(String[] args) {
  8.        MyCallable myCallable = new MyCallable();
  9.        FutureTask futureTask = new FutureTask(myCallable);
  10.        Thread thread = new Thread(futureTask);
  11.        thread.start();
  12.        try {
  13.            String value = (String) futureTask.get();
  14.            System.out.println(value);
  15.       } catch (InterruptedException e) {
  16.            throw new RuntimeException(e);
  17.       } catch (ExecutionException e) {
  18.            throw new RuntimeException(e);
  19.       }
  20.   }
  21. }
  22. class MyCallable implements Callable<String>{
  23.    @Override
  24.    public String call() throws Exception {
  25.        System.out.println("callable");
  26.        return "hello";
  27.   }
  28. }

sleep 和 wait

sleep 是让当前线程休眠,wait 是让访问当前对象的线程休眠。

sleep 不会释放锁,wait 会释放锁

synchronized 锁定的是什么

  1. synchronized 修饰非静态方法,锁定方法的调用者

  2. synchronized 修饰静态方法,锁定的是类

  3. synchronized 静态方法和实例方法同时存在,静态方法锁定的是类,实例方法锁定的是对象。

Lock

JUC 提供的一种锁机制,功能和 synchronized 类似,是对 synchronized 的升级,它是一个接口。

它的常用实现类是 ReetrantLock。

synchronized 是通过 JVM 实现锁机制,ReentrantLock 是通过 JDK 实现锁机制

synchronized 是一个关键字,ReentrantLock 是一个类。

重入锁:可以给同一个资源添加多把锁

synchronized 是线程执行完毕之后自动释放锁,ReetrantLock 需要手动解锁。

  1. package com.bsj.Demo17;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. public class Test {
  6.    public static void main(String[] args) {
  7.        Ticket ticket = new Ticket();
  8.        new Thread(()->{
  9.            for (int i = 0;i<40;i++){
  10.                ticket.sale();
  11.           }
  12.       },"A").start();
  13.        new Thread(()->{
  14.            for (int i = 0;i<40;i++){
  15.                ticket.sale();
  16.           }
  17.       },"B").start();
  18.   }
  19. }
  20. class Ticket{
  21.    private Integer saleNum = 0;
  22.    private Integer lastNum = 30;
  23.    private Lock lock= new ReentrantLock();
  24.    public void sale(){
  25.        lock.lock();
  26.        lock.lock();
  27.        if(lastNum > 0){
  28.            saleNum++;
  29.            lastNum--;
  30.            try {
  31.                TimeUnit.MILLISECONDS.sleep(500);
  32.           } catch (InterruptedException e) {
  33.                throw new RuntimeException(e);
  34.           }
  35.            lock.unlock();
  36.            lock.unlock();
  37.            System.out.println(Thread.currentThread().getName()+"卖出了第"+saleNum+"张票,剩余"+lastNum+"张票");
  38.       }
  39.   }
  40. }

synchronized 和 Lock 的区别

1、synchronized 自动上锁,自动释放锁,Lock 手动上锁,手动释放锁。

2、synchronized 无法判断是否获取到了锁,Lock 可以判断是否拿到了锁。

3、synchronized 拿不到锁就会一直等待,Lock 不一定会一直等待。

4、synchronized 是 Java 关键字,Lock 是接口。

5、synchronized 是非公平锁,Lock 可以设置是否为公平锁。

公平锁:很公平,排队,当锁没有被占用时,当前线程需要判断队列中是否有其他等待线程。

非公平锁:不公平,插队,当锁没有被占用时,当前线程可以直接占用,而不需要判断当前队列中是否有等待线程。

实际开发中推荐使用 Lock 的方式。

ReetrantLock 具备限时性的特点,可以判断某个线程在一定的时间段内能否获取到锁。使用 tryLock 方法,返回值时 boolean 类型,true 表示可以获取到锁,false 表示无法获取到锁。

  1. package com.bsj.Demo18;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. public class Test {
  6.    public static void main(String[] args) {
  7.        TimeLock timeLock = new TimeLock();
  8.        new Thread(()->{
  9.            timeLock.getLock();
  10.       },"A").start();
  11.        new Thread(()->{
  12.            timeLock.getLock();
  13.       },"B").start();
  14.   }
  15. }
  16. class TimeLock{
  17.    private ReentrantLock lock = new ReentrantLock();
  18.    public void getLock(){
  19.        try {
  20.            if (lock.tryLock(3, TimeUnit.SECONDS)){
  21.                System.out.println(Thread.currentThread().getName()+"拿到了锁");
  22.                TimeUnit.SECONDS.sleep(5);
  23.           }else{
  24.                System.out.println(Thread.currentThread().getName()+"拿不到锁");
  25.           }
  26.       } catch (InterruptedException e) {
  27.            throw new RuntimeException(e);
  28.       }finally {
  29.            if (lock.isHeldByCurrentThread()){
  30.                lock.unlock();
  31.           }
  32.       }
  33.   }
  34. }

ConcurrentModificationException

并发访问异常

  1. package com.bsj.Demo20;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.concurrent.TimeUnit;
  5. public class Test {
  6.    public static void main(String[] args) {
  7.        List<String> list = new ArrayList<>();
  8.        for (int i = 0; i < 10; i++) {
  9.            new Thread(()->{
  10.                try {
  11.                    TimeUnit.MILLISECONDS.sleep(1);
  12.               } catch (InterruptedException e) {
  13.                    throw new RuntimeException(e);
  14.               }
  15.                //写
  16.                list.add("a");
  17.                //读
  18.                System.out.println(list);
  19.           },String.valueOf(i)).start();
  20.       }
  21.   }
  22. }

如何解决?

1、Vector

2、Collections.synchronized

List<String> list = Collections.synchronizedList(new ArrayList<>());

3、JUC:CopyOnWriteArrayList

  1. package com.bsj.Demo20;
  2. import java.util.List;
  3. import java.util.concurrent.CopyOnWriteArrayList;
  4. import java.util.concurrent.TimeUnit;
  5. public class Test2 {
  6.    public static void main(String[] args) {
  7.        List<String> list = new CopyOnWriteArrayList<>();
  8.        for (int i = 0; i < 10; i++) {
  9.            new Thread(()->{
  10.                try {
  11.                    TimeUnit.MILLISECONDS.sleep(10);
  12.               } catch (InterruptedException e) {
  13.                    throw new RuntimeException(e);
  14.               }
  15.                //写
  16.                list.add("a");
  17.                //读
  18.                System.out.println(list);
  19.           }).start();
  20.       }
  21.   }
  22. }

CopyOnWrite 写时复制,当我们往一个容器添加元素的时候,不是直接给容器添加,而是先将当前容器复制一份,向新的容器中添加数据,添加完成之后,再将原容器的引用指向新的容器。

Set

  1. package com.bsj.Demo20;
  2. import java.util.List;
  3. import java.util.Set;
  4. import java.util.concurrent.CopyOnWriteArrayList;
  5. import java.util.concurrent.CopyOnWriteArraySet;
  6. import java.util.concurrent.TimeUnit;
  7. public class Test2 {
  8.    public static void main(String[] args) {
  9.        Set<String> set = new CopyOnWriteArraySet<>();
  10.        for (int i = 0; i < 10; i++) {
  11.            final int temp = i;
  12.            new Thread(()->{
  13.                try {
  14.                    TimeUnit.MILLISECONDS.sleep(10);
  15.               } catch (InterruptedException e) {
  16.                    throw new RuntimeException(e);
  17.               }
  18.                //写
  19.                set.add(String.valueOf(temp)+"a");
  20.                //读
  21.                System.out.println(set);
  22.           }).start();
  23.       }
  24.   }
  25. }

Map

  1. package com.bsj.Demo20;
  2. import java.util.List;
  3. import java.util.Map;
  4. import java.util.Set;
  5. import java.util.UUID;
  6. import java.util.concurrent.ConcurrentHashMap;
  7. import java.util.concurrent.CopyOnWriteArrayList;
  8. import java.util.concurrent.CopyOnWriteArraySet;
  9. import java.util.concurrent.TimeUnit;
  10. public class Test2 {
  11.    public static void main(String[] args) {
  12.        Map<String,String> map = new ConcurrentHashMap<>();
  13.        for (int i = 0; i < 10; i++) {
  14.            final int temp = i;
  15.            new Thread(()->{
  16.                try {
  17.                    TimeUnit.MILLISECONDS.sleep(10);
  18.               } catch (InterruptedException e) {
  19.                    throw new RuntimeException(e);
  20.               }
  21.                map.put(UUID.randomUUID().toString().substring(0,3),UUID.randomUUID().toString().substring(0,2));
  22.                System.out.println(map);
  23.           }).start();
  24.       }
  25.   }
  26. }

JUC 工具类

CountDownLatch:减法计数器

可以用来倒计时,当两个线程同时执行时,如果要确保一个线程优先执行,可以使用计数器,当计数器清零的时候,再让另一个线程执行。

  1. package com.bsj.Demo21;
  2. import java.util.concurrent.CountDownLatch;
  3. public class Test {
  4.    public static void main(String[] args) {
  5.        //创建一个 CountDownLatch
  6.        CountDownLatch countDownLatch = new CountDownLatch(100);
  7.        new Thread(()->{
  8.            for (int i = 0; i < 100; i++) {
  9.                System.out.println("++++++++++Thread");
  10.                countDownLatch.countDown();
  11.           }
  12.       }).start();
  13.        try {
  14.            countDownLatch.await();
  15.       } catch (InterruptedException e) {
  16.            throw new RuntimeException(e);
  17.       }
  18.        for (int i = 0; i < 100; i++) {
  19.            System.out.println("main-------------");
  20.       }
  21.   }
  22. }

countDown():计数器减一

await():计数器停止,唤醒其他线程

new CountDownLatch(100)、countDown()、await() 必须配合起来使用,创建对象的时候赋的值时多少,countDown() 就必须执行多少次,否则计数器是没有清零的,计数器就不会停止,其他线程也无法唤醒,所以必须保证计数器清零,countDown() 的调用次数必须大于构造函数的参数值。

CyclicBarrier:加法计数器

  1. package com.bsj.Demo21;
  2. import java.util.concurrent.BrokenBarrierException;
  3. import java.util.concurrent.CyclicBarrier;
  4. public class CyclicBarrierTest {
  5.    public static void main(String[] args) {
  6.        CyclicBarrier cyclicBarrier = new CyclicBarrier(100,()->{
  7.            System.out.println("放行");
  8.       });
  9.        for (int i = 0; i < 100; i++) {
  10.            final int temp = i;
  11.            new Thread(()->{
  12.                System.out.println("---->"+temp);
  13.                try {
  14.                    cyclicBarrier.await();
  15.               } catch (InterruptedException e) {
  16.                    throw new RuntimeException(e);
  17.               } catch (BrokenBarrierException e) {
  18.                    throw new RuntimeException(e);
  19.               }
  20.           }).start();
  21.       }
  22.   }
  23. }

await():在其他线程中试图唤醒计数器线程,当其他线程的执行次数达到计数器的临界值时,则唤醒计数器线程,并且计数器是可以重复使用的,当计数器线程执行完成一次之后,计数器自动清零,等待下一次执行。

new CyclicBarrier(30),for 执行90次,则计数器的任务会执行 3 次。

Semaphore:计数信号量

实际开发中主要使用它来完成限流操作,限制可以访问某些资源的线程数量。

Semaphore 只有 3 个操作:

  • 初始化

  • 获取许可

  • 释放

  1. package com.bsj.Demo21;
  2. import java.util.concurrent.Semaphore;
  3. import java.util.concurrent.TimeUnit;
  4. public class SemaphoreTest {
  5.    public static void main(String[] args) {
  6.        //初始化
  7.        Semaphore semaphore = new Semaphore(5);
  8.        for (int i = 0; i < 15; i++) {
  9.            new Thread(()->{
  10.                //获得许可
  11.                try {
  12.                    semaphore.acquire();
  13.                    System.out.println(Thread.currentThread().getName()+"进店购物");
  14.                    TimeUnit.SECONDS.sleep(5);
  15.                    System.out.println(Thread.currentThread().getName()+"出店");
  16.               } catch (InterruptedException e) {
  17.                    throw new RuntimeException(e);
  18.               }finally {
  19.                    //释放
  20.                    semaphore.release();
  21.               }
  22.           },String.valueOf(i)).start();
  23.       }
  24.   }
  25. }

每个线程在执行的时候,首先需要去获取信号量,只有获取到资源才可以执行,执行完毕之后需要释放资源,留给下一个线程。

读写锁

接口 ReadWriteLock,实现类是 ReetrantReadWriteLock,可以多线程同时读,但是同一时间内只能有一个线程进行写入操作。

读写锁也是为了实现线程同步,只不过粒度更细,可以分别给读和写的操作设置不同的锁。

  1. package com.bsj.Demo22;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.concurrent.locks.ReadWriteLock;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. import java.util.concurrent.locks.ReentrantReadWriteLock;
  7. public class ReadWriteLockTest {
  8.    public static void main(String[] args) {
  9.        Cache cache = new Cache();
  10.        for (int i = 0; i < 5; i++) {
  11.            final int temp = i;
  12.            new Thread(()->{
  13.                cache.write(temp,String.valueOf(temp));
  14.           }).start();
  15.       }
  16.        for (int i = 0; i < 5; i++) {
  17.            final int temp = i;
  18.            new Thread(()->{
  19.                cache.read(temp);
  20.           }).start();
  21.       }
  22.   }
  23. }
  24. class Cache{
  25.    private ReadWriteLock readWriteLock= new ReentrantReadWriteLock();
  26.    private Map<Integer,String> map = new HashMap<>();
  27.    /**
  28.     * 写操作
  29.     */
  30.    public void write(Integer key,String value){
  31.        readWriteLock.writeLock().lock();
  32.        System.out.println(key+"开始写入");
  33.        map.put(key,value);
  34.        System.out.println(key+"写入完毕");
  35.        readWriteLock.writeLock().unlock();
  36.   }
  37.    /**
  38.     * 读操作
  39.     */
  40.    public void read(Integer key){
  41.        readWriteLock.readLock().lock();
  42.        System.out.println(key+"开始读取");
  43.        map.get(key);
  44.        System.out.println(key+"读取完毕");
  45.        readWriteLock.readLock().unlock();
  46.   }
  47. }

写入锁也叫独占锁,只能被一个线程占用,读取锁也叫共享锁,多个线程可以同时占用。

线程池

池化技术 池化思想

预先创建好一定数量的线程对象,存入缓冲池中,需要用的时候直接从缓冲池中取出,用完之后不要销毁,还回到缓冲池中,为了提高资源的利用率。

优势:

  • 提高线程的利用率

  • 提高响应速度

  • 便于统一管理线程对象

  • 可以控制最大的并发数

线程池的具体设计思想:

  • 核心池的大小

  • 线程池的最大容量

  • 等待队列

  • 拒绝策略

1、线程池初始化的时候创建一定数量的线程对象。

2、如果缓冲池中没有空闲的线程对象,则新来的任务进入等待队列。

3、如果缓冲池中没有空闲的线程对象,等待队列也已经填满,可以申请再创建一定数量的新线程对象,直到达到线程池的最大值,这时候如果还有新的任务进来,只能选择拒绝。

无论哪种线程池,都是工具类 Executors 封装的,底层代码都一样,都是通过创建 ThreadPoolExecutor 对象来完成线程池的构建

核心参数有 7 个

  1. public ThreadPoolExecutor(int corePoolSize,
  2.                          int maximumPoolSize,
  3.                          long keepAliveTime,
  4.                          TimeUnit unit,
  5.                          BlockingQueue<Runnable> workQueue,
  6.                          ThreadFactory threadFactory,
  7.                          RejectedExecutionHandler handler) {
  8.    if (corePoolSize < 0 ||
  9.        maximumPoolSize <= 0 ||
  10.        maximumPoolSize < corePoolSize ||
  11.        keepAliveTime < 0)
  12.        throw new IllegalArgumentException();
  13.    if (workQueue == null || threadFactory == null || handler == null)
  14.        throw new NullPointerException();
  15.    this.corePoolSize = corePoolSize;
  16.    this.maximumPoolSize = maximumPoolSize;
  17.    this.workQueue = workQueue;
  18.    this.keepAliveTime = unit.toNanos(keepAliveTime);
  19.    this.threadFactory = threadFactory;
  20.    this.handler = handler;
  21.    String name = Objects.toIdentityString(this);
  22.    this.container = SharedThreadContainer.create(name);
  23. }
  • corePoolSize:核心池大小,初始化的线程数量

  • maximumPoolSize:线程池的最大线程数,它决定了线程池容量的上限

corePoolSize 就是线程池的大小,maximumPoolSize 是一种补救措施,任务量突然增大的时候的一种补救措施。

  • keepAliveTime:线程对象的存活时间

  • unit:线程对象存活时间单位

  • workQueue:等待队列,存储等待执行的任务

  • threadFactory:线程工厂,用来创建线程对象

  • handler:拒绝策略

拒绝策略:

1、AbortPolicy:直接抛出异常

2、DiscardPolicy:放弃任务,不抛出异常

3、DiscardoldestPolicy:尝试与等待队列中最前面的任务去争夺,不抛出异常

4、CallerRunsPolicy:谁调用谁处理

线程池运行流程:

线程池启动的时候会按照核心池的数量来创建初始化的线程对象 2 个。

开始分配任务,如果同时来了多个任务,2 个线程对象都被占用了,第 3 个以及之后的任务进入等待队列,当有线程完成任务恢复空闲状态的时候,等待队列中的任务获取线程对象。

如果等待队列也占满了,又有新的任务进来,需要去协调,让线程池再创建新的线程对象,但是线程池不可能无限创建线程对象,一定会有一个最大上限,就是线程池的最大容量。

如果线程池已经达到了最大上限,并且等待队列也占满了,此时如果有新的任务进来,只能选择拒绝,并且需要根据拒绝策略来选择对应的方案。

单例 (只有 1 个线程对象)

 固定

 缓存

  1. package com.bsj.Demo24;
  2. import javax.swing.*;
  3. import java.util.concurrent.*;
  4. public class Test {
  5.    public static void main(String[] args) {
  6.        ExecutorService executorService = null;
  7.        try {
  8.            /**
  9.             * 自己写7大参数
  10.             */
  11.            executorService = new ThreadPoolExecutor(
  12.                    2,
  13.                    3,
  14.                    1L,
  15.                    TimeUnit.SECONDS,
  16.                    new ArrayBlockingQueue<>(2),
  17.                    Executors.defaultThreadFactory(),
  18.                    new ThreadPoolExecutor.AbortPolicy()
  19.           );
  20.            for (int i = 0; i < 6; i++) {
  21.                executorService.execute(()->{
  22.                    try {
  23.                        TimeUnit.MILLISECONDS.sleep(100);
  24.                   } catch (InterruptedException e) {
  25.                        throw new RuntimeException(e);
  26.                   }
  27.                    System.out.println(Thread.currentThread().getName()+"====>办理业务");
  28.               });
  29.           }
  30.       }catch (Exception e){
  31.            e.printStackTrace();
  32.       }finally {
  33.            executorService.shutdown();
  34.       }
  35.   }
  36. }

 new ThreadPoolExecutor.DiscardPolicy()

new ThreadPoolExecutor.DiscardOldestPolicy()

不会抛出异常

线程池 3 大考点:

1、Executors 工具类的 3 种实现

  1. ExecutorService executorService = Executors.newSingleThreadExecutor();
  2. ExecutorService executorService = Executors.newFixedThreadPool(5);
  3. ExecutorService executorService = Executors.newCachedThreadPool();

2、7 个参数

  1. corePoolSize:核心池大小,初始化的线程数量
  2. maximumPoolSize:线程池的最大线程数,它决定了线程池容量的上限
  3. corePoolSize 就是线程池的大小,maximumPoolSize 是一种补救措施,任务量突然增大的时候的一种补救措施。
  4. keepAliveTime:线程对象的存活时间
  5. unit:线程对象存活时间单位
  6. workQueue:等待队列,存储等待执行的任务
  7. threadFactory:线程工厂,用来创建线程对象
  8. handler:拒绝策略

3、4 种拒绝策略

  1. 1、AbortPolicy:直接抛出异常
  2. 2、DiscardPolicy:放弃任务,不抛出异常
  3. 3、DiscardoldestPolicy:尝试与等待队列中最前面的任务去争夺,不抛出异常
  4. 4、CallerRunsPolicy:谁调用谁处理

ForkJoin 框架

ForkJoin 是 JDK 1.7 后发布的多线程并发处理框架,功能上和 JUC 类似,JUC 更多时候是使用单个类完成操作,ForkJoin 使用多个类同时完成某项工作,处理上比 JUC 更加丰富,实际开发中使用的场景并不是很多,互联网公司真正有高并发需求的情况才会使用,面试时候会加分

本质上是对线程池的一种的补充,对线程池功能的一种扩展,基于线程池,它的核心思想就是将一个大型的任务拆分成很多个小任务,分别执行,最终将小任务的结果进行汇总,生成最终的结果。

ForkJoin 框架,核心是两个类

  • ForkJoinTask(描述任务)

  • ForkJoinPool(线程池)提供多线程并发工作窃取。

使用 ForkJoinTask 最重要的就是要搞清楚如何拆分任务,这里用的是递归思想。

1、需要创建一个 ForkJoinTask 任务,ForkJoinTask 是一个抽象类,不能直接创建 ForkJoinTask 的实例化对象,开发者需要自定义一个类,继承 FrokJoinTask 的子类 RecursiveTask,RecursiveTask 就是递归的意思,该类就提供了实现递归的功能。

  1. package com.bsj.Demo25;
  2. import java.util.concurrent.RecursiveTask;
  3. /**
  4. * 10亿求和
  5. */
  6. public class ForkJoinDemo extends RecursiveTask<Long> {
  7.    private Long start;
  8.    private Long end;
  9.    private Long temp = 100_0000L;
  10.    public ForkJoinDemo(Long start, Long end) {
  11.        this.start = start;
  12.        this.end = end;
  13.   }
  14.    @Override
  15.    protected Long compute() {
  16.        if ((end - start) < temp){
  17.            Long sum = 0L;
  18.            for (Long i = start; i <= end; i++) {
  19.                sum += i;
  20.           }
  21.            return sum;
  22.       }else{
  23.            Long avg = (start + end)/2;
  24.            ForkJoinDemo task1 = new ForkJoinDemo(start,avg);
  25.            task1.fork();
  26.            ForkJoinDemo task2 = new ForkJoinDemo(avg,end);
  27.            task2.fork();
  28.            return task1.join()+task2.join();
  29.       }
  30.   }
  31. }
  1. package com.bsj.Demo25;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.ForkJoinPool;
  4. import java.util.concurrent.ForkJoinTask;
  5. public class Test {
  6.    public static void main(String[] args) {
  7.        Long startTime = System.currentTimeMillis();
  8.        ForkJoinPool forkJoinPool = new ForkJoinPool();
  9.        ForkJoinTask<Long> task = new ForkJoinDemo(0L,10_0000_0000L);
  10.        forkJoinPool.execute(task);
  11.        Long sum = 0L;
  12.        try {
  13.            sum = task.get();
  14.       } catch (InterruptedException e) {
  15.            throw new RuntimeException(e);
  16.       } catch (ExecutionException e) {
  17.            throw new RuntimeException(e);
  18.       }
  19.        Long endTime = System.currentTimeMillis();
  20.        System.out.println(sum+",供耗时"+(endTime-startTime));
  21.   }
  22. }

Volatile 关键字

Volatile 是 JVM 提供的轻量级的同步机制,可见性,主内存对线程可见

可见性:指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。

一个线程执行完任务之后,会把变量存回到主内存中,并且从主内存中读取当前最新的值,如果是一个空的任务,则不会重新读取主内存中的值。

  1. package com.bsj.Demo26;
  2. import java.sql.Time;
  3. import java.util.concurrent.TimeUnit;
  4. public class Test {
  5.    private static volatile int num = 0;
  6.    public static void main(String[] args) {
  7.        /**
  8.         * 循环
  9.         */
  10.        new Thread(()->{
  11.            while(num == 0){
  12.                System.out.println("------Thread-----");
  13.           }
  14.       }).start();
  15.        try {
  16.            TimeUnit.SECONDS.sleep(1);
  17.       } catch (InterruptedException e) {
  18.            throw new RuntimeException(e);
  19.       }
  20.        num = 1;
  21.        System.out.println(num);
  22.   }
  23. }
  1. package com.bsj.Demo26;
  2. import java.sql.Time;
  3. import java.util.concurrent.TimeUnit;
  4. public class Test {
  5.    private static int num = 0;
  6.    public static void main(String[] args) {
  7.        /**
  8.         * 循环
  9.         */
  10.        new Thread(()->{
  11.            while(num == 0){
  12.                System.out.println("------Thread-----");
  13.           }
  14.       }).start();
  15.        try {
  16.            TimeUnit.SECONDS.sleep(1);
  17.       } catch (InterruptedException e) {
  18.            throw new RuntimeException(e);
  19.       }
  20.        num = 1;
  21.        System.out.println(num);
  22.   }
  23. }

线程池 workQueue

一个阻塞队列,用来存储等待执行的任务,常用的阻塞队列有几下几种:

  • ArrayBlockingQueue:基于数组的先进先出队列,创建时必须指定大小。

  • LinkedBlockingQueue:基于链表的先进先出队列,创建时可以不指定大小,默认值是Integer.MAX_VALUE。

  • SynchronousQueue:它不会保存提交的任务,而是直接新建一个线程来执行新来的任务。

  • PriorityBlockQueue:具有优先级的阻塞队列。

递归

二叉树遍历,深度优先搜索等

什么是递归?

常规的定义:编程语言中,函数 func 直接或间接调用函数本身,则该函数称为递归函数。

问前排人是第几排 -> 函数

所有的递归问题都可以用递推公式来表示,所以要用递归解决问题,关键就是先找到递推公式。

  1. f(n) = f(n-1)+1
  2. f(1) = 1

f(n) 表示你当前是第几排,f(n-1) 前面一排所在的排数,f(1) = 1 表示第一排的人知道自己是第一排。

  1. int f(int n){
  2. if(n==1) return 1;
  3. return f(n-1)+1;
  4. }

递归需要满足 3 要素:

1、一个父问题可以拆分若干个子问题,并且若干个子问题的结果汇总起来就是父问题的答案。

2、父问题和子问题,解题思路必须完全一致,只是数据规模不同。

3、存在终止条件。

问题再不断拆分的同时,一定要在某个节点终止拆分,得到一个明确的答案。

问题:假设有 n 个台阶,每次可以跨 1 个台阶或者 2 个台阶,请问走完这 n 个台阶一共有多少种走法?

1、假设有 1 个台阶,一共有(1)种走法

2、假设有 2 个台阶,一共有(2)种走法【1,1】【2】

3、假设有 3 个台阶,一共有(3)种走法【1,1,1】【1,2】【2,1】

......

可以根据第一步的走法进行分类

第一类是一步走了 1 个台阶

第二类是一步走了 2 个台阶

所以 n 个台阶的走法就等于先走 1 个台阶后,n-1 个台阶的走法 + 先走 2 个台阶后,n-2 个台阶的走法。

f(n) = f(n-1) + f(n-2)

f(1) = 1,能否作为终止条件?

n = 2,f(2) = f(1) + f(0),如果说终止条件只有一个 f(1) = 1,f(2) 就无法求解,因为f(0) 的值无法确定

把 f(2) = 2 作为一个终止条件

终止条件有两个:

  1. f(1) = 1;
  2. f(2) = 2;
  3. n = 3,f(3) = f(2) + f(1) = 3;
  4. n = 4,f(4) = f(3) + f(2) = 3 + 2 =5

递推公式

  1. f(1) = 1;
  2. f(2) = 2;
  3. f(n) = f(n-1) + f(n-2);

推导出递归代码

  1. int f(int n){
  2. if(n == 1) return 1;
  3. if(n == 2) return 2;
  4. return f(n-1)+f(n-2);
  5. }
  1. package com.bsj.Demo27;
  2. public class Test {
  3.    public static void main(String[] args) {
  4.        for (int i = 1; i <= 10; i++) {
  5.            System.out.println(i+"个台阶共有"+f(i)+"种走法");
  6.       }
  7.   }
  8.    public static int f(int n){
  9.        if (n == 1) return 1;
  10.        if (n == 2) return 2;
  11.        return f(n-1)+f(n-2);
  12.   }
  13. }
  1. package com.bsj.Demo27;
  2. public class Test {
  3.    public static void main(String[] args) {
  4.        for (int i = 1; i <= 10; i++) {
  5.            System.out.println(i+"个台阶共有"+f(i)+"种走法");
  6.       }
  7.   }
  8.    public static int f(int n){
  9.        if (n == 1) return 1;
  10.        if (n == 2) return 2;
  11.        return f(n-1)+f(n-2);
  12.   }
  13. }

总结

Java并发编程是指在多线程环境下编写程序的能力,这是Java程序员必备的技能之一。下面是Java并发编程的总结:

  1. 理解并发编程的概念和概述,掌握Java中线程、锁、并发包、线程池等基础知识。

  2. 掌握线程安全(Thread Safety)的概念和实现方式,避免多线程竞争引起的安全问题。

  3. 理解synchronized关键字和Lock接口的使用,避免多线程访问共享变量时的数据竞争问题。

  4. 掌握Java Concurrency API中的基本工具类,如CountDownLatch、CyclicBarrier、Semaphore、Exchanger等,应对多线程编程中常见的问题。

  5. 了解Java中的线程池,掌握线程池的概念、工作原理、线程池组成、线程池的参数配置等,实现高效的多线程任务管理和任务处理。

  6. 掌握Java中的并发集合类,如ConcurrentHashMap、CopyOnWriteArrayList等,有效解决多线程并发访问集合对象时出现的问题,提高了程序的性能。

  7. 了解Java中的原子类型、volatile关键字的使用,可以保证数据的可见性和原子性,避免竞争条件引起的问题。

  8. 熟悉Java中的Callable、Future、CompletableFuture等API,实现多线程任务的异步处理,提高程序的性能。

  9. 掌握Java中的线程调度机制和线程优先级,合理地安排线程的优先级,保证程序的稳定性和高效性。

  10. 了解Java中的线程状态、线程状态改变的原因,可以及时地对线程进行调试和问题处理。

总之,Java并发编程是Java程序员必须掌握的技能之一,只有深入理解并掌握Java中的并发编程,才能更好地提升程序的性能和可靠性。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/926113
推荐阅读
相关标签
  

闽ICP备14008679号