Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion packages/opencode/src/server/routes/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ export const EventRoutes = () =>
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>()
const q = new AsyncQueue<string | null>(1000)
let done = false
let drop = 0
let streak = 0
let drain = Date.now()

q.push(
JSON.stringify({
Expand All @@ -52,10 +55,32 @@ export const EventRoutes = () =>
)
}, 10_000)

const watch = setInterval(() => {
const delta = q.dropped - drop
if (delta > 0) {
log.warn("event queue dropped items", { dropped: q.dropped, size: q.size })
streak += delta
drop = q.dropped
}
if (delta === 0) {
streak = 0
}
if (streak >= 100) {
log.warn("disconnecting slow event client (drop threshold)", { dropped: q.dropped, size: q.size })
stop()
return
}
if (q.size > 0 && Date.now() - drain > 30_000) {
log.warn("disconnecting slow event client (backlog timeout)", { dropped: q.dropped, size: q.size })
stop()
}
}, 5_000)

const stop = () => {
if (done) return
done = true
clearInterval(heartbeat)
clearInterval(watch)
unsub()
q.push(null)
log.info("event disconnected")
Expand All @@ -74,6 +99,7 @@ export const EventRoutes = () =>
for await (const data of q) {
if (data === null) return
await stream.writeSSE({ data })
drain = Date.now()
}
} finally {
stop()
Expand Down
28 changes: 27 additions & 1 deletion packages/opencode/src/server/routes/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ export const GlobalDisposedEvent = BusEvent.define("global.disposed", z.object({

async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>) => () => void) {
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>()
const q = new AsyncQueue<string | null>(1000)
let done = false
let drop = 0
let streak = 0
let drain = Date.now()

q.push(
JSON.stringify({
Expand All @@ -43,10 +46,32 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>
)
}, 10_000)

const watch = setInterval(() => {
const delta = q.dropped - drop
if (delta > 0) {
log.warn("global event queue dropped items", { dropped: q.dropped, size: q.size })
streak += delta
drop = q.dropped
}
if (delta === 0) {
streak = 0
}
if (streak >= 100) {
log.warn("disconnecting slow global event client (drop threshold)", { dropped: q.dropped, size: q.size })
stop()
return
}
if (q.size > 0 && Date.now() - drain > 30_000) {
log.warn("disconnecting slow global event client (backlog timeout)", { dropped: q.dropped, size: q.size })
stop()
}
}, 5_000)

const stop = () => {
if (done) return
done = true
clearInterval(heartbeat)
clearInterval(watch)
unsub()
q.push(null)
log.info("global event disconnected")
Expand All @@ -60,6 +85,7 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>
for await (const data of q) {
if (data === null) return
await stream.writeSSE({ data })
drain = Date.now()
}
} finally {
stop()
Expand Down
18 changes: 16 additions & 2 deletions packages/opencode/src/util/queue.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
export class AsyncQueue<T> implements AsyncIterable<T> {
private queue: T[] = []
private resolvers: ((value: T) => void)[] = []
dropped = 0

constructor(private capacity?: number) {}

get size() {
return this.queue.length
}

push(item: T) {
const resolve = this.resolvers.shift()
if (resolve) resolve(item)
else this.queue.push(item)
if (resolve) {
resolve(item)
return
}
if (this.capacity !== undefined && this.queue.length >= this.capacity) {
this.queue.shift()
this.dropped += 1
}
this.queue.push(item)
}

async next(): Promise<T> {
Expand Down
Loading