Breaking
TUTORIALSNode.js streamsnodewire.net →

Node.js streams in 2026: a practical tutorial for real workloads

Node.js streams in 2026: pipeline() over pipe(), backpressure that actually works, async iteration, the four stream types, and a real workload that streams a 4 GB CSV through CSV parsing into S3 in 90 seconds.

The first time I needed Node.js streams in production, I was uploading 4 GB CSV exports from a logistics SaaS into S3 nightly. The naive version — read the file into memory, parse it, post the result — worked on the dev machine and OOM-killed the production worker the first night it ran. Switching to a stream pipeline (file → CSV parser → S3 upload) ran the same job in 90 seconds with peak memory of 30 MB. That gap is the entire reason streams exist.

The article below is what I wish someone had handed me before that incident. Practical, working code on Node 20 LTS and Node 22, focused on the workloads streams actually solve in 2026: large file processing, S3 uploads, HTTP response transformation, and live data feeds. Async iterators and the Web Streams API get equal attention because in modern Node both are first-class.

Quick start: stream a 1 GB file through a transform in 30 lines

Working baseline. The rest of the article hardens this for production.

bash
npm install csv-parse
TypeScript
// quickstart.ts
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { Transform } from 'node:stream';
import { parse } from 'csv-parse';

await pipeline(
  createReadStream('orders.csv'),
  parse({ columns: true }),                      // CSV row → object
  new Transform({
    objectMode: true,
    transform(row, _enc, cb) {
      // your business logic per row
      const out = { id: row.id, amount: Number(row.amount) * 1.1 };
      cb(null, JSON.stringify(out) + '\n');
    },
  }),
  createWriteStream('orders.jsonl')
);

console.log('done');
bash
npx tsx quickstart.ts

One million rows, peak memory ~50 MB regardless of input size. The trick is pipeline() — it wires the streams together with backpressure handled and errors propagated.

What is wrong with the typical “streams in Node” tutorial

Five things most stream tutorials get wrong:

  1. They show .pipe(), which silently swallows errors. A failure in any link of the chain leaves you with a half-written file and no exception.
  2. They skip backpressure. The whole point of streams is to slow the producer when the consumer can’t keep up. Tutorials that readable.on('data', ...) without honouring writable.write() return values defeat this.
  3. They don’t explain object mode. Streams default to byte mode; for “row-shaped” data (CSV records, JSON lines, parsed events), you need objectMode: true.
  4. They predate async iterators. Node 10+ supports for await (const chunk of stream) — usually clearer than callback-based event handlers.
  5. They never mention Web Streams. Node 18+ ships the standards-compliant Web Streams API. Bridging between Node streams and Web streams is needed if you call fetch().

The four stream types in 30 seconds

Type Direction Examples
Readable Source — produces data createReadStream, process.stdin, HTTP request body
Writable Sink — consumes data createWriteStream, process.stdout, HTTP response
Duplex Both — read and write are independent TCP socket, WebSocket
Transform Duplex where output depends on input gzip, CSV parser, JSON encoder

You will spend 90% of your time on Readable and Transform. Duplex appears when you write low-level network code; Writable usually shows up as the final destination of a pipeline.

pipeline() vs pipe(): use pipeline, every time

The single highest-leverage upgrade in any streams codebase: replace .pipe() with pipeline().

TypeScript
// .pipe() — error in any step is swallowed
createReadStream('big.csv')
  .pipe(parse())
  .pipe(transform)
  .pipe(createWriteStream('out.json'));      // if transform throws, what happens?
TypeScript
import { pipeline } from 'node:stream/promises';

// pipeline — errors propagate, returns a promise
try {
  await pipeline(
    createReadStream('big.csv'),
    parse(),
    transform,
    createWriteStream('out.json'),
  );
} catch (err) {
  console.error('Pipeline failed:', err);
  // partial files cleaned up automatically
}

The promise-based pipeline from node:stream/promises is the API you want. It auto-destroys all streams on failure and prevents the leak-the-file-descriptor pattern that .pipe() creates.

Backpressure: what it is and how to honour it

Backpressure is the mechanism that prevents a fast producer from drowning a slow consumer. Read a 10 GB file faster than you can write it; without backpressure, you buffer 10 GB in memory.

The contract is simple: writable.write(chunk) returns false when the writer’s internal buffer is full. The producer must stop writing until the writer emits 'drain'. pipeline() handles all of this automatically.

Manual demonstration of why it matters:

TypeScript
// WRONG — ignores backpressure, buffers everything
readable.on('data', (chunk) => {
  writable.write(chunk);                       // no check, no wait
});

// RIGHT — honours backpressure
readable.on('data', (chunk) => {
  if (!writable.write(chunk)) {
    readable.pause();                          // tell upstream to stop
    writable.once('drain', () => readable.resume());
  }
});

// BEST — pipeline handles all of this
import { pipeline } from 'node:stream/promises';
await pipeline(readable, writable);

You almost never need to write the second pattern by hand. pipeline() exists exactly so you don’t have to.

Async iterators: the modern way to consume a stream

Any Readable stream is an async iterable in Node 10+. Two consumption patterns side by side:

TypeScript
// Old — event listeners
function readFile(path: string): Promise<string[]> {
  return new Promise((resolve, reject) => {
    const lines: string[] = [];
    const stream = createReadStream(path, { encoding: 'utf8' });
    let buffer = '';
    stream.on('data', (chunk) => {
      buffer += chunk;
      const parts = buffer.split('\n');
      buffer = parts.pop() ?? '';
      lines.push(...parts);
    });
    stream.on('end', () => resolve(lines));
    stream.on('error', reject);
  });
}
TypeScript
// New — async iteration with for-await
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';

async function readFile(path: string): Promise<string[]> {
  const lines: string[] = [];
  const rl = createInterface({ input: createReadStream(path), crlfDelay: Infinity });
  for await (const line of rl) lines.push(line);
  return lines;
}

Same correctness. Half the code. Errors throw out of the for await like any normal exception. The async iterator pattern is what I default to for any read-heavy logic in 2026.

Real workload: parse a 4 GB CSV and upload to S3

The job from the opening, hardened. Streams the file through CSV parsing, transforms each row, and uploads to S3 — peak memory under 100 MB no matter how big the input.

bash
npm install csv-parse @aws-sdk/client-s3 @aws-sdk/lib-storage
TypeScript
// upload-csv-to-s3.ts
import { createReadStream } from 'node:fs';
import { Transform, PassThrough } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { parse } from 'csv-parse';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';

const s3 = new S3Client({ region: 'us-east-1' });
const passthrough = new PassThrough();

const rowToJson = new Transform({
  objectMode: true,
  transform(row, _enc, cb) {
    const out = { id: row.id, total: Number(row.amount) * 1.1 };
    cb(null, JSON.stringify(out) + '\n');
  },
});

// Start the upload in parallel — Upload reads from passthrough as it gets data
const upload = new Upload({
  client: s3,
  params: {
    Bucket: 'orders-export',
    Key: `${new Date().toISOString().slice(0, 10)}.jsonl`,
    Body: passthrough,
  },
});
const uploadPromise = upload.done();

await pipeline(
  createReadStream('orders.csv'),
  parse({ columns: true }),
  rowToJson,
  passthrough,
);

await uploadPromise;
console.log('uploaded');

Three details that matter at scale:

  • @aws-sdk/lib-storage’s Upload handles multipart upload automatically — required for files larger than 5 GB on S3.
  • PassThrough as the bridge between pipeline and the upload. Pipeline owns one writable; Upload.body wants a readable. PassThrough is both.
  • Start the upload before pipeline finishes. The two run concurrently — bytes flow through pipeline → passthrough → upload while the file is still being read.

HTTP body streaming: don’t load the request into memory

Express, Fastify, and the native http module all expose request and response as streams. Use them.

TypeScript
// Stream a large request body to disk without ever buffering in memory
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

app.post('/upload', async (req, res) => {
  await pipeline(req, createWriteStream(`/tmp/${Date.now()}`));
  res.json({ ok: true });
});
TypeScript
// Stream a large response from a downstream API back to the client
app.get('/proxy', async (req, res) => {
  const upstream = await fetch('https://example.com/big-file');
  res.setHeader('Content-Type', upstream.headers.get('content-type') ?? 'application/octet-stream');
  // upstream.body is a Web ReadableStream; convert to Node stream:
  const { Readable } = await import('node:stream');
  await pipeline(Readable.fromWeb(upstream.body!), res);
});

The Readable.fromWeb bridge is the line most people miss. fetch() returns Web Streams; Express expects Node streams. The conversion is one method call.

Web Streams: when you need them

Node 18+ ships the standards-compliant Web Streams API. You will encounter them when you call fetch(), when you ship code that runs on Cloudflare Workers or Deno, or when you use libraries built on Web standards.

Concept Node Streams Web Streams
Readable Readable ReadableStream
Writable Writable WritableStream
Transform Transform TransformStream
Pipe pipeline() readable.pipeTo(writable)
Iterate for await (chunk of readable) for await (chunk of readable)

Bridge functions: Readable.fromWeb(), Readable.toWeb(), Writable.fromWeb(), Writable.toWeb(). Node’s Web Streams docs cover the surface.

Production checklist

  • pipeline() from node:stream/promises for any multi-step stream chain. Never .pipe().
  • Use async iteration for read-only consumers; clearer code, real exceptions.
  • objectMode: true when chunks are JS objects (parsed CSV rows, JSON lines, events).
  • Set highWaterMark intentionally. Default is 16 KB for byte streams, 16 objects for object mode. Tune for your workload — too small and you context-switch constantly; too large and you defeat backpressure.
  • Always destroy streams on error. pipeline() does this automatically; manual code must.
  • Set timeouts on stream operations. A hung upstream can leave a stream waiting forever; wrap in AbortSignal.timeout() for HTTP-driven streams.
  • Profile peak memory under realistic load. A “stream pipeline” with a hidden buffer-everything step looks like streams and costs like the naive version.
  • Bridge Web Streams ↔ Node Streams with the helper methods. Mixing the two without conversion silently breaks.

When not to use streams

  • The data fits in memory comfortably. Reading a 100 KB JSON config file and parsing it is one line: JSON.parse(readFileSync(path)). Streams are overkill.
  • You need random access to the data. Streams are sequential by design. If you need to seek, slice, or re-read, load it into memory or use a database.
  • The transformation depends on the whole dataset. Sorting, deduplicating, computing averages — streams can do these, but it is awkward and slower than loading into memory if memory allows. Use streams for filtering and per-row transforms; switch to in-memory or DB for aggregations.

Troubleshooting FAQ

Why is my stream slow even though it should be fast?

Three usual causes: missing objectMode when you expect objects (each chunk gets serialised to a string), highWaterMark too small (constant context switches), or a synchronous operation in your transform blocking the event loop.

What happens to memory when a stream errors mid-flow?

With pipeline(): every stream is destroyed, file descriptors close, partial files remain on disk (clean up explicitly). With .pipe(): the broken stream stops, the others may keep buffering forever — file descriptor leak.

Should I use Highland.js or most?

No, in 2026. The native streams API plus async iterators covers what these libraries used to add. The smaller dependency footprint matters; the API ergonomics gap closed.

Can I use streams with TypeScript ergonomically?

Yes. Readable<T>-style generic typing is patchy in @types/node; use a thin wrapper or community type packages for typed object-mode streams.

I get “Cannot find module ‘node:stream/promises'” — what version of Node do I need?

Node 16.0+. The error usually means an outdated Node binary on the server; the resolution diagnostic tree covers the broader pattern.

How do I stream from a database query?

Most Postgres clients support a streaming mode. pg.Cursor gives you a row-at-a-time iterator. Prisma 5+ supports findMany with cursor-based pagination — not technically a stream, but functionally equivalent for chunked exports. My Postgres + Prisma setup covers cursor pagination.

What is the difference between readable and data events?

data is push-based — fires whenever a chunk is available. readable is pull-based — you get notified that data is available, then you call .read(). For 95% of code, async iteration is clearer than either.

Are Node streams compatible with Bun and Deno?

Bun supports them natively. Deno does in compatibility mode (deno run --node or node: imports). For maximum portability across all three runtimes, write Web Streams and bridge to Node streams only at the edges.

How do I test stream code?

Convert the readable to an array of chunks and assert on the array. Array.fromAsync(stream) in Node 22+ does this in one call.

What ships next

This article covers the core Node streams API. The natural next steps: a deep dive on backpressure tuning under real load, and a comparison of Web Streams vs Node Streams when you need to ship the same code on Cloudflare Workers and Node. If your stream code leaks memory, the diagnosis pattern is the same as any other Node memory leak — heap snapshots, retainer tree, fix the unbounded buffer.