一篇文章了解JS并行任务
【前言】
在开发过程中大部分场景是顺序执行代码,
也有场景要求并行执行多个任务,
本文研究下如何高效的并行执行任务。
【并行场景】
常见并行执行任务的场景
1.并行执行多个请求
2.并行下载文件
【并行任务】
上述场景中任务的共同点是
1.多个任务没有依赖关系
2.多个任务完成耗时不确定
3.要求最快完成所有任务
所以可以用setTimeout模拟并行任务,
timeout可以传入,模拟不同时长完成任务,
/**
* handler
* @param {*} timeout
* @returns
*/
module.exports = function (timeout) {
return new Promise(function (resolve) {
setTimeout(() => {
return resolve(timeout);
}, timeout);
});
};
【并行任务池】
这里的任务池,就是一些timeout的数值集合,
/**
* values
*/
module.exports = [100, 300, 200, 400];
【并行回调和完成】
并行任务执行,一般希望
1.每个任务完成有回调callback
2.所有任务完成有回调complete
callback代码如下
// q
const q = require('qiao-console');
/**
* callback
* @param {*} index
* @param {*} res
*/
module.exports = function (index, res) {
q.writeLine(index, `${index} ${res}`);
};
complete代码如下
// q
const q = require('qiao-console');
/**
* complete
* @param {*} l
*/
module.exports = function (l) {
q.writeLine(l, `complete`);
};
【串行执行】
串行执行比较简单,for+await就可以实现
// q
const q = require('qiao-console');
// vars
const values = require('./_values.js');
const handler = require('./_handler.js');
const callback = require('./_callback.js');
const complete = require('./_complete.js');
// normal
async function normal() {
console.time('qiao-parallel');
for (let i = 0; i < values.length; i++) {
const value = values[i];
const res = await handler(value);
callback(i, res);
}
complete(values.length);
console.timeEnd('qiao-parallel');
}
// test
(function () {
q.clear();
normal();
})();
其中
values:并行任务池数组
handler:并行任务
callback:单个任务执行完毕回调
complete:所有任务执行完毕回调
运行效果如下,总耗时1s(100ms+200ms+300ms+400ms)
【并行执行-IIFE】
上述串行执行的核心代码是
for (let i = 0; i < values.length; i++) {
const value = values[i];
const res = await handler(value);
callback(i, res);
}
这里只需要使用立即执行函数,
即可让几个任务并行执行,
立即执行函数的介绍可以看这里: https://developer.mozilla.org/zh-CN/docs/Glossary/IIFE
修改后的核心代码如下
console.time('qiao-parallel');
for (let i = 0; i < values.length; i++) {
(async function (i, value) {
const res = await handler(value);
callback(i, res);
if (i + 1 == values.length) {
complete(values.length);
console.timeEnd('qiao-parallel');
}
})(i, values[i]);
}
运行效果如下,总耗时400ms(max(100,200,300,400))

【并行执行-多进程】
在nodejs下,还可以使用多进程并行执行任务,
多进程执行任务要使用nodejs的child_process模块,
使用方法:
1.准备执行任务js文件
2.fork执行
3.通过process通信
child_process的fork使用,详见:
fork部分的代码大致如下
const cp = child_process.fork(jsPath, args || []);
cp.on('message', function (msg) {
if (onMsg) onMsg(msg);
});
cp.on('exit', function (code) {
if (onExit) onExit(code);
});
之前的handler.js文件对应的要修改为fork的模式
// handler
const handler = require('./_handler.js');
// fork handler
async function forkHandler() {
// check
if (!process || !process.argv) return;
// value
const value = parseInt(process.argv[2]);
// msg
const msg = await handler(value);
process.send(msg);
}
forkHandler();
运行效果如下,总耗时447ms(max(100,200,300,400)+47ms)

运行时间小于串行的1s,大于IIFE的400ms,
其中47ms可以理解为进程创建和销毁的耗时,
将上述100ms,200ms,300ms,400ms修改为1-4s,查看效果,
IIFE执行效果如下, 总耗时还是4s(max(1,2,3,4))

fork执行效果如下,总耗时是4.045s(max(1,2,3,4)+45ms)

以上证明多进程并行执行任务中,
创建和销毁进程大概耗时是40-50ms
【并行执行-多线程】
nodejs从10.5.0开始引入了Worker threads模块,
详见:https://nodejs.org/dist/latest-v18.x/docs/api/worker_threads.html#worker-threads
worker代码如下:
const worker = new Worker(jsPath, {
workerData: value,
});
worker.on('message', function (res) {
onCallback(callback, index, res);
});
worker.on('error', function (error) {
console.log(error);
});
worker.on('exit', function () {
onComplete(complete, valuesLength);
});
对应的handler代码修改为:
// worker
const { parentPort, workerData } = require('node:worker_threads');
// handler
const handler = require('./_handler.js');
// fork handler
async function forkHandler() {
// msg
var msg = await handler(workerData);
parentPort.postMessage(msg);
}
forkHandler();
运行效果如下,总耗时447ms(max(100,200,300,400)+34ms)

修改为1-4后效果如下,总耗时是4.032s(max(1,2,3,4)+32ms)

可以看到多线程模式额外的开销是30ms左右,
比多进程模式的40-50ms要快一些。
【qiao-parallel】
封装了一个npm包,欢迎使用: https://code.insistime.com/#/qiao-parallel
|— 封装了IIFE并行执行任务
// q
const q = require('qiao-console');
// vars
const values = require('./_values.js');
const handler = require('./_handler.js');
const callback = require('./_callback.js');
const complete = require('./_complete.js');
// parallel
const parallel = require('qiao-parallel');
// test
(function () {
q.clear();
parallel.parallelByIIFE(handler, values, callback, complete);
})();
|— 封装了多进程并行执行任务
// q
const q = require('qiao-console');
// vars
const values = require('./_values.js');
const callback = require('./_callback.js');
const complete = require('./_complete.js');
// parallel
const parallel = require('qiao-parallel');
// test
(function () {
q.clear();
const jsPath = require('path').resolve(__dirname, './fork-handler.js');
parallel.parallelByFork(jsPath, values, callback, complete);
})();
|—封装了多线程并行执行任务
// q
const q = require('qiao-console');
// vars
const values = require('./_values.js');
const callback = require('./_callback.js');
const complete = require('./_complete.js');
// parallel
const parallel = require('qiao-parallel');
// test
(function () {
q.clear();
const jsPath = require('path').resolve(__dirname, './worker-handler.js');
parallel.parallelByWorker(jsPath, values, callback, complete);
})();
【总结】
1.并行任务-场景介绍
2.并行任务-任务特点
3.并行任务-任务池
4.并行任务-回调
5.并行任务-IIFE模式
6.并行任务-多进程模式
7.并行任务-多线程模式