NodeJS Efficient Multithread Process Management

NodeJS

One of NodeJS’s major platform caveats is the fact that it runs on a single thread. Furthermore this thread can only consume approximately 1.5gb of RAM (a V8 feature). This means that if your server has more than 1 CPU core or more than 1.5gb of RAM you may be using less than 50% of the computing power available to you.

It is important to first understand the different cases where you would approach multithreading differently. The 2 biggest reasons to need this in a NodeJS application are either high concurrency requests or heavy processing needs. High concurrency is probably the most common case. High concurrency would occur if you have many server or process requests required in a short time, for instance a blog or application with thousands of requests per second. In this case I would highly recommend using PM2 to manage your request concurrency with clustering. I will not be exploring that case here.

Heavy Processing

In a situation where you application needs to do a large amount of data processing or calculating it is probably an easier approach to roll your own cluster script using NodeJS built in cluster support. Node allows us to fork new processes and communicate between them using the familiar .on('message',() => {}) syntax.

Now that we’ve identified a possible use case for this lets create a ridiculous scenario where we can showcase this tech. A friend of mine, let’s call him Jason P. (yes he loves to go by JSONP) posed an interesting question to me recently which immediately made me want to come up with a proof he could use for his solution.

You have a “Calculator” with the following buttons:

array( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ‘x’, ‘/’, ‘+’, ‘-‘ )

You have a function called monkey_press that chooses a random button from the calculator. If it’s a number, it’s input, and if it’s an operative then it performs that function (plus, minus, divide, or multiply)

The money presses 10,000 calculator keys at random.

At the end, EQUALS is pressed.

Let’s say this is repeated one million times.

Is there a way to know, without actually running this series of events one million times and destroying poor monkey’s fingers, what the average output at the end of each key presses would be?

Or is the output going to be TRULY random based on the nature of the operatives (plus/multiply/etc)

You can probably answer this without writing any code, I just had this literal Shower Thought and figured it would be interesting to share!

This gives us a fun little scenario to… monkey around with? 🙂 In response I came up with this fun little script that will give us a way to proof whatever formula he can come up with to predict the range of possibilities our monkey can produce.

var monkeyResults = [],
    monkeyPressing = true,
    monkeyPressTill = new Date().getTime()+(parseInt(process.argv[2])*1000),
    keys = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, '*', '/', '+', '-'];

function monkeyStopped() {
  var Sum = 0;

  Sum = monkeyResults.reduce(function(a, b) {
    return a + b;
  })
  
  var average = Sum / monkeyResults.length;

  console.log('monkey calculated '+monkeyResults.length+' results');
  console.log('monkey result average: ',average);
}

while(monkeyPressing) {
  if( new Date().getTime() > monkeyPressTill ) {
    monkeyStopped();
    monkeyPressing = false;
  } else {
    var MonkeyAlgo = '';
  
    for(var MonkKeyPresses=1; MonkKeyPresses<10000; ) {
      // monkey space dem ops
      var monkKey;
      if( MonkKeyPresses == 1 || MonkKeyPresses == 9999 || ['*', '/', '+', '-'].indexOf(MonkeyAlgo[MonkeyAlgo.length -1]) != -1 ) {
        monkKey = keys.slice(1,10)[Math.floor(Math.random()*keys.slice(1,10).length)];
      } else if( ['0'].indexOf(MonkeyAlgo[MonkeyAlgo.length -1]) == 0 ) {
        monkKey = keys.slice(1)[Math.floor(Math.random()*keys.slice(1).length)];
      } else {
        monkKey = keys[Math.floor(Math.random()*keys.length)];
      }
      MonkKeyPresses++;
      MonkeyAlgo += monkKey;
    }
    var result = eval(MonkeyAlgo);
    monkeyResults.push(result);
    console.log('monkey result: ', result);
  }
}

As you might have figured out I cheated a little bit here, this monkey keeps producing results until a preset time. This is because I don't know how long it would take the monkey to produce a million results and I don't want the poor monkey dying of potassium deficiency in the process. Along the way I had to work out a couple of additional rules for the monkey to follow such as operators cannot be next to each-other, and they can't be at the beginning or end of the equation. JavaScript also doesn't appreciate when we start the equation with a 0.

monkey results

Anyway, this gave me a way to at least benchmark how long it's going to take my monkey to produce a million results. After having run this several times at 10 seconds each I averaged out to about 1330 results. That means my monkey is going to take approximately 2.09 hours to produce a million results. That is just no good!

Multithreading Monkeys

Enter; node process forking. A couple of requirements before we dive into the how. I want all results to be immediately available to the parent process, in case I want to save state and resume later, or in case I want to scale this process into a multi machine API. I want the master thread to to count all of the results and average them as well. This means I need my threads to communicate to each-other.

Let's skip the basics implementation, that can be found right in the docs, and jump directly into an implementation where the threads are communicating.


const cluster = require('cluster');

if (cluster.isMaster) {
  const numCPUs = require('os').cpus().length;
  var monkeyCount = 0;
  
  // fork some monkeys
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  
  // once the monkeys are online, send them a message.
  cluster.on('online', (worker) => {
        
    worker.send({
      message: 'I would like to hire you, monkey!'
    });
    
    // handle a message when a monkey responds
    worker.on('message', (message) => {
      console.log(`monkey ID: ${message.monkeyid} said ${message.message}`);
      
      // as soon as I have received a response from all available monkeys I need to exit
      monkeyCount++;
      if( monkeyCount == numCPUs ) {
        process.exit();
      }
    });
    
    
  });
  
  cluster.on('exit', (worker, code, signal) => {
    console.log(`monkey ID: ${worker.process.pid} died`);
  });
  
} else {
  
  // the child monkey responds to being poked.
  process.on('message', (message) => {
    
    console.log(`Monkey: ${process.pid} is online and received a message received a message.`);
    
    if( message.message == 'I would like to hire you, monkey!' ) {
      process.send({
        monkeyid: process.pid,
        message: 'I\'ll take the job!'
      });
    }
    
  });
  
}

Running this script I now receive what I expect, my development machine has 8 cores, so I fork 8 processes. I send each process a message and receive a message in response.

monkey results

Now I am ready to merge the two pieces of code together so when I run this script it will fork processes for each core and each core will communicate back their results to the master process.

 
const cluster = require('cluster');

var monkeyResults = [],
    monkeyPressing = true,
    monkeysCompleted = 0,
    keys = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, '*', '/', '+', '-'];

if(cluster.isMaster) {
  const numWorkers = require('os').cpus().length;
  var   monkeyPressTill = new Date().getTime()+(parseInt(process.argv[2])*1000);
  
  console.log('Master monkey hiring ' + numWorkers + ' monkeys...');
  
  for(var i = 0; i < numWorkers; i++) {
    cluster.fork();
  }

  cluster.on('online', (worker) => {
    console.log('Monkey ID ' + worker.process.pid + ' is hired');
        
    worker.send({
      type: 'keypress',
      from: 'master',
      data: {
        'pressTill': monkeyPressTill
      }
    });
    
    worker.on('message', (message) => {
      switch(message.type) {
        case'result':
          monkeyResults.push(message.data.result);
          console.log('Monkey ID '+message.monkeyid+' result: '+message.data.result);
          break;
        case'end':
          console.log('Monkey ID '+message.monkeyid+' completed '+message.data.totalresults+' result.');
          monkeysCompleted++;
          if( monkeysCompleted == numWorkers ) {
            monkeyStopped();
          }
          break;
      }
    });
        
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log('Monkey ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
    console.log('Hiring a new Monkey');
    cluster.fork();
  });
  
} else {
    
  process.on('message', (message) => {
    if(message.type === 'keypress') {
      monkeyPress(message.data.pressTill);
    }
  });
  
  
}


function monkeyPress(monkeyPressTill) {
  var resultCount = 0;
  while(monkeyPressing) {
    if( new Date().getTime() > monkeyPressTill ) {
      process.send({
        'type':'end',
        'monkeyid': process.pid,
        'data':{
          'totalresults':resultCount
        }
      });
      monkeyPressing = false;
    } else {
      var MonkeyAlgo = '';
      
      for(var MonkKeyPresses=1; MonkKeyPresses<10000; ) {
        // monkey space dem ops
        var monkKey;
        if( MonkKeyPresses == 1 || MonkKeyPresses == 9999 || ['*', '/', '+', '-'].indexOf(MonkeyAlgo[MonkeyAlgo.length -1]) != -1 ) {
          monkKey = keys.slice(1,10)[Math.floor(Math.random()*keys.slice(1,10).length)];
        } else if( ['0'].indexOf(MonkeyAlgo[MonkeyAlgo.length -1]) == 0 ) {
          monkKey = keys.slice(1)[Math.floor(Math.random()*keys.slice(1).length)];
        } else {
          monkKey = keys[Math.floor(Math.random()*keys.length)];
        }
        MonkKeyPresses++;
        MonkeyAlgo += monkKey;
      }
      var result = eval(MonkeyAlgo);
      process.send({
        'type':'result',
        'monkeyid': process.pid,
        'data':{
          'result':result
        }
      });
      resultCount++;
    }
  }
}

function monkeyStopped() {
  var Sum = 0;

  Sum = monkeyResults.reduce(function(a, b) {
    return a + b;
  })
  
  var average = Sum / monkeyResults.length;

  console.log('Together the monkeys calculated '+monkeyResults.length+' results');
  console.log('All monkeys averaged: ',average);
  process.exit();
}

I run this script and bam, 2166 results in 10 seconds! That means we've reduced the million round overhead to 1.28 hours, that's quite a bit!

monkey results

So why did each thread drop down to an average of 270 results when we were getting an average of 1330 running a single thread? Pretty simple, that comes into the requirements I set out. There is quite a bit of overhead that was added to allow the processes to communicate each result. If we really want to go crazy with speed, I can tweak the script a little to reduce the communication overhead. Note that also there is a delay between when a process starts and when it is online and crunching results. So now that we have a baseline, let's see if we can run in speed mode to get more juice out of these monkeys!


const cluster = require('cluster');

var monkeyResults = [],
    monkeyPressing = true,
    monkeysCompleted = 0,
    keys = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, '*', '/', '+', '-'];

if(cluster.isMaster) {
  const numWorkers = require('os').cpus().length;
  var   monkeyPressTill = new Date().getTime()+(parseInt(process.argv[2])*1000);
  
  console.log('Master monkey hiring ' + numWorkers + ' monkeys...');
  
  for(var i = 0; i < numWorkers; i++) {
    cluster.fork();
  }

  cluster.on('online', (worker) => {
    console.log('Monkey ID ' + worker.process.pid + ' is hired');
        
    worker.send({
      type: 'keypress',
      from: 'master',
      groupResults: Boolean(process.argv[3]),
      data: {
        'pressTill': monkeyPressTill
      }
    });
    
    worker.on('message', (message) => {
      switch(message.type) {
        case'result':
          monkeyResults.push(message.data.result);
          console.log('Monkey ID '+message.monkeyid+' result: '+message.data.result);
          break;
        case'groupedResult':
          monkeyResults = monkeyResults.concat(message.data.result);
          monkeysCompleted++;
          if( monkeysCompleted == numWorkers ) {
            monkeyStopped();
          }
          break;
        case'end':
          console.log('Monkey ID '+message.monkeyid+' completed '+message.data.totalresults+' result.');
          if( !Boolean(process.argv[3]) ) {
            monkeysCompleted++;
          }
          if( monkeysCompleted == numWorkers ) {
            monkeyStopped();
          }
          break;
      }
    });
        
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log('Monkey ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
    console.log('Hiring a new Monkey');
    cluster.fork();
  });
  
} else {
    
  process.on('message', (message) => {
    if(message.type === 'keypress') {
      monkeyPress(message.data.pressTill,message.groupResults);
    }
  });
  
  
}


function monkeyPress(monkeyPressTill,groupResults) {
  var resultCount = 0;
  var resultGroup = [];
  while(monkeyPressing) {
    if( new Date().getTime() > monkeyPressTill ) {
      if(groupResults) {
        process.send({
          'type':'groupedResult',
          'monkeyid': process.pid,
          'data':{
            'result':resultGroup
          }
        });
      }
      process.send({
        'type':'end',
        'monkeyid': process.pid,
        'data':{
          'totalresults':resultCount
        }
      });
      monkeyPressing = false;
    } else {
      var MonkeyAlgo = '';
      
      for(var MonkKeyPresses=1; MonkKeyPresses<10000; ) {
        // monkey space dem ops
        var monkKey;
        if( MonkKeyPresses == 1 || MonkKeyPresses == 9999 || ['*', '/', '+', '-'].indexOf(MonkeyAlgo[MonkeyAlgo.length -1]) != -1 ) {
          monkKey = keys.slice(1,10)[Math.floor(Math.random()*keys.slice(1,10).length)];
        } else if( ['0'].indexOf(MonkeyAlgo[MonkeyAlgo.length -1]) == 0 ) {
          monkKey = keys.slice(1)[Math.floor(Math.random()*keys.slice(1).length)];
        } else {
          monkKey = keys[Math.floor(Math.random()*keys.length)];
        }
        MonkKeyPresses++;
        MonkeyAlgo += monkKey;
      }
      var result = eval(MonkeyAlgo);
      if( groupResults ) {
        resultGroup.push(result);
      } else {
        process.send({
          'type':'result',
          'monkeyid': process.pid,
          'data':{
            'result':result
          }
        });
      }
      resultCount++;
    }
  }
}

function monkeyStopped() {
  var Sum = 0;

  Sum = monkeyResults.reduce(function(a, b) {
    return a + b;
  })
  
  var average = Sum / monkeyResults.length;

  console.log('Together the monkeys calculated '+monkeyResults.length+' results');
  console.log('All monkeys averaged: ',average);
  process.exit();
}

Now if I supply a 2nd parameter of 'true' when I run this script the monkey processes will only report back after the allotted run time has completed. We won't be able to save up to the second results from our master process, but we should be able to squeeze just a little more speed out of these processes!

monkey results

2510 results in 10 seconds so that gets us to 1.11 hours! We've now been able to just about double to testing capacity of the same machine! I'm sure there's a lot more optimization that could be done, but I believe this stands as a strong case for clustering your heavy processing tasks!