Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion src/sse_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export async function* parseSseStream(
let eventType = 'message';
let eventData = '';

for await (const value of response.body.pipeThrough(new TextDecoderStream())) {
for await (const value of pipeThrough(response.body)) {
buffer += value;
let lineEndIndex: number;

Expand Down Expand Up @@ -137,3 +137,43 @@ export async function* parseSseStream(
yield { type: eventType, data: eventData };
}
}

function createAsyncIterator(
stream: ReadableStream<string>
): () => AsyncGenerator<Awaited<string>, void, unknown> {
Comment thread
ilbertt marked this conversation as resolved.
Outdated
return async function* () {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
yield value;
}
} finally {
reader.releaseLock();
}
};
}

/**
* Makes sure the async iterator is available on the stream.
* @param body - The body of the response
* @returns The stream with the async iterator
*
* @example
* ```ts
* for await (const value of pipeThrough(response.body)) {
* console.log(value);
* }
* ```
*/
function pipeThrough(body: ReadableStream<Uint8Array<ArrayBuffer>>): ReadableStream<string> {
Comment thread
ilbertt marked this conversation as resolved.
Outdated
const stream = body.pipeThrough(new TextDecoderStream());
if (!stream[Symbol.asyncIterator]) {
stream[Symbol.asyncIterator] = createAsyncIterator(stream);
}

return stream;
}
97 changes: 97 additions & 0 deletions test/sse_utils.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,34 @@ function createMockResponse(sseData: string, chunkSize: number = 2): Response {
});
}

function createMockResponseWithoutAsyncIterator(sseData: string): Response {
const encoder = new TextEncoder();
const chunks = [encoder.encode(sseData)];

let chunkIndex = 0;
const stream = new ReadableStream({
pull(controller) {
if (chunkIndex < chunks.length) {
controller.enqueue(chunks[chunkIndex]);
chunkIndex++;
} else {
controller.close();
}
},
});
Comment thread
ishymko marked this conversation as resolved.
Outdated

const originalPipeThrough = stream.pipeThrough.bind(stream);
stream.pipeThrough = function <T>(transform: ReadableWritablePair<T, Uint8Array>) {
const result = originalPipeThrough(transform);
delete result[Symbol.asyncIterator];
return result;
} as typeof stream.pipeThrough;

return new Response(stream, {
headers: { 'Content-Type': 'text/event-stream' },
});
}

describe('SSE Utils', () => {
describe('formatSSEEvent', () => {
it('should format a data event', () => {
Expand Down Expand Up @@ -174,4 +202,73 @@ describe('SSE Utils', () => {
expect(JSON.parse(parsedEvents[1].data)).toEqual(errorEvent);
});
});

describe('Async iterator fallback', () => {
Comment thread
ilbertt marked this conversation as resolved.
Outdated
it('should work when stream has no native async iterator', async () => {
const originalData = { kind: 'task', id: '456' };
const formatted = formatSSEEvent(originalData);
const response = createMockResponseWithoutAsyncIterator(formatted);

const events: SseEvent[] = [];
for await (const event of parseSseStream(response)) {
events.push(event);
}

expect(events).toHaveLength(1);
expect(events[0].type).toBe('message');
expect(JSON.parse(events[0].data)).toEqual(originalData);
});

it('should parse multiple events without async iterator', async () => {
const events_to_format = [
{ kind: 'status', value: 'started' },
{ kind: 'status', value: 'completed' },
];
const formatted = events_to_format.map(formatSSEEvent).join('');
const response = createMockResponseWithoutAsyncIterator(formatted);

const parsedEvents: SseEvent[] = [];
for await (const event of parseSseStream(response)) {
parsedEvents.push(event);
}

expect(parsedEvents).toHaveLength(2);
for (let i = 0; i < events_to_format.length; i++) {
expect(JSON.parse(parsedEvents[i].data)).toEqual(events_to_format[i]);
}
});
});

describe('Early termination', () => {
it('should handle breaking out of the loop early', async () => {
const events_to_format = [{ kind: 'first' }, { kind: 'second' }, { kind: 'third' }];
const formatted = events_to_format.map(formatSSEEvent).join('');
const response = createMockResponse(formatted);

const parsedEvents: SseEvent[] = [];
for await (const event of parseSseStream(response)) {
parsedEvents.push(event);
if (parsedEvents.length === 1) {
break;
}
}

expect(parsedEvents).toHaveLength(1);
expect(JSON.parse(parsedEvents[0].data)).toEqual({ kind: 'first' });
});

it('should release reader lock when breaking early without async iterator', async () => {
Comment thread
ishymko marked this conversation as resolved.
Outdated
const events_to_format = [{ kind: 'first' }, { kind: 'second' }];
const formatted = events_to_format.map(formatSSEEvent).join('');
const response = createMockResponseWithoutAsyncIterator(formatted);

const parsedEvents: SseEvent[] = [];
for await (const event of parseSseStream(response)) {
parsedEvents.push(event);
break;
}

expect(parsedEvents).toHaveLength(1);
});
});
});