Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions benchmark/webstreams/readable-async-iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,40 @@ const {

const bench = common.createBenchmark(main, {
n: [1e5],
type: ['normal', 'bytes'],
});


async function main({ n }) {
const rs = new ReadableStream({
pull: function(controller) {
controller.enqueue(1);
},
});
async function main({ n, type }) {
const rs = type === 'bytes' ?
new ReadableStream({
type: 'bytes',
pull: function(controller) {
controller.enqueue(new Uint8Array(1));
},
}) :
new ReadableStream({
pull: function(controller) {
controller.enqueue(1);
},
});

let x = 0;

bench.start();
for await (const chunk of rs) {
x += chunk;
if (x > n) {
break;
if (type === 'bytes') {
for await (const chunk of rs) {
x += chunk.byteLength;
if (x > n) {
break;
}
}
} else {
for await (const chunk of rs) {
x += chunk;
if (x > n) {
break;
}
}
}
// Use x to ensure V8 does not optimize away the loop as a noop.
Expand Down
137 changes: 93 additions & 44 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@ class ReadableStream {
current: undefined,
};
let started = false;
// A single reusable read request: at most one read is ever in flight
// (next() chains through state.current), and the request is consumed
// before the next read starts, so only its promise record changes
// per read.
// eslint-disable-next-line no-use-before-define
const readRequest = new ReadableStreamAsyncIteratorReadRequest(reader, state, undefined);

// The nextSteps function is not an async function in order
// to make it more efficient. Because nextSteps explicitly
Expand All @@ -519,8 +525,8 @@ class ReadableStream {
}
const promise = PromiseWithResolvers();

// eslint-disable-next-line no-use-before-define
readableStreamDefaultReaderRead(reader, new ReadableStreamAsyncIteratorReadRequest(reader, state, promise));
readRequest.promise = promise;
readableStreamDefaultReaderRead(reader, readRequest);
return promise.promise;
}

Expand Down Expand Up @@ -574,28 +580,38 @@ class ReadableStream {
}
// No read is in flight. Mirror the buffered fast path of
// ReadableStreamDefaultReader.read(): when data is already queued
// in a default controller, resolve immediately without allocating
// a read request. The result settles synchronously, so leaving
// in the controller, resolve immediately without allocating a
// read request. The result settles synchronously, so leaving
// state.current undefined matches the state the slow path reaches
// once its read request callbacks have settled.
const stream = reader[kState].stream;
if (!state.done && stream !== undefined) {
if (!state.done && stream !== undefined &&
stream[kState].state === 'readable') {
const controller = stream[kState].controller;
if (stream[kState].state === 'readable' &&
isReadableStreamDefaultController(controller) &&
controller[kState].queue.length > 0) {
stream[kState].disturbed = true;
const chunk = dequeueValue(controller);

if (controller[kState].closeRequested &&
!controller[kState].queue.length) {
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamClose(stream);
} else {
readableStreamDefaultControllerCallPullIfNeeded(controller);
if (isReadableStreamDefaultController(controller)) {
if (controller[kState].queue.length > 0) {
stream[kState].disturbed = true;
const chunk = dequeueValue(controller);

if (controller[kState].closeRequested &&
!controller[kState].queue.length) {
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamClose(stream);
} else {
readableStreamDefaultControllerCallPullIfNeeded(controller);
}

return PromiseResolve({ done: false, value: chunk });
}
} else if (controller[kState].queueTotalSize > 0) {
// Byte controller with buffered data: same shape as above via
// the queue-filled arm of the byte controller's pull steps.
stream[kState].disturbed = true;
return PromiseResolve({
done: false,

return PromiseResolve({ done: false, value: chunk });
value: readableByteStreamControllerDequeueChunk(controller),
});
}
}
state.current = nextSteps();
Expand Down Expand Up @@ -918,24 +934,36 @@ class ReadableStreamDefaultReader {
const stream = this[kState].stream;
const controller = stream[kState].controller;

// Fast path: if data is already buffered in a default controller,
// Fast path: if data is already buffered in the controller's queue,
// return a resolved promise immediately without creating a read request.
// This is spec-compliant because read() returns a Promise, and
// Promise.resolve() callbacks still run in the microtask queue.
if (stream[kState].state === 'readable' &&
isReadableStreamDefaultController(controller) &&
controller[kState].queue.length > 0) {
stream[kState].disturbed = true;
const chunk = dequeueValue(controller);
if (stream[kState].state === 'readable') {
if (isReadableStreamDefaultController(controller)) {
if (controller[kState].queue.length > 0) {
stream[kState].disturbed = true;
const chunk = dequeueValue(controller);

if (controller[kState].closeRequested && !controller[kState].queue.length) {
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamClose(stream);
} else {
readableStreamDefaultControllerCallPullIfNeeded(controller);
}

if (controller[kState].closeRequested && !controller[kState].queue.length) {
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamClose(stream);
} else {
readableStreamDefaultControllerCallPullIfNeeded(controller);
return PromiseResolve({ done: false, value: chunk });
}
} else if (controller[kState].queueTotalSize > 0) {
// Byte controller with buffered data: mirror the queue-filled arm
// of its pull steps (which never consults pendingPullIntos) minus
// the read request.
stream[kState].disturbed = true;
return PromiseResolve({
done: false,

value: readableByteStreamControllerDequeueChunk(controller),
});
}

return PromiseResolve({ done: false, value: chunk });
}

// Slow path: create request and go through normal flow
Expand Down Expand Up @@ -3044,9 +3072,23 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
}
}

if (readableStreamHasDefaultReader(stream)) {
readableByteStreamControllerProcessReadRequestsUsingQueue(controller);
if (!readableStreamGetNumReadRequests(stream)) {
// Single consolidated pass over the reader state. The spec routes this
// through HasDefaultReader / ProcessReadRequestsUsingQueue /
// GetNumReadRequests / FulfillReadRequest, which would re-run the same
// reader brand check and re-load the read request list four times on
// this per-chunk path.
const { reader } = stream[kState];
if (reader !== undefined &&
reader[kState] !== undefined &&
reader[kType] === 'ReadableStreamDefaultReader') {
const { readRequests } = reader[kState];
if (readRequests.length && controller[kState].queueTotalSize > 0) {
// Only possible when data was enqueued while the stream was not
// being read; read requests otherwise never coexist with a
// non-empty queue.
readableByteStreamControllerProcessReadRequestsUsingQueue(controller);
}
if (!readRequests.length) {
readableByteStreamControllerEnqueueChunkToQueue(
controller,
transferredBuffer,
Expand All @@ -3060,7 +3102,8 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
}
const transferredView =
new Uint8Array(transferredBuffer, byteOffset, byteLength);
readableStreamFulfillReadRequest(stream, transferredView, false);
const readRequest = ArrayPrototypeShift(readRequests);
readRequest[kChunk](transferredView);
}
} else if (readableStreamHasBYOBReader(stream)) {
readableByteStreamControllerEnqueueChunkToQueue(
Expand Down Expand Up @@ -3395,22 +3438,28 @@ function readableByteStreamControllerCancelSteps(controller, reason) {
return result;
}

function readableByteStreamControllerFillReadRequestFromQueue(controller, readRequest) {
const {
queue,
queueTotalSize,
} = controller[kState];
assert(queueTotalSize > 0);
// Dequeues the first chunk of the byte queue as a Uint8Array view,
// handling queue drain (close-on-empty or pull) before the view is
// created. This is the [[queueTotalSize]] > 0 arm of the byte
// controller's pull steps; it is also called directly from the
// buffered fast paths in ReadableStreamDefaultReader.read() and the
// async iterator, which resolve with the view without allocating a
// read request.
function readableByteStreamControllerDequeueChunk(controller) {
assert(controller[kState].queueTotalSize > 0);
const {
buffer,
byteOffset,
byteLength,
} = ArrayPrototypeShift(queue);
} = ArrayPrototypeShift(controller[kState].queue);

controller[kState].queueTotalSize -= byteLength;
readableByteStreamControllerHandleQueueDrain(controller);
const view = new Uint8Array(buffer, byteOffset, byteLength);
readRequest[kChunk](view);
return new Uint8Array(buffer, byteOffset, byteLength);
}

function readableByteStreamControllerFillReadRequestFromQueue(controller, readRequest) {
readRequest[kChunk](readableByteStreamControllerDequeueChunk(controller));
}

function readableByteStreamControllerProcessReadRequestsUsingQueue(controller) {
Expand Down
32 changes: 25 additions & 7 deletions lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ const {
ArrayPrototypePush,
ArrayPrototypeShift,
AsyncIteratorPrototype,
DataViewPrototypeGetBuffer,
DataViewPrototypeGetByteLength,
DataViewPrototypeGetByteOffset,
FunctionPrototypeCall,
MathMax,
NumberIsNaN,
PromisePrototypeThen,
PromiseReject,
PromiseResolve,
ReflectGet,
Symbol,
TypedArrayPrototypeGetBuffer,
TypedArrayPrototypeGetByteLength,
TypedArrayPrototypeGetByteOffset,
Uint8Array,
} = primordials;

Expand All @@ -41,6 +46,10 @@ const {

const assert = require('internal/assert');

const {
isDataView,
} = require('internal/util/types');

const {
validateFunction,
} = require('internal/validators');
Expand Down Expand Up @@ -93,20 +102,29 @@ function customInspect(depth, options, name, data) {
return `${name} ${inspect(data, opts)}`;
}

// These are defensive to work around the possibility that
// the buffer, byteLength, and byteOffset properties on
// ArrayBuffer and ArrayBufferView's may have been tampered with.
// These use the original prototype getters so that user tampering with
// the buffer, byteLength, and byteOffset properties on ArrayBuffer and
// ArrayBufferView's is not observed. They run once or more per chunk on
// every byte-stream path, so they must not go through a reflective get
// (the previous view.constructor.prototype lookup was both slower and
// spoofable via a user-defined .constructor).

function ArrayBufferViewGetBuffer(view) {
return ReflectGet(view.constructor.prototype, 'buffer', view);
return isDataView(view) ?
DataViewPrototypeGetBuffer(view) :
TypedArrayPrototypeGetBuffer(view);
}

function ArrayBufferViewGetByteLength(view) {
return ReflectGet(view.constructor.prototype, 'byteLength', view);
return isDataView(view) ?
DataViewPrototypeGetByteLength(view) :
TypedArrayPrototypeGetByteLength(view);
}

function ArrayBufferViewGetByteOffset(view) {
return ReflectGet(view.constructor.prototype, 'byteOffset', view);
return isDataView(view) ?
DataViewPrototypeGetByteOffset(view) :
TypedArrayPrototypeGetByteOffset(view);
}

function cloneAsUint8Array(view) {
Expand Down
Loading