From 4aa61090b65608897b123687cafd804e74e855f1 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 12 Jul 2024 17:41:15 +0000 Subject: [PATCH 1/6] feat(util-stream): add stream splitting function --- .changeset/metal-snakes-remember.md | 5 + packages/util-stream/package.json | 10 +- .../util-stream/src/headStream.browser.ts | 39 +++++++ packages/util-stream/src/headStream.spec.ts | 108 ++++++++++++++++++ packages/util-stream/src/headStream.ts | 52 +++++++++ packages/util-stream/src/index.ts | 2 + .../util-stream/src/splitStream.browser.ts | 11 ++ packages/util-stream/src/splitStream.spec.ts | 43 +++++++ packages/util-stream/src/splitStream.ts | 14 +++ 9 files changed, 282 insertions(+), 2 deletions(-) create mode 100644 .changeset/metal-snakes-remember.md create mode 100644 packages/util-stream/src/headStream.browser.ts create mode 100644 packages/util-stream/src/headStream.spec.ts create mode 100644 packages/util-stream/src/headStream.ts create mode 100644 packages/util-stream/src/splitStream.browser.ts create mode 100644 packages/util-stream/src/splitStream.spec.ts create mode 100644 packages/util-stream/src/splitStream.ts diff --git a/.changeset/metal-snakes-remember.md b/.changeset/metal-snakes-remember.md new file mode 100644 index 00000000000..9117d2c1f87 --- /dev/null +++ b/.changeset/metal-snakes-remember.md @@ -0,0 +1,5 @@ +--- +"@smithy/util-stream": minor +--- + +add stream splitting to sdkStreamMixin diff --git a/packages/util-stream/package.json b/packages/util-stream/package.json index ed622c4686f..18f0df7f814 100644 --- a/packages/util-stream/package.json +++ b/packages/util-stream/package.json @@ -56,13 +56,19 @@ ], "browser": { "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", - "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser" + "./dist-es/headStream": "./dist-es/headStream.browser", + "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser", + "./dist-es/splitStream": "./dist-es/splitStream.browser" }, "react-native": { "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser", + "./dist-es/headStream": "./dist-es/headStream.browser", + "./dist-es/splitStream": "./dist-es/splitStream.browser", "./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser", - "./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser" + "./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser", + "./dist-cjs/headStream": "./dist-cjs/headStream.browser", + "./dist-cjs/splitStream": "./dist-cjs/splitStream.browser" }, "homepage": "https://github.com/awslabs/smithy-typescript/tree/main/packages/util-stream", "repository": { diff --git a/packages/util-stream/src/headStream.browser.ts b/packages/util-stream/src/headStream.browser.ts new file mode 100644 index 00000000000..afdc0995f00 --- /dev/null +++ b/packages/util-stream/src/headStream.browser.ts @@ -0,0 +1,39 @@ +/** + * @internal + * @param stream + * @param bytes - read head bytes from the stream and discard the rest of it. + * + * Caution: the input stream must be destroyed separately, this function does not do so. + */ +export async function headStream(stream: ReadableStream, bytes: number): Promise { + let byteLengthCounter = 0; + const chunks = []; + const reader = stream.getReader(); + let isDone = false; + + while (!isDone) { + const { done, value } = await reader.read(); + if (value) { + chunks.push(value); + byteLengthCounter += value?.byteLength ?? 0; + } + if (byteLengthCounter >= bytes) { + break; + } + isDone = done; + } + reader.releaseLock(); + + const collected = new Uint8Array(Math.min(bytes, byteLengthCounter)); + let offset = 0; + for (const chunk of chunks) { + if (chunk.byteLength > collected.byteLength - offset) { + collected.set(chunk.subarray(0, collected.byteLength - offset), offset); + break; + } else { + collected.set(chunk, offset); + } + offset += chunk.length; + } + return collected; +} diff --git a/packages/util-stream/src/headStream.spec.ts b/packages/util-stream/src/headStream.spec.ts new file mode 100644 index 00000000000..f8d6c93e2b2 --- /dev/null +++ b/packages/util-stream/src/headStream.spec.ts @@ -0,0 +1,108 @@ +import { Readable } from "stream"; + +import { headStream } from "./headStream"; +import { headStream as headWebStream } from "./headStream.browser"; +import { splitStream } from "./splitStream"; +import { splitStream as splitWebStream } from "./splitStream.browser"; + +const CHUNK_SIZE = 4; +const a32 = "abcd".repeat(32_000 / CHUNK_SIZE); +const a16 = "abcd".repeat(16_000 / CHUNK_SIZE); +const a8 = "abcd".repeat(8); +const a4 = "abcd".repeat(4); +const a2 = "abcd".repeat(2); +const a1 = "abcd".repeat(1); + +describe(headStream.name, () => { + it("should collect the head of a Node.js stream", async () => { + const data = Buffer.from(a32); + const myStream = Readable.from(data); + + const head = await headStream(myStream, 16_000); + + expect(Buffer.from(head).toString()).toEqual(a16); + }); + + it("should collect the head of a web stream", async () => { + if (typeof ReadableStream !== "undefined") { + const buffer = Buffer.from(a32); + const data = Array.from(new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength)); + + const myStream = new ReadableStream({ + start(controller) { + for (const inputChunk of data) { + controller.enqueue(new Uint8Array([inputChunk])); + } + controller.close(); + }, + }); + + const head = await headWebStream(myStream, 16_000); + expect(Buffer.from(head).toString()).toEqual(a16); + } + }); +}); + +describe("splitStream and headStream integration", () => { + it("should split and head streams for Node.js", async () => { + const data = Buffer.from(a32); + const myStream = Readable.from(data); + + const [a, _1] = await splitStream(myStream); + const [b, _2] = await splitStream(_1); + const [c, _3] = await splitStream(_2); + const [d, _4] = await splitStream(_3); + const [e, f] = await splitStream(_4); + + const buffer1 = await headStream(a, Infinity); + const buffer2 = await headStream(b, 16_000); + const buffer3 = await headStream(c, 8 * CHUNK_SIZE); + const buffer4 = await headStream(d, 4 * CHUNK_SIZE); + const buffer5 = await headStream(e, 2 * CHUNK_SIZE); + const buffer6 = await headStream(f, CHUNK_SIZE); + + await Promise.all([a, b, c, d, e, f].map((stream) => stream.destroy())); + + expect(Buffer.from(buffer1).toString()).toEqual(a32); + expect(Buffer.from(buffer2).toString()).toEqual(a16); + expect(Buffer.from(buffer3).toString()).toEqual(a8); + expect(Buffer.from(buffer4).toString()).toEqual(a4); + expect(Buffer.from(buffer5).toString()).toEqual(a2); + expect(Buffer.from(buffer6).toString()).toEqual(a1); + }); + + it("should split and head streams for web streams API", async () => { + if (typeof ReadableStream !== "undefined") { + const buffer = Buffer.from(a8); + const data = Array.from(new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength)); + + const myStream = new ReadableStream({ + start(controller) { + for (let i = 0; i < data.length; i += CHUNK_SIZE) { + controller.enqueue(new Uint8Array(data.slice(i, i + CHUNK_SIZE))); + } + controller.close(); + }, + }); + + const [a, _1] = await splitWebStream(myStream); + const [b, _2] = await splitWebStream(_1); + const [c, _3] = await splitWebStream(_2); + const [d, e] = await splitWebStream(_3); + + const buffer1 = await headWebStream(a, Infinity); + const buffer2 = await headWebStream(b, 8 * CHUNK_SIZE); + const buffer3 = await headWebStream(c, 4 * CHUNK_SIZE); + const buffer4 = await headWebStream(d, 2 * CHUNK_SIZE); + const buffer5 = await headWebStream(e, CHUNK_SIZE); + + await Promise.all([a, b, c, d, e].map((stream) => stream.cancel())); + + expect(Buffer.from(buffer1).toString()).toEqual(a8); + expect(Buffer.from(buffer2).toString()).toEqual(a8); + expect(Buffer.from(buffer3).toString()).toEqual(a4); + expect(Buffer.from(buffer4).toString()).toEqual(a2); + expect(Buffer.from(buffer5).toString()).toEqual(a1); + } + }); +}); diff --git a/packages/util-stream/src/headStream.ts b/packages/util-stream/src/headStream.ts new file mode 100644 index 00000000000..2a29434e6a1 --- /dev/null +++ b/packages/util-stream/src/headStream.ts @@ -0,0 +1,52 @@ +import { Readable, Writable } from "stream"; + +import { headStream as headWebStream } from "./headStream.browser"; + +/** + * @internal + * @param stream + * @param bytes - read head bytes from the stream and discard the rest of it. + * + * Caution: the input stream must be destroyed separately, this function does not do so. + */ +export const headStream = (stream: Readable | ReadableStream, bytes: number): Promise => { + if (isReadableStreamInstance(stream)) { + // Web stream API in Node.js + return headWebStream(stream, bytes); + } + return new Promise((resolve, reject) => { + const collector = new Collector(); + collector.limit = bytes; + stream.pipe(collector); + stream.on("error", (err) => { + collector.end(); + reject(err); + }); + collector.on("error", reject); + collector.on("finish", function (this: Collector) { + const bytes = new Uint8Array(Buffer.concat(this.buffers)); + resolve(bytes); + }); + }); +}; + +class Collector extends Writable { + public readonly buffers: Buffer[] = []; + public limit = Infinity; + private bytesBuffered = 0; + + _write(chunk: Buffer, encoding: string, callback: (err?: Error) => void) { + this.buffers.push(chunk); + this.bytesBuffered += chunk.byteLength ?? 0; + if (this.bytesBuffered >= this.limit) { + const excess = this.bytesBuffered - this.limit; + const tailBuffer = this.buffers[this.buffers.length - 1]; + this.buffers[this.buffers.length - 1] = tailBuffer.subarray(0, tailBuffer.byteLength - excess); + this.emit("finish"); + } + callback(); + } +} + +const isReadableStreamInstance = (stream: unknown): stream is ReadableStream => + typeof ReadableStream === "function" && stream instanceof ReadableStream; diff --git a/packages/util-stream/src/index.ts b/packages/util-stream/src/index.ts index 47bcb14b718..75cac0beeb7 100644 --- a/packages/util-stream/src/index.ts +++ b/packages/util-stream/src/index.ts @@ -1,3 +1,5 @@ export * from "./blob/Uint8ArrayBlobAdapter"; export * from "./getAwsChunkedEncodingStream"; export * from "./sdk-stream-mixin"; +export * from "./splitStream"; +export * from "./headStream"; diff --git a/packages/util-stream/src/splitStream.browser.ts b/packages/util-stream/src/splitStream.browser.ts new file mode 100644 index 00000000000..16314858512 --- /dev/null +++ b/packages/util-stream/src/splitStream.browser.ts @@ -0,0 +1,11 @@ +/** + * @param stream + * @returns stream split into two identical streams. + */ +export async function splitStream(stream: ReadableStream | Blob): Promise<[ReadableStream, ReadableStream]> { + if (typeof (stream as Blob).stream === "function") { + stream = (stream as Blob).stream(); + } + const readableStream = stream as ReadableStream; + return readableStream.tee(); +} diff --git a/packages/util-stream/src/splitStream.spec.ts b/packages/util-stream/src/splitStream.spec.ts new file mode 100644 index 00000000000..2c1ecf90d0f --- /dev/null +++ b/packages/util-stream/src/splitStream.spec.ts @@ -0,0 +1,43 @@ +import { streamCollector as webStreamCollector } from "@smithy/fetch-http-handler"; +import { streamCollector } from "@smithy/node-http-handler"; +import { Readable } from "stream"; + +import { splitStream } from "./splitStream"; +import { splitStream as splitWebStream } from "./splitStream.browser"; + +describe(splitStream.name, () => { + it("should split a node:Readable stream", async () => { + const data = Buffer.from("abcd"); + + const myStream = Readable.from(data); + const [a, b] = await splitStream(myStream); + + const buffer1 = await streamCollector(a); + const buffer2 = await streamCollector(b); + + expect(buffer1).toEqual(new Uint8Array([97, 98, 99, 100])); + expect(buffer1).toEqual(buffer2); + }); + it("should split a web:ReadableStream stream", async () => { + if (typeof ReadableStream !== "undefined") { + const inputChunks = [97, 98, 99, 100]; + + const myStream = new ReadableStream({ + start(controller) { + for (const inputChunk of inputChunks) { + controller.enqueue(new Uint8Array([inputChunk])); + } + controller.close(); + }, + }); + + const [a, b] = await splitWebStream(myStream); + + const bytes1 = await webStreamCollector(a); + const bytes2 = await webStreamCollector(b); + + expect(bytes1).toEqual(new Uint8Array([97, 98, 99, 100])); + expect(bytes1).toEqual(bytes2); + } + }); +}); diff --git a/packages/util-stream/src/splitStream.ts b/packages/util-stream/src/splitStream.ts new file mode 100644 index 00000000000..61d6cbcbb12 --- /dev/null +++ b/packages/util-stream/src/splitStream.ts @@ -0,0 +1,14 @@ +import type { Readable as IReadable } from "stream"; +import { PassThrough } from "stream"; + +/** + * @param stream + * @returns stream split into two identical streams. + */ +export async function splitStream(stream: IReadable): Promise<[IReadable, IReadable]> { + const stream1 = new PassThrough(); + const stream2 = new PassThrough(); + stream.pipe(stream1); + stream.pipe(stream2); + return [stream1, stream2]; +} From 6010af3dc77b8b33745940fc8c1cddd9f5dd5ad5 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 12 Jul 2024 20:32:52 +0000 Subject: [PATCH 2/6] variable naming --- packages/util-stream/src/headStream.spec.ts | 44 ++++++++++----------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/packages/util-stream/src/headStream.spec.ts b/packages/util-stream/src/headStream.spec.ts index f8d6c93e2b2..18294f9bf7a 100644 --- a/packages/util-stream/src/headStream.spec.ts +++ b/packages/util-stream/src/headStream.spec.ts @@ -54,21 +54,21 @@ describe("splitStream and headStream integration", () => { const [d, _4] = await splitStream(_3); const [e, f] = await splitStream(_4); - const buffer1 = await headStream(a, Infinity); - const buffer2 = await headStream(b, 16_000); - const buffer3 = await headStream(c, 8 * CHUNK_SIZE); - const buffer4 = await headStream(d, 4 * CHUNK_SIZE); - const buffer5 = await headStream(e, 2 * CHUNK_SIZE); - const buffer6 = await headStream(f, CHUNK_SIZE); + const byteArr1 = await headStream(a, Infinity); + const byteArr2 = await headStream(b, 16_000); + const byteArr3 = await headStream(c, 8 * CHUNK_SIZE); + const byteArr4 = await headStream(d, 4 * CHUNK_SIZE); + const byteArr5 = await headStream(e, 2 * CHUNK_SIZE); + const byteArr6 = await headStream(f, CHUNK_SIZE); await Promise.all([a, b, c, d, e, f].map((stream) => stream.destroy())); - expect(Buffer.from(buffer1).toString()).toEqual(a32); - expect(Buffer.from(buffer2).toString()).toEqual(a16); - expect(Buffer.from(buffer3).toString()).toEqual(a8); - expect(Buffer.from(buffer4).toString()).toEqual(a4); - expect(Buffer.from(buffer5).toString()).toEqual(a2); - expect(Buffer.from(buffer6).toString()).toEqual(a1); + expect(Buffer.from(byteArr1).toString()).toEqual(a32); + expect(Buffer.from(byteArr2).toString()).toEqual(a16); + expect(Buffer.from(byteArr3).toString()).toEqual(a8); + expect(Buffer.from(byteArr4).toString()).toEqual(a4); + expect(Buffer.from(byteArr5).toString()).toEqual(a2); + expect(Buffer.from(byteArr6).toString()).toEqual(a1); }); it("should split and head streams for web streams API", async () => { @@ -90,19 +90,19 @@ describe("splitStream and headStream integration", () => { const [c, _3] = await splitWebStream(_2); const [d, e] = await splitWebStream(_3); - const buffer1 = await headWebStream(a, Infinity); - const buffer2 = await headWebStream(b, 8 * CHUNK_SIZE); - const buffer3 = await headWebStream(c, 4 * CHUNK_SIZE); - const buffer4 = await headWebStream(d, 2 * CHUNK_SIZE); - const buffer5 = await headWebStream(e, CHUNK_SIZE); + const byteArr1 = await headWebStream(a, Infinity); + const byteArr2 = await headWebStream(b, 8 * CHUNK_SIZE); + const byteArr3 = await headWebStream(c, 4 * CHUNK_SIZE); + const byteArr4 = await headWebStream(d, 2 * CHUNK_SIZE); + const byteArr5 = await headWebStream(e, CHUNK_SIZE); await Promise.all([a, b, c, d, e].map((stream) => stream.cancel())); - expect(Buffer.from(buffer1).toString()).toEqual(a8); - expect(Buffer.from(buffer2).toString()).toEqual(a8); - expect(Buffer.from(buffer3).toString()).toEqual(a4); - expect(Buffer.from(buffer4).toString()).toEqual(a2); - expect(Buffer.from(buffer5).toString()).toEqual(a1); + expect(Buffer.from(byteArr1).toString()).toEqual(a8); + expect(Buffer.from(byteArr2).toString()).toEqual(a8); + expect(Buffer.from(byteArr3).toString()).toEqual(a4); + expect(Buffer.from(byteArr4).toString()).toEqual(a2); + expect(Buffer.from(byteArr5).toString()).toEqual(a1); } }); }); From 2073c8cac7826e71e4c3b9b6a4cda7ab287ab425 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 12 Jul 2024 20:40:45 +0000 Subject: [PATCH 3/6] DRY isReadableStream typeguard --- .changeset/metal-snakes-remember.md | 2 +- packages/util-stream/src/headStream.ts | 5 +---- packages/util-stream/src/isReadableStream.ts | 5 +++++ .../util-stream/src/sdk-stream-mixin.browser.ts | 5 ++--- packages/util-stream/src/splitStream.ts | 14 ++++++++++++-- 5 files changed, 21 insertions(+), 10 deletions(-) create mode 100644 packages/util-stream/src/isReadableStream.ts diff --git a/.changeset/metal-snakes-remember.md b/.changeset/metal-snakes-remember.md index 9117d2c1f87..76818257b56 100644 --- a/.changeset/metal-snakes-remember.md +++ b/.changeset/metal-snakes-remember.md @@ -2,4 +2,4 @@ "@smithy/util-stream": minor --- -add stream splitting to sdkStreamMixin +add splitStream and headStream utilities diff --git a/packages/util-stream/src/headStream.ts b/packages/util-stream/src/headStream.ts index 2a29434e6a1..7ca1c1efdca 100644 --- a/packages/util-stream/src/headStream.ts +++ b/packages/util-stream/src/headStream.ts @@ -1,6 +1,7 @@ import { Readable, Writable } from "stream"; import { headStream as headWebStream } from "./headStream.browser"; +import { isReadableStreamInstance } from "./isReadableStream"; /** * @internal @@ -11,7 +12,6 @@ import { headStream as headWebStream } from "./headStream.browser"; */ export const headStream = (stream: Readable | ReadableStream, bytes: number): Promise => { if (isReadableStreamInstance(stream)) { - // Web stream API in Node.js return headWebStream(stream, bytes); } return new Promise((resolve, reject) => { @@ -47,6 +47,3 @@ class Collector extends Writable { callback(); } } - -const isReadableStreamInstance = (stream: unknown): stream is ReadableStream => - typeof ReadableStream === "function" && stream instanceof ReadableStream; diff --git a/packages/util-stream/src/isReadableStream.ts b/packages/util-stream/src/isReadableStream.ts new file mode 100644 index 00000000000..f15624c6e6d --- /dev/null +++ b/packages/util-stream/src/isReadableStream.ts @@ -0,0 +1,5 @@ +/** + * @internal + */ +export const isReadableStreamInstance = (stream: unknown): stream is ReadableStream => + typeof ReadableStream === "function" && stream?.constructor?.name === ReadableStream.name; diff --git a/packages/util-stream/src/sdk-stream-mixin.browser.ts b/packages/util-stream/src/sdk-stream-mixin.browser.ts index 5fd86aabc05..58c7e7ceee5 100644 --- a/packages/util-stream/src/sdk-stream-mixin.browser.ts +++ b/packages/util-stream/src/sdk-stream-mixin.browser.ts @@ -4,6 +4,8 @@ import { toBase64 } from "@smithy/util-base64"; import { toHex } from "@smithy/util-hex-encoding"; import { toUtf8 } from "@smithy/util-utf8"; +import { isReadableStreamInstance } from "./isReadableStream"; + const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transformed."; /** @@ -74,6 +76,3 @@ export const sdkStreamMixin = (stream: unknown): SdkStream typeof Blob === "function" && stream instanceof Blob; - -const isReadableStreamInstance = (stream: unknown): stream is ReadableStream => - typeof ReadableStream === "function" && stream instanceof ReadableStream; diff --git a/packages/util-stream/src/splitStream.ts b/packages/util-stream/src/splitStream.ts index 61d6cbcbb12..a1d7a4d34dc 100644 --- a/packages/util-stream/src/splitStream.ts +++ b/packages/util-stream/src/splitStream.ts @@ -1,11 +1,21 @@ -import type { Readable as IReadable } from "stream"; +import type { Readable } from "stream"; import { PassThrough } from "stream"; +import { isReadableStreamInstance } from "./isReadableStream"; +import { splitStream as splitWebStream } from "./splitStream.browser"; + /** * @param stream * @returns stream split into two identical streams. */ -export async function splitStream(stream: IReadable): Promise<[IReadable, IReadable]> { +export async function splitStream(stream: Readable): Promise<[Readable, Readable]>; +export async function splitStream(stream: ReadableStream): Promise<[ReadableStream, ReadableStream]>; +export async function splitStream( + stream: Readable | ReadableStream +): Promise<[Readable | ReadableStream, Readable | ReadableStream]> { + if (isReadableStreamInstance(stream)) { + return splitWebStream(stream); + } const stream1 = new PassThrough(); const stream2 = new PassThrough(); stream.pipe(stream1); From f0ee304bb2e88e10ca043bc57891093dff0edd80 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 12 Jul 2024 20:44:27 +0000 Subject: [PATCH 4/6] update type guard --- packages/util-stream/src/isReadableStream.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/util-stream/src/isReadableStream.ts b/packages/util-stream/src/isReadableStream.ts index f15624c6e6d..85862ae2446 100644 --- a/packages/util-stream/src/isReadableStream.ts +++ b/packages/util-stream/src/isReadableStream.ts @@ -2,4 +2,5 @@ * @internal */ export const isReadableStreamInstance = (stream: unknown): stream is ReadableStream => - typeof ReadableStream === "function" && stream?.constructor?.name === ReadableStream.name; + typeof ReadableStream === "function" && + (stream?.constructor?.name === ReadableStream.name || stream instanceof ReadableStream); From cd89803cb709841289ff2f1ee3dfe1fd948cc445 Mon Sep 17 00:00:00 2001 From: George Fu Date: Mon, 15 Jul 2024 15:56:34 +0000 Subject: [PATCH 5/6] rename type guard file --- packages/util-stream/src/headStream.ts | 4 ++-- packages/util-stream/src/index.ts | 1 + packages/util-stream/src/sdk-stream-mixin.browser.ts | 6 +++--- packages/util-stream/src/splitStream.ts | 4 ++-- .../src/{isReadableStream.ts => stream-type-check.ts} | 2 +- 5 files changed, 9 insertions(+), 8 deletions(-) rename packages/util-stream/src/{isReadableStream.ts => stream-type-check.ts} (63%) diff --git a/packages/util-stream/src/headStream.ts b/packages/util-stream/src/headStream.ts index 7ca1c1efdca..0fe80f1e602 100644 --- a/packages/util-stream/src/headStream.ts +++ b/packages/util-stream/src/headStream.ts @@ -1,7 +1,7 @@ import { Readable, Writable } from "stream"; import { headStream as headWebStream } from "./headStream.browser"; -import { isReadableStreamInstance } from "./isReadableStream"; +import { isReadableStream } from "./stream-type-check"; /** * @internal @@ -11,7 +11,7 @@ import { isReadableStreamInstance } from "./isReadableStream"; * Caution: the input stream must be destroyed separately, this function does not do so. */ export const headStream = (stream: Readable | ReadableStream, bytes: number): Promise => { - if (isReadableStreamInstance(stream)) { + if (isReadableStream(stream)) { return headWebStream(stream, bytes); } return new Promise((resolve, reject) => { diff --git a/packages/util-stream/src/index.ts b/packages/util-stream/src/index.ts index 75cac0beeb7..305896d2fa0 100644 --- a/packages/util-stream/src/index.ts +++ b/packages/util-stream/src/index.ts @@ -3,3 +3,4 @@ export * from "./getAwsChunkedEncodingStream"; export * from "./sdk-stream-mixin"; export * from "./splitStream"; export * from "./headStream"; +export * from "./stream-type-check"; diff --git a/packages/util-stream/src/sdk-stream-mixin.browser.ts b/packages/util-stream/src/sdk-stream-mixin.browser.ts index 58c7e7ceee5..f91252527c0 100644 --- a/packages/util-stream/src/sdk-stream-mixin.browser.ts +++ b/packages/util-stream/src/sdk-stream-mixin.browser.ts @@ -4,7 +4,7 @@ import { toBase64 } from "@smithy/util-base64"; import { toHex } from "@smithy/util-hex-encoding"; import { toUtf8 } from "@smithy/util-utf8"; -import { isReadableStreamInstance } from "./isReadableStream"; +import { isReadableStream } from "./stream-type-check"; const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transformed."; @@ -14,7 +14,7 @@ const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transfo * @internal */ export const sdkStreamMixin = (stream: unknown): SdkStream => { - if (!isBlobInstance(stream) && !isReadableStreamInstance(stream)) { + if (!isBlobInstance(stream) && !isReadableStream(stream)) { //@ts-ignore const name = stream?.__proto__?.constructor?.name || stream; throw new Error(`Unexpected stream implementation, expect Blob or ReadableStream, got ${name}`); @@ -66,7 +66,7 @@ export const sdkStreamMixin = (stream: unknown): SdkStream { - if (isReadableStreamInstance(stream)) { + if (isReadableStream(stream)) { return splitWebStream(stream); } const stream1 = new PassThrough(); diff --git a/packages/util-stream/src/isReadableStream.ts b/packages/util-stream/src/stream-type-check.ts similarity index 63% rename from packages/util-stream/src/isReadableStream.ts rename to packages/util-stream/src/stream-type-check.ts index 85862ae2446..21b4ce1909e 100644 --- a/packages/util-stream/src/isReadableStream.ts +++ b/packages/util-stream/src/stream-type-check.ts @@ -1,6 +1,6 @@ /** * @internal */ -export const isReadableStreamInstance = (stream: unknown): stream is ReadableStream => +export const isReadableStream = (stream: unknown): stream is ReadableStream => typeof ReadableStream === "function" && (stream?.constructor?.name === ReadableStream.name || stream instanceof ReadableStream); From 51a0b3ef6613a98d49bd2fd3607faa3c5af2fa71 Mon Sep 17 00:00:00 2001 From: George Fu Date: Mon, 15 Jul 2024 16:29:18 +0000 Subject: [PATCH 6/6] lint --- packages/util-stream/src/splitStream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/util-stream/src/splitStream.ts b/packages/util-stream/src/splitStream.ts index 170ee50e20b..c9ff165181e 100644 --- a/packages/util-stream/src/splitStream.ts +++ b/packages/util-stream/src/splitStream.ts @@ -1,8 +1,8 @@ import type { Readable } from "stream"; import { PassThrough } from "stream"; -import { isReadableStream } from "./stream-type-check"; import { splitStream as splitWebStream } from "./splitStream.browser"; +import { isReadableStream } from "./stream-type-check"; /** * @param stream