Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion src/sse_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ export async function* parseSseStream(
let eventType = 'message';
let eventData = '';

for await (const value of response.body.pipeThrough(new TextDecoderStream())) {
const stream = response.body.pipeThrough(new TextDecoderStream());

for await (const value of readFrom(stream)) {
buffer += value;
let lineEndIndex: number;

Expand Down Expand Up @@ -137,3 +139,28 @@ export async function* parseSseStream(
yield { type: eventType, data: eventData };
}
}

/**
* Reads string chunks from a ReadableStream using the reader API.
*
* We use the manual reader approach the native async iterator directly on the stream
* because ReadableStream async iteration is not supported in all environments.
* @see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#browser_compatibility
*
* @param stream - The ReadableStream to read from
* @yields String chunks from the stream
*/
async function* readFrom(stream: ReadableStream<string>): AsyncGenerator<string, void, void> {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
yield value;
}
} finally {
reader.releaseLock();
}
}
111 changes: 100 additions & 11 deletions test/sse_utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
import { describe, it, expect } from 'vitest';
import { formatSSEEvent, formatSSEErrorEvent, parseSseStream, SseEvent } from '../src/sse_utils.js';

const MOCK_CHUNK_SIZE = 2;

/**
* Creates a mock Response object from SSE-formatted strings.
* Used to test that the parser can understand what the formatter produces.
* Creates a ReadableStream from chunks of Uint8Array data.
*/
function createMockResponse(sseData: string, chunkSize: number = 2): Response {
const encoder = new TextEncoder();
const chunks: Uint8Array[] = [];

for (let i = 0; i < sseData.length; i += chunkSize) {
chunks.push(encoder.encode(sseData.slice(i, i + chunkSize)));
}

function createStream(chunks: Uint8Array[]): ReadableStream<Uint8Array> {
let chunkIndex = 0;
const stream = new ReadableStream({
return new ReadableStream({
pull(controller) {
if (chunkIndex < chunks.length) {
controller.enqueue(chunks[chunkIndex]);
Expand All @@ -24,6 +18,46 @@ function createMockResponse(sseData: string, chunkSize: number = 2): Response {
}
},
});
}

/**
* Encodes a string into chunks of Uint8Array data.
*/
function encodeChunks(data: string): Uint8Array[] {
const encoder = new TextEncoder();
const chunks: Uint8Array[] = [];
for (let i = 0; i < data.length; i += MOCK_CHUNK_SIZE) {
chunks.push(encoder.encode(data.slice(i, i + MOCK_CHUNK_SIZE)));
}
return chunks;
}

/**
* Creates a mock Response object from SSE-formatted strings.
* Used to test that the parser can understand what the formatter produces.
*/
function createMockResponse(sseData: string): Response {
const chunks = encodeChunks(sseData);
return new Response(createStream(chunks), {
headers: { 'Content-Type': 'text/event-stream' },
});
}

/**
* Creates a mock Response where the decoded stream has no native async iterator.
* Simulates environments where ReadableStream async iteration is not supported.
* @see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#browser_compatibility
*/
function createMockResponseWithoutAsyncIterator(sseData: string): Response {
const chunks = encodeChunks(sseData);
const stream = createStream(chunks);

const originalPipeThrough = stream.pipeThrough.bind(stream);
stream.pipeThrough = function <T>(transform: ReadableWritablePair<T, Uint8Array>) {
const result = originalPipeThrough(transform);
delete result[Symbol.asyncIterator];
return result;
} as typeof stream.pipeThrough;

return new Response(stream, {
headers: { 'Content-Type': 'text/event-stream' },
Expand Down Expand Up @@ -175,3 +209,58 @@ describe('SSE Utils', () => {
});
});
});

/**
* Tests that parseSseStream works correctly in both environments:
* - With native ReadableStream async iterator support
* - Without native async iterator (simulating older browsers/runtimes)
*/
Comment thread
ishymko marked this conversation as resolved.
Outdated
describe.each([
['with native async iterator', createMockResponse],
['without native async iterator', createMockResponseWithoutAsyncIterator],
])('parseSseStream environment compatibility (%s)', (_, createResponse) => {
it('should parse a single data event', async () => {
const sseData = 'data: {"kind":"message"}\n\n';
const response = createResponse(sseData);

const events: SseEvent[] = [];
for await (const event of parseSseStream(response)) {
events.push(event);
}

expect(events).toHaveLength(1);
expect(events[0].type).toBe('message');
expect(events[0].data).toBe('{"kind":"message"}');
});

it('should parse multiple events', async () => {
const sseData = 'data: {"id":1}\n\ndata: {"id":2}\n\n';
const response = createResponse(sseData);

const events: SseEvent[] = [];
for await (const event of parseSseStream(response)) {
events.push(event);
}

expect(events).toHaveLength(2);
expect(JSON.parse(events[0].data)).toEqual({ id: 1 });
expect(JSON.parse(events[1].data)).toEqual({ id: 2 });
});

it('should handle breaking out of the loop early', async () => {
const events_to_format = [{ kind: 'first' }, { kind: 'second' }, { kind: 'third' }];
const formatted = events_to_format.map(formatSSEEvent).join('');
const response = createResponse(formatted);

const parsedEvents: SseEvent[] = [];
for await (const event of parseSseStream(response)) {
parsedEvents.push(event);
if (parsedEvents.length === 1) {
break;
}
}

expect(parsedEvents).toHaveLength(1);
expect(JSON.parse(parsedEvents[0].data)).toEqual({ kind: 'first' });
});
});