Node.js Streams — Deep Dive
What Are Streams?
Streams process data piece by piece (chunks) instead of loading everything into memory. Essential for:
- Large file processing
- HTTP request/response bodies
- Video/audio streaming
- ETL pipelines
- Real-time data transformation
Without streams: With streams:
┌─────────────┐ ┌──────┐ ┌──────┐ ┌──────┐
│ Read entire │ │Chunk1│───►│Trans-│───►│Chunk1│
│ file to │ │Chunk2│ │ form │ │Chunk2│
│ memory │ │Chunk3│ │ │ │Chunk3│
│ Transform │ └──────┘ └──────┘ └──────┘
│ all at │
│ once │
└─────────────┘
Memory = file size Memory = chunk size (constant!)Stream Types
| Type | Description | Example |
|---|---|---|
Readable |
Source of data | fs.createReadStream, http.IncomingMessage |
Writable |
Destination | fs.createWriteStream, http.ServerResponse |
Transform |
Read + Write + Transform | zlib.createGzip, crypto.createCipher |
Duplex |
Read + Write (independent) | net.Socket, WebSocket |
PassThrough |
Transform that passes through | Logging, metrics |
Readable Streams
Two Modes
Paused (pull): Consumer controls when data arrives.
jsconst readable = fs.createReadStream('large.csv');
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read(64 * 1024)) !== null) {
// Process 64KB at a time
process(chunk);
}
});Flowing (push): Data pushed to consumer as fast as it arrives.
jsreadable.on('data', (chunk) => {
process(chunk);
});
readable.on('end', () => console.log('Done'));
readable.on('error', (err) => console.error(err));Modes transition
Paused (default) ──► Flowing (on('data'), pipe(), resume())
Flowing ──► Paused (pause(), remove 'data' listener, unpipe())Reading from iterables (Node 12+)
jsimport { Readable } from 'stream';
// Create readable from async generator
async function* generateData() {
for (let i = 0; i < 1000000; i++) {
yield Buffer.from(`line ${i}\n`);
if (i % 1000 === 0) await new Promise(r => setImmediate(r));
}
}
const readable = Readable.from(generateData());
readable.pipe(fs.createWriteStream('output.txt'));Writable Streams
jsimport { Writable } from 'stream';
class DatabaseWritable extends Writable {
constructor(db, options = {}) {
super({ ...options, objectMode: true }); // accept objects, not just buffers
this.db = db;
this.batch = [];
this.batchSize = options.batchSize ?? 100;
}
async _write(chunk, encoding, callback) {
this.batch.push(chunk);
if (this.batch.length >= this.batchSize) {
try {
await this.db.batchInsert(this.batch);
this.batch = [];
callback(); // signal ready for next chunk
} catch (err) {
callback(err); // signal error
}
} else {
callback(); // ready immediately
}
}
async _final(callback) {
// Called when all writes done, before 'finish' event
try {
if (this.batch.length > 0) {
await this.db.batchInsert(this.batch);
}
callback();
} catch (err) {
callback(err);
}
}
}
const writer = new DatabaseWritable(db, { batchSize: 500 });
csvStream.pipe(writer);
writer.on('finish', () => console.log('All data written'));_writev — batch writes
jsclass BatchWritable extends Writable {
// _writev is called with multiple buffered chunks at once
async _writev(chunks, callback) {
const data = chunks.map(({ chunk }) => chunk);
try {
await this.db.batchInsert(data);
callback();
} catch (err) {
callback(err);
}
}
}Transform Streams — The Core Pattern
Transform is a Duplex where output is derived from input.
jsimport { Transform } from 'stream';
class CSVParser extends Transform {
constructor(options = {}) {
super({ ...options, objectMode: true }); // output objects
this.buffer = '';
this.headers = null;
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
// Keep incomplete last line in buffer
this.buffer = lines.pop();
for (const line of lines) {
if (!line.trim()) continue;
if (!this.headers) {
this.headers = line.split(',').map(h => h.trim());
} else {
const values = line.split(',');
const obj = Object.fromEntries(
this.headers.map((h, i) => [h, values[i]?.trim()])
);
this.push(obj); // push to readable side
}
}
callback();
}
_flush(callback) {
// Called when input ends — process remaining buffer
if (this.buffer.trim() && this.headers) {
const values = this.buffer.split(',');
const obj = Object.fromEntries(
this.headers.map((h, i) => [h, values[i]?.trim()])
);
this.push(obj);
}
callback();
}
}
// Usage
fs.createReadStream('users.csv')
.pipe(new CSVParser())
.pipe(new Transform({
objectMode: true,
transform(user, enc, cb) {
// Transform object
cb(null, { ...user, age: parseInt(user.age) });
}
}))
.pipe(new DatabaseWritable(db));Transform with async operations
jsclass AsyncTransform extends Transform {
constructor(fn, options = {}) {
super({ ...options, objectMode: true });
this.fn = fn;
this.pending = 0;
this.flushCallback = null;
}
_transform(chunk, enc, callback) {
this.pending++;
callback(); // accept next chunk immediately (parallel processing)
this.fn(chunk).then(result => {
this.push(result);
this.pending--;
if (this.pending === 0 && this.flushCallback) {
this.flushCallback();
}
}).catch(err => this.destroy(err));
}
_flush(callback) {
if (this.pending === 0) callback();
else this.flushCallback = callback; // wait for pending
}
}
// Usage: parallel HTTP requests in stream pipeline
const enrichStream = new AsyncTransform(async (user) => {
const extra = await fetch(`/api/enrich/${user.id}`).then(r => r.json());
return { ...user, ...extra };
}, { highWaterMark: 16 }); // limit concurrent requestsBackpressure — The Critical Concept
Backpressure = signal from consumer to producer to slow down.
Without backpressure:
Producer: ████████████████████████ (fast)
Consumer: ████░░░░░░░░░░░░░░░░░░░░ (slow)
Buffer: ▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲ (grows → OOM!)With backpressure:
Producer: ████░░░████░░░████░░░ (throttled by consumer)
Consumer: ████ ████ ████
Buffer: ▲▲▲ ▲▲▲ ▲▲▲ (stays bounded)How it works with pipe()
jsreadable.pipe(writable);
// pipe handles backpressure automatically:
// - writable.write() returns false when buffer is full (highWaterMark reached)
// - pipe pauses readable
// - writable emits 'drain' when buffer empties
// - pipe resumes readableManual backpressure (without pipe)
jsfunction copy(readable, writable) {
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// Writable buffer full — pause reading
readable.pause();
writable.once('drain', () => {
// Writable drained — resume reading
readable.resume();
});
}
});
readable.on('end', () => writable.end());
}highWaterMark
Controls buffer size (in bytes for binary, objects for objectMode).
js// Binary streams: highWaterMark in bytes (default: 16KB)
const readable = fs.createReadStream('file.txt', {
highWaterMark: 64 * 1024 // 64KB chunks
});
// Object mode: highWaterMark in objects (default: 16)
const transform = new Transform({
objectMode: true,
highWaterMark: 100, // buffer up to 100 objects
});stream.pipeline() — Error Handling
pipe() doesn't handle errors well — if a stream in the middle errors, the others aren't cleaned up.
js// BAD — resource leak on error
readable.pipe(transform).pipe(writable);
// GOOD — pipeline destroys all streams on error
import { pipeline } from 'stream/promises';
async function processFile(inputPath, outputPath) {
await pipeline(
fs.createReadStream(inputPath),
new CSVParser(),
new Transform({
objectMode: true,
transform(row, enc, cb) {
cb(null, JSON.stringify(row) + '\n');
}
}),
zlib.createGzip(),
fs.createWriteStream(outputPath + '.gz')
);
}
// With abort signal
const controller = new AbortController();
setTimeout(() => controller.abort(), 30000); // timeout
await pipeline(
source,
transform,
destination,
{ signal: controller.signal }
);Streams as Async Iterables (Node 12+)
js// Readable streams implement async iterable
async function processLines(filepath) {
const stream = fs.createReadStream(filepath);
// readline interface as async iterable
const rl = readline.createInterface({ input: stream });
let lineCount = 0;
for await (const line of rl) {
processLine(line);
lineCount++;
}
return lineCount;
}
// Collecting stream to buffer
async function streamToBuffer(readable) {
const chunks = [];
for await (const chunk of readable) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}
// Streaming HTTP response in Node.js
app.get('/stream', async (req, res) => {
res.setHeader('Content-Type', 'application/json');
res.write('[');
let first = true;
for await (const user of db.streamUsers()) {
if (!first) res.write(',');
res.write(JSON.stringify(user));
first = false;
}
res.write(']');
res.end();
});Real-World Pipeline: ETL with Streams
jsimport { pipeline } from 'stream/promises';
import { Transform, PassThrough } from 'stream';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
import { createInterface } from 'readline';
class LineTransform extends Transform {
constructor() {
super({ readableObjectMode: true });
this.buffer = '';
}
_transform(chunk, enc, cb) {
this.buffer += chunk;
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
lines.forEach(line => this.push(line));
cb();
}
_flush(cb) {
if (this.buffer) this.push(this.buffer);
cb();
}
}
class JSONParser extends Transform {
constructor() {
super({ writableObjectMode: true, readableObjectMode: true });
}
_transform(line, enc, cb) {
try {
cb(null, JSON.parse(line));
} catch {
cb(); // skip invalid lines
}
}
}
class Filter extends Transform {
constructor(fn) {
super({ objectMode: true });
this.fn = fn;
}
_transform(obj, enc, cb) {
if (this.fn(obj)) this.push(obj);
cb();
}
}
class Counter extends PassThrough {
constructor() {
super({ objectMode: true });
this.count = 0;
}
_transform(obj, enc, cb) {
this.count++;
cb(null, obj);
}
}
const counter = new Counter();
await pipeline(
createReadStream('events.log'),
new LineTransform(),
new JSONParser(),
new Filter(event => event.type === 'purchase' && event.amount > 100),
counter,
new Transform({
objectMode: true,
transform(obj, enc, cb) {
cb(null, JSON.stringify(obj) + '\n');
}
}),
createWriteStream('filtered.jsonl')
);
console.log(`Processed ${counter.count} matching events`);Memory Profile Comparison
js// Without streams — OOM on large files
const data = fs.readFileSync('10gb.csv'); // 10GB in memory
const parsed = parse(data.toString()); // another 10GB+
await db.insertMany(parsed);
// With streams — constant ~16KB memory
await pipeline(
fs.createReadStream('10gb.csv', { highWaterMark: 16384 }),
new CSVParser(),
new DatabaseWritable(db, { batchSize: 1000 }),
);
// Processes 10GB with <50MB memory usageInterview Questions
Q: What is backpressure and how does Node.js handle it?
Backpressure is the mechanism for a fast producer to know when to slow down for a slow consumer. In Node.js streams, writable.write() returns false when the internal buffer exceeds highWaterMark. The producer should stop writing and wait for the 'drain' event. pipe() handles this automatically; manual pipes must implement it or risk memory exhaustion.
Q: What's the difference between pipe() and pipeline()?
pipe() returns the destination stream and handles backpressure, but does NOT propagate errors — a stream error in the middle will leave other streams open (resource leak). pipeline() (Node 10+) destroys all streams on error and supports abort signals. Always use pipeline() for production code.
Q: When would you use a Transform stream vs. processing in memory? Transform streams when: data is large (wouldn't fit in memory), data arrives incrementally (HTTP, files), or you want to compose processing stages. In-memory when: data is small and already buffered, or the processing logic is complex and doesn't benefit from streaming.
Q: What is objectMode in streams?
By default, streams work with Buffers/strings. objectMode: true allows pushing any JavaScript value (objects, arrays). Required when building data processing pipelines with parsed records. Note: highWaterMark in objectMode counts objects, not bytes.
Q: How do you handle errors in a stream pipeline?
Use stream.pipeline() which destroys all streams on first error and calls the callback/rejects the promise. For individual streams: listen to 'error' event. Unhandled stream errors crash the process (like unhandled promise rejections). Always either use pipeline() or attach error handlers to every stream.