Stream Processing
"Process data as it flows — never load it all at once."
A stream processor is a state machine that transforms data as it flows through — emit output, await more input, or halt. Processors compose by piping: each stage's output feeds the next's input. The entire pipeline runs in constant memory regardless of data volume, and resource cleanup (file handles, connections) is guaranteed even when errors occur mid-stream.
A water treatment plant. Raw water flows in, through filter stages, UV treatment, pH adjustment — each stage processes water as it arrives, passing results to the next stage. The plant doesn't wait for all the world's water before filtering. Each stage is independent and composable. Pipe them in any order. When a stage shuts down, upstream stages are notified and drain properly.
// WRONG: loads the entire 50GB file into memory
import fs from 'fs';
const data = fs.readFileSync('huge.log', 'utf8'); // needs 50GB RAM!
const errors = data.split('\n').filter(l => l.includes('ERROR'));
// RIGHT: stream processing — constant memory regardless of file size
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
import { createWriteStream } from 'fs';
async function extractErrors(input: string, output: string) {
const reader = createInterface({
input: createReadStream(input), // reads in ~64KB chunks
});
const writer = createWriteStream(output);
for await (const line of reader) { // process ONE line at a time
if (line.includes('ERROR')) {
writer.write(line + '\n'); // emit matching line
}
}
writer.close(); // halt: clean up resources
}
// Works identically on 1KB or 1TB files. Memory usage: ~64KB always.
// Composable pipe (RxJS-style concept):
// source → filter → transform → take(100) → sink
// Each stage: Await input → process → Emit output → Await more
// Three states: Emit | Await | Halt
// Unix pipes are the original:
// cat access.log | grep "ERROR" | awk '{print $5}' | sort | uniq -c | head -10
// Each | connects a stream producer to a stream consumer