Handling large amounts of data with javascript streams

Streams work like piped functions in shellscripts. They allow one piece/line/query to be processed at a time through the stream functions that have been connected together. This way large data sets can be processed a piece at a time. When downloading and processing large data sets I often do initial data cleaning with linux CLI tools like sed (CLI related howtos) and then use streams in my Node scripts to put the data into mongodb or some other database.

Streams are connected with pipes

You will notice that when a stream is set up, it looks something like this:

source
  .pipe(transform1)
  .pipe(transform2)
  .pipe(writer)

The source is a Readable stream. The middle two objects are transform streams; once connected to the source they take input, operate on it, and pass along an output to the next object being piped to. The final object (write) is a writable stream. It takes the input, and writes it somewhere.

Stream inputs and outputs can be objects or buffers

Readable streams will pass along buffer objects most of the time. These must be conveted to perform operations on the contents. If you have multiple other function in the stream you can decide whether to pass along Buffers or objects; but you need to know which it will be; node streams require this specified as an option (readableObjectMode, writableObjectMode)
Translating buffers to objects
This is a standard bit of code I use to get Json from buffers.

// Functional form
const bufferToJSON = R.pipe(
  F.toString,
  fps.toJSON,
)
// otherwise use this way
let bufferToJSON = input => JSON.parse(input.toString())

And objects back to buffers

let objectToBuffer = input => Buffer.from(JSON.stringify(input))

The main types of stream fcns; Readable, Transform, Writable

Readable - The source of data; it is at the start of the pipe() chain and can only be read from. The stream fcn reading from it will most likely receive a buffer.
Tranform - Passes data through, performing an operation. So it is written to by the previous object in the pipe() chain, and read from by the following object.
Writable - Is written to only; so it is the end of a pipe() chain. It can potentially write to anywhere you configure it to.

The stream is set up with a readstream as the source

You can make a custom readstream using Readable, but more likely it will be one returned by a getter function of some sort (file, http request, process) That allows you to go through the result of that getter piecemeal.
Getting a readable stream It depends on the library, some support returning a readable stream to work through the data they are fetching.

// get a readable stream from a file
const fs = require('fs');
let filestream = fs.createReadStream(filePath)
// Get a readable stream from an http request
const request = require('request')
let httpstream = request.get('http://example.com/target')

Creating your own readable stream

// Making your own Readable stream
const { Readable } = require('stream');
const readStream = new Readable({
  objectMode: true,
  read(){}
});
// Then you push data directly to it.
readStream.push('data')

Transform streams perform operations on input, pass it along

Transform streams take an input written to them, operate on it, and then allow reading the output.
They are useful to separate operations Transform allows you to break up stream operations into reusable pieces. As long as they correctly match write/read expectations, they can be used in any stream.

// This is a transform stream that takes a function  
// to determine its operation. Input is from previous .pipe(), this.push is  
// sent to next one. It specifies objects as input/output.   
const { Transform } = require('stream');
const transformStream = transformFcn => {  
  return new Transform({  
    readableObjectMode: true,  
    writableObjectMode: true,   
    transform(input, encoding, callback){  
      let data = transformFcn(input)  
      this.push(data);  
      callback();  
    }
  });     
}

Generic transform stream
This form will not expect objects as in/out, it will do buffers.

const transformStream = transformFcn => {  
  return new Transform({  
    transform(input, encoding, callback){  
      let data = transformFcn(input.toString)  
      this.push(Buffer.from(data));  
      callback();  
    }
  });     
}

Write stream is the sink, and the end of the stream

This is the end of the stream, and deposits the stream data to the directed location. If the operation is asynchronous and you don’t use async/await it will keep piling up the writes, usually until it breaks.

// My generic writable version; takes a function that determines the write 
// operation. Allows for async write operations, since I expect most  
// write operations to be to file/db/http that will not be instant.
const writeStream = writeFcn => {
  return new Writable({
    objectMode: true,
    async write(row, encoding, callback){
      try{
        await writeFcn(row)
        callback();
      }catch(err){
        console.log("write stream error: ",err)
      }
    }
  });
}

How stream events work -

These events are hidden by the pipe() methods, but you may want to access them directly.
Pipe and events can be mixed -
If pipe() is used, you can still access events with this syntax:

readStream  
  .pipe(someStream)  
    .event(// specify actions for event; read/write events listed below)  
    .event(// more than one event allowed)  

Read events -
data, end, error, close, readable
Read fcns -
pipe(), unpipe(), read(), unshift(), resume(), pause(), isPaused()
Write events -
Drain, finish, error, close, pipe/unpipe.
Write fcns -
write(), end(), cork(), uncork()

Putting an event to work

This is to speicfy clean up behavior on a writable stream when the finish
event is detected.

  .pipe(writeData)  
    .on('finish', async () => {  
     console.log('Read all of file: ',settings.datapath)  
      await settings.dbclose()  
      process.exit(0)  
    })

Buffer.from() used for testing streams

Just a way to make sure buffers are getting passed along when the readstream is just a testing function that pushes along exactly what it is given. Example is from jest test where I wrap in a promise and use finish/error to make sure I push everything through.

  var runStreams = () => {
    return new Promise((resolve, reject) => {
      reader
        .pipe(transform)
        .pipe(counter)
        .pipe(writer)
          .on('finish', () => resolve(true))
          .on('error', (err) => reject(err))
    })
  }

  reader.push(Buffer.from('test string'))
  reader.push(Buffer.from('[1,2,3,4,5]'))
  reader.push(Buffer.from('{"a":1, "b":2, "c":3}'))
  reader.push('test string')
  reader.push([1,2,3,4,5])
  reader.push({a:1, b:2, c:3})