package com.zs.thread; public class TestVolatile { public static void main(String[] args) { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { final int index = i; cachedThreadPool.execute(new Runnable() { @Override public void run() { try { SimpleDateFormat sdf = new SimpleDateFormat( "HH:mm:ss"); System.out.println("运行时间: " + sdf.format(new Date()) + " " + index); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } cachedThreadPool.shutdown(); } }
package com.thread.excutor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; public class FourThreadPoolTest { public static void main(String[] args) { newCachedThreadPool(); } /** * public static ExecutorService newCachedThreadPool() { * return new ThreadPoolExecutor(0, Integer.MAX_VALUE, * 60L, TimeUnit.SECONDS, * new SynchronousQueue<Runnable>()); * } * * 这些池通常会提高执行许多短期异步任务的程序的性能。 * * SynchronousQueue<Runnable>(): * 1、SynchronousQueue是BlockingQueue的一种,所以SynchronousQueue是线程安全的。 * 2、SynchronousQueue 是一个没有数据缓冲的BlockingQueue,容量为0,它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介。 * 生产者线程对其的插入操作put必须等待消费者的移除操作take。 * 3、SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。 * SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉, * 那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。 * Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程, * 如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。 * 4、SynchronousQueue 最大的特点在于,它的容量为0,没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入; * 同理,每次放数据的时候也会阻塞,直到有消费者来取。 * 5、SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。 * 由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。 * 使用的数据结构是链表。使用CAS+自旋(无锁),自旋了一定次数后调用 LockSupport.park()进行阻塞。 * * */ private static void newCachedThreadPool() { ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool(); System.out.println(executorService.getActiveCount()); // sout -> 0 IntStream.range(0, 5).boxed().forEach(item -> executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " [" + item + "]"); /** * pool-1-thread-3 [2] * pool-1-thread-4 [3] * pool-1-thread-1 [0] * pool-1-thread-5 [4] * pool-1-thread-2 [1] */ })); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(executorService.getActiveCount()); // sout -> 5 } /** * 60s之后线程池停止 * 0 * 5 * pool-1-thread-3 [2] * pool-1-thread-4 [3] * pool-1-thread-1 [0] * pool-1-thread-5 [4] * pool-1-thread-2 [1] * * Process finished with exit code 0 */ }
package com.zs.thread; public class TestVolatile { public static void main(String[] args) { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 5; i++) { final int index = i; fixedThreadPool.execute(new Runnable() { @Override public void run() { try { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); System.out.println("运行时间: " + sdf.format(new Date()) + " " + index); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } fixedThreadPool.shutdown(); } }
/** * public static ExecutorService newFixedThreadPool(int nThreads) { * return new ThreadPoolExecutor(nThreads, nThreads, * 0L, TimeUnit.MILLISECONDS, * // Creates a LinkedBlockingQueue with a capacity of Integer.MAX_VALUE. * new LinkedBlockingQueue<Runnable>()); * } */ private static void newFixedThreadPool() { ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); IntStream.range(0, 20).boxed().forEach(item -> executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " [" + item + "]"); })); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(executorService.getActiveCount()); }
10 pool-1-thread-10 [9] pool-1-thread-1 [0] pool-1-thread-4 [3] pool-1-thread-5 [4] pool-1-thread-9 [8] pool-1-thread-6 [5] pool-1-thread-3 [2] pool-1-thread-8 [7] pool-1-thread-2 [1] pool-1-thread-7 [6] 10s之后。。。 pool-1-thread-9 [12] pool-1-thread-4 [11] pool-1-thread-6 [13] pool-1-thread-7 [17] pool-1-thread-5 [18] pool-1-thread-2 [16] pool-1-thread-8 [15] pool-1-thread-1 [10] pool-1-thread-3 [14] pool-1-thread-10 [19] 程序不会停止。。。
从构造方法可以看出,它创建了一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
package com.zs.thread; public class TestVolatile { public static void main(String[] args) { ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { final int index = i; singleThreadExecutor.execute(new Runnable() { @Override public void run() { try { SimpleDateFormat sdf = new SimpleDateFormat( "HH:mm:ss"); System.out.println("运行时间: " + sdf.format(new Date()) + " " + index); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } singleThreadExecutor.shutdown(); } }
/** * public static ExecutorService newSingleThreadExecutor() { * return new FinalizableDelegatedExecutorService * (new ThreadPoolExecutor(1, 1, * 0L, TimeUnit.MILLISECONDS, * new LinkedBlockingQueue<Runnable>())); * } * * */ private static void newSingleThreadExecutor() { ExecutorService executorService = Executors.newSingleThreadExecutor(); IntStream.range(0, 8).boxed().forEach(item -> executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " [" + item + "]" + System.currentTimeMillis()); })); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }
pool-1-thread-1 [0]1669445965474
pool-1-thread-1 [1]1669445967480
pool-1-thread-1 [2]1669445969481
pool-1-thread-1 [3]1669445971487
pool-1-thread-1 [4]1669445973492
pool-1-thread-1 [5]1669445975497
pool-1-thread-1 [6]1669445977500
pool-1-thread-1 [7]1669445979505
Process finished with exit code 130 (interrupted by signal 2: SIGINT)
package com.zs.thread; public class TestVolatile { public static void main(String[] args) { final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3); System.out.println("提交时间: " + sdf.format(new Date())); scheduledThreadPool.schedule(new Runnable() { @Override public void run() { System.out.println("运行时间: " + sdf.format(new Date())); } }, 3, TimeUnit.SECONDS); scheduledThreadPool.shutdown(); } }
package com.zs.thread; public class TestVolatile { public static void main(String[] args) throws InterruptedException { final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3); System.out.println("提交时间: " + sdf.format(new Date())); scheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("运行时间: " + sdf.format(new Date())); } }, 1, 3, TimeUnit.SECONDS); Thread.sleep(10000); scheduledThreadPool.shutdown(); } }
/** * public static ExecutorService newWorkStealingPool() { * return new ForkJoinPool * (Runtime.getRuntime().availableProcessors(), * ForkJoinPool.defaultForkJoinWorkerThreadFactory, * null, true); * } * * 可以传入线程的数量,不传入则默认使用当前计算机中可用的cpu数量,能够合理的使用CPU进行对任务操作(并行操作)。 * 适合使用在很耗时的任务中,底层用的ForkJoinPool 来实现的: * ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”分发到不同的cpu核心上执行,执行完后再把结果收集到一起返回。 */ private static void newWorkStealingPool() { /*Optional.of(Runtime.getRuntime().availableProcessors()) .ifPresent(System.out::println);*/ // 8核CPU SimpleDateFormat sdf = new SimpleDateFormat( "HH:mm:ss"); ExecutorService executorService = Executors.newWorkStealingPool(); List<Callable<String>> callableList = IntStream.range(0, 20).boxed().map(item -> (Callable<String>) () -> { System.out.println("Thread - " + Thread.currentThread().getName() + " - " + sdf.format(new Date())); sleep(2L); return "task-" + item; }).collect(Collectors.toList()); try { // invokeAll的作用是:等待所有的任务执行完成后统一返回。 // 这里与大家分享的是:如果executorService是公共线程池慎用,如果这时候有另外一个请求也不断地往线程池里扔任务,这时候这个请求是不是就一直不停的阻塞了。 executorService.invokeAll(callableList).stream().map(item -> { try { return item.get(); } catch (Exception e) { throw new RuntimeException(e); } }).forEach(System.out::println); } catch (InterruptedException e) { e.printStackTrace(); } }
Thread - ForkJoinPool-1-worker-3 - 15:29:15 Thread - ForkJoinPool-1-worker-1 - 15:29:15 Thread - ForkJoinPool-1-worker-6 - 15:29:15 Thread - ForkJoinPool-1-worker-2 - 15:29:15 Thread - ForkJoinPool-1-worker-4 - 15:29:15 Thread - ForkJoinPool-1-worker-5 - 15:29:15 Thread - ForkJoinPool-1-worker-0 - 15:29:15 Thread - ForkJoinPool-1-worker-7 - 15:29:15 Thread - ForkJoinPool-1-worker-3 - 15:29:17 Thread - ForkJoinPool-1-worker-4 - 15:29:17 Thread - ForkJoinPool-1-worker-0 - 15:29:17 Thread - ForkJoinPool-1-worker-5 - 15:29:17 Thread - ForkJoinPool-1-worker-6 - 15:29:17 Thread - ForkJoinPool-1-worker-7 - 15:29:17 Thread - ForkJoinPool-1-worker-2 - 15:29:17 Thread - ForkJoinPool-1-worker-1 - 15:29:17 Thread - ForkJoinPool-1-worker-0 - 15:29:19 Thread - ForkJoinPool-1-worker-4 - 15:29:19 Thread - ForkJoinPool-1-worker-5 - 15:29:19 Thread - ForkJoinPool-1-worker-1 - 15:29:19 task-0 task-1 task-2 task-3 task-4 task-5 task-6 task-7 task-8 task-9 task-10 task-11 task-12 task-13 task-14 task-15 task-16 task-17 task-18 task-19 Process finished with exit code 0
