当前位置:   article > 正文

线程池池使用进阶篇(任务调度优化)_pool.map put(task)

pool.map put(task)

目录

一、优化方案流程

二、方案的关键点

三、方案的优缺点


线程池原生任务调度执行策略:任务队列先进先出的模式,也就是先放到线程池中的任务先被执行。这种模式存在一个弊端就是无法对任务的优先级进行调度。

一、优化方案流程

针对这个问题,我这边自行实现了一个调度方案,调度方案包括:

1.线程池线程任务调度交给其中一个核心线程,核心线程根据线程任务优先级进行任务调度;

2.低优先级任务调度是不能占满线程池,确保高优先级任务来的时候可以及时响应处理。

方案的大致流程示意图:

线程池优先级调用方案

因方案主要对线程任务调度的优化,所以流程图对线程任务调度到线程池之前的流程进行详细绘制并说明(对线程池内部线程调度流程及流程暂时忽略)。

这里提两点:

1.任务调度方案实现了任务拒绝策略,当线程池任务队列满,任务拒绝策略是将任务重新放回二级任务队列。

2.任务调度根据线程池任务队列是否满为其中一个调度调节,可以有效避免因为线程池任务满导致的任务被拒绝的情况。

二、方案的关键点

1.外部任务进来是,先放到二级任务队列,该队列根据任务优先级及任务名称进行区分,保证不同优先级的任务分别排在不同的任务队列中;

  1. /**
  2. * 普通线程池任务队列-二级队列
  3. */
  4. private HashMap<Integer, HashMap<String, LinkedBlockingQueue<Runnable>>> mTaskMapQueue =
  5. new HashMap<Integer, HashMap<String, LinkedBlockingQueue<Runnable>>>();

同优先级任务根据任务名称,进行队列存储,这个设计主要是为了扩展线程池根据线程任务名进行同等级任务进行调度。

2.调度线程作为核心线程,可以根据二级任务队列中任务优先级以及线程池任务队列情况进行任务调度;

调度线程是一个死循环线程,复杂循环调度任务,调度过程如下:

当线程池队列满,暂时不进行任务调度,直到线程池任务队列有排队空间;当线程池任务队列未满,遍历二级任务队列,取高优先级任务,如果有高优先级任务,取一个任务到线程池中,结束本轮循环调度;如无高优先级任务,遍历低优先级任务,如果此时线程池空闲线程小于一定数量N,暂时不调度低优先级任务,结束本地循环调度;如果线程池空闲线程大于一定数量N,取一个任务到线程池中,结束本轮循环调度.完成一轮循环调度立即进行下一轮循环调度。

三、代码实现

代码实现:

  1. package com.yx.demo;
  2. import java.lang.ref.WeakReference;
  3. import java.util.HashMap;
  4. import java.util.Iterator;
  5. import java.util.LinkedList;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Set;
  9. import java.util.WeakHashMap;
  10. import java.util.concurrent.Future;
  11. import java.util.concurrent.LinkedBlockingQueue;
  12. import java.util.concurrent.RejectedExecutionHandler;
  13. import java.util.concurrent.ThreadFactory;
  14. import java.util.concurrent.ThreadPoolExecutor;
  15. import java.util.concurrent.TimeUnit;
  16. import java.util.concurrent.atomic.AtomicInteger;
  17. /**
  18. * ThreadPoolManager
  19. *
  20. * @author yx
  21. * @date 2019/5/17 13:57
  22. */
  23. public class ThreadPoolManager {
  24. private static final String THREAD_NAME_COMMON = "threadPool-common";
  25. /**
  26. * 普通线程池
  27. */
  28. private ThreadPoolExecutor mPoolExecutor;
  29. /**
  30. * 普通线程池任务队列-二级队列
  31. */
  32. private HashMap<Integer, HashMap<String, LinkedBlockingQueue<Runnable>>> mTaskMapQueue =
  33. new HashMap<Integer, HashMap<String, LinkedBlockingQueue<Runnable>>>();
  34. /**
  35. * 任务提交到线程池的任务集合
  36. */
  37. private Map<String, List<WeakReference<Future<?>>>> mTaskMap;
  38. /**
  39. * 单例
  40. */
  41. private static ThreadPoolManager instance = null;
  42. /**
  43. * 普通线程池核心线程池的数量,同时能够执行的线程数量
  44. */
  45. private int mCommonCorePoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;
  46. /**
  47. * 非核心线程数
  48. */
  49. private int mNonCorePoolSize = 10;
  50. /**
  51. * 最大线程池数量
  52. */
  53. private int mMaximumPoolSize = mCommonCorePoolSize + mNonCorePoolSize;
  54. /**
  55. * 低优先级任务不占用线程数
  56. * mMaximumPoolSize > mCommonCorePoolSize
  57. */
  58. private int mSpareThreadSize = 5;
  59. /**
  60. * 存活时间,根据各种线程任务执行超时时间评估 (网络重连任务 超时)
  61. */
  62. private long mKeepAliveTime = 30;
  63. /**
  64. * 线程池任务队列
  65. */
  66. private final int MAX_QUEUE_LENGTH = 200;
  67. private long mRejectCount = 0;
  68. private LinkedBlockingQueue<Runnable> mQueue =
  69. new LinkedBlockingQueue<Runnable>(MAX_QUEUE_LENGTH);
  70. /**
  71. * 创建一个新的实例 ThreadPoolManager
  72. */
  73. private ThreadPoolManager() {
  74. mPoolExecutor = new ThreadPoolExecutor(
  75. mCommonCorePoolSize,
  76. mMaximumPoolSize,
  77. mKeepAliveTime,
  78. TimeUnit.SECONDS,
  79. //缓冲队列,用于存放等待任务,Linked的先进先出
  80. mQueue,
  81. //创建线程的工厂
  82. new ThreadFactory() {
  83. private final AtomicInteger mCount = new AtomicInteger(1);
  84. @Override
  85. public Thread newThread(Runnable r) {
  86. return new Thread(r, THREAD_NAME_COMMON + "#" + mCount.getAndIncrement());
  87. }
  88. }
  89. );
  90. // 用来对超出maximumPoolSize的任务的处理策略
  91. mPoolExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
  92. @Override
  93. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  94. mRejectCount++;
  95. System.out.println("ThreadPoolManager rejectedExecution happaned mRejectCount=" +
  96. mRejectCount);
  97. ThreadPoolManager.getInstance().addCommonTask("", r, Thread.NORM_PRIORITY);
  98. }
  99. });
  100. mTaskMap = new WeakHashMap<String, List<WeakReference<Future<?>>>>();
  101. mPoolExecutor.execute(new Runnable() {
  102. @Override
  103. public void run() {
  104. while (true) {
  105. try {
  106. int available = mMaximumPoolSize - mPoolExecutor.getActiveCount();
  107. // 线程池队列满了不调度
  108. if (mQueue.size() >= MAX_QUEUE_LENGTH) {
  109. continue;
  110. }
  111. // 调度策略 根据任务的优先级进行调度,高优先级任务优先调度
  112. for (Integer p = Thread.MAX_PRIORITY; p > Thread.MIN_PRIORITY; p--) {
  113. // 如果优先级小于5的线程(低优先级)来了,空闲线程太少,不调度
  114. if (p < Thread.NORM_PRIORITY &&
  115. available < mSpareThreadSize) {
  116. System.out
  117. .println("available = " + available + " mMaximumPoolSize=" +
  118. mMaximumPoolSize);
  119. break;
  120. }
  121. HashMap<String, LinkedBlockingQueue<Runnable>> mapQueue =
  122. mTaskMapQueue.get(p);
  123. if (mapQueue == null) {
  124. continue;
  125. }
  126. if (!mapQueue.isEmpty()) {
  127. Set<String> taskKeySet = mapQueue.keySet();
  128. Iterator<String> subIterator = taskKeySet.iterator();
  129. if (subIterator.hasNext()) {
  130. String taskName = subIterator.next();
  131. LinkedBlockingQueue<Runnable> queue = mapQueue.get(taskName);
  132. if (!queue.isEmpty()) {
  133. try {
  134. Runnable runnable = queue.take();
  135. ThreadPoolManager.getInstance()
  136. .submitTask(runnable, taskName);
  137. // 正常调度了一个任务,重新遍历寻找高优先级任务
  138. break;
  139. } catch (InterruptedException e) {
  140. e.printStackTrace();
  141. break;
  142. }
  143. }
  144. }
  145. }
  146. // 没有找到任务,继续找任务
  147. }
  148. } catch (Exception e) {
  149. e.printStackTrace();
  150. }
  151. }
  152. }
  153. });
  154. }
  155. /**
  156. * 根据优先级加入二级队列
  157. *
  158. * @param taskName 任务名称
  159. * @param task 任务
  160. * @param priority 任务优先级
  161. * @throws NullPointerException
  162. */
  163. public synchronized void addCommonTask(String taskName, Runnable task, int priority) throws
  164. NullPointerException {
  165. if (priority > Thread.MAX_PRIORITY || priority < Thread.MIN_PRIORITY) {
  166. throw new IllegalArgumentException(
  167. "priority must be between " + Thread.MIN_PRIORITY + " and " +
  168. Thread.MAX_PRIORITY);
  169. }
  170. if (taskName == null) {
  171. throw new NullPointerException("taskName can not be null");
  172. }
  173. HashMap<String, LinkedBlockingQueue<Runnable>> mapQueue = mTaskMapQueue.get(priority);
  174. if (mapQueue == null) {
  175. mapQueue = new HashMap<String, LinkedBlockingQueue<Runnable>>(16);
  176. LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
  177. mapQueue.put(taskName, queue);
  178. }
  179. LinkedBlockingQueue<Runnable> queue = mapQueue.get(taskName);
  180. queue.add(task);
  181. mTaskMapQueue.put(priority, mapQueue);
  182. }
  183. /**
  184. * 获取ThreadPoolManager单例对象
  185. */
  186. public static ThreadPoolManager getInstance() {
  187. if (instance == null) {
  188. synchronized (ThreadPoolManager.class) {
  189. if (instance == null) {
  190. instance = new ThreadPoolManager();
  191. }
  192. }
  193. }
  194. return instance;
  195. }
  196. /**
  197. * 释放MinaThreadPoolManager中线程资源
  198. * 主要用于退出应用进程前,销毁线程池
  199. */
  200. public void release() {
  201. synchronized (ThreadPoolManager.class) {
  202. if (instance != null) {
  203. instance.cancelAllTaskThreads();
  204. }
  205. mPoolExecutor.shutdownNow();
  206. instance = null;
  207. }
  208. }
  209. /**
  210. * 将线程任务放到二级任务队列
  211. *
  212. * @param task 任务线程
  213. * @param name 任务名字
  214. */
  215. public void startTaskThread(Runnable task, String name) {
  216. if (task != null && name != null) {
  217. addCommonTask(name, task, Thread.NORM_PRIORITY);
  218. }
  219. }
  220. /**
  221. * 将线程任务放到二级任务队列
  222. *
  223. * @param task 任务线程
  224. * @param name 任务名字
  225. */
  226. public void startTaskThread(Runnable task, String name, int priority) {
  227. if (task != null && name != null) {
  228. addCommonTask(name, task, priority);
  229. }
  230. }
  231. /**
  232. * 开启线程
  233. *
  234. * @param task 任务线程
  235. * @param name 任务名字
  236. */
  237. public void submitTask(Runnable task, String name) {
  238. if (task != null && name != null) {
  239. Future<?> request = mPoolExecutor.submit(task);
  240. addTask(request, name);
  241. printPoolExecutorInfo();
  242. }
  243. }
  244. /**
  245. * 开启线程
  246. *
  247. * @param task 任务线程
  248. */
  249. @Deprecated
  250. public void executeTaskThread(Thread task) {
  251. String taskName = task.getName();
  252. System.out.println("ThreadPoolManager executeTaskThread task name = " + taskName);
  253. printPoolExecutorInfo();
  254. mPoolExecutor.execute(task);
  255. }
  256. /**
  257. * 结束线程
  258. *
  259. * @param task 任务线程
  260. */
  261. public void stopTaskThread(Thread task) {
  262. stopTaskThread(task.getName());
  263. }
  264. /**
  265. * 结束线程
  266. *
  267. * @param taskTag 任务线程
  268. */
  269. public void stopTaskThread(String taskTag) {
  270. cancelTaskThreads(taskTag);
  271. }
  272. /**
  273. * 添加执行任务到队列中
  274. *
  275. * @param request
  276. */
  277. private void addTask(Future<?> request, String taskTag) {
  278. synchronized (ThreadPoolManager.class) {
  279. if (taskTag != null) {
  280. List<WeakReference<Future<?>>> requestList = mTaskMap.get(taskTag);
  281. if (requestList == null) {
  282. requestList = new LinkedList<WeakReference<Future<?>>>();
  283. mTaskMap.put(taskTag, requestList);
  284. }
  285. requestList.add(new WeakReference<Future<?>>(request));
  286. }
  287. }
  288. }
  289. /**
  290. * 取消所有的任务
  291. */
  292. public void cancelAllTaskThreads() {
  293. for (String clsName : mTaskMap.keySet()) {
  294. List<WeakReference<Future<?>>> requestList = mTaskMap.get(clsName);
  295. if (requestList != null) {
  296. Iterator<WeakReference<Future<?>>> iterator = requestList.iterator();
  297. while (iterator.hasNext()) {
  298. Future<?> request = iterator.next().get();
  299. if (request != null) {
  300. request.cancel(true);
  301. }
  302. }
  303. }
  304. }
  305. mTaskMap.clear();
  306. }
  307. /**
  308. * 根据特定任务名称取消任务
  309. */
  310. private void cancelTaskThreads(String taskName) {
  311. System.out.println("ThreadPoolManager cancelTaskThreads task name = " + taskName);
  312. // 二级队列
  313. for (Integer p = Thread.MAX_PRIORITY; p > Thread.MIN_PRIORITY; p--) {
  314. HashMap<String, LinkedBlockingQueue<Runnable>> mapQueue =
  315. mTaskMapQueue.get(p);
  316. if (mapQueue != null) {
  317. LinkedBlockingQueue<Runnable> queue = mapQueue.get(taskName);
  318. if (queue != null) {
  319. queue.clear();
  320. }
  321. }
  322. }
  323. // 一级队列
  324. List<WeakReference<Future<?>>> requestList = mTaskMap.get(taskName);
  325. if (requestList != null) {
  326. Iterator<WeakReference<Future<?>>> iterator = requestList.iterator();
  327. while (iterator.hasNext()) {
  328. Future<?> request = iterator.next().get();
  329. if (request != null) {
  330. request.cancel(true);
  331. }
  332. }
  333. mTaskMap.remove(taskName);
  334. }
  335. printPoolExecutorInfo();
  336. }
  337. public ThreadPoolExecutor getPoolExecutor() {
  338. return mPoolExecutor;
  339. }
  340. private void printPoolExecutorInfo() {
  341. if (mPoolExecutor != null) {
  342. System.out.println(
  343. "ThreadPoolManager mPoolExecutor info:[poolSize:" + mPoolExecutor.getPoolSize()
  344. + ",activeCount:" + mPoolExecutor.getActiveCount()
  345. + ",taskQueueCount:" + mPoolExecutor.getQueue().size()
  346. + ",completeTaskCount:" + mPoolExecutor.getCompletedTaskCount() + "]");
  347. }
  348. }
  349. /**
  350. * 任务队列中是否有
  351. *
  352. * @param task 任务
  353. * @return 是否有标签任务 true-有
  354. */
  355. public boolean hasTask(Thread task) {
  356. if (task == null) {
  357. return false;
  358. }
  359. return hasTask(task.getName());
  360. }
  361. /**
  362. * 任务队列中是否有
  363. *
  364. * @param taskTag 任务标签
  365. * @return 是否有标签任务 true-有
  366. */
  367. public boolean hasTask(String taskTag) {
  368. if (taskTag == null) {
  369. return false;
  370. }
  371. List<WeakReference<Future<?>>> requestList = mTaskMap.get(taskTag);
  372. if (requestList != null) {
  373. Iterator<WeakReference<Future<?>>> iterator = requestList.iterator();
  374. while (iterator.hasNext()) {
  375. Future<?> request = iterator.next().get();
  376. if (request != null) {
  377. if (request.isCancelled()) {
  378. System.out.println(
  379. "ThreadPoolManager taskTag: " + taskTag + " has canceled ");
  380. continue;
  381. }
  382. if (!request.isDone()) {
  383. System.out.println("ThreadPoolManager hasTask " + taskTag);
  384. return true;
  385. }
  386. }
  387. }
  388. }
  389. return false;
  390. }
  391. }

三、方案的优缺点

优点:.实现了任务调度根据优先级进行调度(暂未实现根据任务名调度)。

缺点:线程任务来到时,先放入二级任务队列,所以这个方案对于任务大量积压(比如任务大量添加,但线程池处理不及时)会对内存是一个挑战,所以使用时需要对低优先级耗时任务的添加进行限制,避免出现任务积压导致内存不足问题。

 

最后,关于方案还在持续优化中,如有问题,可以一起讨论交流。

微信交流群

 

微信扫描关注更多精彩
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/189731
推荐阅读
相关标签
  

闽ICP备14008679号