Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/opencode/src/session/compaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ When constructing the summary, try to stick to this template:
const prompt = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n")
const msgs = structuredClone(messages)
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
const modelMessages = yield* MessageV2.toModelMessagesEffect(msgs, model, { stripMedia: true })
const ctx = yield* InstanceState.context
const msg: MessageV2.Assistant = {
id: MessageID.ascending(),
Expand Down
16 changes: 7 additions & 9 deletions packages/opencode/src/session/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -593,15 +593,13 @@ export namespace Session {
})

const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) {
return yield* Effect.promise(async () => {
const result = [] as MessageV2.WithParts[]
for await (const msg of MessageV2.stream(input.sessionID)) {
if (input.limit && result.length >= input.limit) break
result.push(msg)
}
result.reverse()
return result
})
if (input.limit) {
const result = yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: input.limit })
return result.items
}
const all = yield* MessageV2.streamEffect(input.sessionID)
all.reverse()
return all
})

const removeMessage = Effect.fn("Session.removeMessage")(function* (input: {
Expand Down
175 changes: 122 additions & 53 deletions packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { errorMessage } from "@/util/error"
import type { SystemError } from "bun"
import type { Provider } from "@/provider/provider"
import { ModelID, ProviderID } from "@/provider/schema"
import { Effect } from "effect"

/** Error shape thrown by Bun's fetch() when gzip/br decompression fails mid-stream */
interface FetchDecompressionError extends Error {
Expand Down Expand Up @@ -547,7 +548,7 @@ export namespace MessageV2 {
and(eq(MessageTable.time_created, row.time), lt(MessageTable.id, row.id)),
)

async function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
const ids = rows.map((row) => row.id)
const partByMessage = new Map<string, MessageV2.Part[]>()
if (ids.length > 0) {
Expand Down Expand Up @@ -809,48 +810,58 @@ export namespace MessageV2 {
)
}

export const toModelMessagesEffect = Effect.fnUntraced(function* (
input: WithParts[],
model: Provider.Model,
options?: { stripMedia?: boolean },
) {
return yield* Effect.promise(() => toModelMessages(input, model, options))
})

function pageSync(input: { sessionID: SessionID; limit: number; before?: string }) {
const before = input.before ? cursor.decode(input.before) : undefined
const where = before
? and(eq(MessageTable.session_id, input.sessionID), older(before))
: eq(MessageTable.session_id, input.sessionID)
const rows = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(where)
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
.limit(input.limit + 1)
.all(),
)
if (rows.length === 0) {
const row = Database.use((db) =>
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
)
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
return {
items: [] as MessageV2.WithParts[],
more: false,
}
}

const more = rows.length > input.limit
const slice = more ? rows.slice(0, input.limit) : rows
const items = hydrate(slice)
items.reverse()
const tail = slice.at(-1)
return {
items,
more,
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
}
}

export const page = fn(
z.object({
sessionID: SessionID.zod,
limit: z.number().int().positive(),
before: z.string().optional(),
}),
async (input) => {
const before = input.before ? cursor.decode(input.before) : undefined
const where = before
? and(eq(MessageTable.session_id, input.sessionID), older(before))
: eq(MessageTable.session_id, input.sessionID)
const rows = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(where)
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
.limit(input.limit + 1)
.all(),
)
if (rows.length === 0) {
const row = Database.use((db) =>
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
)
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
return {
items: [] as MessageV2.WithParts[],
more: false,
}
}

const more = rows.length > input.limit
const page = more ? rows.slice(0, input.limit) : rows
const items = await hydrate(page)
items.reverse()
const tail = page.at(-1)
return {
items,
more,
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
}
},
async (input) => pageSync(input),
)

export const stream = fn(SessionID.zod, async function* (sessionID) {
Expand All @@ -867,7 +878,7 @@ export namespace MessageV2 {
}
})

export const parts = fn(MessageID.zod, async (message_id) => {
function partsSync(message_id: MessageID) {
const rows = Database.use((db) =>
db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(),
)
Expand All @@ -880,29 +891,83 @@ export namespace MessageV2 {
messageID: row.message_id,
}) as MessageV2.Part,
)
})
}

export const parts = fn(MessageID.zod, async (message_id) => partsSync(message_id))

function getSync(input: { sessionID: SessionID; messageID: MessageID }): WithParts {
const row = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
.get(),
)
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
return {
info: info(row),
parts: partsSync(input.messageID),
}
}

export const get = fn(
z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
}),
async (input): Promise<WithParts> => {
const row = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
.get(),
)
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
return {
info: info(row),
parts: await parts(input.messageID),
}
},
async (input): Promise<WithParts> => getSync(input),
)

export const partsEffect = Effect.fnUntraced(function* (id: MessageID) {
return partsSync(id)
})

export const getEffect = Effect.fnUntraced(function* (input: { sessionID: SessionID; messageID: MessageID }) {
return getSync(input)
})

export const pageEffect = Effect.fnUntraced(function* (input: {
sessionID: SessionID
limit: number
before?: string
}) {
return pageSync(input)
})

export const streamEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
const result: WithParts[] = []
const size = 50
let before: string | undefined
while (true) {
const next = pageSync({ sessionID, limit: size, before })
if (next.items.length === 0) break
for (let i = next.items.length - 1; i >= 0; i--) {
result.push(next.items[i])
}
if (!next.more || !next.cursor) break
before = next.cursor
}
return result
})

function applyCompactionFilter(msgs: MessageV2.WithParts[]) {
const result = [] as MessageV2.WithParts[]
const completed = new Set<string>()
for (const msg of msgs) {
result.push(msg)
if (
msg.info.role === "user" &&
completed.has(msg.info.id) &&
msg.parts.some((part) => part.type === "compaction")
)
break
if (msg.info.role === "assistant" && msg.info.summary && msg.info.finish && !msg.info.error)
completed.add(msg.info.parentID)
}
result.reverse()
return result
}

export async function filterCompacted(stream: AsyncIterable<MessageV2.WithParts>) {
const result = [] as MessageV2.WithParts[]
const completed = new Set<string>()
Expand All @@ -921,6 +986,10 @@ export namespace MessageV2 {
return result
}

export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
return applyCompactionFilter(yield* streamEffect(sessionID))
})

export function fromError(
e: unknown,
ctx: { providerID: ProviderID; aborted?: boolean },
Expand Down
4 changes: 2 additions & 2 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ export namespace SessionProcessor {
metadata: value.providerMetadata,
} satisfies MessageV2.ToolPart)

const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id)
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)

if (
Expand Down Expand Up @@ -392,7 +392,7 @@ export namespace SessionProcessor {
}
ctx.reasoningMap = {}

const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id)
for (const part of parts) {
if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
yield* session.updatePart({
Expand Down
4 changes: 2 additions & 2 deletions packages/opencode/src/session/prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ export namespace SessionPrompt {
(yield* provider.getModel(input.providerID, input.modelID)))
const msgs = onlySubtasks
? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }]
: yield* Effect.promise(() => MessageV2.toModelMessages(context, mdl))
: yield* MessageV2.toModelMessagesEffect(context, mdl)
const text = yield* Effect.promise(async (signal) => {
const result = await LLM.stream({
agent: ag,
Expand Down Expand Up @@ -1360,7 +1360,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
yield* status.set(sessionID, { type: "busy" })
log.info("loop", { step, sessionID })

let msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(sessionID)))
let msgs = yield* MessageV2.filterCompactedEffect(sessionID)

let lastUser: MessageV2.User | undefined
let lastAssistant: MessageV2.Assistant | undefined
Expand Down
14 changes: 7 additions & 7 deletions packages/opencode/test/session/processor-effect.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
} satisfies LLM.StreamInput

const value = yield* handle.process(input)
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = yield* MessageV2.partsEffect(msg.id)
const calls = yield* llm.calls

expect(value).toBe("continue")
Expand Down Expand Up @@ -254,7 +254,7 @@ it.live("session.processor effect tests stop after token overflow requests compa
tools: {},
})

const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = yield* MessageV2.partsEffect(msg.id)

expect(value).toBe("compact")
expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
Expand Down Expand Up @@ -347,7 +347,7 @@ it.live("session.processor effect tests reset reasoning state across retries", (
tools: {},
})

const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = yield* MessageV2.partsEffect(msg.id)
const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")

expect(value).toBe("continue")
Expand Down Expand Up @@ -438,7 +438,7 @@ it.live("session.processor effect tests retry recognized structured json errors"
tools: {},
})

const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = yield* MessageV2.partsEffect(msg.id)

expect(value).toBe("continue")
expect(yield* llm.calls).toBe(2)
Expand Down Expand Up @@ -596,7 +596,7 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
yield* handle.abort()
}
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = yield* MessageV2.partsEffect(msg.id)
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")

expect(Exit.isFailure(exit)).toBe(true)
Expand Down Expand Up @@ -669,7 +669,7 @@ it.live("session.processor effect tests record aborted errors and idle state", (
yield* handle.abort()
}
yield* Effect.promise(() => seen.promise)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id })
const state = yield* sts.get(chat.id)
off()

Expand Down Expand Up @@ -731,7 +731,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua
yield* Fiber.interrupt(run)

const exit = yield* Fiber.await(run)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id })
const state = yield* sts.get(chat.id)

expect(Exit.isFailure(exit)).toBe(true)
Expand Down
4 changes: 2 additions & 2 deletions packages/opencode/test/session/prompt-effect.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ it.live("failed subtask preserves metadata on error tool state", () =>
expect(result.info.role).toBe("assistant")
expect(yield* llm.calls).toBe(2)

const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
expect(taskMsg?.info.role).toBe("assistant")
if (!taskMsg || taskMsg.info.role !== "assistant") return
Expand Down Expand Up @@ -628,7 +628,7 @@ it.live(
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)

const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
expect(taskMsg?.info.role).toBe("assistant")
if (!taskMsg || taskMsg.info.role !== "assistant") return
Expand Down
Loading