The first time I needed Node.js streams in production, I was uploading 4 GB CSV exports from a logistics SaaS into S3 nightly. The naive version — read the file into memory, parse it, post the result — worked on the dev machine and OOM-killed the production worker the first night it ran. Switching to a stream pipeline (file → CSV parser → S3 upload) ran the same job in 90 seconds with peak memory of 30 MB. That gap is the entire reason streams exist.
The article below is what I wish someone had handed me before that incident. Practical, working code on Node 20 LTS and Node 22, focused on the workloads streams actually solve in 2026: large file processing, S3 uploads, HTTP response transformation, and live data feeds. Async iterators and the Web Streams API get equal attention because in modern Node both are first-class.
Quick start: stream a 1 GB file through a transform in 30 lines
Working baseline. The rest of the article hardens this for production.
npm install csv-parse// quickstart.ts
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { Transform } from 'node:stream';
import { parse } from 'csv-parse';
await pipeline(
createReadStream('orders.csv'),
parse({ columns: true }), // CSV row → object
new Transform({
objectMode: true,
transform(row, _enc, cb) {
// your business logic per row
const out = { id: row.id, amount: Number(row.amount) * 1.1 };
cb(null, JSON.stringify(out) + '\n');
},
}),
createWriteStream('orders.jsonl')
);
console.log('done');npx tsx quickstart.tsOne million rows, peak memory ~50 MB regardless of input size. The trick is pipeline() — it wires the streams together with backpressure handled and errors propagated.
What is wrong with the typical “streams in Node” tutorial
Five things most stream tutorials get wrong:
- They show
.pipe(), which silently swallows errors. A failure in any link of the chain leaves you with a half-written file and no exception. - They skip backpressure. The whole point of streams is to slow the producer when the consumer can’t keep up. Tutorials that
readable.on('data', ...)without honouringwritable.write()return values defeat this. - They don’t explain object mode. Streams default to byte mode; for “row-shaped” data (CSV records, JSON lines, parsed events), you need
objectMode: true. - They predate async iterators. Node 10+ supports
for await (const chunk of stream)— usually clearer than callback-based event handlers. - They never mention Web Streams. Node 18+ ships the standards-compliant Web Streams API. Bridging between Node streams and Web streams is needed if you call
fetch().
The four stream types in 30 seconds
| Type | Direction | Examples |
|---|---|---|
| Readable | Source — produces data | createReadStream, process.stdin, HTTP request body |
| Writable | Sink — consumes data | createWriteStream, process.stdout, HTTP response |
| Duplex | Both — read and write are independent | TCP socket, WebSocket |
| Transform | Duplex where output depends on input | gzip, CSV parser, JSON encoder |
You will spend 90% of your time on Readable and Transform. Duplex appears when you write low-level network code; Writable usually shows up as the final destination of a pipeline.
pipeline() vs pipe(): use pipeline, every time
The single highest-leverage upgrade in any streams codebase: replace .pipe() with pipeline().
// .pipe() — error in any step is swallowed
createReadStream('big.csv')
.pipe(parse())
.pipe(transform)
.pipe(createWriteStream('out.json')); // if transform throws, what happens?import { pipeline } from 'node:stream/promises';
// pipeline — errors propagate, returns a promise
try {
await pipeline(
createReadStream('big.csv'),
parse(),
transform,
createWriteStream('out.json'),
);
} catch (err) {
console.error('Pipeline failed:', err);
// partial files cleaned up automatically
}The promise-based pipeline from node:stream/promises is the API you want. It auto-destroys all streams on failure and prevents the leak-the-file-descriptor pattern that .pipe() creates.
Backpressure: what it is and how to honour it
Backpressure is the mechanism that prevents a fast producer from drowning a slow consumer. Read a 10 GB file faster than you can write it; without backpressure, you buffer 10 GB in memory.
The contract is simple: writable.write(chunk) returns false when the writer’s internal buffer is full. The producer must stop writing until the writer emits 'drain'. pipeline() handles all of this automatically.
Manual demonstration of why it matters:
// WRONG — ignores backpressure, buffers everything
readable.on('data', (chunk) => {
writable.write(chunk); // no check, no wait
});
// RIGHT — honours backpressure
readable.on('data', (chunk) => {
if (!writable.write(chunk)) {
readable.pause(); // tell upstream to stop
writable.once('drain', () => readable.resume());
}
});
// BEST — pipeline handles all of this
import { pipeline } from 'node:stream/promises';
await pipeline(readable, writable);You almost never need to write the second pattern by hand. pipeline() exists exactly so you don’t have to.
Async iterators: the modern way to consume a stream
Any Readable stream is an async iterable in Node 10+. Two consumption patterns side by side:
// Old — event listeners
function readFile(path: string): Promise<string[]> {
return new Promise((resolve, reject) => {
const lines: string[] = [];
const stream = createReadStream(path, { encoding: 'utf8' });
let buffer = '';
stream.on('data', (chunk) => {
buffer += chunk;
const parts = buffer.split('\n');
buffer = parts.pop() ?? '';
lines.push(...parts);
});
stream.on('end', () => resolve(lines));
stream.on('error', reject);
});
}// New — async iteration with for-await
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
async function readFile(path: string): Promise<string[]> {
const lines: string[] = [];
const rl = createInterface({ input: createReadStream(path), crlfDelay: Infinity });
for await (const line of rl) lines.push(line);
return lines;
}Same correctness. Half the code. Errors throw out of the for await like any normal exception. The async iterator pattern is what I default to for any read-heavy logic in 2026.
Real workload: parse a 4 GB CSV and upload to S3
The job from the opening, hardened. Streams the file through CSV parsing, transforms each row, and uploads to S3 — peak memory under 100 MB no matter how big the input.
npm install csv-parse @aws-sdk/client-s3 @aws-sdk/lib-storage// upload-csv-to-s3.ts
import { createReadStream } from 'node:fs';
import { Transform, PassThrough } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { parse } from 'csv-parse';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3 = new S3Client({ region: 'us-east-1' });
const passthrough = new PassThrough();
const rowToJson = new Transform({
objectMode: true,
transform(row, _enc, cb) {
const out = { id: row.id, total: Number(row.amount) * 1.1 };
cb(null, JSON.stringify(out) + '\n');
},
});
// Start the upload in parallel — Upload reads from passthrough as it gets data
const upload = new Upload({
client: s3,
params: {
Bucket: 'orders-export',
Key: `${new Date().toISOString().slice(0, 10)}.jsonl`,
Body: passthrough,
},
});
const uploadPromise = upload.done();
await pipeline(
createReadStream('orders.csv'),
parse({ columns: true }),
rowToJson,
passthrough,
);
await uploadPromise;
console.log('uploaded');Three details that matter at scale:
- @aws-sdk/lib-storage’s
Uploadhandles multipart upload automatically — required for files larger than 5 GB on S3. PassThroughas the bridge betweenpipelineand the upload. Pipeline owns one writable;Upload.bodywants a readable. PassThrough is both.- Start the upload before pipeline finishes. The two run concurrently — bytes flow through pipeline → passthrough → upload while the file is still being read.
HTTP body streaming: don’t load the request into memory
Express, Fastify, and the native http module all expose request and response as streams. Use them.
// Stream a large request body to disk without ever buffering in memory
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
app.post('/upload', async (req, res) => {
await pipeline(req, createWriteStream(`/tmp/${Date.now()}`));
res.json({ ok: true });
});// Stream a large response from a downstream API back to the client
app.get('/proxy', async (req, res) => {
const upstream = await fetch('https://example.com/big-file');
res.setHeader('Content-Type', upstream.headers.get('content-type') ?? 'application/octet-stream');
// upstream.body is a Web ReadableStream; convert to Node stream:
const { Readable } = await import('node:stream');
await pipeline(Readable.fromWeb(upstream.body!), res);
});The Readable.fromWeb bridge is the line most people miss. fetch() returns Web Streams; Express expects Node streams. The conversion is one method call.
Web Streams: when you need them
Node 18+ ships the standards-compliant Web Streams API. You will encounter them when you call fetch(), when you ship code that runs on Cloudflare Workers or Deno, or when you use libraries built on Web standards.
| Concept | Node Streams | Web Streams |
|---|---|---|
| Readable | Readable |
ReadableStream |
| Writable | Writable |
WritableStream |
| Transform | Transform |
TransformStream |
| Pipe | pipeline() |
readable.pipeTo(writable) |
| Iterate | for await (chunk of readable) |
for await (chunk of readable) |
Bridge functions: Readable.fromWeb(), Readable.toWeb(), Writable.fromWeb(), Writable.toWeb(). Node’s Web Streams docs cover the surface.
Production checklist
pipeline()fromnode:stream/promisesfor any multi-step stream chain. Never.pipe().- Use async iteration for read-only consumers; clearer code, real exceptions.
objectMode: truewhen chunks are JS objects (parsed CSV rows, JSON lines, events).- Set
highWaterMarkintentionally. Default is 16 KB for byte streams, 16 objects for object mode. Tune for your workload — too small and you context-switch constantly; too large and you defeat backpressure. - Always destroy streams on error.
pipeline()does this automatically; manual code must. - Set timeouts on stream operations. A hung upstream can leave a stream waiting forever; wrap in
AbortSignal.timeout()for HTTP-driven streams. - Profile peak memory under realistic load. A “stream pipeline” with a hidden buffer-everything step looks like streams and costs like the naive version.
- Bridge Web Streams ↔ Node Streams with the helper methods. Mixing the two without conversion silently breaks.
When not to use streams
- The data fits in memory comfortably. Reading a 100 KB JSON config file and parsing it is one line:
JSON.parse(readFileSync(path)). Streams are overkill. - You need random access to the data. Streams are sequential by design. If you need to seek, slice, or re-read, load it into memory or use a database.
- The transformation depends on the whole dataset. Sorting, deduplicating, computing averages — streams can do these, but it is awkward and slower than loading into memory if memory allows. Use streams for filtering and per-row transforms; switch to in-memory or DB for aggregations.
Troubleshooting FAQ
Why is my stream slow even though it should be fast?
Three usual causes: missing objectMode when you expect objects (each chunk gets serialised to a string), highWaterMark too small (constant context switches), or a synchronous operation in your transform blocking the event loop.
What happens to memory when a stream errors mid-flow?
With pipeline(): every stream is destroyed, file descriptors close, partial files remain on disk (clean up explicitly). With .pipe(): the broken stream stops, the others may keep buffering forever — file descriptor leak.
Should I use Highland.js or most?
No, in 2026. The native streams API plus async iterators covers what these libraries used to add. The smaller dependency footprint matters; the API ergonomics gap closed.
Can I use streams with TypeScript ergonomically?
Yes. Readable<T>-style generic typing is patchy in @types/node; use a thin wrapper or community type packages for typed object-mode streams.
I get “Cannot find module ‘node:stream/promises'” — what version of Node do I need?
Node 16.0+. The error usually means an outdated Node binary on the server; the resolution diagnostic tree covers the broader pattern.
How do I stream from a database query?
Most Postgres clients support a streaming mode. pg.Cursor gives you a row-at-a-time iterator. Prisma 5+ supports findMany with cursor-based pagination — not technically a stream, but functionally equivalent for chunked exports. My Postgres + Prisma setup covers cursor pagination.
What is the difference between readable and data events?
data is push-based — fires whenever a chunk is available. readable is pull-based — you get notified that data is available, then you call .read(). For 95% of code, async iteration is clearer than either.
Are Node streams compatible with Bun and Deno?
Bun supports them natively. Deno does in compatibility mode (deno run --node or node: imports). For maximum portability across all three runtimes, write Web Streams and bridge to Node streams only at the edges.
How do I test stream code?
Convert the readable to an array of chunks and assert on the array. Array.fromAsync(stream) in Node 22+ does this in one call.
What ships next
This article covers the core Node streams API. The natural next steps: a deep dive on backpressure tuning under real load, and a comparison of Web Streams vs Node Streams when you need to ship the same code on Cloudflare Workers and Node. If your stream code leaks memory, the diagnosis pattern is the same as any other Node memory leak — heap snapshots, retainer tree, fix the unbounded buffer.