diff --git a/packages/opencode/src/server/routes/event.ts b/packages/opencode/src/server/routes/event.ts index 989b857710d..3905d352dc4 100644 --- a/packages/opencode/src/server/routes/event.ts +++ b/packages/opencode/src/server/routes/event.ts @@ -32,7 +32,7 @@ export const EventRoutes = () => c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - const q = new AsyncQueue() + const q = new AsyncQueue(1024) let done = false q.push( diff --git a/packages/opencode/src/server/routes/global.ts b/packages/opencode/src/server/routes/global.ts index 16b9e559f27..5383c6b9d73 100644 --- a/packages/opencode/src/server/routes/global.ts +++ b/packages/opencode/src/server/routes/global.ts @@ -19,7 +19,7 @@ export const GlobalDisposedEvent = BusEvent.define("global.disposed", z.object({ async function streamEvents(c: Context, subscribe: (q: AsyncQueue) => () => void) { return streamSSE(c, async (stream) => { - const q = new AsyncQueue() + const q = new AsyncQueue(1024) let done = false q.push( diff --git a/packages/opencode/src/util/queue.ts b/packages/opencode/src/util/queue.ts index a1af53fe8f0..9238b9f34bb 100644 --- a/packages/opencode/src/util/queue.ts +++ b/packages/opencode/src/util/queue.ts @@ -1,11 +1,17 @@ export class AsyncQueue implements AsyncIterable { private queue: T[] = [] private resolvers: ((value: T) => void)[] = [] + private readonly capacity: number | undefined + + constructor(capacity?: number) { + this.capacity = capacity + } push(item: T) { const resolve = this.resolvers.shift() - if (resolve) resolve(item) - else this.queue.push(item) + if (resolve) return resolve(item) + this.queue.push(item) + if (this.capacity !== undefined) while (this.queue.length > this.capacity) this.queue.shift() } async next(): Promise {