当前位置:   article > 正文

JS 中如何实现并发控制?

JS 中如何实现并发控制?

一、并发控制简介

在日常开发过程中,你可能会遇到并发控制的场景,比如控制请求并发数。那么在 JavaScript 中如何实现并发控制呢?在回答这个问题之前,我们来简单介绍一下并发控制。

假设有 6 个待办任务要执行,而我们希望限制同时执行的任务个数,即最多只有 2 个任务能同时执行。当 正在执行任务列表 中的任何 1 个任务完成后,程序会自动从 待办任务列表 中获取新的待办任务并把该任务添加到 正在执行任务列表 中。为了让大家能够更直观地理解上述的过程,小码哥特意画了以下 3 张图:

1.1 阶段一

1.2 阶段二

1.2 阶段三

好的,介绍完并发控制之后。接下来带大家剖析异步任务并发控制具体实现方法。

二、并发控制的实现

async-pool 这个库提供了 ES7 和 ES6 两种不同版本的实现,在分析其具体实现之前,我们来看一下它如何使用

2.1 asyncPool 的使用

  1. const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
  2. await asyncPool(2, [1000, 5000, 3000, 2000], timeout);

在以上代码中,我们使用 async-pool 这个库提供的 asyncPool 函数来实现异步任务的并发控制。asyncPool 函数的签名如下所示:

function asyncPool(poolLimit, array, iteratorFn){ ... }

该函数接收 3 个参数:

  • poolLimit(数字类型):表示限制的并发数;

  • array(数组类型):表示任务数组;

  • iteratorFn(函数类型):表示迭代函数,用于实现对每个任务项进行处理,该函数会返回一个 Promise 对象或异步函数。

对于以上示例来说,在使用了 asyncPool 函数之后,对应的执行过程如下所示:

  1. const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
  2. await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
  3. // Call iterator (i = 1000)
  4. // Call iterator (i = 5000)
  5. // Pool limit of 2 reached, wait for the quicker one to complete...
  6. // 1000 finishes
  7. // Call iterator (i = 3000)
  8. // Pool limit of 2 reached, wait for the quicker one to complete...
  9. // 3000 finishes
  10. // Call iterator (i = 2000)
  11. // Itaration is complete, wait until running ones complete...
  12. // 5000 finishes
  13. // 2000 finishes
  14. // Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.

通过观察以上的注释信息,我们可以大致地了解 asyncPool 函数内部的控制流程。下面我们先来分析 asyncPool 函数的 ES7 实现。

2.2 asyncPool ES7 实现

  1. async function asyncPool(poolLimit, array, iteratorFn) {
  2. const ret = []; // 存储所有的异步任务
  3. const executing = []; // 存储正在执行的异步任务
  4. for (const item of array) {
  5. // 调用iteratorFn函数创建异步任务
  6. const p = Promise.resolve().then(() => iteratorFn(item, array));
  7. ret.push(p); // 保存新的异步任务
  8. // 当poolLimit值小于或等于总任务个数时,进行并发控制
  9. if (poolLimit <= array.length) {
  10. // 当任务完成后,从正在执行的任务数组中移除已完成的任务
  11. const e = p.then(() => executing.splice(executing.indexOf(e), 1));
  12. executing.push(e); // 保存正在执行的异步任务
  13. if (executing.length >= poolLimit) {
  14. await Promise.race(executing); // 等待较快的任务执行完成
  15. }
  16. }
  17. }
  18. return Promise.all(ret);
  19. }

在以上代码中,充分利用了 Promise.all 和 Promise.race 函数特点,再结合 ES7 中提供的 async await 特性,最终实现了并发控制的功能。利用 await Promise.race(executing); 这行语句,我们会等待 正在执行任务列表 中较快的任务执行完成之后,才会继续执行下一次循环。

asyncPool ES7 实现相对比较简单,接下来我们来看一下不使用 async await 特性要如何实现同样的功能。

2.3 asyncPool ES6 实现

  1. function asyncPool(poolLimit, array, iteratorFn) {
  2. let i = 0;
  3. const ret = []; // 存储所有的异步任务
  4. const executing = []; // 存储正在执行的异步任务
  5. const enqueue = function () {
  6. if (i === array.length) {
  7. return Promise.resolve();
  8. }
  9. const item = array[i++]; // 获取新的任务项
  10. const p = Promise.resolve().then(() => iteratorFn(item, array));
  11. ret.push(p);
  12. let r = Promise.resolve();
  13. // 当poolLimit值小于或等于总任务个数时,进行并发控制
  14. if (poolLimit <= array.length) {
  15. // 当任务完成后,从正在执行的任务数组中移除已完成的任务
  16. const e = p.then(() => executing.splice(executing.indexOf(e), 1));
  17. executing.push(e);
  18. if (executing.length >= poolLimit) {
  19. r = Promise.race(executing);
  20. }
  21. }
  22. // 正在执行任务列表 中较快的任务执行完成之后,才会从array数组中获取新的待办任务
  23. return r.then(() => enqueue());
  24. };
  25. return enqueue().then(() => Promise.all(ret));
  26. }

在 ES6 的实现版本中,通过内部封装的 enqueue 函数来实现核心的控制逻辑。当 Promise.race(executing) 返回的 Promise 对象变成已完成状态时,才会调用 enqueue 函数,从 array 数组中获取新的待办任务。

总结:对于异步任务并发控制的具体实现核心方法es7中 async await 还是es6中的 enqueue 方法最核心用的方法都是是 Promise 中的Promise.all和Promise.race两个方法。

对于这个两个方法也是经常出现在面试题上的。对于如何手写Promise.all和Promise.race这来个方法的具体实现过程可点击关注收藏作者,在后面会更新对于这两个方法的实现原理的相关技术文档。

链接:https://pan.baidu.com/s/1_4PIUb-Yl68aTW9Bw95iJA 
提取码:tnav 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/330220
推荐阅读
相关标签
  

闽ICP备14008679号