共计 3679 个字符,预计需要花费 10 分钟才能阅读完成。
前言
本文写一下 js 中 es5 和 es6 针对异步函数,串行执行和并行执行的方案。以及串行和并行结合使用的例子。
es5 方式
在 es6 出来之前,社区 nodejs 中针对回调地狱,已经有了 promise 方案。假如多个异步函数,执行循序怎么安排,如何才能更快的执行完所有异步函数,再执行下一步呢?这里就出现了 js 的串行执行和并行执行问题。
异步函数串行执行
var items = [1, 2, 3, 4, 5, 6];
var results = [];
function async(arg, callback) {console.log('参数为' + arg +', 1 秒后返回结果');
setTimeout(function () {callback(arg * 2); }, 1000);
}
function final(value) {console.log('完成:', value);
}
function series(item) {if(item) {async( item, function(result) {results.push(result);
return series(items.shift());// 递归执行完所有的数据
});
} else {return final(results[results.length - 1]);
}
}
series(items.shift());
异步函数并行执行
上面函数是一个一个执行的,上一个执行结束再执行下一个,类似 es6(es5 之后统称 es6) 中 async 和 await,那有没有类似 promise.all 这种,所有的并行执行的呢?
可以如下写:
var items = [1, 2, 3, 4, 5, 6];
var results = [];
function async(arg, callback) {console.log('参数为' + arg +', 1 秒后返回结果');
setTimeout(function () {callback(arg * 2); }, 1000);
}
function final(value) {console.log('完成:', value);
}
items.forEach(function(item) {// 循环完成
async(item, function(result){results.push(result);
if(results.length === items.length) {// 判断执行完毕的个数是否等于要执行函数的个数
final(results[results.length - 1]);
}
})
});
异步函数串行执行和并行执行结合
假如并行执行很多条异步(几百条)数据,每个异步数据中有很多的(https)请求数据,势必造成 tcp 连接数不足,或者堆积了无数调用栈导致内存溢出。所以并行执行不易太多数据,因此,出现了并行和串行结合的方式。
代码可以如下书写:
var items = [1, 2, 3, 4, 5, 6];
var results = [];
var running = 0;
var limit = 2;
function async(arg, callback) {console.log('参数为' + arg +', 1 秒后返回结果');
setTimeout(function () {callback(arg * 2); }, 1000);
}
function final(value) {console.log('完成:', value);
}
function launcher() {while(running 0) {var item = items.shift();
async(item, function(result) {results.push(result);
running--;
if(items.length> 0) {launcher();
} else if(running == 0) {final(results);
}
});
running++;
}
}
launcher();
es6 方式
es6 天然自带串行和并行的执行方式,例如串行可以用 async 和 await(前文已经讲解),并行可以用 promise.all 等等。那么针对串行和并行结合,限制 promise all 并发数量,社区也有一些方案,例如
tiny-async-pool、es6-promise-pool、p-limit
简单封装一个 promise all 并发数限制解决方案函数
function PromiseLimit(funcArray, limit = 5) { // 并发执行 5 条数据
let i = 0;
const result = [];
const executing = [];
const queue = function() {if (i === funcArray.length) return Promise.all(executing);
const p = funcArray[i++]();
result.push(p);
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length>= limit) {return Promise.race(executing).then(() => queue(),
e => Promise.reject(e)
);
}
return Promise.resolve().then(() => queue());
};
return queue().then(() => Promise.all(result));
}
使用:
// 测试代码
const result = [];
for (let index = 0; index {console.log("开始" + index, new Date().toLocaleString());
setTimeout(() => {resolve(index);
console.log("结束" + index, new Date().toLocaleString());
}, parseInt(Math.random() * 10000));
});
});
}
PromiseLimit(result).then(data => {console.log(data);
});
修改测试代码,新增随机失败逻辑
// 修改测试代码 随机失败或者成功
const result = [];
for (let index = 0; index {console.log("开始" + index, new Date().toLocaleString());
setTimeout(() => {if (Math.random() > 0.5) {resolve(index);
} else {reject(index);
}
console.log("结束" + index, new Date().toLocaleString());
}, parseInt(Math.random() * 1000));
});
});
}
PromiseLimit(result).then(data => {console.log("成功", data);
},
data => {console.log("失败", data);
}
);
async 和 await 结合 promise all
async function PromiseAll(promises,batchSize=10) {const result = [];
while(promises.length> 0) {const data = await Promise.all(promises.splice(0,batchSize));
result.push(...data);
}
return result;
}
这么写有 2 个问题
1、在调用 Promise.all 前就已经创建好了 promises,实际上 promise 已经执行了
2、你这个实现必须等前面 batchSize 个 promise resolve,才能跑下一批的 batchSize 个,也就是 promise all 全部成功才可以。
改进如下:
async function asyncPool(array,poolLimit,iteratorFn) {const ret = [];
const executing = [];
for (const item of array) {const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
if (poolLimit executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length>= poolLimit) {await Promise.race(executing);
}
}
}
return Promise.all(ret);
}
使用:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
return asyncPool([1000, 5000, 3000, 2000], 2,timeout).then(results => {...});
正文完