Logo Vincent
返回文章列表

一篇文章了解JS并行任务

Web
一篇文章了解JS并行任务

【前言】

在开发过程中大部分场景是顺序执行代码,

也有场景要求并行执行多个任务,

本文研究下如何高效的并行执行任务。

【并行场景】

常见并行执行任务的场景

1.并行执行多个请求

2.并行下载文件

【并行任务】

上述场景中任务的共同点是

1.多个任务没有依赖关系

2.多个任务完成耗时不确定

3.要求最快完成所有任务

所以可以用setTimeout模拟并行任务,

timeout可以传入,模拟不同时长完成任务,

/**
 * handler
 * @param {*} timeout
 * @returns
 */
module.exports = function (timeout) {
  return new Promise(function (resolve) {
    setTimeout(() => {
      return resolve(timeout);
    }, timeout);
  });
};

【并行任务池】

这里的任务池,就是一些timeout的数值集合,

/**
 * values
 */
module.exports = [100, 300, 200, 400];

【并行回调和完成】

并行任务执行,一般希望

1.每个任务完成有回调callback

2.所有任务完成有回调complete

callback代码如下

// q
const q = require('qiao-console');

/**
 * callback
 * @param {*} index
 * @param {*} res
 */
module.exports = function (index, res) {
  q.writeLine(index, `${index} ${res}`);
};

complete代码如下

// q
const q = require('qiao-console');

/**
 * complete
 * @param {*} l
 */
module.exports = function (l) {
  q.writeLine(l, `complete`);
};

【串行执行】

串行执行比较简单,for+await就可以实现

// q
const q = require('qiao-console');

// vars
const values = require('./_values.js');
const handler = require('./_handler.js');
const callback = require('./_callback.js');
const complete = require('./_complete.js');

// normal
async function normal() {
  console.time('qiao-parallel');

  for (let i = 0; i < values.length; i++) {
    const value = values[i];

    const res = await handler(value);
    callback(i, res);
  }
  complete(values.length);

  console.timeEnd('qiao-parallel');
}

// test
(function () {
  q.clear();

  normal();
})();

其中

values:并行任务池数组

handler:并行任务

callback:单个任务执行完毕回调

complete:所有任务执行完毕回调

运行效果如下,总耗时1s(100ms+200ms+300ms+400ms)

【并行执行-IIFE】

上述串行执行的核心代码是

for (let i = 0; i < values.length; i++) {
  const value = values[i];

  const res = await handler(value);
  callback(i, res);
}

这里只需要使用立即执行函数,

即可让几个任务并行执行,

立即执行函数的介绍可以看这里: https://developer.mozilla.org/zh-CN/docs/Glossary/IIFE

修改后的核心代码如下

console.time('qiao-parallel');

for (let i = 0; i < values.length; i++) {
  (async function (i, value) {
    const res = await handler(value);
    callback(i, res);
    if (i + 1 == values.length) {
      complete(values.length);
      console.timeEnd('qiao-parallel');
    }
  })(i, values[i]);
}

运行效果如下,总耗时400ms(max(100,200,300,400))

【并行执行-多进程】

在nodejs下,还可以使用多进程并行执行任务,

多进程执行任务要使用nodejs的child_process模块,

使用方法:

1.准备执行任务js文件

2.fork执行

3.通过process通信

child_process的fork使用,详见:

https://nodejs.org/dist/latest-v16.x/docs/api/child_process.html#child_processforkmodulepath-args-options

fork部分的代码大致如下

const cp = child_process.fork(jsPath, args || []);
cp.on('message', function (msg) {
  if (onMsg) onMsg(msg);
});
cp.on('exit', function (code) {
  if (onExit) onExit(code);
});

之前的handler.js文件对应的要修改为fork的模式

// handler
const handler = require('./_handler.js');

// fork handler
async function forkHandler() {
  // check
  if (!process || !process.argv) return;

  // value
  const value = parseInt(process.argv[2]);

  // msg
  const msg = await handler(value);
  process.send(msg);
}

forkHandler();

运行效果如下,总耗时447ms(max(100,200,300,400)+47ms)

运行时间小于串行的1s,大于IIFE的400ms,

其中47ms可以理解为进程创建和销毁的耗时,

将上述100ms,200ms,300ms,400ms修改为1-4s,查看效果,

IIFE执行效果如下, 总耗时还是4s(max(1,2,3,4))

fork执行效果如下,总耗时是4.045s(max(1,2,3,4)+45ms)

以上证明多进程并行执行任务中,

创建和销毁进程大概耗时是40-50ms

【并行执行-多线程】

nodejs从10.5.0开始引入了Worker threads模块,

详见:https://nodejs.org/dist/latest-v18.x/docs/api/worker_threads.html#worker-threads

worker代码如下:

const worker = new Worker(jsPath, {
  workerData: value,
});
worker.on('message', function (res) {
  onCallback(callback, index, res);
});
worker.on('error', function (error) {
  console.log(error);
});
worker.on('exit', function () {
  onComplete(complete, valuesLength);
});

对应的handler代码修改为:

// worker
const { parentPort, workerData } = require('node:worker_threads');

// handler
const handler = require('./_handler.js');

// fork handler
async function forkHandler() {
  // msg
  var msg = await handler(workerData);
  parentPort.postMessage(msg);
}

forkHandler();

运行效果如下,总耗时447ms(max(100,200,300,400)+34ms)

修改为1-4后效果如下,总耗时是4.032s(max(1,2,3,4)+32ms)

可以看到多线程模式额外的开销是30ms左右,

比多进程模式的40-50ms要快一些。

【qiao-parallel】

封装了一个npm包,欢迎使用: https://code.insistime.com/#/qiao-parallel

|— 封装了IIFE并行执行任务

// q
const q = require('qiao-console');

// vars
const values = require('./_values.js');
const handler = require('./_handler.js');
const callback = require('./_callback.js');
const complete = require('./_complete.js');

// parallel
const parallel = require('qiao-parallel');

// test
(function () {
  q.clear();

  parallel.parallelByIIFE(handler, values, callback, complete);
})();

|— 封装了多进程并行执行任务

// q
const q = require('qiao-console');

// vars
const values = require('./_values.js');
const callback = require('./_callback.js');
const complete = require('./_complete.js');

// parallel
const parallel = require('qiao-parallel');

// test
(function () {
  q.clear();

  const jsPath = require('path').resolve(__dirname, './fork-handler.js');
  parallel.parallelByFork(jsPath, values, callback, complete);
})();

|—封装了多线程并行执行任务

// q
const q = require('qiao-console');

// vars
const values = require('./_values.js');
const callback = require('./_callback.js');
const complete = require('./_complete.js');

// parallel
const parallel = require('qiao-parallel');

// test
(function () {
  q.clear();

  const jsPath = require('path').resolve(__dirname, './worker-handler.js');
  parallel.parallelByWorker(jsPath, values, callback, complete);
})();

【总结】

1.并行任务-场景介绍

2.并行任务-任务特点

3.并行任务-任务池

4.并行任务-回调

5.并行任务-IIFE模式

6.并行任务-多进程模式

7.并行任务-多线程模式

8.并行任务-https://code.insistime.com/#/qiao-parallel

© 2026 vincentqiao.com . 保留所有权利。