diff --git a/.changeset/metal-snakes-remember.md b/.changeset/metal-snakes-remember.md new file mode 100644 index 00000000000..76818257b56 --- /dev/null +++ b/.changeset/metal-snakes-remember.md @@ -0,0 +1,5 @@ +--- +"@smithy/util-stream": minor +--- + +add splitStream and headStream utilities diff --git a/packages/util-stream/package.json b/packages/util-stream/package.json index ed622c4686f..18f0df7f814 100644 --- a/packages/util-stream/package.json +++ b/packages/util-stream/package.json @@ -56,13 +56,19 @@ ], "browser": { "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", - "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser" + "./dist-es/headStream": "./dist-es/headStream.browser", + "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser", + "./dist-es/splitStream": "./dist-es/splitStream.browser" }, "react-native": { "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser", + "./dist-es/headStream": "./dist-es/headStream.browser", + "./dist-es/splitStream": "./dist-es/splitStream.browser", "./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser", - "./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser" + "./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser", + "./dist-cjs/headStream": "./dist-cjs/headStream.browser", + "./dist-cjs/splitStream": "./dist-cjs/splitStream.browser" }, "homepage": "https://github.com/awslabs/smithy-typescript/tree/main/packages/util-stream", "repository": { diff --git a/packages/util-stream/src/headStream.browser.ts b/packages/util-stream/src/headStream.browser.ts new file mode 100644 index 00000000000..afdc0995f00 --- /dev/null +++ b/packages/util-stream/src/headStream.browser.ts @@ -0,0 +1,39 @@ +/** + * @internal + * @param stream + * @param bytes - read head bytes from the stream and discard the rest of it. + * + * Caution: the input stream must be destroyed separately, this function does not do so. + */ +export async function headStream(stream: ReadableStream, bytes: number): Promise { + let byteLengthCounter = 0; + const chunks = []; + const reader = stream.getReader(); + let isDone = false; + + while (!isDone) { + const { done, value } = await reader.read(); + if (value) { + chunks.push(value); + byteLengthCounter += value?.byteLength ?? 0; + } + if (byteLengthCounter >= bytes) { + break; + } + isDone = done; + } + reader.releaseLock(); + + const collected = new Uint8Array(Math.min(bytes, byteLengthCounter)); + let offset = 0; + for (const chunk of chunks) { + if (chunk.byteLength > collected.byteLength - offset) { + collected.set(chunk.subarray(0, collected.byteLength - offset), offset); + break; + } else { + collected.set(chunk, offset); + } + offset += chunk.length; + } + return collected; +} diff --git a/packages/util-stream/src/headStream.spec.ts b/packages/util-stream/src/headStream.spec.ts new file mode 100644 index 00000000000..18294f9bf7a --- /dev/null +++ b/packages/util-stream/src/headStream.spec.ts @@ -0,0 +1,108 @@ +import { Readable } from "stream"; + +import { headStream } from "./headStream"; +import { headStream as headWebStream } from "./headStream.browser"; +import { splitStream } from "./splitStream"; +import { splitStream as splitWebStream } from "./splitStream.browser"; + +const CHUNK_SIZE = 4; +const a32 = "abcd".repeat(32_000 / CHUNK_SIZE); +const a16 = "abcd".repeat(16_000 / CHUNK_SIZE); +const a8 = "abcd".repeat(8); +const a4 = "abcd".repeat(4); +const a2 = "abcd".repeat(2); +const a1 = "abcd".repeat(1); + +describe(headStream.name, () => { + it("should collect the head of a Node.js stream", async () => { + const data = Buffer.from(a32); + const myStream = Readable.from(data); + + const head = await headStream(myStream, 16_000); + + expect(Buffer.from(head).toString()).toEqual(a16); + }); + + it("should collect the head of a web stream", async () => { + if (typeof ReadableStream !== "undefined") { + const buffer = Buffer.from(a32); + const data = Array.from(new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength)); + + const myStream = new ReadableStream({ + start(controller) { + for (const inputChunk of data) { + controller.enqueue(new Uint8Array([inputChunk])); + } + controller.close(); + }, + }); + + const head = await headWebStream(myStream, 16_000); + expect(Buffer.from(head).toString()).toEqual(a16); + } + }); +}); + +describe("splitStream and headStream integration", () => { + it("should split and head streams for Node.js", async () => { + const data = Buffer.from(a32); + const myStream = Readable.from(data); + + const [a, _1] = await splitStream(myStream); + const [b, _2] = await splitStream(_1); + const [c, _3] = await splitStream(_2); + const [d, _4] = await splitStream(_3); + const [e, f] = await splitStream(_4); + + const byteArr1 = await headStream(a, Infinity); + const byteArr2 = await headStream(b, 16_000); + const byteArr3 = await headStream(c, 8 * CHUNK_SIZE); + const byteArr4 = await headStream(d, 4 * CHUNK_SIZE); + const byteArr5 = await headStream(e, 2 * CHUNK_SIZE); + const byteArr6 = await headStream(f, CHUNK_SIZE); + + await Promise.all([a, b, c, d, e, f].map((stream) => stream.destroy())); + + expect(Buffer.from(byteArr1).toString()).toEqual(a32); + expect(Buffer.from(byteArr2).toString()).toEqual(a16); + expect(Buffer.from(byteArr3).toString()).toEqual(a8); + expect(Buffer.from(byteArr4).toString()).toEqual(a4); + expect(Buffer.from(byteArr5).toString()).toEqual(a2); + expect(Buffer.from(byteArr6).toString()).toEqual(a1); + }); + + it("should split and head streams for web streams API", async () => { + if (typeof ReadableStream !== "undefined") { + const buffer = Buffer.from(a8); + const data = Array.from(new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength)); + + const myStream = new ReadableStream({ + start(controller) { + for (let i = 0; i < data.length; i += CHUNK_SIZE) { + controller.enqueue(new Uint8Array(data.slice(i, i + CHUNK_SIZE))); + } + controller.close(); + }, + }); + + const [a, _1] = await splitWebStream(myStream); + const [b, _2] = await splitWebStream(_1); + const [c, _3] = await splitWebStream(_2); + const [d, e] = await splitWebStream(_3); + + const byteArr1 = await headWebStream(a, Infinity); + const byteArr2 = await headWebStream(b, 8 * CHUNK_SIZE); + const byteArr3 = await headWebStream(c, 4 * CHUNK_SIZE); + const byteArr4 = await headWebStream(d, 2 * CHUNK_SIZE); + const byteArr5 = await headWebStream(e, CHUNK_SIZE); + + await Promise.all([a, b, c, d, e].map((stream) => stream.cancel())); + + expect(Buffer.from(byteArr1).toString()).toEqual(a8); + expect(Buffer.from(byteArr2).toString()).toEqual(a8); + expect(Buffer.from(byteArr3).toString()).toEqual(a4); + expect(Buffer.from(byteArr4).toString()).toEqual(a2); + expect(Buffer.from(byteArr5).toString()).toEqual(a1); + } + }); +}); diff --git a/packages/util-stream/src/headStream.ts b/packages/util-stream/src/headStream.ts new file mode 100644 index 00000000000..0fe80f1e602 --- /dev/null +++ b/packages/util-stream/src/headStream.ts @@ -0,0 +1,49 @@ +import { Readable, Writable } from "stream"; + +import { headStream as headWebStream } from "./headStream.browser"; +import { isReadableStream } from "./stream-type-check"; + +/** + * @internal + * @param stream + * @param bytes - read head bytes from the stream and discard the rest of it. + * + * Caution: the input stream must be destroyed separately, this function does not do so. + */ +export const headStream = (stream: Readable | ReadableStream, bytes: number): Promise => { + if (isReadableStream(stream)) { + return headWebStream(stream, bytes); + } + return new Promise((resolve, reject) => { + const collector = new Collector(); + collector.limit = bytes; + stream.pipe(collector); + stream.on("error", (err) => { + collector.end(); + reject(err); + }); + collector.on("error", reject); + collector.on("finish", function (this: Collector) { + const bytes = new Uint8Array(Buffer.concat(this.buffers)); + resolve(bytes); + }); + }); +}; + +class Collector extends Writable { + public readonly buffers: Buffer[] = []; + public limit = Infinity; + private bytesBuffered = 0; + + _write(chunk: Buffer, encoding: string, callback: (err?: Error) => void) { + this.buffers.push(chunk); + this.bytesBuffered += chunk.byteLength ?? 0; + if (this.bytesBuffered >= this.limit) { + const excess = this.bytesBuffered - this.limit; + const tailBuffer = this.buffers[this.buffers.length - 1]; + this.buffers[this.buffers.length - 1] = tailBuffer.subarray(0, tailBuffer.byteLength - excess); + this.emit("finish"); + } + callback(); + } +} diff --git a/packages/util-stream/src/index.ts b/packages/util-stream/src/index.ts index 47bcb14b718..305896d2fa0 100644 --- a/packages/util-stream/src/index.ts +++ b/packages/util-stream/src/index.ts @@ -1,3 +1,6 @@ export * from "./blob/Uint8ArrayBlobAdapter"; export * from "./getAwsChunkedEncodingStream"; export * from "./sdk-stream-mixin"; +export * from "./splitStream"; +export * from "./headStream"; +export * from "./stream-type-check"; diff --git a/packages/util-stream/src/sdk-stream-mixin.browser.ts b/packages/util-stream/src/sdk-stream-mixin.browser.ts index 5fd86aabc05..f91252527c0 100644 --- a/packages/util-stream/src/sdk-stream-mixin.browser.ts +++ b/packages/util-stream/src/sdk-stream-mixin.browser.ts @@ -4,6 +4,8 @@ import { toBase64 } from "@smithy/util-base64"; import { toHex } from "@smithy/util-hex-encoding"; import { toUtf8 } from "@smithy/util-utf8"; +import { isReadableStream } from "./stream-type-check"; + const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transformed."; /** @@ -12,7 +14,7 @@ const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transfo * @internal */ export const sdkStreamMixin = (stream: unknown): SdkStream => { - if (!isBlobInstance(stream) && !isReadableStreamInstance(stream)) { + if (!isBlobInstance(stream) && !isReadableStream(stream)) { //@ts-ignore const name = stream?.__proto__?.constructor?.name || stream; throw new Error(`Unexpected stream implementation, expect Blob or ReadableStream, got ${name}`); @@ -64,7 +66,7 @@ export const sdkStreamMixin = (stream: unknown): SdkStream typeof Blob === "function" && stream instanceof Blob; - -const isReadableStreamInstance = (stream: unknown): stream is ReadableStream => - typeof ReadableStream === "function" && stream instanceof ReadableStream; diff --git a/packages/util-stream/src/splitStream.browser.ts b/packages/util-stream/src/splitStream.browser.ts new file mode 100644 index 00000000000..16314858512 --- /dev/null +++ b/packages/util-stream/src/splitStream.browser.ts @@ -0,0 +1,11 @@ +/** + * @param stream + * @returns stream split into two identical streams. + */ +export async function splitStream(stream: ReadableStream | Blob): Promise<[ReadableStream, ReadableStream]> { + if (typeof (stream as Blob).stream === "function") { + stream = (stream as Blob).stream(); + } + const readableStream = stream as ReadableStream; + return readableStream.tee(); +} diff --git a/packages/util-stream/src/splitStream.spec.ts b/packages/util-stream/src/splitStream.spec.ts new file mode 100644 index 00000000000..2c1ecf90d0f --- /dev/null +++ b/packages/util-stream/src/splitStream.spec.ts @@ -0,0 +1,43 @@ +import { streamCollector as webStreamCollector } from "@smithy/fetch-http-handler"; +import { streamCollector } from "@smithy/node-http-handler"; +import { Readable } from "stream"; + +import { splitStream } from "./splitStream"; +import { splitStream as splitWebStream } from "./splitStream.browser"; + +describe(splitStream.name, () => { + it("should split a node:Readable stream", async () => { + const data = Buffer.from("abcd"); + + const myStream = Readable.from(data); + const [a, b] = await splitStream(myStream); + + const buffer1 = await streamCollector(a); + const buffer2 = await streamCollector(b); + + expect(buffer1).toEqual(new Uint8Array([97, 98, 99, 100])); + expect(buffer1).toEqual(buffer2); + }); + it("should split a web:ReadableStream stream", async () => { + if (typeof ReadableStream !== "undefined") { + const inputChunks = [97, 98, 99, 100]; + + const myStream = new ReadableStream({ + start(controller) { + for (const inputChunk of inputChunks) { + controller.enqueue(new Uint8Array([inputChunk])); + } + controller.close(); + }, + }); + + const [a, b] = await splitWebStream(myStream); + + const bytes1 = await webStreamCollector(a); + const bytes2 = await webStreamCollector(b); + + expect(bytes1).toEqual(new Uint8Array([97, 98, 99, 100])); + expect(bytes1).toEqual(bytes2); + } + }); +}); diff --git a/packages/util-stream/src/splitStream.ts b/packages/util-stream/src/splitStream.ts new file mode 100644 index 00000000000..c9ff165181e --- /dev/null +++ b/packages/util-stream/src/splitStream.ts @@ -0,0 +1,24 @@ +import type { Readable } from "stream"; +import { PassThrough } from "stream"; + +import { splitStream as splitWebStream } from "./splitStream.browser"; +import { isReadableStream } from "./stream-type-check"; + +/** + * @param stream + * @returns stream split into two identical streams. + */ +export async function splitStream(stream: Readable): Promise<[Readable, Readable]>; +export async function splitStream(stream: ReadableStream): Promise<[ReadableStream, ReadableStream]>; +export async function splitStream( + stream: Readable | ReadableStream +): Promise<[Readable | ReadableStream, Readable | ReadableStream]> { + if (isReadableStream(stream)) { + return splitWebStream(stream); + } + const stream1 = new PassThrough(); + const stream2 = new PassThrough(); + stream.pipe(stream1); + stream.pipe(stream2); + return [stream1, stream2]; +} diff --git a/packages/util-stream/src/stream-type-check.ts b/packages/util-stream/src/stream-type-check.ts new file mode 100644 index 00000000000..21b4ce1909e --- /dev/null +++ b/packages/util-stream/src/stream-type-check.ts @@ -0,0 +1,6 @@ +/** + * @internal + */ +export const isReadableStream = (stream: unknown): stream is ReadableStream => + typeof ReadableStream === "function" && + (stream?.constructor?.name === ReadableStream.name || stream instanceof ReadableStream);