skip to content

node:stream — Streams

Node.js streams — Readable, Writable, Duplex, and Transform, the modern pipeline() API, backpressure, async iterators, Web Streams interop, and patterns for piping files, HTTP bodies, and gzip compression.

12 min read 44 snippets deep dive

node:stream — Streams#

What it is#

A Node.js stream is an abstraction for processing data in chunks rather than loading it all at once. Streams let you read a 10 GB file, gzip it, and upload it to object storage using a fixed amount of memory, because data flows through the pipeline one buffer at a time. Streams pre-date Promises by about a decade and historically came with sharp edges, but the modern API — stream/promises.pipeline(), async iterators, and Web Streams interop — makes them genuinely pleasant.

Install#

node:stream is built into Node.js. The promises sub-module (node:stream/promises) shipped stable in Node 15.

node --version

Output:

v22.14.0

Stream types#

Four base classes; each solves a different problem. Almost every concrete stream (a file reader, an HTTP response, a zlib compressor) extends one of these.

ClassDirectionExample
ReadableOut → consumerfs.createReadStream, process.stdin, http.IncomingMessage
WritableConsumer → outfs.createWriteStream, process.stdout, http.ServerResponse
DuplexBoth, independentTCP socket, crypto.createCipheriv
TransformDuplex; output is a function of inputzlib.createGzip, crypto.createHash
import { Readable, Writable, Transform } from 'node:stream';

const r = new Readable({ read() {} });   // Readable
const w = new Writable({ write(chunk, enc, cb) { cb(); } });  // Writable
const t = new Transform({ transform(chunk, enc, cb) { cb(null, chunk); } });  // Transform

console.log(r instanceof Readable, w instanceof Writable, t instanceof Transform);

Output:

true true true

Reading: the modern way#

For a Readable, async iteration is the simplest, most modern way to consume chunks. The for await loop handles backpressure and end-of-stream automatically — no event handlers required.

import { createReadStream } from 'node:fs';

const stream = createReadStream('large.txt', { encoding: 'utf8', highWaterMark: 64 * 1024 });
let lineCount = 0;
let remainder = '';

for await (const chunk of stream) {
  const lines = (remainder + chunk).split('\n');
  remainder = lines.pop();
  lineCount += lines.length;
}
if (remainder.length) lineCount++;

console.log(`${lineCount} lines`);

Output:

1048576 lines

The highWaterMark option sets the internal buffer size — 64 KiB by default for byte streams, 16 objects for object-mode streams. Raising it trades memory for fewer round-trips through the event loop.

Writing chunks#

A Writable accepts data via write() and signals end with end(). write() returns false when the internal buffer is full — that’s the backpressure signal. Either respect it manually or let pipeline() handle it for you.

import { createWriteStream } from 'node:fs';

const out = createWriteStream('out.txt');

for (let i = 0; i < 5; i++) {
  const ok = out.write(`line ${i}\n`);
  if (!ok) {
    // Internal buffer is full — wait for 'drain' before writing more
    await new Promise((resolve) => out.once('drain', resolve));
  }
}

out.end();
await new Promise((resolve) => out.on('finish', resolve));
console.log('written');

Output:

written

The pattern above is verbose because it’s manual. The next section covers the much shorter pipeline form.

pipeline() — the right way to compose streams#

pipeline() from node:stream/promises wires multiple streams together end-to-end. It propagates errors, handles backpressure, and resolves a Promise when everything has finished — replacing the historical pipe() + manual error handler dance.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
  createReadStream('book.txt'),
  createGzip(),
  createWriteStream('book.txt.gz')
);

console.log('gzipped');

Output:

gzipped

Why this beats .pipe() chains:

Concernpipeline().pipe()
Errors propagateYes — single await/try blockNo — must attach error listener to every stream
Resource cleanup on errorAutomaticManual (destroy() each stream)
Returns a PromiseYesNo
BackpressureYesYes

The rule: never use .pipe() for new code. It silently leaks resources on error.

Backpressure — why this matters#

Backpressure is the mechanism that prevents a fast producer from overwhelming a slow consumer. If a Readable produces 100 MB/s and a Writable accepts 10 MB/s, something has to throttle — otherwise memory grows unbounded.

Streams handle backpressure automatically when you use pipeline() or pipe(). When the Writable’s buffer fills, it returns false from write(); the Readable pauses; the Writable emits drain when it has room; the Readable resumes. Manually plumbing this is error-prone, which is exactly why pipeline() exists.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

// Source produces faster than dest can write — backpressure keeps memory bounded
await pipeline(
  createReadStream('/dev/urandom', { highWaterMark: 1024 * 1024 }),
  createWriteStream('random.bin', { highWaterMark: 64 * 1024 })
);

Output: (none — exits 0 on success)

Readable.from() — turn anything into a stream#

Readable.from() adapts any iterable, async iterable, or generator into a Readable. It’s the bridge between “I have an array of data” and “I want to push it through a streaming pipeline”.

import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { createWriteStream } from 'node:fs';

async function* lines() {
  for (let i = 0; i < 1000; i++) {
    yield `record ${i}\n`;
  }
}

await pipeline(Readable.from(lines()), createWriteStream('records.txt'));
console.log('done');

Output:

done

Works with synchronous iterables too:

import { Readable } from 'node:stream';

const stream = Readable.from(['hello\n', 'world\n']);
for await (const chunk of stream) process.stdout.write(chunk);

Output:

hello
world

Writing a custom Transform#

A Transform consumes input chunks and produces output chunks. Define transform(chunk, encoding, callback) to process each chunk, calling callback(error, output) to emit downstream. Use it for streaming line splitting, encryption, parsing, or filtering.

import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

class Uppercase extends Transform {
  _transform(chunk, encoding, callback) {
    callback(null, chunk.toString('utf8').toUpperCase());
  }
}

await pipeline(
  createReadStream('input.txt'),
  new Uppercase(),
  createWriteStream('output.txt')
);

console.log('uppercased');

Output:

uppercased

For Transforms that consume entire lines instead of arbitrary byte chunks, buffer a remainder between calls:

import { Transform } from 'node:stream';

class LineFilter extends Transform {
  constructor(predicate) {
    super();
    this.predicate = predicate;
    this.remainder = '';
  }
  _transform(chunk, encoding, callback) {
    const lines = (this.remainder + chunk.toString('utf8')).split('\n');
    this.remainder = lines.pop();
    const kept = lines.filter(this.predicate).join('\n');
    callback(null, kept ? kept + '\n' : '');
  }
  _flush(callback) {
    if (this.predicate(this.remainder)) callback(null, this.remainder);
    else callback();
  }
}

Output: (none — exits 0 on success)

Object mode#

By default, streams carry Buffer or string chunks. Setting objectMode: true lets a stream emit arbitrary JavaScript objects — useful for pipelines that parse records (CSV rows, JSON lines) and pass them downstream as structured data.

import { Readable, Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';

const source = Readable.from([{ id: 1 }, { id: 2 }, { id: 3 }]);

const doubler = new Transform({
  objectMode: true,
  transform(obj, enc, cb) {
    cb(null, { id: obj.id * 2 });
  },
});

const sink = new Transform({
  objectMode: true,
  transform(obj, enc, cb) {
    console.log(obj);
    cb();
  },
});

await pipeline(source, doubler, sink);

Output:

{ id: 2 }
{ id: 4 }
{ id: 6 }

Readable.from() returns an object-mode stream by default when given an iterable of non-Buffer values.

HTTP request and response bodies#

In Node’s http/https modules, IncomingMessage is a Readable and ServerResponse is a Writable — so streaming a file response, or piping a request body to disk, is just pipeline():

import { createServer } from 'node:http';
import { createReadStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

const server = createServer(async (req, res) => {
  res.setHeader('Content-Type', 'application/octet-stream');
  try {
    await pipeline(createReadStream('./big.bin'), res);
  } catch (err) {
    console.error('stream failed', err.message);
  }
});

server.listen(3000, () => console.log('listening on 3000'));

Output:

listening on 3000

Uploading a request body to disk is the mirror image:

import { createServer } from 'node:http';
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

createServer(async (req, res) => {
  if (req.method !== 'POST') return res.end();
  await pipeline(req, createWriteStream(`./uploads/${Date.now()}.bin`));
  res.end('saved');
}).listen(3000);

Output: (none — exits 0 on success)

Web Streams interop#

Modern browsers and fetch use the Web Streams standard (ReadableStream, WritableStream, TransformStream), which is similar to but not identical with Node streams. Node provides converters in both directions.

import { Readable, Writable } from 'node:stream';

// Node Readable → Web ReadableStream
const nodeReadable = Readable.from(['hello ', 'world']);
const webReadable = Readable.toWeb(nodeReadable);

// Web ReadableStream → Node Readable
const webStream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
    controller.enqueue('b');
    controller.close();
  },
});
const nodeStream = Readable.fromWeb(webStream);
for await (const chunk of nodeStream) console.log(chunk);

Output:

a
b

A common reason to convert: fetch().body is a Web ReadableStream, but most Node sinks expect a Node Readable. Wrap it once with Readable.fromWeb() and pipe it as usual:

import { Readable } from 'node:stream';
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

const response = await fetch('https://example.com/large.bin');
await pipeline(Readable.fromWeb(response.body), createWriteStream('downloaded.bin'));
console.log('downloaded');

Output:

downloaded

Comparison: classic events vs modern API#

The old way — events and .pipe() — still works but is error-prone. Use it only when reading legacy code.

import { createReadStream } from 'node:fs';

const stream = createReadStream('file.txt', 'utf8');

// Old style — manual event handlers
stream.on('data', (chunk) => process.stdout.write(chunk));
stream.on('end', () => console.log('\n--EOF--'));
stream.on('error', (err) => console.error(err));

Output:

hello
world
--EOF--

Equivalent modern code:

import { createReadStream } from 'node:fs';

try {
  for await (const chunk of createReadStream('file.txt', 'utf8')) {
    process.stdout.write(chunk);
  }
  console.log('\n--EOF--');
} catch (err) {
  console.error(err);
}

Output:

hello
world
--EOF--

Common pitfalls#

  1. Using .pipe() instead of pipeline().pipe() silently leaks file descriptors and event listeners on error. Always prefer pipeline() from node:stream/promises.
  2. Forgetting to await pipeline() — Without the await, the function returns immediately and the caller’s finally cleanup may run before the streams have actually finished.
  3. Ignoring backpressure in manual write() loops — A loop that calls out.write(...) thousands of times without checking the return value buffers everything in RAM. Listen for drain or use pipeline.
  4. Reading large files with readFilereadFile loads the whole file. For anything over a few MB, switch to createReadStream.
  5. Mixing async iteration with event listeners — Once you start iterating with for await, do not also attach 'data' handlers. The iterator consumes the chunks; the events fire on a different code path and you’ll see ghost behaviour.
  6. Object-mode streams crossing boundaries — A binary-mode Writable downstream of an object-mode Readable throws on the first object chunk. Match modes throughout the pipeline.
  7. Confusing Web and Node streamspipeThrough works on Web streams; pipe/pipeline works on Node streams. Use the converters at the boundary.
  8. Not destroying on error — A Readable still consuming a network socket leaks if you abandon it without .destroy(). pipeline() handles this automatically.
  9. Encoding surprisescreateReadStream without an encoding yields Buffer chunks. Comparing a Buffer to a string with === always fails; either set encoding: 'utf8' or call .toString('utf8') per chunk.
  10. Using sync zlib on big fileszlib.gzipSync() blocks the event loop for the duration. Always stream it through createGzip() in a pipeline.

Real-world recipes#

Read, gzip, and upload to S3#

Compose a four-stage pipeline: read from disk → gzip → upload. Memory stays bounded regardless of file size because each chunk is consumed downstream before the next is produced.

import { createReadStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { pipeline } from 'node:stream/promises';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';

const s3 = new S3Client({ region: 'us-east-1' });

const gzip = createGzip();
const source = createReadStream('./big-export.csv');

const upload = new Upload({
  client: s3,
  params: {
    Bucket: 'my-backups',
    Key: `exports/${Date.now()}.csv.gz`,
    Body: gzip,
    ContentType: 'application/gzip',
  },
});

await pipeline(source, gzip);
await upload.done();
console.log('uploaded');

Output:

uploaded

Line-by-line processing of a huge file#

Build a line splitter as a Transform; pipe a file through it; process each line. Memory use is bounded by the longest line, not the file size.

import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';

const rl = createInterface({
  input: createReadStream('access.log'),
  crlfDelay: Infinity,
});

let errors = 0;
for await (const line of rl) {
  if (line.includes(' 500 ') || line.includes(' 502 ')) errors++;
}
console.log(`${errors} server errors`);

Output:

247 server errors

readline.createInterface wraps any line-oriented Readable; it’s the standard pattern for log scanners.

Compute a file’s SHA-256 without loading it#

crypto.createHash is a Transform — feed bytes in, read the digest at the end. Pipe a file through it and grab the result.

import { createReadStream } from 'node:fs';
import { createHash } from 'node:crypto';
import { pipeline } from 'node:stream/promises';

const hash = createHash('sha256');
await pipeline(createReadStream('./image.iso'), hash);
console.log(hash.digest('hex'));

Output:

e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855

Concurrent transform with bounded parallelism#

The default Transform processes one chunk at a time. For CPU-bound work like image conversion, you can lift concurrency by buffering work in the callback — but be careful not to lose backpressure.

import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { Readable } from 'node:stream';

function concurrentMap(concurrency, fn) {
  let active = 0;
  return new Transform({
    objectMode: true,
    transform(item, enc, cb) {
      active++;
      Promise.resolve(fn(item))
        .then((result) => {
          this.push(result);
          active--;
        })
        .catch((err) => cb(err));
      if (active < concurrency) cb();
      else this._cb = cb;
    },
    flush(cb) {
      const check = () => (active === 0 ? cb() : setImmediate(check));
      check();
    },
  });
}

const items = Array.from({ length: 6 }, (_, i) => ({ id: i }));
const sink = new Transform({
  objectMode: true,
  transform(item, enc, cb) {
    console.log('done', item.id);
    cb();
  },
});

await pipeline(
  Readable.from(items),
  concurrentMap(3, async (item) => {
    await new Promise((r) => setTimeout(r, 100));
    return item;
  }),
  sink
);

Output:

done 0
done 1
done 2
done 3
done 4
done 5

Forward an upload to another service#

Take an incoming HTTP body (a Readable) and proxy it to another service via fetch (which accepts a Readable as body). No temp file involved.

import { createServer } from 'node:http';
import { Readable } from 'node:stream';

createServer(async (req, res) => {
  const upstream = await fetch('https://storage.example.com/upload', {
    method: 'POST',
    body: Readable.toWeb(req),
    duplex: 'half',
  });
  res.writeHead(upstream.status, { 'Content-Type': 'application/json' });
  await Readable.fromWeb(upstream.body).pipe(res);
}).listen(3000);

Output: (none — exits 0 on success)

Split a CSV into per-key files#

Stream-parse a CSV; route each row to a different Writable based on a column value. Useful for sharding a giant export by region, tenant, or month.

import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { Transform } from 'node:stream';
import { parse } from 'csv-parse';

const sinks = new Map();

const router = new Transform({
  objectMode: true,
  transform(row, enc, cb) {
    const key = row.region;
    let sink = sinks.get(key);
    if (!sink) {
      sink = createWriteStream(`./out/${key}.csv`);
      sinks.set(key, sink);
    }
    sink.write(`${row.id},${row.amount}\n`);
    cb();
  },
  flush(cb) {
    for (const sink of sinks.values()) sink.end();
    cb();
  },
});

await pipeline(
  createReadStream('./sales.csv'),
  parse({ columns: true }),
  router
);

console.log(`split into ${sinks.size} files`);

Output:

split into 12 files

Tee — duplicate a stream to two destinations#

There’s no single built-in tee for Node streams, but two PassThroughs can pipe the same source to two sinks. Useful for writing to disk and uploading at the same time.

import { createReadStream, createWriteStream } from 'node:fs';
import { PassThrough } from 'node:stream';
import { pipeline } from 'node:stream/promises';

const source = createReadStream('./report.csv');
const a = new PassThrough();
const b = new PassThrough();

source.pipe(a);
source.pipe(b);

await Promise.all([
  pipeline(a, createWriteStream('./report.local.csv')),
  pipeline(b, createWriteStream('./report.backup.csv')),
]);

console.log('tee complete');

Output:

tee complete