Veera / Blog

Using Node fork() to speed up processing time

At my home system, I run a simple stock market analysis everyday, using a node app. I download the end of day market data, put it into the local postgres, and then calculate stats and indicators on every stock symbol. I track only the NYSE and NASDAQ exchanges which comes close to 6100 symbols.

The problem is the processing time.

While 6100 may seem a small number, it takes around 30 to 45 minutes to calculate the indicators (sma20, sma50, sma200, stochastic, bollinger bands and MACD) on all the symbols and then update the results to database. It takes long time because the calculation is heavily CPU/IO bound and my laptop could only haul so much at given time. :)

Earlier, I used a simple for loop to go through all the symbols sequentially.

// sequential processing of symbols
const numOfSymbols = symbols.length; // 6100
for (let i=0; i < numOfSymbols; i++) {
    const sma20 = calculateSMA(symbols[i], 20);
    // other indicators sma50, sma200, stochastic, macd and bollinger band
    uploadToDatabase(sma20);
}
// takes around 45 minutes to finish  🐢 😢

But,it can be better. Each symbol's indicators can be calculated independent of each other, so I could use some form of parallelism to speed up the processing time. Enter child_process.fork() method.

fork()

fork takes in a module path and spawns a new Node.js process that runs the given module code, in parallel to the parent / other forked processes.

So I extracted the indicator calculation code into daily-indicators.js file and then forked this module for a set of symbols. I just had to implement a communication logic to determine if I had processed all the symbols and then clean up the forked process.

// parent.js
const runIndicators = () =>
  new Promise(async resolve => {
    const allSymbols = await db.getAllSymbols();
    const totalSymbols = allSymbols.length;
    const chunks = [];
    while (allSymbols.length) {
      chunks.push(allSymbols.splice(0, 1000));
    }
    let chunksDone = 0;
    for (let i = 0; i < chunks.length; i++) {
      const chunkProcessor = fork(path.join(__dirname, "daily-indicators.js"));
      chunkProcessor.on("message", msg => {
        if (msg.status === "done") {
          chunksDone += 1;
          chunkProcessor.kill();
          if (chunksDone >= chunks.length) {
            resolve();
          }
        }
      });
      chunkProcessor.send({ symbols: chunks[i] });
    }
  });
  await runIndicators();

  // --------------------------- //

  // daily-indicators.js
  process.on("message", async msg => {
    const { symbols } = msg;
    for (let i = 0; i < symbols.length; i++) {
      await calculateIndicators(symbols[i]);
    }
    process.send({ status: "done" });
  });

With this improved implementation, the whole process takes less than 10 minutes to finish! 🚀

undefined