CONCEPT 16 · EFFECTS & IO

Stream Processing

"Process data as it flows — never load it all at once."

streamingiocomposability
Plain English

A stream processor is a state machine that transforms data as it flows through — emit output, await more input, or halt. Processors compose by piping: each stage's output feeds the next's input. The entire pipeline runs in constant memory regardless of data volume, and resource cleanup (file handles, connections) is guaranteed even when errors occur mid-stream.

Analogy

A water treatment plant. Raw water flows in, through filter stages, UV treatment, pH adjustment — each stage processes water as it arrives, passing results to the next stage. The plant doesn't wait for all the world's water before filtering. Each stage is independent and composable. Pipe them in any order. When a stage shuts down, upstream stages are notified and drain properly.

You already know this
Unix pipes: cat log | grep ERROR | awk '{print $5}' | sort | uniq -c — each | is a pipeNode.js Readable/Writable streams: fs.createReadStream().pipe(transform).pipe(output)RxJS Observables: observable.pipe(filter(...), map(...), take(10))Kafka consumers: process one message at a time, commit offset on successReact Streaming SSR: HTML streams to the browser before the full page is ready
Code Example
JavaScript (Node.js)
// WRONG: loads the entire 50GB file into memory
import fs from 'fs';
const data = fs.readFileSync('huge.log', 'utf8'); // needs 50GB RAM!
const errors = data.split('\n').filter(l => l.includes('ERROR'));

// RIGHT: stream processing — constant memory regardless of file size
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
import { createWriteStream } from 'fs';

async function extractErrors(input: string, output: string) {
  const reader = createInterface({
    input: createReadStream(input),   // reads in ~64KB chunks
  });
  const writer = createWriteStream(output);

  for await (const line of reader) {  // process ONE line at a time
    if (line.includes('ERROR')) {
      writer.write(line + '\n');      // emit matching line
    }
  }
  writer.close(); // halt: clean up resources
}
// Works identically on 1KB or 1TB files. Memory usage: ~64KB always.

// Composable pipe (RxJS-style concept):
// source → filter → transform → take(100) → sink
// Each stage: Await input → process → Emit output → Await more
// Three states: Emit | Await | Halt

// Unix pipes are the original:
// cat access.log | grep "ERROR" | awk '{print $5}' | sort | uniq -c | head -10
// Each | connects a stream producer to a stream consumer
Apply when
Large file processing (CSV, logs, JSON lines) — read and process one line at a time
Real-time data: sensor feeds, financial ticks, user event streams — process as data arrives
ETL pipelines: extract from source, transform, load to destination — each stage is a stream processor
HTTP streaming responses: send results to the client as they're computed
Any situation where loading all data into memory is not feasible or not necessary
Check Your Understanding
You need to process a 100GB log file to count ERROR occurrences by service name. Which approach works correctly?
◀ PREVTrampolining & Stack Safety