Skip to content

Commit 346abdd

Browse files
authored
stream: improve readable webstream pipeTo
PR-URL: #49690 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Yagiz Nizipli <[email protected]> Reviewed-By: Moshe Atlow <[email protected]>
1 parent 1149d6b commit 346abdd

File tree

1 file changed

+34
-17
lines changed

1 file changed

+34
-17
lines changed

lib/internal/webstreams/readablestream.js

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ const {
1414
ObjectCreate,
1515
ObjectDefineProperties,
1616
ObjectSetPrototypeOf,
17-
Promise,
1817
PromisePrototypeThen,
1918
PromiseResolve,
2019
PromiseReject,
@@ -1351,7 +1350,9 @@ function readableStreamPipeTo(
13511350

13521351
const promise = createDeferredPromise();
13531352

1354-
let currentWrite = PromiseResolve();
1353+
const state = {
1354+
currentWrite: PromiseResolve(),
1355+
};
13551356

13561357
// The error here can be undefined. The rejected arg
13571358
// tells us that the promise must be rejected even
@@ -1368,9 +1369,9 @@ function readableStreamPipeTo(
13681369
}
13691370

13701371
async function waitForCurrentWrite() {
1371-
const write = currentWrite;
1372+
const write = state.currentWrite;
13721373
await write;
1373-
if (write !== currentWrite)
1374+
if (write !== state.currentWrite)
13741375
await waitForCurrentWrite();
13751376
}
13761377

@@ -1461,20 +1462,14 @@ function readableStreamPipeTo(
14611462
async function step() {
14621463
if (shuttingDown)
14631464
return true;
1465+
14641466
await writer[kState].ready.promise;
1465-
return new Promise((resolve, reject) => {
1466-
readableStreamDefaultReaderRead(
1467-
reader,
1468-
{
1469-
[kChunk](chunk) {
1470-
currentWrite = writableStreamDefaultWriterWrite(writer, chunk);
1471-
setPromiseHandled(currentWrite);
1472-
resolve(false);
1473-
},
1474-
[kClose]: () => resolve(true),
1475-
[kError]: reject,
1476-
});
1477-
});
1467+
1468+
const promise = createDeferredPromise();
1469+
// eslint-disable-next-line no-use-before-define
1470+
readableStreamDefaultReaderRead(reader, new PipeToReadableStreamReadRequest(writer, state, promise));
1471+
1472+
return promise.promise;
14781473
}
14791474

14801475
async function run() {
@@ -1536,6 +1531,28 @@ function readableStreamPipeTo(
15361531
return promise.promise;
15371532
}
15381533

1534+
class PipeToReadableStreamReadRequest {
1535+
constructor(writer, state, promise) {
1536+
this.writer = writer;
1537+
this.state = state;
1538+
this.promise = promise;
1539+
}
1540+
1541+
[kChunk](chunk) {
1542+
this.state.currentWrite = writableStreamDefaultWriterWrite(this.writer, chunk);
1543+
setPromiseHandled(this.state.currentWrite);
1544+
this.promise.resolve(false);
1545+
}
1546+
1547+
[kClose]() {
1548+
this.promise.resolve(true);
1549+
}
1550+
1551+
[kError](error) {
1552+
this.promise.reject(error);
1553+
}
1554+
}
1555+
15391556
function readableStreamTee(stream, cloneForBranch2) {
15401557
if (isReadableByteStreamController(stream[kState].controller)) {
15411558
return readableByteStreamTee(stream);

0 commit comments

Comments
 (0)