Skip to content

Commit a997992

Browse files
committed
perf(session): lazy boundary scan + context-window message windowing
Two optimizations to drastically reduce memory during prompting: 1. filterCompactedLazy: probe newest 50 message infos (1 query, no parts) to detect compaction. If none found, fall back to original single-pass filterCompacted(stream()) — avoids 155+ wasted info-only queries for uncompacted sessions. Compacted sessions still use the efficient two-pass scan. 2. Context-window windowing: before calling toModelMessages, estimate which messages from the tail fit in the LLM context window using model.limit.context * 4 chars/token. Only convert those messages to ModelMessage format. For a 7,704-message session where ~200 fit in context, this reduces toModelMessages input from 7,704 to ~200 messages — cutting ~300MB of wrapper objects across 4-5 copy layers down to ~10MB. Also caches conversation across prompt loop iterations — full reload only after compaction, incremental merge for tool-call steps.
1 parent d500a84 commit a997992

File tree

2 files changed

+146
-2
lines changed

2 files changed

+146
-2
lines changed

packages/opencode/src/session/message-v2.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,10 +913,113 @@ export namespace MessageV2 {
913913
return result
914914
}
915915

916+
// ── Lightweight conversation loading ──────────────────────────────────
917+
//
918+
// filterCompactedLazy avoids materializing the full WithParts[] array.
919+
// Phase 1: scan message *info only* (no parts) newest→oldest to find
920+
// the compaction boundary and collect message IDs.
921+
// Phase 2: load parts only for messages after the boundary.
922+
//
923+
// For a 7,000-message session with no compaction this still loads all
924+
// parts, but for compacted sessions it skips everything before the
925+
// summary — which is the common case for long-running sessions.
926+
927+
/** Scan info-only (no parts) newest→oldest. Returns message rows from
928+
* the compaction boundary forward, in oldest-first order. */
929+
async function scanBoundary(sessionID: SessionID) {
930+
const size = 50
931+
let before: string | undefined
932+
const rows: (typeof MessageTable.$inferSelect)[] = []
933+
const completed = new Set<string>()
934+
935+
while (true) {
936+
const cursor_before = before ? cursor.decode(before) : undefined
937+
const where = cursor_before
938+
? and(eq(MessageTable.session_id, sessionID), older(cursor_before))
939+
: eq(MessageTable.session_id, sessionID)
940+
const batch = Database.use((db) =>
941+
db
942+
.select()
943+
.from(MessageTable)
944+
.where(where)
945+
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
946+
.limit(size + 1)
947+
.all(),
948+
)
949+
if (batch.length === 0) break
950+
const more = batch.length > size
951+
const page = more ? batch.slice(0, size) : batch
952+
953+
let found = false
954+
for (const row of page) {
955+
rows.push(row)
956+
const msg = info(row)
957+
if (
958+
msg.role === "assistant" &&
959+
(msg as Assistant).summary &&
960+
(msg as Assistant).finish &&
961+
!(msg as Assistant).error
962+
)
963+
completed.add(msg.parentID)
964+
if (msg.role === "user" && completed.has(msg.id)) {
965+
// Potential boundary — need to check parts for compaction type.
966+
// Only load parts for THIS message to check.
967+
const partRows = Database.use((db) =>
968+
db.select().from(PartTable).where(eq(PartTable.message_id, row.id)).all(),
969+
)
970+
if (partRows.some((p) => (p.data as any).type === "compaction")) {
971+
found = true
972+
break
973+
}
974+
}
975+
}
976+
if (found || !more) break
977+
const tail = page.at(-1)!
978+
before = cursor.encode({ id: tail.id, time: tail.time_created })
979+
}
980+
rows.reverse()
981+
return rows
982+
}
983+
984+
/** Load conversation from compaction boundary forward, with full parts.
985+
* For compacted sessions: two-pass (info scan → selective hydrate) is
986+
* much cheaper. For uncompacted sessions: falls back to the original
987+
* single-pass filterCompacted(stream()) to avoid the extra info scan. */
988+
export async function filterCompactedLazy(sessionID: SessionID) {
989+
// Quick probe: check newest 50 message infos for any compaction summary.
990+
// One DB query, no parts loaded.
991+
const probe = Database.use((db) =>
992+
db
993+
.select()
994+
.from(MessageTable)
995+
.where(eq(MessageTable.session_id, sessionID))
996+
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
997+
.limit(50)
998+
.all(),
999+
)
1000+
const compacted = probe.some((row) => {
1001+
const msg = info(row)
1002+
return (
1003+
msg.role === "assistant" && (msg as Assistant).summary && (msg as Assistant).finish && !(msg as Assistant).error
1004+
)
1005+
})
1006+
if (!compacted) {
1007+
// No recent compaction summary — fall back to single-pass which
1008+
// loads parts alongside info (avoids 155+ wasted info-only queries
1009+
// for uncompacted sessions).
1010+
return filterCompacted(stream(sessionID))
1011+
}
1012+
// Compacted session: two-pass is efficient — scan info to find boundary,
1013+
// then hydrate only messages after it.
1014+
const rows = await scanBoundary(sessionID)
1015+
return hydrate(rows)
1016+
}
1017+
9161018
export function fromError(
9171019
e: unknown,
9181020
ctx: { providerID: ProviderID; aborted?: boolean },
9191021
): NonNullable<Assistant["error"]> {
1022+
9201023
switch (true) {
9211024
case e instanceof DOMException && e.name === "AbortError":
9221025
return new MessageV2.AbortedError(

packages/opencode/src/session/prompt.ts

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,20 @@ export namespace SessionPrompt {
295295

296296
let step = 0
297297
const session = await Session.get(sessionID)
298+
// filterCompactedLazy scans message info without loading parts to find
299+
// the compaction boundary, then hydrates parts only for messages after
300+
// it. For a 7K-message session with compaction at message #100, this
301+
// loads ~100 messages' parts instead of all 7K.
302+
let msgs = await MessageV2.filterCompactedLazy(sessionID)
303+
let needsFullReload = false
298304
while (true) {
305+
if (needsFullReload) {
306+
msgs = await MessageV2.filterCompactedLazy(sessionID)
307+
needsFullReload = false
308+
}
299309
await SessionStatus.set(sessionID, { type: "busy" })
300310
log.info("loop", { step, sessionID })
301311
if (abort.aborted) break
302-
let msgs = await MessageV2.filterCompacted(MessageV2.stream(sessionID))
303312

304313
let lastUser: MessageV2.User | undefined
305314
let lastAssistant: MessageV2.Assistant | undefined
@@ -536,6 +545,7 @@ export namespace SessionPrompt {
536545
} satisfies MessageV2.TextPart)
537546
}
538547

548+
needsFullReload = true
539549
continue
540550
}
541551

@@ -550,6 +560,7 @@ export namespace SessionPrompt {
550560
overflow: task.overflow,
551561
})
552562
if (result === "stop") break
563+
needsFullReload = true
553564
continue
554565
}
555566

@@ -565,6 +576,7 @@ export namespace SessionPrompt {
565576
model: lastUser.model,
566577
auto: true,
567578
})
579+
needsFullReload = true
568580
continue
569581
}
570582

@@ -684,6 +696,24 @@ export namespace SessionPrompt {
684696
system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
685697
}
686698

699+
// Context-window windowing: only convert messages that fit in the
700+
// LLM context window to ModelMessage format. This avoids creating
701+
// ~300MB of wrapper objects for messages the provider will discard.
702+
const budget = (model.limit.input || model.limit.context || 200_000) * 4 // chars
703+
let used = 0
704+
let windowStart = msgs.length
705+
for (let i = msgs.length - 1; i >= 0; i--) {
706+
for (const part of msgs[i].parts) {
707+
if (part.type === "text") used += part.text.length
708+
else if (part.type === "tool" && part.state.status === "completed")
709+
used += (part.state.output?.length ?? 0) + JSON.stringify(part.state.input).length
710+
else if (part.type === "reasoning") used += part.text.length
711+
}
712+
if (used > budget) break
713+
windowStart = i
714+
}
715+
const window = windowStart > 0 ? msgs.slice(windowStart) : msgs
716+
687717
const result = await processor.process({
688718
user: lastUser,
689719
agent,
@@ -692,7 +722,7 @@ export namespace SessionPrompt {
692722
sessionID,
693723
system,
694724
messages: [
695-
...MessageV2.toModelMessages(msgs, model),
725+
...MessageV2.toModelMessages(window, model),
696726
...(isLastStep
697727
? [
698728
{
@@ -740,6 +770,17 @@ export namespace SessionPrompt {
740770
auto: true,
741771
overflow: !processor.message.finish,
742772
})
773+
needsFullReload = true
774+
} else {
775+
// Normal tool-call continuation: fetch the latest page to pick up
776+
// new assistant messages and tool results, then merge with the
777+
// cached history to avoid reloading the entire conversation.
778+
const fresh = await MessageV2.page({ sessionID, limit: 200 })
779+
const existing = new Map(msgs.map((m) => [m.info.id, m]))
780+
for (const msg of fresh.items) existing.set(msg.info.id, msg)
781+
msgs = Array.from(existing.values()).sort((a, b) =>
782+
a.info.id < b.info.id ? -1 : a.info.id > b.info.id ? 1 : 0,
783+
)
743784
}
744785
continue
745786
}

0 commit comments

Comments
 (0)