Skip to content

Commit d34a539

Browse files
Apply PR #18579: effectify Bus service: migrate to Effect PubSub + InstanceState
2 parents 8e14487 + 36930fd commit d34a539

File tree

29 files changed

+401
-189
lines changed

29 files changed

+401
-189
lines changed

packages/opencode/specs/effect-migration.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Practical reference for new and migrated Effect code in `packages/opencode`.
66

77
Use `InstanceState` (from `src/effect/instance-state.ts`) for services that need per-directory state, per-instance cleanup, or project-bound background work. InstanceState uses a `ScopedCache` keyed by directory, so each open project gets its own copy of the state that is automatically cleaned up on disposal.
88

9-
Use `makeRunPromise` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`.
9+
Use `makeRuntime` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`. Returns `{ runPromise, runFork }`.
1010

1111
- Global services (no per-directory state): Account, Auth, Installation, Truncate
1212
- Instance-scoped (per-directory state via InstanceState): File, FileTime, FileWatcher, Format, Permission, Question, Skill, Snapshot, Vcs, ProviderAuth
@@ -46,7 +46,7 @@ export namespace Foo {
4646
export const defaultLayer = layer.pipe(Layer.provide(FooDep.layer))
4747

4848
// Per-service runtime (inside the namespace)
49-
const runPromise = makeRunPromise(Service, defaultLayer)
49+
const { runPromise } = makeRuntime(Service, defaultLayer)
5050

5151
// Async facade functions
5252
export async function get(id: FooID) {

packages/opencode/src/account/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Clock, Duration, Effect, Layer, Option, Schema, SchemaGetter, ServiceMap } from "effect"
22
import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
33

4-
import { makeRunPromise } from "@/effect/run-service"
4+
import { makeRuntime } from "@/effect/run-service"
55
import { withTransientReadRetry } from "@/util/effect-http-client"
66
import { AccountRepo, type AccountRow } from "./repo"
77
import {
@@ -379,7 +379,7 @@ export namespace Account {
379379

380380
export const defaultLayer = layer.pipe(Layer.provide(AccountRepo.layer), Layer.provide(FetchHttpClient.layer))
381381

382-
export const runPromise = makeRunPromise(Service, defaultLayer)
382+
export const { runPromise } = makeRuntime(Service, defaultLayer)
383383

384384
export async function active(): Promise<Info | undefined> {
385385
return Option.getOrUndefined(await runPromise((service) => service.active()))

packages/opencode/src/auth/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import path from "path"
22
import { Effect, Layer, Record, Result, Schema, ServiceMap } from "effect"
3-
import { makeRunPromise } from "@/effect/run-service"
3+
import { makeRuntime } from "@/effect/run-service"
44
import { zod } from "@/util/effect-zod"
55
import { Global } from "../global"
66
import { Filesystem } from "../util/filesystem"
@@ -95,7 +95,7 @@ export namespace Auth {
9595
}),
9696
)
9797

98-
const runPromise = makeRunPromise(Service, layer)
98+
const { runPromise } = makeRuntime(Service, layer)
9999

100100
export async function get(providerID: string) {
101101
return runPromise((service) => service.get(providerID))

packages/opencode/src/bus/index.ts

Lines changed: 112 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import z from "zod"
2+
import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect"
23
import { Log } from "../util/log"
34
import { Instance } from "../project/instance"
45
import { BusEvent } from "./bus-event"
56
import { GlobalBus } from "./global"
7+
import { InstanceState } from "@/effect/instance-state"
8+
import { makeRuntime } from "@/effect/run-service"
69

710
export namespace Bus {
811
const log = Log.create({ service: "bus" })
9-
type Subscription = (event: any) => void
1012

1113
export const InstanceDisposed = BusEvent.define(
1214
"server.instance.disposed",
@@ -15,91 +17,129 @@ export namespace Bus {
1517
}),
1618
)
1719

18-
const state = Instance.state(
19-
() => {
20-
const subscriptions = new Map<any, Subscription[]>()
20+
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
21+
type: D["type"]
22+
properties: z.infer<D["properties"]>
23+
}
24+
25+
type State = {
26+
wildcard: PubSub.PubSub<Payload>
27+
typed: Map<string, PubSub.PubSub<Payload>>
28+
}
29+
30+
export interface Interface {
31+
readonly publish: <D extends BusEvent.Definition>(
32+
def: D,
33+
properties: z.output<D["properties"]>,
34+
) => Effect.Effect<void>
35+
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
36+
readonly subscribeAll: () => Stream.Stream<Payload>
37+
}
38+
39+
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Bus") {}
40+
41+
export const layer = Layer.effect(
42+
Service,
43+
Effect.gen(function* () {
44+
const cache = yield* InstanceState.make<State>(
45+
Effect.fn("Bus.state")(function* (ctx) {
46+
const wildcard = yield* PubSub.unbounded<Payload>()
47+
const typed = new Map<string, PubSub.PubSub<Payload>>()
48+
49+
yield* Effect.addFinalizer(() =>
50+
Effect.gen(function* () {
51+
// Publish InstanceDisposed before shutting down so subscribers see it
52+
yield* PubSub.publish(wildcard, {
53+
type: InstanceDisposed.type,
54+
properties: { directory: ctx.directory },
55+
})
56+
yield* PubSub.shutdown(wildcard)
57+
for (const ps of typed.values()) {
58+
yield* PubSub.shutdown(ps)
59+
}
60+
}),
61+
)
62+
63+
return { wildcard, typed }
64+
}),
65+
)
66+
67+
function getOrCreate(state: State, type: string) {
68+
return Effect.gen(function* () {
69+
let ps = state.typed.get(type)
70+
if (!ps) {
71+
ps = yield* PubSub.unbounded<Payload>()
72+
state.typed.set(type, ps)
73+
}
74+
return ps
75+
})
76+
}
2177

22-
return {
23-
subscriptions,
78+
function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
79+
return Effect.gen(function* () {
80+
const state = yield* InstanceState.get(cache)
81+
const payload: Payload = { type: def.type, properties }
82+
log.info("publishing", { type: def.type })
83+
84+
const ps = state.typed.get(def.type)
85+
if (ps) yield* PubSub.publish(ps, payload)
86+
yield* PubSub.publish(state.wildcard, payload)
87+
88+
GlobalBus.emit("event", {
89+
directory: Instance.directory,
90+
payload,
91+
})
92+
})
2493
}
25-
},
26-
async (entry) => {
27-
const wildcard = entry.subscriptions.get("*")
28-
if (!wildcard) return
29-
const event = {
30-
type: InstanceDisposed.type,
31-
properties: {
32-
directory: Instance.directory,
33-
},
94+
95+
function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
96+
log.info("subscribing", { type: def.type })
97+
return Stream.unwrap(
98+
Effect.gen(function* () {
99+
const state = yield* InstanceState.get(cache)
100+
const ps = yield* getOrCreate(state, def.type)
101+
return Stream.fromPubSub(ps) as Stream.Stream<Payload<D>>
102+
}),
103+
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
34104
}
35-
for (const sub of [...wildcard]) {
36-
sub(event)
105+
106+
function subscribeAll(): Stream.Stream<Payload> {
107+
log.info("subscribing", { type: "*" })
108+
return Stream.unwrap(
109+
Effect.gen(function* () {
110+
const state = yield* InstanceState.get(cache)
111+
return Stream.fromPubSub(state.wildcard)
112+
}),
113+
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))))
37114
}
38-
},
115+
116+
return Service.of({ publish, subscribe, subscribeAll })
117+
}),
39118
)
40119

41-
export async function publish<Definition extends BusEvent.Definition>(
42-
def: Definition,
43-
properties: z.output<Definition["properties"]>,
44-
) {
45-
const payload = {
46-
type: def.type,
47-
properties,
48-
}
49-
log.info("publishing", {
50-
type: def.type,
51-
})
52-
const pending = []
53-
for (const key of [def.type, "*"]) {
54-
const match = [...(state().subscriptions.get(key) ?? [])]
55-
for (const sub of match) {
56-
pending.push(sub(payload))
57-
}
58-
}
59-
GlobalBus.emit("event", {
60-
directory: Instance.directory,
61-
payload,
62-
})
63-
return Promise.all(pending)
120+
const { runPromise, runCallback } = makeRuntime(Service, layer)
121+
122+
function forkStream<T>(streamFn: (svc: Interface) => Stream.Stream<T>, callback: (msg: T) => void) {
123+
return runCallback((svc) =>
124+
streamFn(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))),
125+
)
64126
}
65127

66-
export function subscribe<Definition extends BusEvent.Definition>(
67-
def: Definition,
68-
callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
128+
export async function publish<D extends BusEvent.Definition>(
129+
def: D,
130+
properties: z.output<D["properties"]>,
69131
) {
70-
return raw(def.type, callback)
132+
return runPromise((svc) => svc.publish(def, properties))
71133
}
72134

73-
export function once<Definition extends BusEvent.Definition>(
74-
def: Definition,
75-
callback: (event: {
76-
type: Definition["type"]
77-
properties: z.infer<Definition["properties"]>
78-
}) => "done" | undefined,
135+
export function subscribe<D extends BusEvent.Definition>(
136+
def: D,
137+
callback: (event: { type: D["type"]; properties: z.infer<D["properties"]> }) => void,
79138
) {
80-
const unsub = subscribe(def, (event) => {
81-
if (callback(event)) unsub()
82-
})
139+
return forkStream((svc) => svc.subscribe(def), callback)
83140
}
84141

85142
export function subscribeAll(callback: (event: any) => void) {
86-
return raw("*", callback)
87-
}
88-
89-
function raw(type: string, callback: (event: any) => void) {
90-
log.info("subscribing", { type })
91-
const subscriptions = state().subscriptions
92-
let match = subscriptions.get(type) ?? []
93-
match.push(callback)
94-
subscriptions.set(type, match)
95-
96-
return () => {
97-
log.info("unsubscribing", { type })
98-
const match = subscriptions.get(type)
99-
if (!match) return
100-
const index = match.indexOf(callback)
101-
if (index === -1) return
102-
match.splice(index, 1)
103-
}
143+
return forkStream((svc) => svc.subscribeAll(), callback)
104144
}
105145
}

packages/opencode/src/command/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { BusEvent } from "@/bus/bus-event"
22
import { InstanceState } from "@/effect/instance-state"
3-
import { makeRunPromise } from "@/effect/run-service"
3+
import { makeRuntime } from "@/effect/run-service"
44
import { SessionID, MessageID } from "@/session/schema"
55
import { Effect, Layer, ServiceMap } from "effect"
66
import z from "zod"
@@ -173,7 +173,7 @@ export namespace Command {
173173
}),
174174
)
175175

176-
const runPromise = makeRunPromise(Service, layer)
176+
const { runPromise } = makeRuntime(Service, layer)
177177

178178
export async function get(name: string) {
179179
return runPromise((svc) => svc.get(name))

packages/opencode/src/effect/run-service.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@ import * as ServiceMap from "effect/ServiceMap"
33

44
export const memoMap = Layer.makeMemoMapUnsafe()
55

6-
export function makeRunPromise<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
6+
export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
77
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
8+
const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap }))
89

9-
return <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) => {
10-
rt ??= ManagedRuntime.make(layer, { memoMap })
11-
return rt.runPromise(service.use(fn), options)
10+
return {
11+
runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
12+
getRuntime().runPromise(service.use(fn), options),
13+
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
14+
runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) =>
15+
getRuntime().runCallback(service.use(fn)),
1216
}
1317
}
18+

packages/opencode/src/file/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { BusEvent } from "@/bus/bus-event"
22
import { InstanceState } from "@/effect/instance-state"
3-
import { makeRunPromise } from "@/effect/run-service"
3+
import { makeRuntime } from "@/effect/run-service"
44
import { git } from "@/util/git"
55
import { Effect, Fiber, Layer, Scope, ServiceMap } from "effect"
66
import { formatPatch, structuredPatch } from "diff"
@@ -688,7 +688,7 @@ export namespace File {
688688
}),
689689
)
690690

691-
const runPromise = makeRunPromise(Service, layer)
691+
const { runPromise } = makeRuntime(Service, layer)
692692

693693
export function init() {
694694
return runPromise((svc) => svc.init())

packages/opencode/src/file/time.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { DateTime, Effect, Layer, Semaphore, ServiceMap } from "effect"
22
import { InstanceState } from "@/effect/instance-state"
3-
import { makeRunPromise } from "@/effect/run-service"
3+
import { makeRuntime } from "@/effect/run-service"
44
import { Flag } from "@/flag/flag"
55
import type { SessionID } from "@/session/schema"
66
import { Filesystem } from "../util/filesystem"
@@ -108,7 +108,7 @@ export namespace FileTime {
108108
}),
109109
).pipe(Layer.orDie)
110110

111-
const runPromise = makeRunPromise(Service, layer)
111+
const { runPromise } = makeRuntime(Service, layer)
112112

113113
export function read(sessionID: SessionID, file: string) {
114114
return runPromise((s) => s.read(sessionID, file))

packages/opencode/src/file/watcher.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import z from "zod"
88
import { Bus } from "@/bus"
99
import { BusEvent } from "@/bus/bus-event"
1010
import { InstanceState } from "@/effect/instance-state"
11-
import { makeRunPromise } from "@/effect/run-service"
11+
import { makeRuntime } from "@/effect/run-service"
1212
import { Flag } from "@/flag/flag"
1313
import { Instance } from "@/project/instance"
1414
import { git } from "@/util/git"
@@ -159,7 +159,7 @@ export namespace FileWatcher {
159159
}),
160160
)
161161

162-
const runPromise = makeRunPromise(Service, layer)
162+
const { runPromise } = makeRuntime(Service, layer)
163163

164164
export function init() {
165165
return runPromise((svc) => svc.init())

0 commit comments

Comments
 (0)