Published on

Node.js Streams in 2026 — Web Streams API vs Node.js Streams

Authors

Introduction

Node.js 18+ includes the Web Streams API (WHATWG standard), running alongside Node.js Streams. Two approaches exist: legacy stream module and modern ReadableStream/WritableStream. This post teaches which to use and when.

Web Streams API Built Into Node.js 18+

The WHATWG Web Streams API is now standard in Node.js 18+:

// Modern Web Streams (no imports needed, built-in)
const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('Hello');
    controller.enqueue(' ');
    controller.enqueue('World');
    controller.close();
  },
});

// Consumer
const reader = readable.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  console.log(value);
}

Web Streams are promise-based; Node.js Streams are event-based. Both work in Node.js 22, but Web Streams are the future.

// Comparison: Web Stream vs Node Stream

// Web Stream (promise-based, modern)
async function consumeWebStream(stream: ReadableStream) {
  const reader = stream.getReader();
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      console.log(value);
    }
  } finally {
    reader.releaseLock();
  }
}

// Node Stream (event-based, legacy)
function consumeNodeStream(stream: NodeJS.ReadableStream) {
  stream.on('data', (chunk) => {
    console.log(chunk);
  });
  stream.on('end', () => {
    console.log('Done');
  });
}

ReadableStream, WritableStream, and TransformStream

Core Web Stream types:

ReadableStream

const readable = new ReadableStream<string>({
  start(controller) {
    // Called when stream is created
    controller.enqueue('data');
  },
  async pull(controller) {
    // Called when consumer reads
    const data = await fetch('/api/data').then(r => r.text());
    controller.enqueue(data);
    controller.close();
  },
  cancel(reason) {
    // Called when consumer stops reading
    console.log('Stream cancelled:', reason);
  },
});

// Consume with async iteration
for await (const chunk of readable) {
  console.log(chunk);
}

WritableStream

const writable = new WritableStream<string>({
  write(chunk) {
    console.log('Writing:', chunk);
  },
  close() {
    console.log('Stream closed');
  },
});

const writer = writable.getWriter();

try {
  await writer.write('Hello');
  await writer.write('World');
  await writer.close();
} catch (error) {
  console.error('Write failed:', error);
}

TransformStream

const transformStream = new TransformStream<string, string>({
  transform(chunk, controller) {
    // Transform each chunk
    controller.enqueue(chunk.toUpperCase());
  },
});

// Pipe readable → transform → writable
const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('hello');
    controller.enqueue('world');
    controller.close();
  },
});

const writable = new WritableStream({
  write(chunk) {
    console.log('Got:', chunk);
  },
});

await readable.pipeThrough(transformStream).pipeTo(writable);
// Output: HELLO, WORLD

Converting Between Node Streams and Web Streams

Interoperate between legacy Node.js Streams and Web Streams:

import { Readable, Writable } from 'node:stream';
import { ReadableStream, WritableStream } from 'stream/web';

// Node.js Readable → Web ReadableStream
const nodeReadable = fs.createReadStream('file.txt');
const webReadable = Readable.toWeb(nodeReadable);

// Web ReadableStream → Node.js Readable
const webStream = new ReadableStream({
  start(controller) {
    controller.enqueue('data');
    controller.close();
  },
});
const nodeReadableFromWeb = Readable.fromWeb(webStream);

// Node.js Writable → Web WritableStream
const nodeWritable = fs.createWriteStream('output.txt');
const webWritable = Writable.toWeb(nodeWritable);

// Web WritableStream → Node.js Writable
const webWritableStream = new WritableStream({
  write(chunk) { /* ... */ },
});
const nodeWritableFromWeb = Writable.fromWeb(webWritableStream);

Practical example: read file with Web Streams, write with Node Streams:

import { createReadStream, createWriteStream } from 'fs';
import { Readable, Writable } from 'node:stream';
import { createGzip } from 'zlib';

// Node streams
const input = createReadStream('input.txt');
const output = createWriteStream('input.txt.gz');
const gzip = createGzip();

// Convert and pipe
Readable.toWeb(input)
  .pipeThrough(new TransformStream({
    transform(chunk, controller) {
      // Transform logic
      controller.enqueue(chunk);
    },
  }))
  .pipeTo(Writable.toWeb(output));

Streaming LLM Responses with Web Streams

Stream OpenAI completions as they're generated:

import express from 'express';
import { Readable } from 'node:stream';

const app = express();

app.get('/stream-ai', async (req, res) => {
  // OpenAI returns a Web ReadableStream
  const response = await fetch('https://api.openai.com/v1/chat/completions', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      model: 'gpt-4',
      messages: [{ role: 'user', content: 'Explain quantum computing' }],
      stream: true,
    }),
  });

  if (!response.ok) {
    res.status(500).json({ error: 'OpenAI error' });
    return;
  }

  // Get the web stream from response
  const webStream = response.body;

  // Convert to Node stream for Express
  const nodeStream = Readable.fromWeb(webStream);

  // Set headers for streaming
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  // Pipe the stream to response
  nodeStream.pipe(res);

  // Handle errors
  nodeStream.on('error', (error) => {
    console.error('Stream error:', error);
    res.end();
  });
});

app.listen(3000, () => {
  console.log('Streaming LLM server on port 3000');
});

Client-side consuming the stream:

async function streamLLMResponse() {
  const response = await fetch('/stream-ai');
  const reader = response.body!.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value, { stream: true });
    console.log(chunk);
    // Display chunk in UI immediately
  }
}

Web Streams make LLM response streaming natural and efficient.

Response with ReadableStream for HTTP Streaming

Return streamed data from HTTP handlers:

import express from 'express';
import { Readable } from 'node:stream';

const app = express();

app.get('/data-stream', (req, res) => {
  // Create Web Stream
  const stream = new ReadableStream({
    async start(controller) {
      try {
        // Simulate data fetching with delays
        for (let i = 0; i < 5; i++) {
          const data = { id: i, timestamp: new Date().toISOString() };
          controller.enqueue(JSON.stringify(data) + '\n');

          // Delay between chunks
          await new Promise(resolve => setTimeout(resolve, 100));
        }
        controller.close();
      } catch (error) {
        controller.error(error);
      }
    },
  });

  // Convert to Node stream for Express
  const nodeStream = Readable.fromWeb(stream);

  // Send streaming response
  res.setHeader('Content-Type', 'application/x-ndjson');
  nodeStream.pipe(res);
});

app.listen(3000);

Client receives each JSON object as it arrives, with no buffering overhead.

pipeline() for Node Streams

The pipeline() function safely chains Node.js Streams with error handling:

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';

async function compressFile() {
  try {
    await pipeline(
      createReadStream('input.txt'),
      createGzip(),
      createWriteStream('input.txt.gz')
    );
    console.log('Compression complete');
  } catch (error) {
    console.error('Pipeline failed:', error);
    // All streams are automatically cleaned up
  }
}

compressFile();

pipeline() handles backpressure and cleanup automatically.

Backpressure Handling

Backpressure prevents memory overflow when a slow consumer can't keep up with a fast producer:

import { Readable, Writable } from 'node:stream';

// Fast producer
const readable = new Readable({
  read() {
    for (let i = 0; i < 100; i++) {
      this.push(`Data chunk ${i}\n`);
    }
    this.push(null); // End stream
  },
});

// Slow consumer
const writable = new Writable({
  write(chunk, encoding, callback) {
    // Simulate slow processing
    setTimeout(() => {
      console.log(`Processed: ${chunk.toString().trim()}`);
      callback();
    }, 10);
  },
});

// Proper backpressure handling
readable.pipe(writable);

// With backpressure awareness
readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    console.log('Backpressure: pausing readable');
    readable.pause();
  }
});

writable.on('drain', () => {
  console.log('Backpressure resolved: resuming readable');
  readable.resume();
});

Using pipeline() handles backpressure automatically:

import { pipeline } from 'node:stream/promises';

// No manual backpressure management needed
await pipeline(fastProducer, slowConsumer);

Streaming File Uploads

Handle file uploads without buffering entire files in memory:

import express, { Request, Response } from 'express';
import { createWriteStream } from 'fs';
import path from 'path';

const app = express();

app.post('/upload', (req: Request, res: Response) => {
  const uploadDir = path.join('/tmp/uploads');
  const fileName = `${Date.now()}-${req.headers['x-filename']}`;
  const filePath = path.join(uploadDir, fileName);

  // Create write stream
  const writeStream = createWriteStream(filePath);

  // Pipe request body directly to file
  req.pipe(writeStream);

  writeStream.on('finish', () => {
    res.json({
      success: true,
      file: fileName,
      path: filePath,
    });
  });

  writeStream.on('error', (error) => {
    console.error('Upload error:', error);
    res.status(500).json({ error: 'Upload failed' });
  });

  // Handle client disconnect
  req.on('error', (error) => {
    writeStream.destroy();
    console.error('Request error:', error);
  });
});

app.listen(3000);

No multer or in-memory buffering needed; streams handle large files efficiently.

Streaming CSV/JSON Processing

Process large CSV/JSON files with minimal memory:

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { parse } from 'csv-parse';
import { stringify } from 'csv-stringify';

async function processCSV() {
  // Input: large CSV file (GBs)
  // Output: filtered CSV with only valid records

  await pipeline(
    createReadStream('input.csv'),
    parse({
      columns: true,
      skip_empty_lines: true,
    }),
    new TransformStream({
      transform(record, controller) {
        // Filter invalid records
        if (record.email && record.name) {
          controller.enqueue(record);
        }
      },
    }),
    stringify({
      columns: ['id', 'name', 'email'],
    }),
    createWriteStream('output.csv')
  );

  console.log('Processing complete');
}

processCSV().catch(console.error);

For JSON Lines (NDJSON):

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { Transform } from 'node:stream';

async function processNDJSON() {
  await pipeline(
    createReadStream('input.ndjson'),
    new Transform({
      readableObjectMode: true,
      writableObjectMode: true,
      transform(chunk: string, encoding, callback) {
        try {
          const obj = JSON.parse(chunk.toString());
          if (obj.valid) {
            callback(null, JSON.stringify(obj) + '\n');
          } else {
            callback();
          }
        } catch (e) {
          callback();
        }
      },
    }),
    createWriteStream('output.ndjson')
  );
}

When to Use Each

Use CaseBest Tool
New code, no legacyWeb Streams
Streaming responsesWeb Streams
File operationsNode Streams (fs.createReadStream)
LLM/API responsesWeb Streams (response.body)
Complex pipelinesNode Streams + pipeline()
Interop requiredUse converters (toWeb/fromWeb)

Checklist

  • Use Web Streams for new async code
  • Use pipeline() for Node Stream chains
  • Convert between APIs only when necessary
  • Handle backpressure (pipeline() does automatically)
  • Test streaming with large files
  • Monitor memory usage during streaming
  • Implement error handlers on streams
  • Use Readable.from() for generator functions
  • Prefer pipeThrough() and pipeTo() for Web Streams
  • Benchmark streaming vs buffering for your workload

Conclusion

Web Streams represent the modern, standard approach to streaming in Node.js. For new code, prefer ReadableStream/WritableStream with pipeThrough() and pipeTo(). For file operations and legacy compatibility, Node.js Streams remain practical. Both coexist in Node.js 22; use the right tool for the context.