Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion src/sse_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ export async function* parseSseStream(
let eventType = 'message';
let eventData = '';

for await (const value of response.body.pipeThrough(new TextDecoderStream())) {
const stream = response.body.pipeThrough(new TextDecoderStream());

for await (const value of readFrom(stream)) {
buffer += value;
let lineEndIndex: number;

Expand Down Expand Up @@ -137,3 +139,28 @@ export async function* parseSseStream(
yield { type: eventType, data: eventData };
}
}

/**
* Reads string chunks from a ReadableStream using the reader API.
*
* We use the manual reader approach the native async iterator directly on the stream
* because ReadableStream async iteration is not supported in all environments.
* @see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#browser_compatibility
*
* @param stream - The ReadableStream to read from
* @yields String chunks from the stream
*/
async function* readFrom(stream: ReadableStream<string>): AsyncGenerator<string, void, void> {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
yield value;
}
} finally {
reader.releaseLock();
}
}
67 changes: 52 additions & 15 deletions test/sse_utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
import { describe, it, expect } from 'vitest';
import { formatSSEEvent, formatSSEErrorEvent, parseSseStream, SseEvent } from '../src/sse_utils.js';

const MOCK_CHUNK_SIZE = 2;

/**
* Creates a mock Response object from SSE-formatted strings.
* Used to test that the parser can understand what the formatter produces.
* Creates a ReadableStream from chunks of Uint8Array data.
*/
function createMockResponse(sseData: string, chunkSize: number = 2): Response {
const encoder = new TextEncoder();
const chunks: Uint8Array[] = [];

for (let i = 0; i < sseData.length; i += chunkSize) {
chunks.push(encoder.encode(sseData.slice(i, i + chunkSize)));
}

function createStream(chunks: Uint8Array[]): ReadableStream<Uint8Array> {
let chunkIndex = 0;
const stream = new ReadableStream({
return new ReadableStream({
pull(controller) {
if (chunkIndex < chunks.length) {
controller.enqueue(chunks[chunkIndex]);
Expand All @@ -24,6 +18,46 @@ function createMockResponse(sseData: string, chunkSize: number = 2): Response {
}
},
});
}

/**
* Encodes a string into chunks of Uint8Array data.
*/
function encodeChunks(data: string): Uint8Array[] {
const encoder = new TextEncoder();
const chunks: Uint8Array[] = [];
for (let i = 0; i < data.length; i += MOCK_CHUNK_SIZE) {
chunks.push(encoder.encode(data.slice(i, i + MOCK_CHUNK_SIZE)));
}
return chunks;
}

/**
* Creates a mock Response object from SSE-formatted strings.
* Used to test that the parser can understand what the formatter produces.
*/
function createMockResponse(sseData: string): Response {
const chunks = encodeChunks(sseData);
return new Response(createStream(chunks), {
headers: { 'Content-Type': 'text/event-stream' },
});
}

/**
* Creates a mock Response where the decoded stream has no native async iterator.
* Simulates environments where ReadableStream async iteration is not supported.
* @see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#browser_compatibility
*/
function createMockResponseWithoutAsyncIterator(sseData: string): Response {
const chunks = encodeChunks(sseData);
const stream = createStream(chunks);

const originalPipeThrough = stream.pipeThrough.bind(stream);
stream.pipeThrough = function <T>(transform: ReadableWritablePair<T, Uint8Array>) {
const result = originalPipeThrough(transform);
delete result[Symbol.asyncIterator];
return result;
} as typeof stream.pipeThrough;

return new Response(stream, {
headers: { 'Content-Type': 'text/event-stream' },
Expand Down Expand Up @@ -59,10 +93,13 @@ describe('SSE Utils', () => {
});
});

describe('parseSseStream', () => {
describe.each([
['with native async iterator', createMockResponse],
['without native async iterator', createMockResponseWithoutAsyncIterator],
])('parseSseStream (%s)', (_, createResponse) => {
it('should parse a single data event', async () => {
const sseData = 'data: {"kind":"message"}\n\n';
const response = createMockResponse(sseData);
const response = createResponse(sseData);

const events: SseEvent[] = [];
for await (const event of parseSseStream(response)) {
Expand All @@ -76,7 +113,7 @@ describe('SSE Utils', () => {

it('should parse an error event', async () => {
const sseData = 'event: error\ndata: {"code":-32001}\n\n';
const response = createMockResponse(sseData);
const response = createResponse(sseData);

const events: SseEvent[] = [];
for await (const event of parseSseStream(response)) {
Expand All @@ -90,7 +127,7 @@ describe('SSE Utils', () => {

it('should parse multiple events', async () => {
const sseData = 'data: {"id":1}\n\ndata: {"id":2}\n\n';
const response = createMockResponse(sseData);
const response = createResponse(sseData);

const events: SseEvent[] = [];
for await (const event of parseSseStream(response)) {
Expand Down