Skip to content

Commit f3c3d38

Browse files
Apply PR #20374: refactor: add Effect-returning versions of MessageV2 functions
2 parents 4226c0d + af3f77f commit f3c3d38

File tree

7 files changed

+143
-76
lines changed

7 files changed

+143
-76
lines changed

packages/opencode/src/session/compaction.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ When constructing the summary, try to stick to this template:
218218
const prompt = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n")
219219
const msgs = structuredClone(messages)
220220
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
221-
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
221+
const modelMessages = yield* MessageV2.toModelMessagesEffect(msgs, model, { stripMedia: true })
222222
const ctx = yield* InstanceState.context
223223
const msg: MessageV2.Assistant = {
224224
id: MessageID.ascending(),

packages/opencode/src/session/index.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -593,15 +593,13 @@ export namespace Session {
593593
})
594594

595595
const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) {
596-
return yield* Effect.promise(async () => {
597-
const result = [] as MessageV2.WithParts[]
598-
for await (const msg of MessageV2.stream(input.sessionID)) {
599-
if (input.limit && result.length >= input.limit) break
600-
result.push(msg)
601-
}
602-
result.reverse()
603-
return result
604-
})
596+
if (input.limit) {
597+
const result = yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: input.limit })
598+
return result.items
599+
}
600+
const all = yield* MessageV2.streamEffect(input.sessionID)
601+
all.reverse()
602+
return all
605603
})
606604

607605
const removeMessage = Effect.fn("Session.removeMessage")(function* (input: {

packages/opencode/src/session/message-v2.ts

Lines changed: 122 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { errorMessage } from "@/util/error"
1515
import type { SystemError } from "bun"
1616
import type { Provider } from "@/provider/provider"
1717
import { ModelID, ProviderID } from "@/provider/schema"
18+
import { Effect } from "effect"
1819

1920
/** Error shape thrown by Bun's fetch() when gzip/br decompression fails mid-stream */
2021
interface FetchDecompressionError extends Error {
@@ -547,7 +548,7 @@ export namespace MessageV2 {
547548
and(eq(MessageTable.time_created, row.time), lt(MessageTable.id, row.id)),
548549
)
549550

550-
async function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
551+
function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
551552
const ids = rows.map((row) => row.id)
552553
const partByMessage = new Map<string, MessageV2.Part[]>()
553554
if (ids.length > 0) {
@@ -809,48 +810,58 @@ export namespace MessageV2 {
809810
)
810811
}
811812

813+
export const toModelMessagesEffect = Effect.fnUntraced(function* (
814+
input: WithParts[],
815+
model: Provider.Model,
816+
options?: { stripMedia?: boolean },
817+
) {
818+
return yield* Effect.promise(() => toModelMessages(input, model, options))
819+
})
820+
821+
function pageSync(input: { sessionID: SessionID; limit: number; before?: string }) {
822+
const before = input.before ? cursor.decode(input.before) : undefined
823+
const where = before
824+
? and(eq(MessageTable.session_id, input.sessionID), older(before))
825+
: eq(MessageTable.session_id, input.sessionID)
826+
const rows = Database.use((db) =>
827+
db
828+
.select()
829+
.from(MessageTable)
830+
.where(where)
831+
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
832+
.limit(input.limit + 1)
833+
.all(),
834+
)
835+
if (rows.length === 0) {
836+
const row = Database.use((db) =>
837+
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
838+
)
839+
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
840+
return {
841+
items: [] as MessageV2.WithParts[],
842+
more: false,
843+
}
844+
}
845+
846+
const more = rows.length > input.limit
847+
const slice = more ? rows.slice(0, input.limit) : rows
848+
const items = hydrate(slice)
849+
items.reverse()
850+
const tail = slice.at(-1)
851+
return {
852+
items,
853+
more,
854+
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
855+
}
856+
}
857+
812858
export const page = fn(
813859
z.object({
814860
sessionID: SessionID.zod,
815861
limit: z.number().int().positive(),
816862
before: z.string().optional(),
817863
}),
818-
async (input) => {
819-
const before = input.before ? cursor.decode(input.before) : undefined
820-
const where = before
821-
? and(eq(MessageTable.session_id, input.sessionID), older(before))
822-
: eq(MessageTable.session_id, input.sessionID)
823-
const rows = Database.use((db) =>
824-
db
825-
.select()
826-
.from(MessageTable)
827-
.where(where)
828-
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
829-
.limit(input.limit + 1)
830-
.all(),
831-
)
832-
if (rows.length === 0) {
833-
const row = Database.use((db) =>
834-
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
835-
)
836-
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
837-
return {
838-
items: [] as MessageV2.WithParts[],
839-
more: false,
840-
}
841-
}
842-
843-
const more = rows.length > input.limit
844-
const page = more ? rows.slice(0, input.limit) : rows
845-
const items = await hydrate(page)
846-
items.reverse()
847-
const tail = page.at(-1)
848-
return {
849-
items,
850-
more,
851-
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
852-
}
853-
},
864+
async (input) => pageSync(input),
854865
)
855866

856867
export const stream = fn(SessionID.zod, async function* (sessionID) {
@@ -867,7 +878,7 @@ export namespace MessageV2 {
867878
}
868879
})
869880

870-
export const parts = fn(MessageID.zod, async (message_id) => {
881+
function partsSync(message_id: MessageID) {
871882
const rows = Database.use((db) =>
872883
db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(),
873884
)
@@ -880,29 +891,83 @@ export namespace MessageV2 {
880891
messageID: row.message_id,
881892
}) as MessageV2.Part,
882893
)
883-
})
894+
}
895+
896+
export const parts = fn(MessageID.zod, async (message_id) => partsSync(message_id))
897+
898+
function getSync(input: { sessionID: SessionID; messageID: MessageID }): WithParts {
899+
const row = Database.use((db) =>
900+
db
901+
.select()
902+
.from(MessageTable)
903+
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
904+
.get(),
905+
)
906+
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
907+
return {
908+
info: info(row),
909+
parts: partsSync(input.messageID),
910+
}
911+
}
884912

885913
export const get = fn(
886914
z.object({
887915
sessionID: SessionID.zod,
888916
messageID: MessageID.zod,
889917
}),
890-
async (input): Promise<WithParts> => {
891-
const row = Database.use((db) =>
892-
db
893-
.select()
894-
.from(MessageTable)
895-
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
896-
.get(),
897-
)
898-
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
899-
return {
900-
info: info(row),
901-
parts: await parts(input.messageID),
902-
}
903-
},
918+
async (input): Promise<WithParts> => getSync(input),
904919
)
905920

921+
export const partsEffect = Effect.fnUntraced(function* (id: MessageID) {
922+
return partsSync(id)
923+
})
924+
925+
export const getEffect = Effect.fnUntraced(function* (input: { sessionID: SessionID; messageID: MessageID }) {
926+
return getSync(input)
927+
})
928+
929+
export const pageEffect = Effect.fnUntraced(function* (input: {
930+
sessionID: SessionID
931+
limit: number
932+
before?: string
933+
}) {
934+
return pageSync(input)
935+
})
936+
937+
export const streamEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
938+
const result: WithParts[] = []
939+
const size = 50
940+
let before: string | undefined
941+
while (true) {
942+
const next = pageSync({ sessionID, limit: size, before })
943+
if (next.items.length === 0) break
944+
for (let i = next.items.length - 1; i >= 0; i--) {
945+
result.push(next.items[i])
946+
}
947+
if (!next.more || !next.cursor) break
948+
before = next.cursor
949+
}
950+
return result
951+
})
952+
953+
function applyCompactionFilter(msgs: MessageV2.WithParts[]) {
954+
const result = [] as MessageV2.WithParts[]
955+
const completed = new Set<string>()
956+
for (const msg of msgs) {
957+
result.push(msg)
958+
if (
959+
msg.info.role === "user" &&
960+
completed.has(msg.info.id) &&
961+
msg.parts.some((part) => part.type === "compaction")
962+
)
963+
break
964+
if (msg.info.role === "assistant" && msg.info.summary && msg.info.finish && !msg.info.error)
965+
completed.add(msg.info.parentID)
966+
}
967+
result.reverse()
968+
return result
969+
}
970+
906971
export async function filterCompacted(stream: AsyncIterable<MessageV2.WithParts>) {
907972
const result = [] as MessageV2.WithParts[]
908973
const completed = new Set<string>()
@@ -921,6 +986,10 @@ export namespace MessageV2 {
921986
return result
922987
}
923988

989+
export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
990+
return applyCompactionFilter(yield* streamEffect(sessionID))
991+
})
992+
924993
export function fromError(
925994
e: unknown,
926995
ctx: { providerID: ProviderID; aborted?: boolean },

packages/opencode/src/session/processor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ export namespace SessionProcessor {
180180
metadata: value.providerMetadata,
181181
} satisfies MessageV2.ToolPart)
182182

183-
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
183+
const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id)
184184
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
185185

186186
if (
@@ -392,7 +392,7 @@ export namespace SessionProcessor {
392392
}
393393
ctx.reasoningMap = {}
394394

395-
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
395+
const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id)
396396
for (const part of parts) {
397397
if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
398398
yield* session.updatePart({

packages/opencode/src/session/prompt.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ export namespace SessionPrompt {
213213
(yield* provider.getModel(input.providerID, input.modelID)))
214214
const msgs = onlySubtasks
215215
? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }]
216-
: yield* Effect.promise(() => MessageV2.toModelMessages(context, mdl))
216+
: yield* MessageV2.toModelMessagesEffect(context, mdl)
217217
const text = yield* Effect.promise(async (signal) => {
218218
const result = await LLM.stream({
219219
agent: ag,
@@ -1360,7 +1360,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
13601360
yield* status.set(sessionID, { type: "busy" })
13611361
log.info("loop", { step, sessionID })
13621362

1363-
let msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(sessionID)))
1363+
let msgs = yield* MessageV2.filterCompactedEffect(sessionID)
13641364

13651365
let lastUser: MessageV2.User | undefined
13661366
let lastAssistant: MessageV2.Assistant | undefined

packages/opencode/test/session/processor-effect.test.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
207207
} satisfies LLM.StreamInput
208208

209209
const value = yield* handle.process(input)
210-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
210+
const parts = yield* MessageV2.partsEffect(msg.id)
211211
const calls = yield* llm.calls
212212

213213
expect(value).toBe("continue")
@@ -254,7 +254,7 @@ it.live("session.processor effect tests stop after token overflow requests compa
254254
tools: {},
255255
})
256256

257-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
257+
const parts = yield* MessageV2.partsEffect(msg.id)
258258

259259
expect(value).toBe("compact")
260260
expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
@@ -347,7 +347,7 @@ it.live("session.processor effect tests reset reasoning state across retries", (
347347
tools: {},
348348
})
349349

350-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
350+
const parts = yield* MessageV2.partsEffect(msg.id)
351351
const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
352352

353353
expect(value).toBe("continue")
@@ -438,7 +438,7 @@ it.live("session.processor effect tests retry recognized structured json errors"
438438
tools: {},
439439
})
440440

441-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
441+
const parts = yield* MessageV2.partsEffect(msg.id)
442442

443443
expect(value).toBe("continue")
444444
expect(yield* llm.calls).toBe(2)
@@ -596,7 +596,7 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
596596
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
597597
yield* handle.abort()
598598
}
599-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
599+
const parts = yield* MessageV2.partsEffect(msg.id)
600600
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
601601

602602
expect(Exit.isFailure(exit)).toBe(true)
@@ -669,7 +669,7 @@ it.live("session.processor effect tests record aborted errors and idle state", (
669669
yield* handle.abort()
670670
}
671671
yield* Effect.promise(() => seen.promise)
672-
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
672+
const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id })
673673
const state = yield* sts.get(chat.id)
674674
off()
675675

@@ -731,7 +731,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua
731731
yield* Fiber.interrupt(run)
732732

733733
const exit = yield* Fiber.await(run)
734-
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
734+
const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id })
735735
const state = yield* sts.get(chat.id)
736736

737737
expect(Exit.isFailure(exit)).toBe(true)

packages/opencode/test/session/prompt-effect.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ it.live("failed subtask preserves metadata on error tool state", () =>
469469
expect(result.info.role).toBe("assistant")
470470
expect(yield* llm.calls).toBe(2)
471471

472-
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
472+
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
473473
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
474474
expect(taskMsg?.info.role).toBe("assistant")
475475
if (!taskMsg || taskMsg.info.role !== "assistant") return
@@ -628,7 +628,7 @@ it.live(
628628
const exit = yield* Fiber.await(fiber)
629629
expect(Exit.isSuccess(exit)).toBe(true)
630630

631-
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
631+
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
632632
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
633633
expect(taskMsg?.info.role).toBe("assistant")
634634
if (!taskMsg || taskMsg.info.role !== "assistant") return

0 commit comments

Comments
 (0)