当前位置:   article > 正文

JAVA中的常用集合框架以及线程池使用_java线程池执行集合

java线程池执行集合
常用集合分类 在这里插入图片描述
Vector和ArrayList
  • 1、Vector线程安全,方法都带有synchronized;
  • 2、如果初始化Vector和ArrayList时,没指定动态数组的大小,则使用默认大小10;
  • 3、Vector容量不足时,若容量增加系数 >0,则将容量的值增加“容量增加系数”;否则,将容量大小增加一倍;
  • 4、ArrayList容量不足,再次添加添加元素时,扩容的大小为原来的1.5倍,minCapacity大于容量1.5倍的值,设置minCapacity大小容量。
HashTable和HashMap
  • 1、HashMap是数组+链表组成的,Hashtable 是一个散列表;
  • 2、Hashtable 的方法都是同步的,它是线程安全;HashMap不是线程安全的,HashMap可以接受为null的键(key)和值(value),HashTable的key-value 不可以有null值。
  • 3、HashMap默认的容量大小是16;增加容量时,长度是原数组的2倍;Hashtable默认的容量大小是11;增加容量时,长度是原数组的2倍+1。
ConcurrentHashMap
  • ConcurrentHashMap(1.7版本 )底层数据结构是Segment数组+链表,(1.8版本)Node数组+链表(红黑树);
  • ConcurrentHashMap(1.7版本 )Segment(ReentrantLock) + HashEntry的方式进行实现,每个段其实就是一个小的Hashtable,修改不同的Segment就可以并发执行;(1.8版本)采用Node + CAS + Synchronized来保证并发安全进行实现;
  • ConcurrentHashMap的默认容量大小是16,调用put方法中initTable()初始化,sizeCtl等于-1正在初始化,等于-N需要扩容,等于正整数代表扩容时的阈值;
  • 没有hash冲突就直接CAS插入,有hash冲突,就加锁来保证线程安全;
  • ConcurrentHashMap中链表转红黑树的阈值是8,红黑树退化成链表的阈值是6;
  • ConcurrentHashMap如果元素数量达到阈值调用transfer扩容,大小为原来长度的2倍【new Node<?,?>[n << 1]】,第一次扩容长度为32,占用5个二进制位;数据重新分布,只需要关注第n位是0还是1,n为扩容后长度的2的n次方,如果是1,那么需要移动到当前元素位置加原数组长度即可,0则元素的位置不变;
CopyOnWriteArrayList
  • 1、CopyOnWriteArrayList是线程安全容器(相对于ArrayList);
  • 2、在 add、remove 操作时会进行加锁,然后复制出来一个新数组,使用的是本地方法 System.arraycopy();进行数组的复制,操作的都是新数组,而此时原数组是可以提供查询的。当操作结束之后,会将对象指针指向新数组;
  • 3、CopyOnWriteArrayList 在读多写少的场景下可以提高效率,而 ArrayList 只是普通数组集合,并不适用于并发场景,CopyOnWriteArrayList 仅能保证最终一致性。因为刚写入的数据,是写到的复制的数组中,此时并不能立即查询到;
SynchronizedList

Collections.synchronizedList(new ArrayList<>());
内部实现,增加一个锁对象【Object mutex】,方法的使用对这个对象加锁Synchronized。

Queue
  • offer():往队列添加元素。如果队列已满直接返回false,队列未满则直接插入并返回true;
  • pool():取出并删除队头的元素,当队列为空,返回null;
  • add():对offer()方法的简单封装。如果队列已满,抛出异常new IllegalStateException(“Queue full”);
  • remove():直接删除队头的元素;
  • peek():直接取出队头的元素,并不删除;
  • element():对peek方法进行简单封装,如果队头元素存在则取出并不删除,如果不存在抛出异常NoSuchElementException();
BlockingQueue
  • put():往队列里插入元素,如果队列已经满,则会一直等待直到队列为空插入新元素,或者线程被中断抛出异常;
  • take():取出并删除队头的元素,当队列为空,则会一直等待直到队列有新元素可以取出,或者线程被中断抛出异常;
非阻塞队列
  • ConcurrentLinkedQueue:基于链表实现的无界非阻塞队列,满足先进先出(FIFO)的原则;无锁设计,用循环CAS来保证线程安全性,所以比使用锁的阻塞队列实现具有更高的性能,不允许插入为null的元素。
阻塞队列
  • ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。底层是Condition条件等待队列实现。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】
  • LinkedBlockingQueue:是一个用链表实现的有界阻塞队列。底层是Condition条件等待队列实现。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出(FIFO)的原则对元素进行排序。
  • PriorityBlockingQueue: 一个支持线程优先级排序的无界阻塞队列,直到系统资源耗尽,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。底层是Condition条件等待队列实现。
  • DelayQueue: 是一个无界阻塞队列,用于放置实现了Delayed接口的对象,只有在延迟期满时才能从中提取元素。该队列是有序的,头部是延迟期满后保存时间最长的元素。
  • SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在(newCachedThreadPool)线程池。
  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相对于其它队列多了transfer和tryTransfer方法。
    采用一种预占模式,意思就是消费者线程取元素时,如果队列不为空,则直接取走数据;若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,生产者调用transfer方法就不会将元素存入队列,而是直接传递给消费者,如果调用transfer方法的生产者发现没有在等待的消费者,则会将元素入队,然后阻塞等待,直到有一个消费者来获取该元素。
Executor 类图

在这里插入图片描述

  • 任务,也就是工作单元,包括被执行任务需要实现的接口:Runnable接口或者Callable接口;
  • 执行器,也就是把任务分派给多个线程的执行机制,包括Executor接口的execute方法及继承自Executor接口的ExecutorService接口的submit以及invokeAll方法;
  • 异步计算的结果,Future接口及实现了RunnableFuture接口的FutureTask类;
ExecutorService
  • Executors创建线程池

    • newSingleThreadPool:创建corePoolSize等于1,maximumPoolSize等于1单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行(LinkedBlockingQueue队列大小是Integer.MAX_VALUE)。
    • newFixedThreadPool:创建corePoolSize固定大小,maximumPoolSize固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大线程数量。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程(LinkedBlockingQueue队列大小是Integer.MAX_VALUE)。
    • newCachedThreadPool:创建一个corePoolSize是0,maximumPoolSize是Integer.MAX_VALUE,可缓存(SynchronousQueue)的线程池。此队列不能存储元素,来一个任务必须有一个线程处理,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
    • newScheduledThreadPool:创建一个corePoolSize固定大小,maximumPoolSize是Integer.MAX_VALUE的线程池。此线程池支持定时以及周期性(DelayedWorkQueue)执行任务的需求。
    • newWorkStealingPool:底层使用的是ForkJoinPool。如果使用默认参数创建的话,使用当前计算机中可用的cpu数量。每个线程都有自己的工作队列,如果当前线程工作完了,它会到别的工作队列中“窃取”任务执行,充分地利用了CPU的多核能力。
  • 简单使用

    public class ExecutorTest {
       	public static void main(String[] args) throws InterruptedException, ExecutionException {
       		ExecutorService e=Executors.newFixedThreadPool(3);
    
       		//执行Runnable任务
       		Future<String > f=e.submit(new Runnable() {
       			public void run() {
       				try {
       					Thread.sleep(2000);
       				} catch (InterruptedException e) {
       					e.printStackTrace();
       				}
       			}
       		},"Runnable end");
       		System.out.println(f.get());
       		
       		//执行Callable任务
       		FutureTask<String> task=(FutureTask<String>) e.submit(new Callable<String>() {
       			@Override
       			public String call() throws Exception {
       				Thread.sleep(3000);
       				return "Callable end";
       			}
       		});
       		System.out.println(task.get());
       		
       		//执行FutureTask任务,FutureTask既是任务也是结果
       		FutureTask<String> task1=new FutureTask<>(()->{
       			Thread.sleep(1000);
       			return "FutureTask end";
       		});
       		new Thread(task1).start();
       		System.out.println(task1.get());
       		e.shutdown();
       	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
FutureTask
  • state状态
    /**
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL          正常
     * NEW -> COMPLETING -> EXCEPTIONAL     异常
     * NEW -> CANCELLED                     取消
     * NEW -> INTERRUPTING -> INTERRUPTED   中断
     */
    private volatile int state; 线程状态
    private static final int NEW          = 0; 新建
    private static final int COMPLETING   = 1; 执行中
    private static final int NORMAL       = 2; 正常
    private static final int EXCEPTIONAL  = 3; 异常
    private static final int CANCELLED    = 4; 取消
    private static final int INTERRUPTING = 5; 中断
    private static final int INTERRUPTED  = 6; 被中断
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
  • 构造方法
    //设置Callable有返回值的任务
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // 新建状态
    }
    //设置Runnable任务,通过Executors构建成Callable有返回值的任务,成功后返回result
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // 新建状态
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
  • run方法
    public void run() {
    	// 如果状态不是NEW ,返回
    	//如果状态为new,则尝试把当前线程赋值过去,如果赋值失败,返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();  //调用callable.call(),执行任务
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);  //抛出异常则设置异常
                }
                if (ran)
                    set(result);  //正常完成则设置对应的完成结果
            }
        } finally {
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)  //任务处于中断中的状态,则进行中断操作
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    protected void set(V v) {//正常返回值设置
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//将状态位设置成中间状态COMPLETING
            outcome = v;//设置输出为正常返回结果
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 将状态更为最终状态NORMAL
            finishCompletion();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
  • get方法
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING) //执行中状态
            s = awaitDone(false, 0L);//等待任务结束
        return report(s); //完成返回
    }
    
    //等待完成,可能是是中断、异常、正常完成,timed:true,考虑等待时长,false:不考虑等待时长
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {  //如果线程已中断,则直接将当前节点q从waiters中移出,并抛出异常
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            if (s > COMPLETING) { //如果state已经是最终状态了,则直接返回state
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // 如果state是中间状态(COMPLETING),意味很快将变更过成最终状态,让出cpu时间片即可
                Thread.yield();
            else if (q == null) //如果发现尚未有节点,则创建节点
                q = new WaitNode();
            else if (!queued) //如果当前等待线程还没有进入等待队列,入列
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) { //线程被阻塞指定时间后再唤醒
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos); // 如果不是超时等待,就超时阻塞
            }
            else //线程一直被阻塞直到被其他线程唤醒
                LockSupport.park(this);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
  • cancel取消方法
    public boolean cancel(boolean mayInterruptIfRunning) {
    	//如果正处于NEW状态,mayInterruptIfRunning是true就设置为INTERRUTPTING,否则直接设置CANCELLED
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) { // 允许运行中进行中断操作
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt(); // 并不是实时取消!
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); //中断成功,则置为最终状态
                }
            }
        } finally {
            finishCompletion(); //执行唤醒所有等待线程操作
        }
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
ThreadPoolExecutor 参数含义

ThreadPoolExecutor中有两个集合 一个是HashSet线程集合workers,另一个是BlockingQueue任务集合workQeuue。

  • 参数定义
    在这里插入图片描述

  • corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;

  • maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;

  • keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;

  • unit:keepAliveTime的单位

  • workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;

  • threadFactory:线程工厂,用于创建线程,一般用默认(DefaultThreadFactory)即可;

  • handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务;

    • ThreadPoolExecutor.AbortPolicy(): 抛出RejectedExecutionException异常,默认策略
    • ThreadPoolExecutor.CallerRunsPolicy(): 会用调用execute函数的上层线程去执行被拒绝的任务
    • ThreadPoolExecutor.DiscardOldestPolicy(): 抛弃最旧的任务,再把这个新任务添加到队列
    • ThreadPoolExecutor.DiscardPolicy(): 抛弃当前的任务
    • 自定义策略实现RejectedExecutionHandler接口,来不及处理的任务保存到MQ
ThreadPoolExecutor线程池状态

Integer 类型是 32 位二进制表示,则其中高 3 位用来表示线程池状态,后面 29 位用来记录线程池线程 个数 。

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • RUNNING(111)
    线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理
    线程池的初始化状态是RUNNING。线程池中的任务数为 0。
  • SHUTDOWN(000)
    线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
    调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
  • STOP(001)
    线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
    调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
  • TIDYING(010)
    当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。
    当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
    当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
  • TERMINATED(011)
    线程池彻底终止,就变成TERMINATED状态。
    线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
ThreadPoolExecutor源码解析
  • 常用变量的解释

    // 1. `ctl`,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    // 2. `COUNT_BITS`,`Integer.SIZE`为32,所以`COUNT_BITS`为29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    
    // 3. `CAPACITY`,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • execute 方法
    核心线程执行任务,加入任务(BlockingQueue)队列,非核心线程执行任务

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //worker数量比核心线程数小,直接创建worker执行任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //worker数量超过核心线程数,直接加入任务队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //线程池状态不是RUNNING,说明执行过shudown,需要对新加入的任务拒绝(reject)操作
            if (! isRunning(recheck) && remove(command))
                reject(command);
             //如果核心线程等于0,添加非核心线程执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果线程池不是RUNNING状态,或者进入队列失败,创建worker执行任务,如果返回值时候false,执行拒绝操作
        else if (!addWorker(command, false))
            reject(command);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
  • addWorker方法
    worker线程数量加1,workers容器中添加线程,启动执行任务

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
       	//线程数量加1	
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    		// 1. 线程池状态大于SHUTDOWN时,直接返回false
        	// 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
       		// 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    		//满足添加线程状态的条件
            for (;;) {
                int wc = workerCountOf(c);
                //worker超过容量直接返回false;
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                 //CAS方式添加worker数量,成功,直接跳出外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
               	//线程池转态发生变化,外层循环自旋,其他情况内层循环自旋
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
            	//worker线程容器添加信息,必须加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //检查线程池状态
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //worker已经调用了start方法,则不再创建worker
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //添加worker到workers容器
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //添加成功,启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
        	// worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
  • Worker->runWorker执行方法

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 调用unlock()是为了让外部可以中断
        w.unlock(); // allow interrupts
        // 这个变量用于判断是否进入过自旋(while循环)
        boolean completedAbruptly = true;
        try {
        	// 这儿是自旋
      	 	// 1. 如果firstTask不为null,则执行firstTask;
        	// 2. 如果firstTask为null,则调用getTask()从队列获取任务。
        	// 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
            while (task != null || (task = getTask()) != null) {
            	// 这儿对worker进行加锁,是为了达到下面的目的
            	// 1. 降低锁范围,提升性能
           	    // 2. 保证每个worker执行的任务是串行的
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
    			// 如果线程池正在停止,则对当前线程进行中断操作
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
               		// 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
            		// 这两个方法在当前类里面为空实现。
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 已完成任务数加一 
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        	
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
ForkJoinPool

jvm提供的一个用于并行执行的任务框架。其主旨是将大任务分成若干小任务,之后再并行对这些小任务进行计算,最终汇总这些任务的结果,得到最终的结果。

  • RecursiveTask代表有返回值的任务
    public class RecursiveTaskTest extends RecursiveTask<Integer>{
    	private static final int THRESHOLD = 10;
    
        private  int start;
    
        private  int end;
        
        public RecursiveTaskTest(int start,int end) {
             this.start = start;
             this.end = end;
    	}
    	protected Integer compute() {
    		if(end - start  < THRESHOLD) {
    			Integer result=0;
                for(int i=start;i<=end;i++) {
                	result+=i;
                }
                return result;
            }else {
                int middle = (start + end) / 2;
                RecursiveTaskTest firstTask = new RecursiveTaskTest(start,middle);
                RecursiveTaskTest secondTask = new RecursiveTaskTest(middle+1,end);
                invokeAll(firstTask,secondTask);
                return firstTask.join()+secondTask.join();
            }
    	}
    	public static void main(String[] args) throws InterruptedException, ExecutionException {
    		ForkJoinPool fjp=new ForkJoinPool();
    		ForkJoinTask<Integer> forkJoinTask=fjp.submit(new RecursiveTaskTest(0, 50));
    		Integer result=forkJoinTask.get();
    		System.out.println(result);
    		fjp.awaitTermination(3, TimeUnit.SECONDS);
    		fjp.shutdown();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
  • RecursiveAction代表没有返回值的任务。
    public class RecursiveActionTest extends RecursiveAction{
    
    	private static final int THRESHOLD = 10;
    
        private  int start;
    
        private  int end;
        
        public RecursiveActionTest(int start,int end) {
             this.start = start;
             this.end = end;
    	}
    	@Override
    	protected void compute() {
    		if(end - start  < THRESHOLD) {
                for(int i=start;i<=end;i++) {
                    System.out.println(Thread.currentThread().getName()+",i="+i);
                }
            }else {
                int middle = (start + end) / 2;
                RecursiveActionTest firstTask = new RecursiveActionTest(start,middle);
                RecursiveActionTest secondTask = new RecursiveActionTest(middle+1,end);
                invokeAll(firstTask,secondTask);
            }
    	}
    	public static void main(String[] args) throws InterruptedException {
    		ForkJoinPool fjp=new ForkJoinPool();
    		fjp.submit(new RecursiveActionTest(0, 50));
    		fjp.awaitTermination(3, TimeUnit.SECONDS);
    		fjp.shutdown();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
  • 工作窃取算法
    • 工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
      假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
    • 工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/535158
推荐阅读
相关标签
  

闽ICP备14008679号