Skip to content

Commit 2e23da0

Browse files
Apply PR #20374: refactor: add Effect-returning versions of MessageV2 functions
2 parents 8a16956 + 05d2aa5 commit 2e23da0

File tree

8 files changed

+99
-102
lines changed

8 files changed

+99
-102
lines changed

packages/opencode/src/session/compaction.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ When constructing the summary, try to stick to this template:
218218
const prompt = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n")
219219
const msgs = structuredClone(messages)
220220
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
221-
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
221+
const modelMessages = yield* MessageV2.toModelMessagesEffect(msgs, model, { stripMedia: true })
222222
const ctx = yield* InstanceState.context
223223
const msg: MessageV2.Assistant = {
224224
id: MessageID.ascending(),

packages/opencode/src/session/index.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -593,15 +593,10 @@ export namespace Session {
593593
})
594594

595595
const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) {
596-
return yield* Effect.promise(async () => {
597-
const result = [] as MessageV2.WithParts[]
598-
for await (const msg of MessageV2.stream(input.sessionID)) {
599-
if (input.limit && result.length >= input.limit) break
600-
result.push(msg)
601-
}
602-
result.reverse()
603-
return result
604-
})
596+
if (input.limit) {
597+
return MessageV2.page({ sessionID: input.sessionID, limit: input.limit }).items
598+
}
599+
return Array.from(MessageV2.stream(input.sessionID)).reverse()
605600
})
606601

607602
const removeMessage = Effect.fn("Session.removeMessage")(function* (input: {

packages/opencode/src/session/message-v2.ts

Lines changed: 79 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { NamedError } from "@opencode-ai/util/error"
55
import { APICallError, convertToModelMessages, LoadAPIKeyError, type ModelMessage, type UIMessage } from "ai"
66
import { LSP } from "../lsp"
77
import { Snapshot } from "@/snapshot"
8-
import { fn } from "@/util/fn"
98
import { SyncEvent } from "../sync"
109
import { Database, NotFoundError, and, desc, eq, inArray, lt, or } from "@/storage/db"
1110
import { MessageTable, PartTable, SessionTable } from "./session.sql"
@@ -15,6 +14,7 @@ import { errorMessage } from "@/util/error"
1514
import type { SystemError } from "bun"
1615
import type { Provider } from "@/provider/provider"
1716
import { ModelID, ProviderID } from "@/provider/schema"
17+
import { Effect } from "effect"
1818

1919
/** Error shape thrown by Bun's fetch() when gzip/br decompression fails mid-stream */
2020
interface FetchDecompressionError extends Error {
@@ -547,7 +547,7 @@ export namespace MessageV2 {
547547
and(eq(MessageTable.time_created, row.time), lt(MessageTable.id, row.id)),
548548
)
549549

550-
async function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
550+
function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
551551
const ids = rows.map((row) => row.id)
552552
const partByMessage = new Map<string, MessageV2.Part[]>()
553553
if (ids.length > 0) {
@@ -573,11 +573,11 @@ export namespace MessageV2 {
573573
}))
574574
}
575575

576-
export async function toModelMessages(
576+
export const toModelMessagesEffect = Effect.fnUntraced(function* (
577577
input: WithParts[],
578578
model: Provider.Model,
579579
options?: { stripMedia?: boolean },
580-
): Promise<ModelMessage[]> {
580+
) {
581581
const result: UIMessage[] = []
582582
const toolNames = new Set<string>()
583583
// Track media from tool results that need to be injected as user messages
@@ -800,74 +800,77 @@ export namespace MessageV2 {
800800

801801
const tools = Object.fromEntries(Array.from(toolNames).map((toolName) => [toolName, { toModelOutput }]))
802802

803-
return await convertToModelMessages(
804-
result.filter((msg) => msg.parts.some((part) => part.type !== "step-start")),
805-
{
806-
//@ts-expect-error (convertToModelMessages expects a ToolSet but only actually needs tools[name]?.toModelOutput)
807-
tools,
808-
},
803+
return yield* Effect.promise(() =>
804+
convertToModelMessages(
805+
result.filter((msg) => msg.parts.some((part) => part.type !== "step-start")),
806+
{
807+
//@ts-expect-error (convertToModelMessages expects a ToolSet but only actually needs tools[name]?.toModelOutput)
808+
tools,
809+
},
810+
),
809811
)
812+
})
813+
814+
export function toModelMessages(
815+
input: WithParts[],
816+
model: Provider.Model,
817+
options?: { stripMedia?: boolean },
818+
): Promise<ModelMessage[]> {
819+
return Effect.runPromise(toModelMessagesEffect(input, model, options))
810820
}
811821

812-
export const page = fn(
813-
z.object({
814-
sessionID: SessionID.zod,
815-
limit: z.number().int().positive(),
816-
before: z.string().optional(),
817-
}),
818-
async (input) => {
819-
const before = input.before ? cursor.decode(input.before) : undefined
820-
const where = before
821-
? and(eq(MessageTable.session_id, input.sessionID), older(before))
822-
: eq(MessageTable.session_id, input.sessionID)
823-
const rows = Database.use((db) =>
824-
db
825-
.select()
826-
.from(MessageTable)
827-
.where(where)
828-
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
829-
.limit(input.limit + 1)
830-
.all(),
822+
export function page(input: { sessionID: SessionID; limit: number; before?: string }) {
823+
const before = input.before ? cursor.decode(input.before) : undefined
824+
const where = before
825+
? and(eq(MessageTable.session_id, input.sessionID), older(before))
826+
: eq(MessageTable.session_id, input.sessionID)
827+
const rows = Database.use((db) =>
828+
db
829+
.select()
830+
.from(MessageTable)
831+
.where(where)
832+
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
833+
.limit(input.limit + 1)
834+
.all(),
835+
)
836+
if (rows.length === 0) {
837+
const row = Database.use((db) =>
838+
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
831839
)
832-
if (rows.length === 0) {
833-
const row = Database.use((db) =>
834-
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
835-
)
836-
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
837-
return {
838-
items: [] as MessageV2.WithParts[],
839-
more: false,
840-
}
841-
}
842-
843-
const more = rows.length > input.limit
844-
const page = more ? rows.slice(0, input.limit) : rows
845-
const items = await hydrate(page)
846-
items.reverse()
847-
const tail = page.at(-1)
840+
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
848841
return {
849-
items,
850-
more,
851-
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
842+
items: [] as MessageV2.WithParts[],
843+
more: false,
852844
}
853-
},
854-
)
845+
}
846+
847+
const more = rows.length > input.limit
848+
const slice = more ? rows.slice(0, input.limit) : rows
849+
const items = hydrate(slice)
850+
items.reverse()
851+
const tail = slice.at(-1)
852+
return {
853+
items,
854+
more,
855+
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
856+
}
857+
}
855858

856-
export const stream = fn(SessionID.zod, async function* (sessionID) {
859+
export function* stream(sessionID: SessionID) {
857860
const size = 50
858861
let before: string | undefined
859862
while (true) {
860-
const next = await page({ sessionID, limit: size, before })
863+
const next = page({ sessionID, limit: size, before })
861864
if (next.items.length === 0) break
862865
for (let i = next.items.length - 1; i >= 0; i--) {
863866
yield next.items[i]
864867
}
865868
if (!next.more || !next.cursor) break
866869
before = next.cursor
867870
}
868-
})
871+
}
869872

870-
export const parts = fn(MessageID.zod, async (message_id) => {
873+
export function parts(message_id: MessageID) {
871874
const rows = Database.use((db) =>
872875
db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(),
873876
)
@@ -880,33 +883,28 @@ export namespace MessageV2 {
880883
messageID: row.message_id,
881884
}) as MessageV2.Part,
882885
)
883-
})
886+
}
887+
888+
export function get(input: { sessionID: SessionID; messageID: MessageID }): WithParts {
889+
const row = Database.use((db) =>
890+
db
891+
.select()
892+
.from(MessageTable)
893+
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
894+
.get(),
895+
)
896+
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
897+
return {
898+
info: info(row),
899+
parts: parts(input.messageID),
900+
}
901+
}
884902

885-
export const get = fn(
886-
z.object({
887-
sessionID: SessionID.zod,
888-
messageID: MessageID.zod,
889-
}),
890-
async (input): Promise<WithParts> => {
891-
const row = Database.use((db) =>
892-
db
893-
.select()
894-
.from(MessageTable)
895-
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
896-
.get(),
897-
)
898-
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
899-
return {
900-
info: info(row),
901-
parts: await parts(input.messageID),
902-
}
903-
},
904-
)
905903

906-
export async function filterCompacted(stream: AsyncIterable<MessageV2.WithParts>) {
904+
export function filterCompacted(msgs: Iterable<MessageV2.WithParts>) {
907905
const result = [] as MessageV2.WithParts[]
908906
const completed = new Set<string>()
909-
for await (const msg of stream) {
907+
for (const msg of msgs) {
910908
result.push(msg)
911909
if (
912910
msg.info.role === "user" &&
@@ -921,6 +919,10 @@ export namespace MessageV2 {
921919
return result
922920
}
923921

922+
export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
923+
return filterCompacted(stream(sessionID))
924+
})
925+
924926
export function fromError(
925927
e: unknown,
926928
ctx: { providerID: ProviderID; aborted?: boolean },

packages/opencode/src/session/processor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ export namespace SessionProcessor {
180180
metadata: value.providerMetadata,
181181
} satisfies MessageV2.ToolPart)
182182

183-
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
183+
const parts = MessageV2.parts(ctx.assistantMessage.id)
184184
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
185185

186186
if (
@@ -392,7 +392,7 @@ export namespace SessionProcessor {
392392
}
393393
ctx.reasoningMap = {}
394394

395-
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
395+
const parts = MessageV2.parts(ctx.assistantMessage.id)
396396
for (const part of parts) {
397397
if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
398398
yield* session.updatePart({

packages/opencode/src/session/prompt.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ export namespace SessionPrompt {
213213
(yield* provider.getModel(input.providerID, input.modelID)))
214214
const msgs = onlySubtasks
215215
? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }]
216-
: yield* Effect.promise(() => MessageV2.toModelMessages(context, mdl))
216+
: yield* MessageV2.toModelMessagesEffect(context, mdl)
217217
const text = yield* Effect.promise(async (signal) => {
218218
const result = await LLM.stream({
219219
agent: ag,
@@ -1360,7 +1360,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
13601360
yield* status.set(sessionID, { type: "busy" })
13611361
log.info("loop", { step, sessionID })
13621362

1363-
let msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(sessionID)))
1363+
let msgs = yield* MessageV2.filterCompactedEffect(sessionID)
13641364

13651365
let lastUser: MessageV2.User | undefined
13661366
let lastAssistant: MessageV2.Assistant | undefined

packages/opencode/test/session/messages-pagination.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ describe("session message pagination", () => {
105105
const b = await Session.create({})
106106
const [id] = await fill(a.id, 1)
107107

108-
await expect(MessageV2.get({ sessionID: b.id, messageID: id })).rejects.toMatchObject({ name: "NotFoundError" })
108+
expect(() => MessageV2.get({ sessionID: b.id, messageID: id })).toThrow("NotFoundError")
109109

110110
await Session.remove(a.id)
111111
await Session.remove(b.id)

packages/opencode/test/session/processor-effect.test.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
207207
} satisfies LLM.StreamInput
208208

209209
const value = yield* handle.process(input)
210-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
210+
const parts = MessageV2.parts(msg.id)
211211
const calls = yield* llm.calls
212212

213213
expect(value).toBe("continue")
@@ -254,7 +254,7 @@ it.live("session.processor effect tests stop after token overflow requests compa
254254
tools: {},
255255
})
256256

257-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
257+
const parts = MessageV2.parts(msg.id)
258258

259259
expect(value).toBe("compact")
260260
expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
@@ -299,7 +299,7 @@ it.live("session.processor effect tests capture reasoning from http mock", () =>
299299
tools: {},
300300
})
301301

302-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
302+
const parts = MessageV2.parts(msg.id)
303303
const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
304304
const text = parts.find((part): part is MessageV2.TextPart => part.type === "text")
305305

@@ -347,7 +347,7 @@ it.live("session.processor effect tests reset reasoning state across retries", (
347347
tools: {},
348348
})
349349

350-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
350+
const parts = MessageV2.parts(msg.id)
351351
const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
352352

353353
expect(value).toBe("continue")
@@ -438,7 +438,7 @@ it.live("session.processor effect tests retry recognized structured json errors"
438438
tools: {},
439439
})
440440

441-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
441+
const parts = MessageV2.parts(msg.id)
442442

443443
expect(value).toBe("continue")
444444
expect(yield* llm.calls).toBe(2)
@@ -596,7 +596,7 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
596596
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
597597
yield* handle.abort()
598598
}
599-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
599+
const parts = MessageV2.parts(msg.id)
600600
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
601601

602602
expect(Exit.isFailure(exit)).toBe(true)
@@ -669,7 +669,7 @@ it.live("session.processor effect tests record aborted errors and idle state", (
669669
yield* handle.abort()
670670
}
671671
yield* Effect.promise(() => seen.promise)
672-
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
672+
const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
673673
const state = yield* sts.get(chat.id)
674674
off()
675675

@@ -731,7 +731,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua
731731
yield* Fiber.interrupt(run)
732732

733733
const exit = yield* Fiber.await(run)
734-
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
734+
const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
735735
const state = yield* sts.get(chat.id)
736736

737737
expect(Exit.isFailure(exit)).toBe(true)

packages/opencode/test/session/prompt-effect.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ it.live("failed subtask preserves metadata on error tool state", () =>
469469
expect(result.info.role).toBe("assistant")
470470
expect(yield* llm.calls).toBe(2)
471471

472-
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
472+
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
473473
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
474474
expect(taskMsg?.info.role).toBe("assistant")
475475
if (!taskMsg || taskMsg.info.role !== "assistant") return
@@ -628,7 +628,7 @@ it.live(
628628
const exit = yield* Fiber.await(fiber)
629629
expect(Exit.isSuccess(exit)).toBe(true)
630630

631-
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
631+
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
632632
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
633633
expect(taskMsg?.info.role).toBe("assistant")
634634
if (!taskMsg || taskMsg.info.role !== "assistant") return

0 commit comments

Comments
 (0)