Understanding Node.js Readable Streams: Internals, API, and Implementation
This article explains the fundamentals and internal mechanics of Node.js readable streams, covering their history, data structures like BufferList, state management, key APIs such as push, read, pipe, and how they inherit from EventEmitter to enable efficient, low‑memory data processing.
1. Basic Concepts
1.1 History of Streams
Streams are not unique to Node.js; they originated decades ago in Unix operating systems, where programs can interact via the pipe operator (|).
Both macOS and Linux, which are Unix‑based, support the pipe operator, allowing the output of the left‑hand process to become the input of the right‑hand process.
In Node, using the traditional readFile reads the entire file into memory before any processing can occur.
This approach has two drawbacks:
Memory consumption: large amount of memory is used.
Time: processing starts only after the whole payload has been loaded.
To solve these problems, Node.js adopted the stream concept. In Node.js streams there are four types, all of which are instances of EventEmitter:
Readable Stream
Writable Stream
Duplex Stream (readable & writable)
Transform Stream
To study this part gradually, I will start with readable streams.
1.2 What Is a Stream?
A stream is an abstract data structure that represents a collection of data. For
objectMode === falsethe only allowed data types are
stringand
Buffer.
We can think of a stream as a container of these data pieces, like a liquid stored in a buffer (BufferList). When the appropriate event fires, the liquid is poured into a pipe and the other side can retrieve it for processing.
1.3 What Is a Readable Stream?
A readable stream is a type of stream that has two reading modes and three states.
Two reading modes:
Flowing mode: data is read from the underlying system and emitted to registered event handlers as quickly as possible.
Paused mode: no data is read; the Stream.read() method must be called explicitly to pull data.
Three states (
readableFlowing):
readableFlowing === null – no data is produced; calling Stream.pipe() or Stream.resume() changes it to true , starting data production and event emission.
readableFlowing === false – data flow is paused, but data generation continues, causing back‑pressure.
readableFlowing === true – data is produced and consumed normally.
2. Basic Principles
2.1 Internal State Definition (ReadableState)
ReadableState
<code>_readableState: ReadableState {
objectMode: false, // set true to handle non‑string/Buffer/null types
highWaterMark: 16384, // default 16 KB, stops calling _read() when exceeded
buffer: BufferList { head: null, tail: null, length: 0 }, // internal buffer list
length: 0, // total size of data in the readable stream (objectMode ? buffer.length : byte length)
pipes: [], // list of piped destinations
flowing: null, // null / false / true
ended: false, // true when all data has been consumed
endEmitted: false,
reading: false,
constructed: true,
sync: true,
needReadable: false,
emittedReadable: false,
readableListening: false,
resumeScheduled: false,
errorEmitted: false,
emitClose: true,
autoDestroy: true,
destroyed: false,
errored: null,
closed: false,
closeEmitted: false,
defaultEncoding: 'utf8',
awaitDrainWriters: null,
multiAwaitDrain: false,
readingMore: false,
dataEmitted: false,
decoder: null,
encoding: null,
[Symbol(kPaused)]: null
}</code>2.2 Internal Data Storage Implementation (BufferList)
BufferList is the container used by streams to store internal data. It is implemented as a linked list with three properties:
head,
tail, and
length.
Each node in BufferList is a BufferNode; the type of its data depends on
objectMode.
This structure provides faster head‑access than
Array.prototype.shift().
2.2.1 Data Storage Types
If
objectMode === true:
Data can be of any type; whatever is pushed is stored as‑is.
objectMode = true
<code>const Stream = require('stream');
const readableStream = new Stream.Readable({
objectMode: true,
read() {}
});
readableStream.push({ name: 'lisa' });
console.log(readableStream._readableState.buffer.tail);
readableStream.push(true);
console.log(readableStream._readableState.buffer.tail);
readableStream.push('lisa');
console.log(readableStream._readableState.buffer.tail);
readableStream.push(666);
console.log(readableStream._readableState.buffer.tail);
readableStream.push(() => {});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(Symbol(1));
console.log(readableStream._readableState.buffer.tail);
readableStream.push(BigInt(123));
console.log(readableStream._readableState.buffer.tail);
</code>Result shown in the following image:
If
objectMode === false:
Data can only be
string,
Buffer, or
Uint8Array.
objectMode = false
<code>const Stream = require('stream');
const readableStream = new Stream.Readable({
objectMode: false,
read() {}
});
readableStream.push({ name: 'lisa' });
</code>Result shown in the following image:
2.2.2 Data Storage Structure
When we create a readable stream in the Node REPL and push data, we first need to implement the
_readmethod or provide a
readfunction in the constructor.
<code>const Stream = require('stream');
const readableStream = new Stream.Readable();
RS._read = function(size) {};
</code>or
<code>const Stream = require('stream');
const readableStream = new Stream.Readable({
read(size) {}
});
</code>After
readableStream.push('abc')the buffer contains two nodes whose data are the ASCII codes of the string 'abc', stored as Buffer objects. The
lengthproperty reflects the number of stored chunks, not the byte size.
2.2.3 Related API
Printing all BufferList methods yields the following image (omitted for brevity). Apart from
join(which serialises the list to a string), the other methods perform data access operations. Below are the most relevant ones.
2.2.3.1 consume
Source: BufferList.consume
<code>// Consumes a specified amount of bytes or characters from the buffered data.
consume(n, hasStrings) {
const data = this.head.data;
if (n < data.length) {
// slice works for both buffers and strings.
const slice = data.slice(0, n);
this.head.data = data.slice(n);
return slice;
}
if (n === data.length) {
// First chunk is a perfect match.
return this.shift();
}
// Result spans more than one buffer.
return hasStrings ? this._getString(n) : this._getBuffer(n);
}
</code>The function has three branches:
If the requested length is smaller than the head node, it returns the first
nbytes and updates the head.
If the requested length equals the head node length, it returns the whole head node.
If the requested length is larger, it delegates to
_getStringor
_getBufferdepending on the underlying storage type.
2.2.3.2 _getBuffer
Source: BufferList._getBuffer
<code>// Consumes a specified amount of bytes from the buffered data.
_getBuffer(n) {
const ret = Buffer.allocUnsafe(n);
const retLen = n;
let p = this.head;
let c = 0;
do {
const buf = p.data;
if (n > buf.length) {
TypedArrayPrototypeSet(ret, buf, retLen - n);
n -= buf.length;
} else {
if (n === buf.length) {
TypedArrayPrototypeSet(ret, buf, retLen - n);
++c;
if (p.next)
this.head = p.next;
else
this.head = this.tail = null;
} else {
TypedArrayPrototypeSet(ret,
new Uint8Array(buf.buffer, buf.byteOffset, n),
retLen - n);
this.head = p;
p.data = buf.slice(n);
}
break;
}
++c;
} while ((p = p.next) !== null);
this.length -= c;
return ret;
}
</code>It iterates over the linked list, copying data into a newly allocated Buffer until the requested length is satisfied.
2.2.3.3 _getString
Source: BufferList._getString
<code>// Consumes a specified amount of characters from the buffered data.
_getString(n) {
let ret = '';
let p = this.head;
let c = 0;
do {
const str = p.data;
if (n > str.length) {
ret += str;
n -= str.length;
} else {
if (n === str.length) {
ret += str;
++c;
if (p.next)
this.head = p.next;
else
this.head = this.tail = null;
} else {
ret += StringPrototypeSlice(str, 0, n);
this.head = p;
p.data = StringPrototypeSlice(str, n);
}
break;
}
++c;
} while ((p = p.next) !== null);
this.length -= c;
return ret;
}
</code>The logic mirrors
_getBufferbut works with strings.
2.3 Why Is a Readable Stream an Instance of EventEmitter?
Understanding the publish‑subscribe pattern is essential: callbacks are stored in a queue and invoked later, separating producers from consumers. Node.js streams follow this pattern, which is why they inherit from EventEmitter.
The inheritance chain is built by first making the Stream constructor inherit from EventEmitter, then making Readable inherit from Stream:
<code>function Stream(opts) {
EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);
</code>Later, Readable’s prototype is set to Stream’s prototype:
<code>ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
</code>Thus:
Readable.prototype.__proto__ === Stream.prototype
Stream.prototype.__proto__ === EE.prototype
Readable.prototype.__proto__.__proto__ === EE.prototype
Traversing this chain shows that Readable streams inherit all EventEmitter methods.
2.4 Implementation of Core APIs
Below are the most important methods of a readable stream, shown in the order they appear in the source.
2.4.1 push
push adds a chunk to the internal buffer or emits it directly when the stream is flowing.
<code>Readable.prototype.push = function(chunk, encoding) {
return readableAddChunk(this, chunk, encoding, false);
};
</code>The pseudo‑code for
readableAddChunkhighlights the main flow:
<code>function readableAddChunk(stream, chunk, encoding, addToFront) {
const state = stream._readableState;
if (chunk === null) { // end of stream
state.reading = false;
onEofChunk(stream, state);
} else if (!state.objectMode) {
if (typeof chunk === 'string') {
chunk = Buffer.from(chunk);
} else if (chunk instanceof Buffer) {
// handle encoding …
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
} else if (chunk != null) {
err = new ERR_INVALID_ARG_TYPE('chunk', ['string','Buffer','Uint8Array'], chunk);
}
}
if (state.objectMode || (chunk && chunk.length > 0)) {
// insert chunk into buffer
addChunk(stream, state, chunk, true);
}
}
</code>Key points:
When
objectModeis false, the chunk is converted to a Buffer.
When
objectModeis true, the chunk is passed through unchanged.
If the stream is flowing and there are data listeners, the chunk is emitted immediately; otherwise it is stored in the buffer.
2.4.2 read
read first calls the user‑implemented
_readmethod (or a Promise‑based version) and then extracts data from the internal buffer.
<code>// Adjust highWaterMark if needed
if (n > state.highWaterMark) {
state.highWaterMark = computeNewHighWaterMark(n);
}
try {
const result = this._read(state.highWaterMark);
if (result != null) {
const then = result.then;
if (typeof then === 'function') {
then.call(result, nop, function(err) {
errorOrDestroy(this, err);
});
}
}
} catch (err) {
errorOrDestroy(this, err);
}
</code>The core extraction logic lives in
fromList:
<code>function fromList(n, state) {
if (state.length === 0) return null;
let ret;
if (state.objectMode)
ret = state.buffer.shift();
else if (!n || n >= state.length) {
// read everything
if (state.decoder)
ret = state.buffer.join('');
else if (state.buffer.length === 1)
ret = state.buffer.first();
else
ret = state.buffer.concat(state.length);
state.buffer.clear();
} else {
// read n bytes
ret = state.buffer.consume(n, state.decoder);
}
return ret;
}
</code>2.4.3 _read
The user must provide
_read. Inside it they typically call
pushrepeatedly and finally push
nullto signal the end of the stream.
<code>const Stream = require('stream');
const readableStream = new Stream.Readable({
read(hwm) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 122) {
this.push(null);
}
},
});
readableStream.currentCharCode = 97;
readableStream.pipe(process.stdout);
// outputs: abcdefghijklmnopqrstuvwxyz
</code>2.4.4 pipe (important)
pipe attaches one or more writable streams to the readable stream and switches the readable into flowing mode.
<code>Readable.prototype.pipe = function(dest, pipeOpts) {
const src = this;
const state = this._readableState;
state.pipes.push(dest);
src.on('data', ondata);
function ondata(chunk) {
const ret = dest.write(chunk);
if (ret === false) {
pause();
}
}
dest.emit('pipe', src);
if (dest.writableNeedDrain === true) {
if (state.flowing) {
pause();
}
} else if (!state.flowing) {
src.resume();
}
return dest;
};
</code>It works similarly to the Unix pipe operator, moving data from the left side to the right side.
2.4.5 resume
resume switches a stream from paused to flowing mode unless a 'readable' listener is present.
<code>Readable.prototype.resume = function() {
const state = this._readableState;
if (!state.flowing) {
state.flowing = !state.readableListening;
resume(this, state);
}
};
function resume(stream, state) {
if (!state.resumeScheduled) {
state.resumeScheduled = true;
process.nextTick(resume_, stream, state);
}
}
function resume_(stream, state) {
if (!state.reading) {
stream.read(0);
}
state.resumeScheduled = false;
stream.emit('resume');
flow(stream);
}
function flow(stream) {
const state = stream._readableState;
while (state.flowing && stream.read() !== null);
}
</code>2.4.6 pause
pause stops the 'data' events and keeps incoming data in the internal buffer.
<code>Readable.prototype.pause = function() {
if (this._readableState.flowing !== false) {
this._readableState.flowing = false;
this.emit('pause');
}
return this;
};
</code>2.5 Usage and Working Mechanism
2.5.1 Working Mechanism
The diagram shows the overall flow and the conditions that trigger mode transitions for a readable stream.
needReadable === true: paused mode, buffer size ≤ highWaterMark, a 'readable' listener is attached, or a read request finds no data.
push: in flowing mode emits 'data' immediately; otherwise stores data and may emit 'readable' when needed.
read: reading with length 0 triggers 'readable' when the buffer reaches or exceeds the highWaterMark; otherwise consumes data and emits 'data'.
resume: has no effect if a 'readable' listener exists; otherwise switches to flowing mode and drains the buffer.
readableis emitted when a listener is attached and data is available, when
pushadds data while
needReadableis true, or when a zero‑length read finds sufficient buffered data.
3. Summary
Node.js implements its own stream system to avoid memory and latency problems by processing data in small chunks.
Streams are not exclusive to Node.js; they originated in Unix decades ago.
There are four stream types—readable, writable, duplex, and transform—all inheriting from EventEmitter.
The underlying container is a BufferList, a custom linked‑list structure with head and tail pointers.
Readable streams have two modes (flowing and paused) and three internal states governing data production.
Using streams enables chainable data processing and flexible composition of transformation steps.
Tencent IMWeb Frontend Team
IMWeb Frontend Community gathering frontend development enthusiasts. Follow us for refined live courses by top experts, cutting‑edge technical posts, and to sharpen your frontend skills.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.