Skip to content

Commit 36cec49

Browse files
committed
Fix flushing stdout/stderr after cell idle
Add a 50ms delay before finalizing cell execution to avoid dropping trailing stdout/stderr events, since output flushes every 10ms. The finalize step is deferred via a runtime fork and captures the lastRunId. If a new execution starts before finalization, the runId comparison will detect the change and skip stale finalization.
1 parent 8dc6854 commit 36cec49

File tree

1 file changed

+65
-23
lines changed

1 file changed

+65
-23
lines changed

extension/src/services/ExecutionRegistry.ts

Lines changed: 65 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
1-
import { type Brand, Data, Effect, HashMap, Option, Ref } from "effect";
1+
import {
2+
type Brand,
3+
Data,
4+
Effect,
5+
FiberSet,
6+
HashMap,
7+
Option,
8+
Ref,
9+
String,
10+
} from "effect";
211
import type * as vscode from "vscode";
312
import { assert } from "../assert.ts";
413
import {
@@ -17,6 +26,7 @@ export class ExecutionRegistry extends Effect.Service<ExecutionRegistry>()(
1726
scoped: Effect.gen(function* () {
1827
const code = yield* VsCode;
1928
const ref = yield* Ref.make(HashMap.empty<NotebookCellId, CellEntry>());
29+
2030
yield* Effect.addFinalizer(() =>
2131
Ref.update(ref, (map) => {
2232
HashMap.forEach(map, (entry) =>
@@ -25,23 +35,20 @@ export class ExecutionRegistry extends Effect.Service<ExecutionRegistry>()(
2535
return HashMap.empty();
2636
}),
2737
);
38+
const runFork = yield* FiberSet.makeRuntime();
2839
return {
29-
handleInterrupted: Effect.fnUntraced(function* (
30-
editor: vscode.NotebookEditor,
31-
) {
32-
yield* Ref.update(ref, (map) =>
33-
HashMap.map(map, (cell) => {
34-
if (
35-
cell.editor === editor &&
36-
Option.isSome(cell.pendingExecution)
37-
) {
38-
return CellEntry.interrupt(cell);
39-
} else {
40-
return cell;
41-
}
42-
}),
40+
handleInterrupted(editor: vscode.NotebookEditor) {
41+
return Ref.update(ref, (map) =>
42+
HashMap.map(map, (cell) =>
43+
Option.match(cell.pendingExecution, {
44+
onSome: () => cell.editor === editor,
45+
onNone: () => false,
46+
})
47+
? CellEntry.interrupt(cell)
48+
: cell,
49+
),
4350
);
44-
}),
51+
},
4552
handleCellOperation: Effect.fnUntraced(function* (
4653
msg: CellMessage,
4754
deps: {
@@ -106,13 +113,45 @@ export class ExecutionRegistry extends Effect.Service<ExecutionRegistry>()(
106113
}
107114
// MUST modify cell output before `ExecutionHandle.end`
108115
yield* CellEntry.maybeUpdateCellOutput(cell, code);
109-
yield* Ref.update(ref, (map) =>
110-
HashMap.set(
111-
map,
112-
cellId,
113-
CellEntry.end(cell, true, msg.timestamp),
114-
),
115-
);
116+
117+
{
118+
// FIXME: stdin/stdout are flushed every 10ms, so wait 50ms
119+
// to ensure all related events arrive before finalizing.
120+
//
121+
// marimo doesn't set a run_id for idle messages, so we can't compare
122+
// against the incoming message to detect if a new execution has started.
123+
//
124+
// Ref: https://github.com/marimo-team/marimo/blob/3644b6f/marimo/_messaging/ops.py#L148-L151
125+
//
126+
// Instead, we capture the `lastRunId` before the timeout and compare it
127+
// when finalizing. If a new execution starts before the timeout fires,
128+
// the `lastRunId` will have changed and we skip finalization.
129+
const lastRunId = cell.lastRunId;
130+
const finalize = Ref.update(ref, (map) => {
131+
const fresh = HashMap.get(map, cellId);
132+
133+
if (Option.isNone(fresh)) {
134+
return map;
135+
}
136+
137+
const isDifferentRun = !Option.getEquivalence(
138+
String.Equivalence,
139+
)(fresh.value.lastRunId, lastRunId);
140+
141+
if (isDifferentRun) {
142+
return map;
143+
}
144+
145+
return HashMap.set(
146+
map,
147+
cellId,
148+
CellEntry.end(fresh.value, true, msg.timestamp),
149+
);
150+
});
151+
152+
setTimeout(() => runFork(finalize), 50);
153+
}
154+
116155
yield* Effect.logDebug("Cell execution completed").pipe(
117156
Effect.annotateLogs({ cellId }),
118157
);
@@ -172,13 +211,15 @@ class CellEntry extends Data.TaggedClass("CellEntry")<{
172211
readonly state: CellRuntimeState;
173212
readonly editor: vscode.NotebookEditor;
174213
readonly pendingExecution: Option.Option<ExecutionHandle>;
214+
readonly lastRunId: Option.Option<RunId>;
175215
}> {
176216
static make(id: NotebookCellId, editor: vscode.NotebookEditor) {
177217
return new CellEntry({
178218
id,
179219
editor,
180220
state: createCellRuntimeState(),
181221
pendingExecution: Option.none(),
222+
lastRunId: Option.none(),
182223
});
183224
}
184225
static transition(cell: CellEntry, message: CellMessage) {
@@ -190,6 +231,7 @@ class CellEntry extends Data.TaggedClass("CellEntry")<{
190231
static withExecution(cell: CellEntry, execution: ExecutionHandle) {
191232
return new CellEntry({
192233
...cell,
234+
lastRunId: Option.some(execution.runId),
193235
pendingExecution: Option.some(execution),
194236
});
195237
}

0 commit comments

Comments
 (0)