赞
踩
定义:
用处
BlockingQueue
都一手给你包办好了concurrent包
发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度.架构介绍
①. ArrayBlockingQueue:
由数组结构组成的有界阻塞队列
②.LinkedBlockingQueue:
由链表结构组成的有界(但大小默认值 Integer>MAX_VAL UE)
阻塞队列.
③. SynchronousQueue:
不存储元素的阻塞队列, 也就是单个元素的队列.
public class SynchronousQueueDemo { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "\t put 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName() + "\t put 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName() + "\t put 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } }, "AAA").start(); new Thread(() -> { try { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take()); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take()); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "BBB").start(); } }
④. PriorityBlockingQueue:
支持优先级排序的无界阻塞队列.
⑤. LinkedTransferQueue:
由链表结构组成的无界阻塞队列.
⑥. LinkedBlockingDeque:
由了解结构组成的双向阻塞队列.
例子: 演示阻塞队列满了后的情况
public class BlockingQueueExceptionDemo { public static void main(String[] args) { //List list=new ArrayList(); //注意:这里要给一个初始值 BlockingQueue<String>blockingQueue=new ArrayBlockingQueue<>(3); //add() 方法是给ArrayBlockingQueue添加元素,如果超过会抛出异常! System.out.println(blockingQueue.add("a"));//true System.out.println(blockingQueue.add("b"));//true System.out.println(blockingQueue.add("c"));//true //element是检查元素有没有? 检查的是出栈的元素 blockingQueue.element(); //remove() System.out.println(blockingQueue.remove());//a System.out.println(blockingQueue.remove());//b System.out.println(blockingQueue.remove());//c } } @Test public void offerAndPoll()throws Exception{ BlockingQueue blockingQueue=new ArrayBlockingQueue(3); //前三个直接成功 System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS)); //下面这个会阻塞2s System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS)); }
结果:
前三个直接输出,最后一个会阻塞两秒,再输出。
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务加入队列,然后在线程创建后启动这些任务,
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类,如下图:
线程池的创建
Executor 的接口方法中存在三种创建线程池的方法如下:
Executors.newFixedThreadPool(int)
: 创建定长线程池(一池定线程)
Executors.newSingleThreadExecutor( )
: 一池一线程,保证任务的顺序执行
Executors.newCachedThreadPool( ) :
创建可灵活改变大小的线程,一池N线程
例子如下:
/* //看cpu的核数 //System.out.println(Runtime.getRuntime().availableProcessors()); * 第四种获取/使用java多线程的方式,线程池 * */ public class ExecutorTest { public static void main(String[] args) { //ExecutorService threadPool= Executors.newFixedThreadPool(5);//一池5个处理线程 //ExecutorService threadPool=Executors.newSingleThreadExecutor();//一池一线程 ExecutorService threadPool=Executors.newCachedThreadPool();//一池N线程 try { for (int i = 1; i <= 10; i++) { //使用 threadPool.execute(() -> { //模拟10个用户来办理业务,每个用户就是一个来自外部的请求线程 System.out.println(Thread.currentThread().getName() + "\t 办理业务~!"); }); //try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();} } }catch (Exception e){ }finally { //关闭 threadPool.shutdown(); } } }
一池N线程结果:
pool-1-thread-2 办理业务~!
pool-1-thread-1 办理业务~!
pool-1-thread-1 办理业务~!
pool-1-thread-3 办理业务~!
pool-1-thread-4 办理业务~!
pool-1-thread-5 办理业务~!
pool-1-thread-6 办理业务~!
pool-1-thread-7 办理业务~!
pool-1-thread-9 办理业务~!
pool-1-thread-8 办理业务~!
一池 5线程结果
pool-1-thread-1 办理业务~!
pool-1-thread-2 办理业务~!
pool-1-thread-2 办理业务~!
pool-1-thread-2 办理业务~!
pool-1-thread-2 办理业务~!
pool-1-thread-2 办理业务~!
pool-1-thread-3 办理业务~!
pool-1-thread-1 办理业务~!
pool-1-thread-4 办理业务~!
pool-1-thread-5 办理业务~!
一池一线程结果:
pool-1-thread-1 办理业务~!
pool-1-thread-1 办理业务~!
pool-1-thread-1 办理业务~!
pool-1-thread-1 办理业务~!
pool-1-thread-1 办理业务~!
pool-1-thread-1 办理业务~!
pool-1-thread-1 办理业务~!
pool-1-thread-1 办理业务~!
pool-1-thread-1 办理业务~!
pool-1-thread-1 办理业务~!
①. corePoolSize:
线程池中的常驻核心线程数。
②. maximumPoolSize:
线程池能够容纳同时执行的最大线程数,此值大于等于1
③. keepAliveTime:
多余的空闲线程存活时间
unit:
keepAliveTime的单位⑤. workQueue:
任务队列,被提交但尚未被执行的任务(候客区)
⑥.threadFactory:
表示生成线程池中工作线程的线程工厂,
⑦. handler:
拒绝策略
形象化的理解:
等待队列也已经排满了,再也塞不下新的任务了。同时,线程池的maximumPoolSize也到达了,无法接续为新任务服务,这时我们需要拒绝策略机制合理的处理这个问题。
RejectedException异常
阻止系统正常运行以上内置策略均实现了RejectExecutionHandler接口
答案 :一个都不用,我们生产上只能使用自定义的。
参考阿里巴巴java开发手册
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。 说明:使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
【强制】==线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,==这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下:
Integer.MAX_VALUE
,可能会堆积大量的请求,从而导致OOM。Integer.MAX_VALUE
,可能会创建大量的线程,从而导致OOM。利用 常见线程池,
public ThreadPoolExecutor(int corePoolSize,//允许正在服务线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//最大空闲时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列,等待任务队列
RejectedExecutionHandler handler) {// 拒绝策略
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
例子:
public class MyThreadPoolDemo { public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 1L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(3), Executors.defaultThreadFactory(), //默认抛出异常 //new ThreadPoolExecutor.AbortPolicy() //回退调用者 //new ThreadPoolExecutor.CallerRunsPolicy() //处理不来的不处理,丢弃时间最长的 //new ThreadPoolExecutor.DiscardOldestPolicy() //直接丢弃任务,不予任何处理也不抛出异常 new ThreadPoolExecutor.DiscardPolicy() ); //模拟10个用户来办理业务 没有用户就是来自外部的请求线程. try { for (int i = 1; i <= 10; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t 办理业务"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } //threadPoolInit(); } }
上面代码由于拒绝策略不同,得到的结果也是不同的:
最大不会抛出异常的值= maximumPoolSize + new LinkedBlockin gDeque<Runnable>(3) =8个
。如果超过8个,默认的拒绝策略会抛出异常①. CPU密集型
②. IO密集型
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。