Web Streams API | Node.js v18.20.8 Documentation (original) (raw)

API#

Class: ReadableStream#

new ReadableStream([underlyingSource [, strategy]])#

Added in: v16.5.0

readableStream.locked#

Added in: v16.5.0

The readableStream.locked property is false by default, and is switched to true while there is an active reader consuming the stream's data.

readableStream.cancel([reason])#

Added in: v16.5.0

readableStream.getReader([options])#

Added in: v16.5.0

`import { ReadableStream } from 'node:stream/web';

const stream = new ReadableStream();

const reader = stream.getReader();

console.log(await reader.read()); const { ReadableStream } = require('node:stream/web');

const stream = new ReadableStream();

const reader = stream.getReader();

reader.read().then(console.log);`

Causes the readableStream.locked to be true.

readableStream.pipeThrough(transform[, options])#

Added in: v16.5.0

Connects this to the pair of and provided in the transform argument such that the data from this is written in to transform.writable, possibly transformed, then pushed to transform.readable. Once the pipeline is configured, transform.readable is returned.

Causes the readableStream.locked to be true while the pipe operation is active.

`import { ReadableStream, TransformStream, } from 'node:stream/web';

const stream = new ReadableStream({ start(controller) { controller.enqueue('a'); }, });

const transform = new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk.toUpperCase()); }, });

const transformedStream = stream.pipeThrough(transform);

for await (const chunk of transformedStream) console.log(chunk); const { ReadableStream, TransformStream, } = require('node:stream/web');

const stream = new ReadableStream({ start(controller) { controller.enqueue('a'); }, });

const transform = new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk.toUpperCase()); }, });

const transformedStream = stream.pipeThrough(transform);

(async () => { for await (const chunk of transformedStream) console.log(chunk); })();`

readableStream.pipeTo(destination[, options])#

Added in: v16.5.0

Causes the readableStream.locked to be true while the pipe operation is active.

readableStream.tee()#

Returns a pair of new instances to which thisReadableStream's data will be forwarded. Each will receive the same data.

Causes the readableStream.locked to be true.

readableStream.values([options])#

Added in: v16.5.0

Creates and returns an async iterator usable for consuming thisReadableStream's data.

Causes the readableStream.locked to be true while the async iterator is active.

`import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream.values({ preventCancel: true })) console.log(Buffer.from(chunk).toString());`

Async Iteration#

The object supports the async iterator protocol usingfor await syntax.

`import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream) console.log(Buffer.from(chunk).toString());`

The async iterator will consume the until it terminates.

By default, if the async iterator exits early (via either a break,return, or a throw), the will be closed. To prevent automatic closing of the , use the readableStream.values()method to acquire the async iterator and set the preventCancel option totrue.

The must not be locked (that is, it must not have an existing active reader). During the async iteration, the will be locked.

Transferring with postMessage()#

A instance can be transferred using a .

`const stream = new ReadableStream(getReadableSourceSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => { data.getReader().read().then((chunk) => { console.log(chunk); }); };

port2.postMessage(stream, [stream]);`

Class: ReadableStreamDefaultReader#

By default, calling readableStream.getReader() with no arguments will return an instance of ReadableStreamDefaultReader. The default reader treats the chunks of data passed through the stream as opaque values, which allows the to work with generally any JavaScript value.

new ReadableStreamDefaultReader(stream)#

Added in: v16.5.0

Creates a new that is locked to the given .

readableStreamDefaultReader.cancel([reason])#

Added in: v16.5.0

Cancels the and returns a promise that is fulfilled when the underlying stream has been canceled.

readableStreamDefaultReader.closed#

Added in: v16.5.0

readableStreamDefaultReader.read()#

Added in: v16.5.0

Requests the next chunk of data from the underlying and returns a promise that is fulfilled with the data once it is available.

readableStreamDefaultReader.releaseLock()#

Added in: v16.5.0

Releases this reader's lock on the underlying .

Class: ReadableStreamBYOBReader#

The ReadableStreamBYOBReader is an alternative consumer for byte-oriented s (those that are created withunderlyingSource.type set equal to 'bytes' when theReadableStream was created).

The BYOB is short for "bring your own buffer". This is a pattern that allows for more efficient reading of byte-oriented data that avoids extraneous copying.

`import { open, } from 'node:fs/promises';

import { ReadableStream, } from 'node:stream/web';

import { Buffer } from 'node:buffer';

class Source { type = 'bytes'; autoAllocateChunkSize = 1024;

async start(controller) { this.file = await open(new URL(import.meta.url)); this.controller = controller; }

async pull(controller) { const view = controller.byobRequest?.view; const { bytesRead, } = await this.file.read({ buffer: view, offset: view.byteOffset, length: view.byteLength, });

if (bytesRead === 0) {
  await this.file.close();
  this.controller.close();
}
controller.byobRequest.respond(bytesRead);

} }

const stream = new ReadableStream(new Source());

async function read(stream) { const reader = stream.getReader({ mode: 'byob' });

const chunks = []; let result; do { result = await reader.read(Buffer.alloc(100)); if (result.value !== undefined) chunks.push(Buffer.from(result.value)); } while (!result.done);

return Buffer.concat(chunks); }

const data = await read(stream); console.log(Buffer.from(data).toString());`

new ReadableStreamBYOBReader(stream)#

Added in: v16.5.0

Creates a new ReadableStreamBYOBReader that is locked to the given .

readableStreamBYOBReader.cancel([reason])#

Added in: v16.5.0

Cancels the and returns a promise that is fulfilled when the underlying stream has been canceled.

readableStreamBYOBReader.closed#

Added in: v16.5.0

readableStreamBYOBReader.read(view)#

Added in: v16.5.0

Requests the next chunk of data from the underlying and returns a promise that is fulfilled with the data once it is available.

Do not pass a pooled object instance in to this method. Pooled Buffer objects are created using Buffer.allocUnsafe(), or Buffer.from(), or are often returned by various node:fs module callbacks. These types of Buffers use a shared underlying object that contains all of the data from all of the pooled Buffer instances. When a Buffer, , or is passed in to readableStreamBYOBReader.read(), the view's underlying ArrayBuffer is detached, invalidating all existing views that may exist on that ArrayBuffer. This can have disastrous consequences for your application.

readableStreamBYOBReader.releaseLock()#

Added in: v16.5.0

Releases this reader's lock on the underlying .

Class: ReadableStreamDefaultController#

Added in: v16.5.0

Every has a controller that is responsible for the internal state and management of the stream's queue. TheReadableStreamDefaultController is the default controller implementation for ReadableStreams that are not byte-oriented.

readableStreamDefaultController.close()#

Added in: v16.5.0

Closes the to which this controller is associated.

readableStreamDefaultController.desiredSize#

Added in: v16.5.0

Returns the amount of data remaining to fill the 's queue.

readableStreamDefaultController.enqueue([chunk])#

Added in: v16.5.0

Appends a new chunk of data to the 's queue.

readableStreamDefaultController.error([error])#

Added in: v16.5.0

Signals an error that causes the to error and close.

Class: ReadableByteStreamController#

Every has a controller that is responsible for the internal state and management of the stream's queue. TheReadableByteStreamController is for byte-oriented ReadableStreams.

readableByteStreamController.byobRequest#

Added in: v16.5.0

readableByteStreamController.close()#

Added in: v16.5.0

Closes the to which this controller is associated.

readableByteStreamController.desiredSize#

Added in: v16.5.0

Returns the amount of data remaining to fill the 's queue.

readableByteStreamController.enqueue(chunk)#

Added in: v16.5.0

Appends a new chunk of data to the 's queue.

readableByteStreamController.error([error])#

Added in: v16.5.0

Signals an error that causes the to error and close.

Class: ReadableStreamBYOBRequest#

When using ReadableByteStreamController in byte-oriented streams, and when using the ReadableStreamBYOBReader, the readableByteStreamController.byobRequest property provides access to a ReadableStreamBYOBRequest instance that represents the current read request. The object is used to gain access to the ArrayBuffer/TypedArraythat has been provided for the read request to fill, and provides methods for signaling that the data has been provided.

readableStreamBYOBRequest.respond(bytesWritten)#

Added in: v16.5.0

Signals that a bytesWritten number of bytes have been written to readableStreamBYOBRequest.view.

readableStreamBYOBRequest.respondWithNewView(view)#

Added in: v16.5.0

Signals that the request has been fulfilled with bytes written to a new Buffer, TypedArray, or DataView.

readableStreamBYOBRequest.view#

Added in: v16.5.0

Class: WritableStream#

The WritableStream is a destination to which stream data is sent.

`import { WritableStream, } from 'node:stream/web';

const stream = new WritableStream({ write(chunk) { console.log(chunk); }, });

await stream.getWriter().write('Hello World');`

new WritableStream([underlyingSink[, strategy]])#

Added in: v16.5.0

writableStream.abort([reason])#

Added in: v16.5.0

Abruptly terminates the WritableStream. All queued writes will be canceled with their associated promises rejected.

writableStream.close()#

Added in: v16.5.0

Closes the WritableStream when no additional writes are expected.

writableStream.getWriter()#

Added in: v16.5.0

Creates and returns a new writer instance that can be used to write data into the WritableStream.

writableStream.locked#

Added in: v16.5.0

The writableStream.locked property is false by default, and is switched to true while there is an active writer attached to thisWritableStream.

Transferring with postMessage()#

A instance can be transferred using a .

`const stream = new WritableStream(getWritableSinkSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => { data.getWriter().write('hello'); };

port2.postMessage(stream, [stream]);`

Class: WritableStreamDefaultWriter#

new WritableStreamDefaultWriter(stream)#

Added in: v16.5.0

Creates a new WritableStreamDefaultWriter that is locked to the givenWritableStream.

writableStreamDefaultWriter.abort([reason])#

Added in: v16.5.0

Abruptly terminates the WritableStream. All queued writes will be canceled with their associated promises rejected.

writableStreamDefaultWriter.close()#

Added in: v16.5.0

Closes the WritableStream when no additional writes are expected.

writableStreamDefaultWriter.closed#

Added in: v16.5.0

writableStreamDefaultWriter.desiredSize#

Added in: v16.5.0

The amount of data required to fill the 's queue.

writableStreamDefaultWriter.ready#

Added in: v16.5.0

writableStreamDefaultWriter.releaseLock()#

Added in: v16.5.0

Releases this writer's lock on the underlying .

writableStreamDefaultWriter.write([chunk])#

Added in: v16.5.0

Appends a new chunk of data to the 's queue.

Class: WritableStreamDefaultController#

The WritableStreamDefaultController manage's the 's internal state.

writableStreamDefaultController.error([error])#

Added in: v16.5.0

Called by user-code to signal that an error has occurred while processing the WritableStream data. When called, the will be aborted, with currently pending writes canceled.

writableStreamDefaultController.signal#

Class: TransformStream#

A TransformStream consists of a and a that are connected such that the data written to the WritableStream is received, and potentially transformed, before being pushed into the ReadableStream's queue.

`import { TransformStream, } from 'node:stream/web';

const transform = new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk.toUpperCase()); }, });

await Promise.all([ transform.writable.getWriter().write('A'), transform.readable.getReader().read(), ]);`

new TransformStream([transformer[, writableStrategy[, readableStrategy]]])#

Added in: v16.5.0

transformStream.readable#

Added in: v16.5.0

transformStream.writable#

Added in: v16.5.0

Transferring with postMessage()#

A instance can be transferred using a .

`const stream = new TransformStream();

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => { const { writable, readable } = data; // ... };

port2.postMessage(stream, [stream]);`

Class: TransformStreamDefaultController#

The TransformStreamDefaultController manages the internal state of the TransformStream.

transformStreamDefaultController.desiredSize#

Added in: v16.5.0

The amount of data required to fill the readable side's queue.

transformStreamDefaultController.enqueue([chunk])#

Added in: v16.5.0

Appends a chunk of data to the readable side's queue.

transformStreamDefaultController.error([reason])#

Added in: v16.5.0

Signals to both the readable and writable side that an error has occurred while processing the transform data, causing both sides to be abruptly closed.

transformStreamDefaultController.terminate()#

Added in: v16.5.0

Closes the readable side of the transport and causes the writable side to be abruptly closed with an error.

Class: ByteLengthQueuingStrategy#

new ByteLengthQueuingStrategy(init)#

Added in: v16.5.0

byteLengthQueuingStrategy.highWaterMark#

Added in: v16.5.0

byteLengthQueuingStrategy.size#

Added in: v16.5.0

Class: CountQueuingStrategy#

new CountQueuingStrategy(init)#

Added in: v16.5.0

countQueuingStrategy.highWaterMark#

Added in: v16.5.0

countQueuingStrategy.size#

Added in: v16.5.0

Class: TextEncoderStream#

new TextEncoderStream()#

Added in: v16.6.0

Creates a new TextEncoderStream instance.

textEncoderStream.encoding#

Added in: v16.6.0

The encoding supported by the TextEncoderStream instance.

textEncoderStream.readable#

Added in: v16.6.0

textEncoderStream.writable#

Added in: v16.6.0

Class: TextDecoderStream#

new TextDecoderStream([encoding[, options]])#

Added in: v16.6.0

Creates a new TextDecoderStream instance.

textDecoderStream.encoding#

Added in: v16.6.0

The encoding supported by the TextDecoderStream instance.

textDecoderStream.fatal#

Added in: v16.6.0

The value will be true if decoding errors result in a TypeError being thrown.

textDecoderStream.ignoreBOM#

Added in: v16.6.0

The value will be true if the decoding result will include the byte order mark.

textDecoderStream.readable#

Added in: v16.6.0

textDecoderStream.writable#

Added in: v16.6.0

Class: CompressionStream#

new CompressionStream(format)#

Added in: v17.0.0

compressionStream.readable#

Added in: v17.0.0

compressionStream.writable#

Added in: v17.0.0

Class: DecompressionStream#

new DecompressionStream(format)#

Added in: v17.0.0

decompressionStream.readable#

Added in: v17.0.0

decompressionStream.writable#

Added in: v17.0.0

Utility Consumers#

Added in: v16.7.0

The utility consumer functions provide common options for consuming streams.

They are accessed using:

import { arrayBuffer, blob, buffer, json, text, } from 'node:stream/consumers'; const { arrayBuffer, blob, buffer, json, text, } = require('node:stream/consumers');

streamConsumers.arrayBuffer(stream)#

Added in: v16.7.0

`` import { arrayBuffer } from 'node:stream/consumers'; import { Readable } from 'node:stream'; import { TextEncoder } from 'node:util';

const encoder = new TextEncoder(); const dataArray = encoder.encode('hello world from consumers!');

const readable = Readable.from(dataArray); const data = await arrayBuffer(readable); console.log(from readable: ${data.byteLength}); const { arrayBuffer } = require('node:stream/consumers'); const { Readable } = require('node:stream'); const { TextEncoder } = require('node:util');

const encoder = new TextEncoder(); const dataArray = encoder.encode('hello world from consumers!'); const readable = Readable.from(dataArray); arrayBuffer(readable).then((data) => { console.log(from readable: ${data.byteLength}); }); ``

streamConsumers.blob(stream)#

Added in: v16.7.0

`` import { blob } from 'node:stream/consumers';

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream(); const data = await blob(readable); console.log(from readable: ${data.size}); const { blob } = require('node:stream/consumers');

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream(); blob(readable).then((data) => { console.log(from readable: ${data.size}); }); ``

streamConsumers.buffer(stream)#

Added in: v16.7.0

`` import { buffer } from 'node:stream/consumers'; import { Readable } from 'node:stream'; import { Buffer } from 'node:buffer';

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer); const data = await buffer(readable); console.log(from readable: ${data.length}); const { buffer } = require('node:stream/consumers'); const { Readable } = require('node:stream'); const { Buffer } = require('node:buffer');

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer); buffer(readable).then((data) => { console.log(from readable: ${data.length}); }); ``

streamConsumers.json(stream)#

Added in: v16.7.0

`` import { json } from 'node:stream/consumers'; import { Readable } from 'node:stream';

const items = Array.from( { length: 100, }, () => ({ message: 'hello world from consumers!', }), );

const readable = Readable.from(JSON.stringify(items)); const data = await json(readable); console.log(from readable: ${data.length}); const { json } = require('node:stream/consumers'); const { Readable } = require('node:stream');

const items = Array.from( { length: 100, }, () => ({ message: 'hello world from consumers!', }), );

const readable = Readable.from(JSON.stringify(items)); json(readable).then((data) => { console.log(from readable: ${data.length}); }); ``

streamConsumers.text(stream)#

Added in: v16.7.0

`` import { text } from 'node:stream/consumers'; import { Readable } from 'node:stream';

const readable = Readable.from('Hello world from consumers!'); const data = await text(readable); console.log(from readable: ${data.length}); const { text } = require('node:stream/consumers'); const { Readable } = require('node:stream');

const readable = Readable.from('Hello world from consumers!'); text(readable).then((data) => { console.log(from readable: ${data.length}); }); ``