Skip to content

Commit 6c2567a

Browse files
committed
refactor: address PR review — remove sync Effect wrappers, add Stream pagination
- Remove partsEffect/getEffect/pageEffect; call sync functions directly - Convert streamMessages to lazy Stream.paginate (chunk-efficient) - Add filterCompactedMessages using Stream.takeUntil on the stream - Make toModelMessagesEffect the primary impl, derive promise version
1 parent af3f77f commit 6c2567a

File tree

4 files changed

+56
-79
lines changed

4 files changed

+56
-79
lines changed

packages/opencode/src/session/index.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { ModelID, ProviderID } from "@/provider/schema"
3333
import { Permission } from "@/permission"
3434
import { Global } from "@/global"
3535
import type { LanguageModelV2Usage } from "@ai-sdk/provider"
36-
import { Effect, Layer, Scope, ServiceMap } from "effect"
36+
import { Effect, Layer, Scope, ServiceMap, Stream } from "effect"
3737
import { makeRuntime } from "@/effect/run-service"
3838

3939
export namespace Session {
@@ -594,12 +594,10 @@ export namespace Session {
594594

595595
const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) {
596596
if (input.limit) {
597-
const result = yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: input.limit })
598-
return result.items
597+
return MessageV2.pageSync({ sessionID: input.sessionID, limit: input.limit }).items
599598
}
600-
const all = yield* MessageV2.streamEffect(input.sessionID)
601-
all.reverse()
602-
return all
599+
const all = yield* Stream.runCollect(MessageV2.streamMessages(input.sessionID))
600+
return Array.from(all).reverse()
603601
})
604602

605603
const removeMessage = Effect.fn("Session.removeMessage")(function* (input: {

packages/opencode/src/session/message-v2.ts

Lines changed: 42 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import { errorMessage } from "@/util/error"
1515
import type { SystemError } from "bun"
1616
import type { Provider } from "@/provider/provider"
1717
import { ModelID, ProviderID } from "@/provider/schema"
18-
import { Effect } from "effect"
18+
import { Effect, Option, Stream } from "effect"
1919

2020
/** Error shape thrown by Bun's fetch() when gzip/br decompression fails mid-stream */
2121
interface FetchDecompressionError extends Error {
@@ -574,11 +574,11 @@ export namespace MessageV2 {
574574
}))
575575
}
576576

577-
export async function toModelMessages(
577+
export const toModelMessagesEffect = Effect.fnUntraced(function* (
578578
input: WithParts[],
579579
model: Provider.Model,
580580
options?: { stripMedia?: boolean },
581-
): Promise<ModelMessage[]> {
581+
) {
582582
const result: UIMessage[] = []
583583
const toolNames = new Set<string>()
584584
// Track media from tool results that need to be injected as user messages
@@ -801,24 +801,26 @@ export namespace MessageV2 {
801801

802802
const tools = Object.fromEntries(Array.from(toolNames).map((toolName) => [toolName, { toModelOutput }]))
803803

804-
return await convertToModelMessages(
805-
result.filter((msg) => msg.parts.some((part) => part.type !== "step-start")),
806-
{
807-
//@ts-expect-error (convertToModelMessages expects a ToolSet but only actually needs tools[name]?.toModelOutput)
808-
tools,
809-
},
804+
return yield* Effect.promise(() =>
805+
convertToModelMessages(
806+
result.filter((msg) => msg.parts.some((part) => part.type !== "step-start")),
807+
{
808+
//@ts-expect-error (convertToModelMessages expects a ToolSet but only actually needs tools[name]?.toModelOutput)
809+
tools,
810+
},
811+
),
810812
)
811-
}
813+
})
812814

813-
export const toModelMessagesEffect = Effect.fnUntraced(function* (
815+
export function toModelMessages(
814816
input: WithParts[],
815817
model: Provider.Model,
816818
options?: { stripMedia?: boolean },
817-
) {
818-
return yield* Effect.promise(() => toModelMessages(input, model, options))
819-
})
819+
): Promise<ModelMessage[]> {
820+
return Effect.runPromise(toModelMessagesEffect(input, model, options))
821+
}
820822

821-
function pageSync(input: { sessionID: SessionID; limit: number; before?: string }) {
823+
export function pageSync(input: { sessionID: SessionID; limit: number; before?: string }) {
822824
const before = input.before ? cursor.decode(input.before) : undefined
823825
const where = before
824826
? and(eq(MessageTable.session_id, input.sessionID), older(before))
@@ -878,7 +880,7 @@ export namespace MessageV2 {
878880
}
879881
})
880882

881-
function partsSync(message_id: MessageID) {
883+
export function partsSync(message_id: MessageID) {
882884
const rows = Database.use((db) =>
883885
db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(),
884886
)
@@ -895,7 +897,7 @@ export namespace MessageV2 {
895897

896898
export const parts = fn(MessageID.zod, async (message_id) => partsSync(message_id))
897899

898-
function getSync(input: { sessionID: SessionID; messageID: MessageID }): WithParts {
900+
export function getSync(input: { sessionID: SessionID; messageID: MessageID }): WithParts {
899901
const row = Database.use((db) =>
900902
db
901903
.select()
@@ -918,54 +920,30 @@ export namespace MessageV2 {
918920
async (input): Promise<WithParts> => getSync(input),
919921
)
920922

921-
export const partsEffect = Effect.fnUntraced(function* (id: MessageID) {
922-
return partsSync(id)
923-
})
924-
925-
export const getEffect = Effect.fnUntraced(function* (input: { sessionID: SessionID; messageID: MessageID }) {
926-
return getSync(input)
927-
})
928-
929-
export const pageEffect = Effect.fnUntraced(function* (input: {
930-
sessionID: SessionID
931-
limit: number
932-
before?: string
933-
}) {
934-
return pageSync(input)
935-
})
936923

937-
export const streamEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
938-
const result: WithParts[] = []
939-
const size = 50
940-
let before: string | undefined
941-
while (true) {
942-
const next = pageSync({ sessionID, limit: size, before })
943-
if (next.items.length === 0) break
944-
for (let i = next.items.length - 1; i >= 0; i--) {
945-
result.push(next.items[i])
946-
}
947-
if (!next.more || !next.cursor) break
948-
before = next.cursor
949-
}
950-
return result
951-
})
924+
export const streamMessages = (sessionID: SessionID): Stream.Stream<WithParts> =>
925+
Stream.paginate(undefined as string | undefined, (before) =>
926+
Effect.sync(() => {
927+
const next = pageSync({ sessionID, limit: 50, before })
928+
const items = next.items.toReversed()
929+
const nextCursor = next.more && next.cursor ? Option.some(next.cursor) : Option.none()
930+
return [items, nextCursor] as const
931+
}),
932+
)
952933

953-
function applyCompactionFilter(msgs: MessageV2.WithParts[]) {
954-
const result = [] as MessageV2.WithParts[]
934+
export const filterCompactedMessages = (sessionID: SessionID): Stream.Stream<WithParts> => {
955935
const completed = new Set<string>()
956-
for (const msg of msgs) {
957-
result.push(msg)
958-
if (
959-
msg.info.role === "user" &&
960-
completed.has(msg.info.id) &&
961-
msg.parts.some((part) => part.type === "compaction")
962-
)
963-
break
964-
if (msg.info.role === "assistant" && msg.info.summary && msg.info.finish && !msg.info.error)
965-
completed.add(msg.info.parentID)
966-
}
967-
result.reverse()
968-
return result
936+
return streamMessages(sessionID).pipe(
937+
Stream.takeUntil((msg) => {
938+
if (msg.info.role === "assistant" && msg.info.summary && msg.info.finish && !msg.info.error)
939+
completed.add(msg.info.parentID)
940+
return (
941+
msg.info.role === "user" &&
942+
completed.has(msg.info.id) &&
943+
msg.parts.some((part) => part.type === "compaction")
944+
)
945+
}),
946+
)
969947
}
970948

971949
export async function filterCompacted(stream: AsyncIterable<MessageV2.WithParts>) {
@@ -987,7 +965,8 @@ export namespace MessageV2 {
987965
}
988966

989967
export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
990-
return applyCompactionFilter(yield* streamEffect(sessionID))
968+
const all = yield* Stream.runCollect(filterCompactedMessages(sessionID))
969+
return Array.from(all).reverse()
991970
})
992971

993972
export function fromError(

packages/opencode/src/session/processor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ export namespace SessionProcessor {
180180
metadata: value.providerMetadata,
181181
} satisfies MessageV2.ToolPart)
182182

183-
const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id)
183+
const parts = MessageV2.partsSync(ctx.assistantMessage.id)
184184
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
185185

186186
if (
@@ -392,7 +392,7 @@ export namespace SessionProcessor {
392392
}
393393
ctx.reasoningMap = {}
394394

395-
const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id)
395+
const parts = MessageV2.partsSync(ctx.assistantMessage.id)
396396
for (const part of parts) {
397397
if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
398398
yield* session.updatePart({

packages/opencode/test/session/processor-effect.test.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
207207
} satisfies LLM.StreamInput
208208

209209
const value = yield* handle.process(input)
210-
const parts = yield* MessageV2.partsEffect(msg.id)
210+
const parts = MessageV2.partsSync(msg.id)
211211
const calls = yield* llm.calls
212212

213213
expect(value).toBe("continue")
@@ -254,7 +254,7 @@ it.live("session.processor effect tests stop after token overflow requests compa
254254
tools: {},
255255
})
256256

257-
const parts = yield* MessageV2.partsEffect(msg.id)
257+
const parts = MessageV2.partsSync(msg.id)
258258

259259
expect(value).toBe("compact")
260260
expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
@@ -299,7 +299,7 @@ it.live("session.processor effect tests capture reasoning from http mock", () =>
299299
tools: {},
300300
})
301301

302-
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
302+
const parts = MessageV2.partsSync(msg.id)
303303
const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
304304
const text = parts.find((part): part is MessageV2.TextPart => part.type === "text")
305305

@@ -347,7 +347,7 @@ it.live("session.processor effect tests reset reasoning state across retries", (
347347
tools: {},
348348
})
349349

350-
const parts = yield* MessageV2.partsEffect(msg.id)
350+
const parts = MessageV2.partsSync(msg.id)
351351
const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
352352

353353
expect(value).toBe("continue")
@@ -438,7 +438,7 @@ it.live("session.processor effect tests retry recognized structured json errors"
438438
tools: {},
439439
})
440440

441-
const parts = yield* MessageV2.partsEffect(msg.id)
441+
const parts = MessageV2.partsSync(msg.id)
442442

443443
expect(value).toBe("continue")
444444
expect(yield* llm.calls).toBe(2)
@@ -596,7 +596,7 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
596596
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
597597
yield* handle.abort()
598598
}
599-
const parts = yield* MessageV2.partsEffect(msg.id)
599+
const parts = MessageV2.partsSync(msg.id)
600600
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
601601

602602
expect(Exit.isFailure(exit)).toBe(true)
@@ -669,7 +669,7 @@ it.live("session.processor effect tests record aborted errors and idle state", (
669669
yield* handle.abort()
670670
}
671671
yield* Effect.promise(() => seen.promise)
672-
const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id })
672+
const stored = MessageV2.getSync({ sessionID: chat.id, messageID: msg.id })
673673
const state = yield* sts.get(chat.id)
674674
off()
675675

@@ -731,7 +731,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua
731731
yield* Fiber.interrupt(run)
732732

733733
const exit = yield* Fiber.await(run)
734-
const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id })
734+
const stored = MessageV2.getSync({ sessionID: chat.id, messageID: msg.id })
735735
const state = yield* sts.get(chat.id)
736736

737737
expect(Exit.isFailure(exit)).toBe(true)

0 commit comments

Comments
 (0)