Breaking
Editorial Node.js streams cover with transparent pipeline tubes carrying chunk cards, readable and writable stream modules, transform stages, backpressure valves, high-water mark gauges, buffer trays, and server hardware

Node.js streams in 2026: a practical tutorial for real workloads

Node.js streams in 2026: pipeline() over pipe(), backpressure that actually works, async iteration, the four stream types, and a real workload that streams a 4 GB CSV through CSV parsing into S3 in 90 seconds.

How this was written

Drafted in plain Markdown by Ethan Laurent and edited against current Node.js, framework and tooling docs. Every command, code block and benchmark in this article was run on Node 24 LTS before publish; if a step does not work on your machine the post is wrong, not you — email and I will fix it.

AI is used as a research and outline assistant only — never as a single-source author. Full editorial policy: About / How nodewire is written.

The first time I needed Node.js streams in production, I was uploading 4 GB CSV exports from a logistics SaaS into S3 nightly. The naive version — read the file into memory, parse it, post the result — worked on the dev machine and OOM-killed the production worker the first night it ran. Switching to a stream pipeline (file → CSV parser → S3 upload) ran the same job in 90 seconds with peak memory of 30 MB. That gap is the entire reason streams exist.

The article below is what I wish someone had handed me before that incident. Practical, working code on Node 24 LTS and Node 26, focused on the workloads streams actually solve in 2026: large file processing, S3 uploads, HTTP response transformation, and live data feeds. Async iterators and the Web Streams API get equal attention because in current Node both are first-class. Custom stream implementations, flowing vs paused mode, encryption pipelines, and video streaming with range requests are all covered because competitors rank for those and our previous version skipped them.

Node.js stream monitoring dashboard showing throughput, chunk size distribution, readable buffer length, writable drain events, backpressure gauge, highWaterMark usage, pipeline errors, transform latency, memory usage, and file and network stream status
streams dashboard for spotting backpressure, buffer growth, drain events, and pipeline errors.

Quick start: stream a 1 GB file through a transform in 30 lines

Working baseline. The rest of the article hardens this for production.

bash
npm install csv-parse
TypeScript
// quickstart.ts
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { Transform } from 'node:stream';
import { parse } from 'csv-parse';

await pipeline(
  createReadStream('orders.csv'),
  parse({ columns: true }),                      // CSV row → object
  new Transform({
    objectMode: true,
    transform(row, _enc, cb) {
      // your business logic per row
      const out = { id: row.id, amount: Number(row.amount) * 1.1 };
      cb(null, JSON.stringify(out) + 'n');
    },
  }),
  createWriteStream('orders.jsonl')
);

console.log('done');
bash
npx tsx quickstart.ts

One million rows, peak memory ~50 MB regardless of input size. The trick is pipeline() — it wires the streams together with backpressure handled and errors propagated.

What is wrong with the typical “streams in Node” tutorial

Five things most stream tutorials get wrong:

  1. They show .pipe(), which silently swallows errors. A failure in any link of the chain leaves you with a half-written file and no exception.
  2. They skip backpressure. The whole point of streams is to slow the producer when the consumer can’t keep up. Tutorials that readable.on('data', ...) without honouring writable.write() return values defeat this.
  3. They don’t explain object mode. Streams default to byte mode; for “row-shaped” data (CSV records, JSON lines, parsed events), you need objectMode: true.
  4. They predate async iterators. Node 10+ supports for await (const chunk of stream) — usually clearer than callback-based event handlers.
  5. They never mention Web Streams. Node 18+ ships the standards-compliant Web Streams API. Bridging between Node streams and Web streams is needed if you call fetch().
Node.js stream pipeline diagram showing readable source, transform stages, writable destination, chunk flow, internal buffers, highWaterMark threshold, backpressure signal, drain event, error propagation, cleanup, and destroy-on-failure path
stream pipeline view showing how chunk flow, backpressure, drain, errors, and cleanup fit together.

The four stream types in 30 seconds

Type Direction Examples
Readable Source — produces data createReadStream, process.stdin, HTTP request body
Writable Sink — consumes data createWriteStream, process.stdout, HTTP response
Duplex Both — read and write are independent TCP socket, WebSocket
Transform Duplex where output depends on input gzip, CSV parser, JSON encoder

You will spend 90% of your time on Readable and Transform. Duplex appears when you write low-level network code; Writable usually shows up as the final destination of a pipeline.

Flowing mode vs paused mode

Readable streams have two operating modes and the distinction matters when you are debugging unexpected buffering behaviour.

In flowing mode, data is pushed to you automatically as fast as it arrives. You enter flowing mode when you attach a 'data' event listener, call .resume(), or call .pipe().

TypeScript
import { createReadStream } from 'node:fs';

const stream = createReadStream('./big.log', { encoding: 'utf8' });

// Attaching 'data' immediately switches to flowing mode
stream.on('data', (chunk: string) => {
  console.log('Received', chunk.length, 'bytes');
});

stream.on('end', () => console.log('Done'));
stream.on('error', (err) => console.error(err));

In paused mode, data sits in the internal buffer until you explicitly call .read(). The stream starts paused; you switch to paused-mode consumption via the 'readable' event:

TypeScript
const stream = createReadStream('./data.json', { encoding: 'utf8' });

stream.on('readable', () => {
  let chunk: string | null;
  // Drain the internal buffer — read() returns null when empty
  while ((chunk = stream.read()) !== null) {
    console.log('Pulled', chunk.length, 'bytes');
  }
});

stream.on('end', () => console.log('Done'));

You can pause and resume manually:

TypeScript
stream.on('data', (chunk) => {
  console.log('Got chunk');
  stream.pause();                            // stop receiving data
  setTimeout(() => stream.resume(), 1000);  // resume after 1 second
});

In practice, you will almost never manage modes manually. pipeline() and for await handle it for you — but understanding the model saves you when a stream sits silently doing nothing and you cannot figure out why.

pipeline() vs pipe(): use pipeline, every time

The single biggest-impact upgrade in any streams codebase: replace .pipe() with pipeline().

TypeScript
// .pipe() — error in any step is swallowed
createReadStream('big.csv')
  .pipe(parse())
  .pipe(transform)
  .pipe(createWriteStream('out.json'));      // if transform throws, what happens?
TypeScript
import { pipeline } from 'node:stream/promises';

// pipeline — errors propagate, returns a promise
try {
  await pipeline(
    createReadStream('big.csv'),
    parse(),
    transform,
    createWriteStream('out.json'),
  );
} catch (err) {
  console.error('Pipeline failed:', err);
  // partial files cleaned up automatically
}

The promise-based pipeline from node:stream/promises is the API you want. It auto-destroys all streams on failure and prevents the leak-the-file-descriptor pattern that .pipe() creates.

Backpressure: what it is and how to honour it

Node.js streams backpressure pipeline showing readable source, transform, highWaterMark buffer, slow writable target, and pipeline error handling
Backpressure is the difference between streaming a 4 GB file and accidentally buffering it in memory.

Backpressure is the mechanism that prevents a fast producer from drowning a slow consumer. Read a 10 GB file faster than you can write it; without backpressure, you buffer 10 GB in memory.

The contract is simple: writable.write(chunk) returns false when the writer’s internal buffer is full. The producer must stop writing until the writer emits 'drain'. pipeline() handles all of this automatically.

Manual demonstration of why it matters:

TypeScript
// WRONG — ignores backpressure, buffers everything
readable.on('data', (chunk) => {
  writable.write(chunk);                       // no check, no wait
});

// RIGHT — honours backpressure
readable.on('data', (chunk) => {
  if (!writable.write(chunk)) {
    readable.pause();                          // tell upstream to stop
    writable.once('drain', () => readable.resume());
  }
});

// BEST — pipeline handles all of this
import { pipeline } from 'node:stream/promises';
await pipeline(readable, writable);

You almost never need to write the second pattern by hand. pipeline() exists exactly so you don’t have to.

Async iterators: the modern way to consume a stream

Any Readable stream is an async iterable in Node 10+. Two consumption patterns side by side:

TypeScript
// Old — event listeners
function readFile(path: string): Promise<string[]> {
  return new Promise((resolve, reject) => {
    const lines: string[] = [];
    const stream = createReadStream(path, { encoding: 'utf8' });
    let buffer = '';
    stream.on('data', (chunk) => {
      buffer += chunk;
      const parts = buffer.split('n');
      buffer = parts.pop() ?? '';
      lines.push(...parts);
    });
    stream.on('end', () => resolve(lines));
    stream.on('error', reject);
  });
}
TypeScript
// New — async iteration with for-await
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';

async function readFile(path: string): Promise<string[]> {
  const lines: string[] = [];
  const rl = createInterface({ input: createReadStream(path), crlfDelay: Infinity });
  for await (const line of rl) lines.push(line);
  return lines;
}

Same correctness. Half the code. Errors throw out of the for await like any normal exception. The async iterator pattern is what I default to for any read-heavy logic in 2026.

Custom streams: building your own

Most articles stop at built-in streams. The moment you need domain-specific sources, sinks, or transforms, you need to build your own. Here is each type.

Custom Readable — a number generator

TypeScript
import { Readable } from 'node:stream';

class NumberStream extends Readable {
  private current = 0;
  private max: number;

  constructor(max: number) {
    super();
    this.max = max;
  }

  // _read() is called whenever the consumer wants more data.
  // Push chunks into the buffer; push null to signal end.
  _read(): void {
    if (this.current <= this.max) {
      this.push(String(this.current) + 'n');
      this.current++;
    } else {
      this.push(null);   // EOF
    }
  }
}

// Usage
const ns = new NumberStream(5);
ns.pipe(process.stdout);
// 0n1n2n3n4n5n

Custom Writable — a batch writer

TypeScript
import { Writable } from 'node:stream';
import { createWriteStream } from 'node:fs';

// Accumulates chunks and flushes in batches to reduce I/O syscalls
class BatchWriter extends Writable {
  private batch: string[] = [];
  private readonly batchSize: number;
  private readonly filePath: string;

  constructor(filePath: string, batchSize = 100) {
    super();
    this.filePath = filePath;
    this.batchSize = batchSize;
  }

  _write(chunk: Buffer, _enc: string, callback: (err?: Error | null) => void): void {
    this.batch.push(chunk.toString());
    if (this.batch.length >= this.batchSize) {
      this.flush(callback);
    } else {
      callback();
    }
  }

  _final(callback: (err?: Error | null) => void): void {
    // Flush remaining items when stream ends
    this.flush(callback);
  }

  private flush(done: (err?: Error | null) => void): void {
    const data = this.batch.join('');
    this.batch = [];
    // In practice use async fs.appendFile here
    done();
  }
}

The _final() hook is the one people always forget. It runs after the last _write() and before the 'finish' event fires — the right place to flush a buffer, close a file descriptor, or send a trailing delimiter.

Custom Transform — JSON lines to CSV, with _flush()

TypeScript
import { Transform, TransformCallback } from 'node:stream';

class JsonToCsv extends Transform {
  private headerWritten = false;
  private buffer = '';

  constructor() {
    super();   // byte-in, byte-out
  }

  _transform(chunk: Buffer, _enc: string, callback: TransformCallback): void {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('n');
    // Keep the last (possibly incomplete) line in the buffer
    this.buffer = lines.pop() ?? '';

    for (const line of lines) {
      if (!line.trim()) continue;
      const obj = JSON.parse(line) as Record<string, unknown>;

      if (!this.headerWritten) {
        this.push(Object.keys(obj).join(',') + 'n');
        this.headerWritten = true;
      }

      const values = Object.values(obj).map((v) => {
        const s = String(v);
        return s.includes(',') ? `"${s}"` : s;
      });
      this.push(values.join(',') + 'n');
    }
    callback();
  }

  // _flush runs once at stream end — process remaining buffered bytes
  _flush(callback: TransformCallback): void {
    if (this.buffer.trim()) {
      try {
        const obj = JSON.parse(this.buffer) as Record<string, unknown>;
        const values = Object.values(obj).map(String);
        this.push(values.join(',') + 'n');
      } catch {
        // Ignore malformed trailing fragment
      }
    }
    callback();
  }
}

// Usage
import { pipeline } from 'node:stream/promises';

await pipeline(
  createReadStream('./data.jsonl'),
  new JsonToCsv(),
  createWriteStream('./data.csv'),
);

_flush() is the equivalent of _final() for Transform streams. Without it, the last buffered line silently vanishes when the input ends mid-line — a bug that only shows up with files whose size is not a clean multiple of your chunk size.

Readable.from(): the shortcut for arrays and generators

Creating a Readable from an array or an async generator does not require subclassing:

TypeScript
import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';

// From array
const arrayStream = Readable.from(['line 1n', 'line 2n', 'line 3n']);

// From async generator — lazy, memory-efficient
async function* generateRecords() {
  for (let i = 0; i < 1_000_000; i++) {
    yield JSON.stringify({ id: i, value: Math.random() }) + 'n';
    if (i % 10_000 === 0) await new Promise(r => setTimeout(r, 0)); // yield to event loop
  }
}

await pipeline(
  Readable.from(generateRecords()),
  createWriteStream('million-records.jsonl'),
);

The generator version produces records on demand — the file is written without ever holding a million records in memory. I reach for this pattern whenever I need to stream synthetic or computed data to a file or HTTP response.

Real workload: parse a 4 GB CSV and upload to S3

The job from the opening, hardened. Streams the file through CSV parsing, transforms each row, and uploads to S3 — peak memory under 100 MB no matter how big the input.

bash
npm install csv-parse @aws-sdk/client-s3 @aws-sdk/lib-storage
TypeScript
// upload-csv-to-s3.ts
import { createReadStream } from 'node:fs';
import { Transform, PassThrough } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { parse } from 'csv-parse';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';

const s3 = new S3Client({ region: 'us-east-1' });
const passthrough = new PassThrough();

const rowToJson = new Transform({
  objectMode: true,
  transform(row, _enc, cb) {
    const out = { id: row.id, total: Number(row.amount) * 1.1 };
    cb(null, JSON.stringify(out) + 'n');
  },
});

// Start the upload in parallel — Upload reads from passthrough as it gets data
const upload = new Upload({
  client: s3,
  params: {
    Bucket: 'orders-export',
    Key: `${new Date().toISOString().slice(0, 10)}.jsonl`,
    Body: passthrough,
  },
});
const uploadPromise = upload.done();

await pipeline(
  createReadStream('orders.csv'),
  parse({ columns: true }),
  rowToJson,
  passthrough,
);

await uploadPromise;
console.log('uploaded');

Three details that matter at scale:

  • @aws-sdk/lib-storage’s Upload handles multipart upload automatically — required for files larger than 5 GB on S3.
  • PassThrough as the bridge between pipeline and the upload. Pipeline owns one writable; Upload.body wants a readable. PassThrough is both.
  • Start the upload before pipeline finishes. The two run concurrently — bytes flow through pipeline → passthrough → upload while the file is still being read.

HTTP body streaming: don’t load the request into memory

Express, Fastify, and the native http module all expose request and response as streams. Use them.

TypeScript
// Stream a large request body to disk without ever buffering in memory
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

app.post('/upload', async (req, res) => {
  await pipeline(req, createWriteStream(`/tmp/${Date.now()}`));
  res.json({ ok: true });
});
TypeScript
// Stream a large response from a downstream API back to the client
app.get('/proxy', async (req, res) => {
  const upstream = await fetch('https://example.com/big-file');
  res.setHeader('Content-Type', upstream.headers.get('content-type') ?? 'application/octet-stream');
  // upstream.body is a Web ReadableStream; convert to Node stream:
  const { Readable } = await import('node:stream');
  await pipeline(Readable.fromWeb(upstream.body!), res);
});

The Readable.fromWeb bridge is the line most people miss. fetch() returns Web Streams; Express expects Node streams. The conversion is one method call.

Streaming file uploads with checksum calculation

A pattern that comes up whenever you accept user-generated files: stream the upload directly to disk while simultaneously computing a checksum — no second pass required.

TypeScript
import { createHash, randomUUID } from 'node:crypto';
import { createWriteStream, unlink } from 'node:fs';
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import express from 'express';

const app = express();

app.post('/upload', async (req, res) => {
  const uploadId = randomUUID();
  const outputPath = `/tmp/uploads/${uploadId}`;
  let bytesReceived = 0;
  const hash = createHash('sha256');

  // Pass-through transform that tracks size and feeds the hash
  const tracker = new Transform({
    transform(chunk: Buffer, _enc, cb) {
      bytesReceived += chunk.length;
      hash.update(chunk);
      cb(null, chunk);
    },
  });

  try {
    await pipeline(req, tracker, createWriteStream(outputPath));

    res.json({
      uploadId,
      size: bytesReceived,
      sha256: hash.digest('hex'),
    });
  } catch (err) {
    unlink(outputPath, () => {});  // clean up partial file
    res.status(500).json({ error: (err as Error).message });
  }
});

The trick is the single-pass transform: bytes flow from the request through the tracker (which feeds the hash and counts bytes) to the file, all at once, without buffering anything in memory beyond the current chunk.

Video streaming with range requests

Streaming video is a stream workload that looks exotic but follows the same pattern as file streaming. The extra piece is honouring Range headers so the browser can seek.

TypeScript
import { createServer } from 'node:http';
import { createReadStream, stat } from 'node:fs';
import { promisify } from 'node:util';

const statAsync = promisify(stat);

createServer(async (req, res) => {
  if (req.url !== '/video') { res.end(); return; }

  const videoPath = './video.mp4';
  const stats = await statAsync(videoPath);
  const fileSize = stats.size;
  const range = req.headers.range;

  if (range) {
    const [startStr, endStr] = range.replace(/bytes=/, '').split('-');
    const start = parseInt(startStr, 10);
    const end = endStr ? parseInt(endStr, 10) : fileSize - 1;
    const chunkSize = end - start + 1;

    res.writeHead(206, {
      'Content-Range': `bytes ${start}-${end}/${fileSize}`,
      'Accept-Ranges': 'bytes',
      'Content-Length': chunkSize,
      'Content-Type': 'video/mp4',
    });

    // Stream only the requested byte range
    createReadStream(videoPath, { start, end }).pipe(res);
  } else {
    res.writeHead(200, {
      'Content-Length': fileSize,
      'Content-Type': 'video/mp4',
    });
    createReadStream(videoPath).pipe(res);
  }
}).listen(3000);

The Range header is what lets the browser scrub to the middle of a video without re-downloading from the start. Without 206 support, the browser can not seek — it only plays from the beginning. This pattern scales to audio, PDFs, and any large binary served over HTTP.

Encryption pipeline: AES-256 on large files

The Node.js crypto module’s cipher objects are Transform streams. Plugging them into a pipeline gives you streaming encryption with no memory penalty:

TypeScript
import { createCipheriv, createDecipheriv, randomBytes } from 'node:crypto';
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

const ALGORITHM = 'aes-256-gcm';

async function encryptFile(inputPath: string, outputPath: string) {
  const key = randomBytes(32);   // store securely — losing this loses the file
  const iv = randomBytes(16);

  const cipher = createCipheriv(ALGORITHM, key, iv);

  await pipeline(
    createReadStream(inputPath),
    cipher,
    createWriteStream(outputPath),
  );

  // Return key + iv + auth tag so you can decrypt later
  return { key, iv, authTag: cipher.getAuthTag() };
}

async function decryptFile(
  inputPath: string,
  outputPath: string,
  key: Buffer,
  iv: Buffer,
  authTag: Buffer,
) {
  const decipher = createDecipheriv(ALGORITHM, key, iv);
  decipher.setAuthTag(authTag);

  await pipeline(
    createReadStream(inputPath),
    decipher,
    createWriteStream(outputPath),
  );
}

// Usage
const { key, iv, authTag } = await encryptFile('./secret.txt', './secret.enc');
await decryptFile('./secret.enc', './secret-decrypted.txt', key, iv, authTag);

AES-256-GCM is the right algorithm here: authenticated encryption (the auth tag catches tampering), hardware-accelerated on modern CPUs, and no padding issues. The encryption pipeline processes an arbitrarily large file in constant memory — 30 MB for a 4 GB source is the same as it would be for a 100 MB source.

Web Streams: when you need them

Node 18+ ships the standards-compliant Web Streams API. You will encounter them when you call fetch(), when you ship code that runs on Cloudflare Workers or Deno, or when you use libraries built on Web standards.

Concept Node Streams Web Streams
Readable Readable ReadableStream
Writable Writable WritableStream
Transform Transform TransformStream
Pipe pipeline() readable.pipeTo(writable)
Iterate for await (chunk of readable) for await (chunk of readable)

Bridge functions: Readable.fromWeb(), Readable.toWeb(), Writable.fromWeb(), Writable.toWeb(). Node’s Web Streams docs cover the surface.

Error handling deep dive

The behaviour difference between .pipe() and pipeline() on error is not just academic. Here is what actually happens:

TypeScript
// .pipe() — error handling is your problem
const read = createReadStream('./input.txt');
const write = createWriteStream('./output.txt');

read.on('error', (err) => {
  console.error('Read error:', err);
  write.destroy(err);   // must manually propagate
});

write.on('error', (err) => {
  console.error('Write error:', err);
  read.destroy();       // must manually clean up source
});

read.pipe(write);

// pipeline() — errors handled automatically, all streams destroyed
try {
  await pipeline(
    createReadStream('./input.txt'),
    createWriteStream('./output.txt'),
  );
} catch (err) {
  const e = err as NodeJS.ErrnoException;
  if (e.code === 'ENOENT') {
    console.error('Input file not found');
  } else if (e.code === 'EACCES') {
    console.error('Permission denied');
  } else {
    console.error('Unexpected error:', e.message);
  }
}

For cases where you want to wait for a single stream to finish without piping it:

TypeScript
import { finished } from 'node:stream/promises';
import { createWriteStream } from 'node:fs';

const write = createWriteStream('./out.txt');
write.write('hellon');
write.end();

await finished(write);   // resolves when 'finish' fires, rejects on 'error'
console.log('Flushed to disk');

Performance tips

highWaterMark: tune it, don’t guess

TypeScript
// Default: 16 KB for byte streams, 16 objects for object mode
// For large sequential file reads, bigger buffers mean fewer syscalls
const fastRead = createReadStream('./big.bin', {
  highWaterMark: 1024 * 1024,   // 1 MB chunks — ~64x fewer reads than default
});

// For memory-constrained environments or many concurrent streams
const smallRead = createReadStream('./small.csv', {
  highWaterMark: 16 * 1024,    // 16 KB — stay below L1/L2 cache
});

Avoid synchronous operations inside _transform

TypeScript
// BAD — synchronous CPU work blocks the event loop for every chunk
const badTransform = new Transform({
  transform(chunk, _enc, cb) {
    const result = expensiveSyncOperation(chunk);   // blocks!
    cb(null, result);
  },
});

// GOOD — yield to the event loop between heavy chunks
const goodTransform = new Transform({
  transform(chunk, _enc, cb) {
    setImmediate(() => {
      const result = expensiveSyncOperation(chunk);
      cb(null, result);
    });
  },
});

Combine operations when possible

TypeScript
// Inefficient: four transforms, four buffer boundaries
source
  .pipe(parseJson)
  .pipe(filterRecords)
  .pipe(transformFields)
  .pipe(stringify);

// Better: one transform does all four things — fewer allocations, fewer wakeups
source.pipe(new Transform({
  objectMode: true,
  transform(chunk, _enc, cb) {
    const record = JSON.parse(chunk) as Record<string, unknown>;
    if (record.status !== 'active') return cb();   // filter
    record.transformed = true;                      // transform
    cb(null, JSON.stringify(record));               // stringify
  },
}));

Production checklist

  • pipeline() from node:stream/promises for any multi-step stream chain. Never .pipe().
  • Use async iteration for read-only consumers; clearer code, real exceptions.
  • objectMode: true when chunks are JS objects (parsed CSV rows, JSON lines, events).
  • Set highWaterMark intentionally. Default is 16 KB for byte streams, 16 objects for object mode. Tune for your workload — too small and you context-switch constantly; too large and you defeat backpressure.
  • Always destroy streams on error. pipeline() does this automatically; manual code must.
  • Implement _flush() in Transform streams. Without it, any data buffered at the end of the input is silently dropped.
  • Set timeouts on stream operations. A hung upstream can leave a stream waiting forever; wrap in AbortSignal.timeout() for HTTP-driven streams.
  • Profile peak memory under realistic load. A “stream pipeline” with a hidden buffer-everything step looks like streams and costs like the naive version.
  • Bridge Web Streams ↔ Node Streams with the helper methods. Mixing the two without conversion silently breaks.
  • Use Readable.from() for arrays and async generators — no need to subclass Readable for simple cases.

When not to use streams

  • The data fits in memory comfortably. Reading a 100 KB JSON config file and parsing it is one line: JSON.parse(readFileSync(path)). Streams are overkill.
  • You need random access to the data. Streams are sequential by design. If you need to seek, slice, or re-read, load it into memory or use a database.
  • The transformation depends on the whole dataset. Sorting, deduplicating, computing averages — streams can do these, but it is awkward and slower than loading into memory if memory allows. Use streams for filtering and per-row transforms; switch to in-memory or DB for aggregations.

Troubleshooting FAQ

Why is my stream slow even though it should be fast?

Three usual causes: missing objectMode when you expect objects (each chunk gets serialised to a string), highWaterMark too small (constant context switches), or a synchronous operation in your transform blocking the event loop.

What happens to memory when a stream errors mid-flow?

With pipeline(): every stream is destroyed, file descriptors close, partial files remain on disk (clean up explicitly). With .pipe(): the broken stream stops, the others may keep buffering forever — file descriptor leak.

Should I use Highland.js or most?

No, in 2026. The native streams API plus async iterators covers what these libraries used to add. The smaller dependency footprint matters; the API ergonomics gap closed.

Can I use streams with TypeScript ergonomically?

Yes. Readable<T>-style generic typing is patchy in @types/node; use a thin wrapper or community type packages for typed object-mode streams.

I get “Cannot find module ‘node:stream/promises'” — what version of Node do I need?

Node 16.0+. The error usually means an outdated Node binary on the server; the resolution diagnostic tree covers the broader pattern.

How do I stream from a database query?

Most Postgres clients support a streaming mode. pg.Cursor gives you a row-at-a-time iterator. Prisma 7+ supports findMany with cursor-based pagination — not technically a stream, but functionally equivalent for chunked exports. My Postgres + Prisma setup covers cursor pagination.

What is the difference between readable and data events?

data is push-based — fires whenever a chunk is available, puts stream in flowing mode. readable is pull-based — you get notified that data is available, then you call .read(). For 95% of code, async iteration is clearer than either.

Are Node streams compatible with Bun and Deno?

Bun supports them natively. Deno does in compatibility mode (deno run --node or node: imports). For maximum portability across all three runtimes, write Web Streams and bridge to Node streams only at the edges.

How do I test stream code?

Convert the readable to an array of chunks and assert on the array. Array.fromAsync(stream) in Node 26+ does this in one call. In earlier versions: const chunks: Buffer[] = []; for await (const c of stream) chunks.push(c); Buffer.concat(chunks).toString().

What is finished() and when do I use it instead of pipeline()?

finished(stream) from node:stream/promises returns a promise that resolves when the stream emits 'finish' or 'end', and rejects on 'error'. Use it when you are not chaining multiple streams — for example, waiting for a single writable to flush before closing your app. For multi-stream chains, pipeline() is always the right tool.

What ships next

This article covers the core Node streams API. The natural next steps: a deep dive on backpressure tuning under real load, and a comparison of Web Streams vs Node Streams when you need to ship the same code on Cloudflare Workers and Node. If your stream code leaks memory, the diagnosis pattern is the same as any other Node memory leak — heap snapshots, retainer tree, fix the unbounded buffer.