赞
踩
线程池的使用
使用线程池管理线程可以最大程度的利用线程,节省资源消耗,它通过利用已有的线程多次循环执行多个任务从而提高系统的处理能力。
我们可以通过java.util.concurrent.ThreadPoolExecutor类来创建线程池,一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。
下面介绍一下里面的一些参数。
1、创建一个线程池需要输入几个参数:
· corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
· runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
o ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
o LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
o SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
o PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
· maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
· ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
· RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
o AbortPolicy:直接抛出异常。
o CallerRunsPolicy:只用调用者所在线程来运行任务。
o DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
o DiscardPolicy:不处理,丢弃掉。
o 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
· keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
· TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),秒(SECONDS),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
2、当一个任务通过execute(Runnable)方法欲添加到线程池时,会有如下几种情况:
· 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
· 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
· 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
· 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
· 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
3、举一个简单的例子:
创建 TestThreadPool 类:
1. importjava.util.concurrent.ArrayBlockingQueue;
2. importjava.util.concurrent.ThreadPoolExecutor;
3. import java.util.concurrent.TimeUnit;
4.
5. public class TestThreadPool {
6.
7. private static int produceTaskSleepTime =2;
8.
9. private static int produceTaskMaxNumber = 2;
10.
11. public static void main(String[] args){
12.
13. // 构造一个线程池
14. ThreadPoolExecutor threadPool = newThreadPoolExecutor(2, 4, 3,
15. TimeUnit.SECONDS, newArrayBlockingQueue<Runnable>(3),
16. newThreadPoolExecutor.DiscardOldestPolicy());
17.
18. for (int i = 1; i <=produceTaskMaxNumber; i++) {
19. try {
20. String task = "task@" + i;
21. System.out.println("创建任务并提交到线程池中:" +task);
22. threadPool.execute(newThreadPoolTask(task));
23.
24. Thread.sleep(produceTaskSleepTime);
25. } catch (Exception e) {
26. e.printStackTrace();
27. }
28. }
29. }
30. }
创建 ThreadPoolTask类:
31. public class ThreadPoolTask implementsRunnable, Serializable {
32.
33. private Object attachData;
34.
35. ThreadPoolTask(Object tasks) {
36. this.attachData = tasks;
37. }
38.
39. public void run() {
40.
41. System.out.println("开始执行任务:" +attachData);
42.
43. attachData = null;
44. }
45.
46. public Object getTask() {
47. return this.attachData;
48. }
49. }
执行结果:
创建任务并提交到线程池中:task@ 1
开始执行任务:task@ 1
创建任务并提交到线程池中:task@ 2
开始执行任务:task@ 2
4、再讲一个实际的例子
这里用的是spring的线程池:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor。
直接上代码:
50. //是否中断导入数据程序
51. publicstatic AtomicBoolean terminateImport = newAtomicBoolean(false);
52. //成功条数
53. public static AtomicLong success_line = newAtomicLong();
54. //失败条数
55. publicstatic AtomicLong error_line = new AtomicLong();
56. //是否发现有线程rejected,如果有,说明线程队列满了,外部执行代码暂时中断队列填充,等待5s后,重新填充.
57. publicstatic AtomicBoolean found_rejected = new AtomicBoolean(false);
58. //开始执行
59. publicvoid startSwitch(int itemsPerPage, String updateMode,String applySeqno, Date startDate,Date endDate) {
60. int nowPage = 1;
61. int exec_sum = 0 ;
62. boolean hasNext = true ;
63. //重置静态值
64. success_line = new AtomicLong(0L);
65. error_line = new AtomicLong(0L);
66. terminateImport = new AtomicBoolean(false);
67.
68. ThreadPoolTaskExecutor threadPoolTaskExecutor= new ThreadPoolTaskExecutor();
69. threadPoolTaskExecutor.setCorePoolSize(100);
70. threadPoolTaskExecutor.setMaxPoolSize(200);
71. threadPoolTaskExecutor.setQueueCapacity(100);
72. threadPoolTaskExecutor.setRejectedExecutionHandler(newRejectedExecutionHandler() {
73. @Override
74. public voidrejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
75. logger.warn("findRejectedExecutionException error !!,will set found_rejected boolean is false");
76. found_rejected.set(true);
77. threadPoolExecutor.execute(runnable);
78. }
79. });
80. //设成true后当调用shutdown()关闭线程池时会去检查任务是否都执行完毕
81. threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
82. threadPoolTaskExecutor.initialize();
83.
84. while (hasNext) {
85. …
86. // applyDTOs是调接口进行分页查询得到的List集合,就是上面”…”省略的部分
87. //如果当次取出的总数比分页数还要小,那么下次进来直接中断掉这个循环
88. if(itemsPerPage>applyDTOs.size()){
89. hasNext = false ;
90. }
91. if(ListUtil.isNotBlank(applyDTOs)){
92. if(found_rejected.get()==false){
93. if(ApplyInfoSync.terminateImport.get()==false){
94. ApplySyncDataThread applySyncDataThread= new ApplySyncDataThread(
95. convert2LoanApplyInfoDOList(applyDTOs),updateMode, loanApplyComponent);
96. threadPoolTaskExecutor.execute(applySyncDataThread);
97. }else{
98. hasNext= false;
99. LogUtil.warn(logger, "主线程发现中断,开始尝试关闭线程池新进入数据");
100. }
101. }else{
102. LogUtil.warn(logger,"执行被拒绝,队列已满了,需要等待5秒");
103. try {
104. TimeUnit.MILLISECONDS.sleep(5000L);
105. found_rejected.set(false);
106. } catch(InterruptedException e) {
107. LogUtil.warn(e,logger,"foundrejected thread,Thread Sleep is error !!");
108. }
109. }
110. }
111. exec_sum= exec_sum+applyDTOs.size();
112. if(hasNext)
113. nowPage++;
114. }
115.
116. while (threadPoolTaskExecutor.getActiveCount()>0){
117. try {
118. logger.warn("threadPool ishave active count ,now sleep...");
119. TimeUnit.MILLISECONDS.sleep(60000L);
120. } catch (InterruptedException e) {
121. logger.warn("Thread Sleepis error !!", e);
122. }
123. }
124. threadPoolTaskExecutor.shutdown();
125.
126. //检查线程池是否完全关闭,否则先等一等再走到日志打印阶段
127. while (threadPoolTaskExecutor.getThreadPoolExecutor().isTerminated()== false) {
128. try {
129. TimeUnit.MILLISECONDS.sleep(6000L);
130. } catch (InterruptedException e) {
131. LogUtil.error(e,logger,"ThreadSleep is error !!");
132. }
133. }
134. …//一些打印日志的操作
135. }
这里其他的不多说了,就说几点使容易看得懂。我这里有两个属性:terminateImport和found_rejected,他们都是AtomicBoolean类型(传入的boolean变量会被转换为volatile的int值,true为1,false为0)的都被static修饰,所以都只有一份且是同步安全的。terminateImport在一开始调用startSwitch时被重置为false,表明没有被中断,当在执行的过程中,有其他的入口调用terminateImport.set(true)后,程序再执行到if(ApplyInfoSync.terminateImport.get()==false)就会是false的,从而达到中断的作用;found_rejected用来标记任务是否被拒绝,经过之前的对线程池的介绍,应该知道了当达到最大线程数,任务队列也满了的时候,再execute的任务会被拒绝,所以我在threadPoolTaskExecutor.setRejectedExecutionHandler(…)设置饱和策略时自己定义实现了一个RejectedExecutionHandler接口来自定义策略,在里面把found_rejected标记改为了true,这样当程序再次执行到if(found_rejected.get()==false)就会是false的,从而可以监视线程和队列是否已经满了,当满了的时候,我就等待5秒,然后把found_rejected标记改为了false就可以继续填充队列了,至于里面为什么还要执行一次threadPoolExecutor.execute(runnable);是因为如果不执行被拒绝的任务,那任务就被抛弃了,当然你也可以做其他操作,例如持久化任务(就是存入数据库)等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。