赞
踩
最初开发目的是做一个axios请求并发的控制功能。后来发现其他场景可能也有类似需要控制并发的需求,于是将此功能其抽离成公共的类,在需要的场景使用。
实现步骤
思考1:如何让一个事件处于等待执行的状态?立马想到的就是 Promise、async、await
思考2:需要记录哪些状态来实现?
①最大并发数量、②当前进行中的任务数量、③存储等待中的任务
思考3:每个事件执行前都要过安检 (自定义的interceptors方法),放行还是拦截,这是个问题!
思考4:放行下一个被拦截的任务。
结合1234:利用async、await和new Promise的resolve方法进行事件的执行控制。
如果 最大并发数量>当前进行中的任务数量,则调用resolve方法,结束await等待。
否则 将resolve方法存储到数组中,等待有任务完成后再从数组中提取并执行第一个resolve。
直至 存储任务的数组为空。
并发拦截的类诞生
- // 并发拦截
- class ConcurrentInterceptors {
- constructor(maxProgressCount) {
- // 最大并发数量
- this.maxProgressCount = maxProgressCount;
- // 进行中的任务数量
- this.inProgressCount = 0;
- // 等待中的任务
- this.waitingArr = [];
- }
-
- /**
- * 拦截器
- * @return Promise.resolve()
- */
- interceptors = () => {
- return new Promise(resolve => {
- const { maxProgressCount, inProgressCount } = this
- // 非数字、isNaN、不大于0 不拦截
- if (typeof maxProgressCount !== 'number' || isNaN(maxProgressCount) || maxProgressCount <= 0) {
- resolve()
- return
- }
- // 当前进行的任务数量是否大于最大并发数量
- if (inProgressCount < maxProgressCount) {
- // 未超出数量
- this.inProgressCount += 1
- resolve()
- }else {
- // 超出数量
- this.waitingArr.push(resolve);
- }
- });
- }
-
- /**
- * 启动下一个被拦截的任务
- */
- next = () => {
- // 当前进行中的数量减一
- this.inProgressCount -= 1
-
- // 等待发送的请求的移除一个
- const nextResolve = this.waitingArr.shift();
- if(nextResolve){
- // 要进行下一个任务了 当前进行中的数量加一
- this.inProgressCount += 1
- nextResolve();
- }
- }
-
- /**
- * 更新最大并发数量
- * @param newMaxProgressCount
- */
- upMaxProgressCount = (newMaxProgressCount)=>{
- this.maxProgressCount = newMaxProgressCount;
- }
- }
-
- export default ConcurrentInterceptors
如何使用?示例①:在axios中使用
- import axios from 'axios';
- // 引入并发拦截器
- import ConcurrentInterceptors from './concurrentInterceptors';
- // 创建并发实例对象,并设置最大并发数量:2
- const concurrent = new ConcurrentInterceptors(2)
-
- // 创建axios实例对象
- const instance = axios.create({});
-
- // axios自身的请求拦截器
- instance.interceptors.request.use(
- async config => {
- /**
- * 关键代码
- * @description 并发拦截器启动 进行数量检测
- */
- await concurrent.interceptors()
-
- return config;
- },
- error => Promise.reject(error)
- );
-
- // axios自身的响应拦截器
- instance.interceptors.response.use(
- response => {
- /**
- * 关键代码
- * @description 通知并发拦截器有任务完成了 请放行下一个任务!
- */
- concurrent.next()
-
- return response;
- },
- error => {
- /**
- * 关键代码
- * @description 通知并发拦截器有任务完成了 请放行下一个任务!
- */
- concurrent.next()
-
- return Promise.reject(error);
- }
- );
以上并发的限制功能已经实现,可根据自己需求进行修改或直接使用。或者继续往下阅读。
结束?
还没!有新的问题:进行中的任务数量和等待中的任务数据仅供内部方法使用,不能在类的外部修改。需要将类的属性私有化处理。于是有了优化版,修改为使用 WeakMap 的Class私有属性版本。使用方法同 示例① axios示例。
私有属性版本 (优化版)
- const privateWeakMap = new WeakMap();
- function getConcurrent(_this) {
- return privateWeakMap.get(_this)
- }
- class ConcurrentInterceptors {
- /**
- * 最大并发数量
- * @param maxProgressCount
- * @typeof number
- */
- constructor(maxProgressCount) {
- privateWeakMap.set(this,
- {
- // 最大并发数量
- _maxProgressCount: maxProgressCount,
- // 进行中的任务数量
- _inProgressCount: 0,
- // 等待中的任务
- _waitingArr: [],
- }
- );
- }
-
- /**
- * 拦截器
- * @return Promise.resolve()
- */
- interceptors = () => {
- const { _inProgressCount, _maxProgressCount } = getConcurrent(this)
- return new Promise(resolve => {
- // 非数字、isNaN、不大于0 不拦截
- if (typeof _maxProgressCount !== 'number' || isNaN(_maxProgressCount) || _maxProgressCount <= 0) {
- resolve()
- return
- }
- // 当前进行的任务数量是否大于最大并发数量
- if (_inProgressCount < _maxProgressCount) {
- // 未超出最大并发数量
- getConcurrent(this)._inProgressCount += 1
- resolve()
- } else {
- // 超出最大并发数量
- getConcurrent(this)._waitingArr.push(resolve);
- }
- });
- }
-
- /**
- * 启动下一个被拦截的任务
- */
- next = () => {
- // 当前进行中的数量减一
- getConcurrent(this)._inProgressCount -= 1
-
- // 等待发送的请求的移除一个
- const nextResolve = getConcurrent(this)._waitingArr.shift();
- if (nextResolve) {
- // 要进行下一个任务了 当前进行中的数量加一
- getConcurrent(this)._inProgressCount += 1
- nextResolve();
- }
- }
-
- /**
- * 更新最大并发数量
- * @param newMaxProgressCount
- */
- updateMaxProgressCount = (newMaxProgressCount) => {
- getConcurrent(this)._maxProgressCount = newMaxProgressCount;
- }
-
- /**
- * 获取_maxProgressCount属性值
- */
- get maxProgressCount() {
- return getConcurrent(this)._maxProgressCount
- }
- set maxProgressCount(newVal) {
- this.updateMaxProgressCount(newVal)
- }
-
- /**
- * 获取_sendAxiosIngCount属性值
- */
- get inProgressCount() {
- return getConcurrent(this)._inProgressCount
- }
- set inProgressCount(newVal) {
- throw new Error(`inProgressCount为只读属性`);
- }
-
- /**
- * 获取_awaitSendAxiosArr属性值
- */
- get waitingArr() {
- return getConcurrent(this)._waitingArr
- }
- set waitingArr(newVal) {
- throw new Error(`waitingArr为只读属性`);
- }
- }
-
- export default ConcurrentInterceptors
最后
1.还是不清楚如用使用?
将 ConcurrentInterceptors 类(第一个或者私有属性的都行)复制到 js文件,在需要的文件引入,并初始化实例对象
① 引入并发拦截器
import ConcurrentInterceptors from './concurrentInterceptors.js';
② 创建并发实例对象,并设置最大并发数量: num
const concurrent = new ConcurrentInterceptors(num)
如果不是axios则需要一个类似于axios公共拦截器一样的方法,并在此方法里调用 (见示例②)
③ 并发拦截器启动 进行数量检测
await concurrent.interceptors()
在每个事件执行完的方法 (见示例②)
④ 通知并发拦截器有任务完成了 执行下一个任务
concurrent.next()
示例②
- import ConcurrentInterceptors from './concurrentInterceptors.js';
-
- // 最大并发数量设置为2
- const concurrent = new ConcurrentInterceptors(2)
-
- function fn1() {
- // Do something
- console.log('函数fn1');
-
- // fn1执行完了,可以进行下一个任务了
- concurrent.next()
- }
-
- function fn2() {
- // Do something
- console.log('函数fn2');
-
- // fn2执行完了,可以进行下一个任务了
- concurrent.next()
- }
-
- async function publicFn(type) {
- // 并发拦截器启动 进行数量检测
- await concurrent.interceptors();
-
- // 等待执行
- switch (type) {
- case 'fn1':
- fn1()
- break;
- case 'fn2':
- fn2()
- break;
- }
- }
2.只能axios使用吗?
不是。任何需要控制并发的场景都可使用
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。