Handling Large Files With NodeJS and Elastic Search

S

o consider myself to be an equal opportunity coder. I like investigating different tools built in different languages to solve different types of problems. Recently at work I had the opportunity to play around with elasticsearch. Elasticsearch brings the whole distributed / non relational data craze to the world of search. It is actually a rather impressive piece of tech. If you are at all interested in search as a problem, I recommend you give it a look. In any event, I need to get some data into the search engine to play around with. Where could I get a good deal of data without doing the heavy lifting myself? Twitter! Sure the twitter activity stream. I can rig that up with NodeJS in jiffy. And it goes a little something like this.

var elastical = require("elastical");  
var colors = require('colors');  
var http = require('http');  
var fs = require('fs');  
var events = require('events');  
var util = require('util');var client = new elastical.Client();  
var twitter_client =  http.createClient("80", "api.twitter.com");  
var tweet_emitter = new events.EventEmitter();  
process.on( 'uncaughtException', function( err ){  
    util.puts("uncaught exception".cyan, err);
    util.puts('');
});
function index_tweets ( tweets ){  
    var data = tweets.map( function( tweet ){
        return {
            index:{
                index:"twitter_example",
                type:"tweet",
                data: tweet
            }
        }
    });
    client.bulk(data, function( err, response  ){
        if( err ){
            util.puts("Failed Bulk operation".red, err)
        } else {
            console.log("Successfull Twitter update: %s tweets".green, data.length);
        }
    });
};
function getTweets(){  
    // http sockets can just get errors.
    // so wrap it in a try/catch and echo it out.
    try{
        util.puts("Checking...".cyan)
        var request = twitter_client.request(
                "GET", 
                "/1/statuses/public_timeline.json?count=200",
                {
                    "host":"api.twtter.com"
                }
            );
        request.addListener('response', function( resp ){
               var body = "";
               resp.addListener('data', function( data ){
                    body += data;
               });
               resp.addListener('end', function(){
               var tweets = JSON.parse( body );
               if( tweets.length ){
                    tweet_emitter.emit( 'tweets', tweets );
                }
            });
        });
        request.end();
    }catch ( e ){
        util.puts( "error fetching tweets".red )
        util.puts("")
        util.puts( e )
        util.puts("")
    }
};
tweet_emitter.on('tweets', index_tweets );  
setInterval(getTweets, (1000 * 60) )  

Now this works out pretty well. Every so often node will ping twitter, fetch the latest data and index it into elastic search. I left this run for a couple days and have 40,000 or so tweets. OK, that is a good start to play around, but how fast can it keep up with a continuous stream of data. For that I'm going to need a fairly large data set to pump in. For this I ended up exporting the activity table out one of the data bases at work. It was about 50MB of data. However, it's in CSV format and elastic search deals exclusively with JSON data.

Python To The Rescue

Luckily I had some experience converting CSV to JSON with python. There is a CSV module in the Python standard library that makes this a snap. And it looks a little like this:

#csv_to_json.py
import os  
import json  
import csv  
import sys  
if len( sys.argv ) < 2:  
    print "You must provide a file path"
    sys.exit(0)file_path = os.path.abspath( sys.argv[1])
reader = csv.reader( open(file_path), delimiter=",", quotechar='"')  
header = reader.next()  
output = open('out.json', "wb")data = []def hasher( lst ):  
    ret = {}
    for item in lst:
        ret[item[0]] = item[1]    
    return retprint "Converting..."for row in reader:
    data.append( hasher( zip( header, row ) ) )print "Writting..."
json.dump( {"data":data}, output)  
output.close()  
print "Complete..."  

Like a Charm and less than 30 lines of code I can convert any CSV file to a JSON file. And it couldn't get any more simple. All this does is take a csv, is pull of the first row of a csv as the headers, and stash that into a list. The given every row zip that with the header row which gives you a structure like this

[ ["HeaderA", "RowA ColA"] , ["HeaderB", "RowA ColB"], ... ]

Which is then converted into a dictionary ready to be parsed as into a json string. I can convert any CSV to JSON in a pinch. From here indexing with elastic search was a breeze and worked with out a hitch.

Stress Test

So let's take this one step further. I need some real data. At work, the production data base has saved ever email message the system has generated since the beginning of time. I exported that table out to a CSV file again. And weighing in at a whopping 4.0GB, I think we have a candidate. I point my python script at the csv to convert it. After about 30 seconds, I get this message:

Traceback ( most recent call last):  
   File "csv_to_json.py" line 24, in (module)
      for row in read:
_csv.Erro: field larger tha field limit(131072)  

In some cases you'll get a MemoryError. In short, the file is too big to read in and the python process is running out of memory. Essentially what I would need to do here is only open the file at the OS level rather into memory and try to parse out rows by looking for new line terminators. I ran this buy a number of people at work if there was an obvious solution that I was missing... Unfortunately everyone came up with the same horrible solution.

NodeJS To The Rescue

From here I turned back to Node.js. Node's event driven IO model was perfect for this. With node you can use the fs module to create read & write streams which read in chunks of data and inform you via events. Now an often over looked fact when dealing with IO is that, typically, computers can read files much faster than they can write a file. So over time you'll Node will have a lot data backed up in memory waiting to be flushed to disk and you run in to the same memory problems.

The key is to pause the reader until the write stream is finished writing out the current buffer and you're golden. It looks a little something like this:

var csv         = require('ya-csv')  
    ,optimist   = require('optimist') 
    ,colors     = require('colors') 
    ,util       = require('util') 
    ,fs         = require('fs') 
    ,path       = require('path') 
    ,headers    = [] 
    ,file_count = 1 
    ,data       = []
    ,argv
    ,filepath
    ,reader
    ,streamer; 

argv = optimist.usage(  
        "\nConverts large csv sets to json for indexing\n".bold + 
        "Usage:\n".magenta+ 
        "node csv_to_json.js -f /path/to/file.csv" 
        ) 
        .demand(['f']) 
        .options('f', { 
            alias:"file" 
            ,describe:"the path to a csv file" 
        }) 
        .argv; 
if( argv.help ){  
    util.puts(optimist.help() ); 
    process.exit(0); 
} 
filepath = path.resolve( argv.file );  
reader = csv.createCsvFileReader(filepath,{  
    separator:',', 
    quote:'"', 
}); 
streamer = fs.createWriteStream('converted.json', {  
    flags:'w', 
    encoding:"utf-8" 
});
//start the json object
function hasher( arr ){  
    ret = {};    
    arr.forEach(function( item ){ 
        ret[item[0]] = item[1]; 
    }); 
    return ret; 
}
// emulate python's zip function
function zip( arr1, arr2 ){  
    var ret = [] 
        ,len = arr1.length;
    // makes a 2 element array and pushes it into the return array    
    for( x = 0; x < len; x++){ 
        ret.push( [ arr1[x], arr2[x] ] ); 
    } 
    return ret; 
}
// we need to throttle the reader
// as it can read faster than the writer can write.
reader.once('data', function( str ){  
    util.puts('Reading...'.green) 
    headers = str; 
    reader.addListener('data', function( record ){ 
        var flushed;        flushed = streamer.write( 
            JSON.stringify( 
                hasher( zip( headers, record ) ) 
            ) + "\n" 
        )
        if( !flushed ){ 
            reader.pause(); 
        } 
    })    
    // when the streamer is empty, let the reader continus
    streamer.on('drain', function(){ 
        reader.resume(); 
    }) 
});
// when the reader is done close the 
// streamer and exit
reader.addListener('end', function( ){  
    streamer.on('drain', function(){ 
          streamer.end(); 
          console.log("Done".red); 
    }); 
})

In this example, I'm using the third party modules ya-csv to read in rows of the file, optimist to provide basic command line options and colors to spice up the out put. The meat of the program starts at line 59. It does almost exactly the same thing as the python script - zip the headers with each row and JSON encode the end result. Done! In NodeJS it takes about 10MB of memory to run and takes about 5 minutes to churn through and convert 4GB of data. Not bad Node, not bad at all.

As for Elasticsearch, after about 10 minutes of pumping data it started to sputter quite a bit. I was trying to run bulk inserts of about 3000 every 5 seconds or so and after about 10 minutes, the requests started timing out. I would have to kill and restart the search engine and start the indexing where it left off a couple of times. For reference I was running on a CoreI5 Quad Core Xeon 2.6GHz machine with only 8GB of RAM and had Elaseticsearch running pretty much on the default setup, so indexing 4GB of data was bound to push it over the edge. So no discredit to Elastic search it's a pretty amazing piece of tech. But there you have it - Node's event driven model makes it a perfect fit for dealing with large files where other systems will obviously strain and eventually die under the pressure.

csv python elasticsearch io json node