From dd6b8fa9ed31b13d15c798e5fb26d21d7a40956f Mon Sep 17 00:00:00 2001 From: Tat Dat Duong Date: Fri, 27 Jun 2025 00:00:57 +0200 Subject: [PATCH 1/2] feat(langgraph): add "tasks" and "checkpoints" stream mode Split out the "debug" stream mode into more descriptive "tasks" and "checkpoints" stream mode, while removing the unnecessary wrapper object. Port of https://github.com/langchain-ai/langgraph/pull/5117 --- libs/langgraph/src/pregel/debug.ts | 57 ++++++--------------- libs/langgraph/src/pregel/loop.ts | 51 +++++++++++++------ libs/langgraph/src/pregel/types.ts | 60 ++++++++++++++++++++++- libs/langgraph/src/tests/pregel.test-d.ts | 55 ++++++++++++--------- libs/langgraph/src/utils.ts | 2 + 5 files changed, 143 insertions(+), 82 deletions(-) diff --git a/libs/langgraph/src/pregel/debug.ts b/libs/langgraph/src/pregel/debug.ts index f315225f4..9decdf214 100644 --- a/libs/langgraph/src/pregel/debug.ts +++ b/libs/langgraph/src/pregel/debug.ts @@ -88,10 +88,8 @@ export function* _readChannels( } export function* mapDebugTasks( - step: number, tasks: readonly PregelExecutableTask[] ) { - const ts = new Date().toISOString(); for (const { id, name, input, config, triggers, writes } of tasks) { if (config?.tags?.includes(TAG_HIDDEN)) continue; @@ -102,18 +100,7 @@ export function* mapDebugTasks( .map(([, v]) => { return v; }); - yield { - type: "task", - timestamp: ts, - step, - payload: { - id, - name, - input, - triggers, - interrupts, - }, - }; + yield { id, name, input, triggers, interrupts }; } } @@ -121,27 +108,20 @@ export function* mapDebugTaskResults< N extends PropertyKey, C extends PropertyKey >( - step: number, tasks: readonly [PregelExecutableTask, PendingWrite[]][], streamChannels: PropertyKey | Array ) { - const ts = new Date().toISOString(); for (const [{ id, name, config }, writes] of tasks) { if (config?.tags?.includes(TAG_HIDDEN)) continue; yield { - type: "task_result", - timestamp: ts, - step, - payload: { - id, - name, - result: writes.filter(([channel]) => { - return Array.isArray(streamChannels) - ? streamChannels.includes(channel) - : channel === streamChannels; - }), - interrupts: writes.filter((w) => w[0] === INTERRUPT).map((w) => w[1]), - }, + id, + name, + result: writes.filter(([channel]) => { + return Array.isArray(streamChannels) + ? streamChannels.includes(channel) + : channel === streamChannels; + }), + interrupts: writes.filter((w) => w[0] === INTERRUPT).map((w) => w[1]), }; } } @@ -152,7 +132,6 @@ export function* mapDebugCheckpoint< N extends PropertyKey, C extends PropertyKey >( - step: number, config: RunnableConfig, channels: Record, streamChannels: string | string[], @@ -213,19 +192,13 @@ export function* mapDebugCheckpoint< }; } - const ts = new Date().toISOString(); yield { - type: "checkpoint", - timestamp: ts, - step, - payload: { - config: formatConfig(config), - values: readChannels(channels, streamChannels), - metadata, - next: tasks.map((task) => task.name), - tasks: tasksWithWrites(tasks, pendingWrites, taskStates, outputKeys), - parentConfig: parentConfig ? formatConfig(parentConfig) : undefined, - }, + config: formatConfig(config), + values: readChannels(channels, streamChannels), + metadata, + next: tasks.map((task) => task.name), + tasks: tasksWithWrites(tasks, pendingWrites, taskStates, outputKeys), + parentConfig: parentConfig ? formatConfig(parentConfig) : undefined, }; } diff --git a/libs/langgraph/src/pregel/loop.ts b/libs/langgraph/src/pregel/loop.ts index 25b6591fd..871087c4f 100644 --- a/libs/langgraph/src/pregel/loop.ts +++ b/libs/langgraph/src/pregel/loop.ts @@ -633,8 +633,8 @@ export class PregelLoop { this._emit( gatherIteratorSync( prefixGenerator( - mapDebugTaskResults(this.step, [[task, writes]], this.streamKeys), - "debug" + mapDebugTaskResults([[task, writes]], this.streamKeys), + "tasks" ) ) ); @@ -782,7 +782,6 @@ export class PregelLoop { await gatherIterator( prefixGenerator( mapDebugCheckpoint( - this.step - 1, // printing checkpoint for previous step this.checkpointConfig, this.channels, this.streamKeys, @@ -792,7 +791,7 @@ export class PregelLoop { this.prevCheckpointConfig, this.outputKeys ), - "debug" + "checkpoints" ) ) ); @@ -838,10 +837,7 @@ export class PregelLoop { // Produce debug output const debugOutput = await gatherIterator( - prefixGenerator( - mapDebugTasks(this.step, Object.values(this.tasks)), - "debug" - ) + prefixGenerator(mapDebugTasks(Object.values(this.tasks)), "tasks") ); this._emit(debugOutput); @@ -952,9 +948,7 @@ export class PregelLoop { } this._emit( - gatherIteratorSync( - prefixGenerator(mapDebugTasks(this.step, [pushed]), "debug") - ) + gatherIteratorSync(prefixGenerator(mapDebugTasks([pushed]), "tasks")) ); if (this.debug) printStepTasks(this.step, [pushed]); @@ -1127,9 +1121,38 @@ export class PregelLoop { } protected _emit(values: [StreamMode, unknown][]) { - for (const chunk of values) { - if (this.stream.modes.has(chunk[0])) { - this.stream.push([this.checkpointNamespace, ...chunk]); + for (const [mode, payload] of values) { + if (this.stream.modes.has(mode)) { + this.stream.push([this.checkpointNamespace, mode, payload]); + } + + // debug mode is a "checkpoints" or "tasks" wrapped in an object + // TODO: consider deprecating this in 1.x + if ( + (mode === "checkpoints" || mode === "tasks") && + this.stream.modes.has("debug") + ) { + const step = mode === "checkpoints" ? this.step - 1 : this.step; + const timestamp = new Date().toISOString(); + const type = (() => { + if (mode === "checkpoints") { + return "checkpoint"; + } else if ( + typeof payload === "object" && + payload != null && + "result" in payload + ) { + return "task_result"; + } else { + return "task"; + } + })(); + + this.stream.push([ + this.checkpointNamespace, + "debug", + { step, type, timestamp, payload }, + ]); } } } diff --git a/libs/langgraph/src/pregel/types.ts b/libs/langgraph/src/pregel/types.ts index 77aebebcd..7bb9d469f 100644 --- a/libs/langgraph/src/pregel/types.ts +++ b/libs/langgraph/src/pregel/types.ts @@ -21,7 +21,14 @@ import { LangGraphRunnableConfig } from "./runnable_types.js"; /** * Selects the type of output you'll receive when streaming from the graph. See [Streaming](/langgraphjs/how-tos/#streaming) for more details. */ -export type StreamMode = "values" | "updates" | "debug" | "messages" | "custom"; +export type StreamMode = + | "values" + | "updates" + | "debug" + | "messages" + | "checkpoints" + | "tasks" + | "custom"; // eslint-disable-next-line @typescript-eslint/no-explicit-any export type PregelInputType = any; @@ -38,6 +45,36 @@ type StreamCustomOutput = any; // eslint-disable-next-line @typescript-eslint/no-explicit-any type StreamDebugOutput = Record; +type StreamCheckpointsOutput = { + values: StreamValues; + next: string[]; + config: RunnableConfig; + metadata?: CheckpointMetadata; + parentConfig?: RunnableConfig | undefined; + tasks: PregelTaskDescription[]; +}; + +interface StreamTasksOutputBase { + id: string; + name: string; + interrupts: Interrupt[]; +} + +export interface StreamTasksCreateOutput + extends StreamTasksOutputBase { + input: StreamValues; + triggers: string[]; +} + +export interface StreamTasksResultOutput + extends StreamTasksOutputBase { + result: [Keys, StreamUpdates][]; +} + +export type StreamTasksOutput = + | StreamTasksCreateOutput + | StreamTasksResultOutput; + type DefaultStreamMode = "updates"; export type StreamOutputMap< @@ -45,7 +82,7 @@ export type StreamOutputMap< TStreamSubgraphs extends boolean, StreamUpdates, StreamValues, - Nodes = string + Nodes > = ( undefined extends TStreamMode ? [] @@ -67,6 +104,16 @@ export type StreamOutputMap< ]; messages: [string[], "messages", StreamMessageOutput]; custom: [string[], "custom", StreamCustomOutput]; + checkpoints: [ + string[], + "checkpoints", + StreamCheckpointsOutput + ]; + tasks: [ + string[], + "tasks", + StreamTasksOutput + ]; debug: [string[], "debug", StreamDebugOutput]; }[Multiple] : { @@ -77,6 +124,8 @@ export type StreamOutputMap< ]; messages: ["messages", StreamMessageOutput]; custom: ["custom", StreamCustomOutput]; + checkpoints: ["checkpoints", StreamCheckpointsOutput]; + tasks: ["tasks", StreamTasksOutput]; debug: ["debug", StreamDebugOutput]; }[Multiple] : ( @@ -91,6 +140,11 @@ export type StreamOutputMap< ]; messages: [string[], StreamMessageOutput]; custom: [string[], StreamCustomOutput]; + checkpoints: [string[], StreamCheckpointsOutput]; + tasks: [ + string[], + StreamTasksOutput + ]; debug: [string[], StreamDebugOutput]; }[Single] : { @@ -98,6 +152,8 @@ export type StreamOutputMap< updates: Record; messages: StreamMessageOutput; custom: StreamCustomOutput; + checkpoints: StreamCheckpointsOutput; + tasks: StreamTasksOutput; debug: StreamDebugOutput; }[Single] : never; diff --git a/libs/langgraph/src/tests/pregel.test-d.ts b/libs/langgraph/src/tests/pregel.test-d.ts index 2cacf97aa..a3c388960 100644 --- a/libs/langgraph/src/tests/pregel.test-d.ts +++ b/libs/langgraph/src/tests/pregel.test-d.ts @@ -67,7 +67,10 @@ it("state graph annotation", async () => { expectTypeOf( await gatherIterator( - graph.stream(input, { streamMode: ["values"], subgraphs: true }) + graph.stream(input, { + streamMode: ["values"], + subgraphs: true, + }) ) ).toExtend<[string[], "values", { foo: string[] }][]>(); @@ -83,7 +86,10 @@ it("state graph annotation", async () => { expectTypeOf( await gatherIterator( - graph.stream(input, { streamMode: ["updates"], subgraphs: true }) + graph.stream(input, { + streamMode: ["updates"], + subgraphs: true, + }) ) ).toExtend< [ @@ -145,30 +151,14 @@ it("state graph annotation", async () => { | ["debug", Record] | ["messages", [BaseMessage, Record]] | ["custom", any] - )[] - >(); - - expectTypeOf( - await gatherIterator( - graph.stream(input, { - streamMode: ["updates", "values"] as - | StreamMode - | StreamMode[] - | undefined, - subgraphs: true, - }) - ) - ).toExtend< - ( + | ["checkpoints", { values: { foo: string[] } }] | [ - string[], - "updates", - Record<"one" | "two" | "three", { foo?: string[] | string }> + "tasks", + { id: string; name: string } & ( + | { input: unknown } + | { result: [string, unknown][] } + ) ] - | [string[], "values", { foo: string[] }] - | [string[], "debug", Record] - | [string[], "messages", [BaseMessage, Record]] - | [string[], "custom", any] )[] >(); }); @@ -296,6 +286,14 @@ it("state graph zod", async () => { | ["debug", Record] | ["messages", [BaseMessage, Record]] | ["custom", any] + | ["checkpoints", { values: { foo: string[] } }] + | [ + "tasks", + { id: string; name: string } & ( + | { input: unknown } + | { result: [string, unknown][] } + ) + ] )[] >(); @@ -320,6 +318,15 @@ it("state graph zod", async () => { | [string[], "debug", Record] | [string[], "messages", [BaseMessage, Record]] | [string[], "custom", any] + | [string[], "checkpoints", { values: { foo: string[] } }] + | [ + string[], + "tasks", + { id: string; name: string } & ( + | { input: unknown } + | { result: [string, unknown][] } + ) + ] )[] >(); }); diff --git a/libs/langgraph/src/utils.ts b/libs/langgraph/src/utils.ts index bc587417a..a71ee4970 100644 --- a/libs/langgraph/src/utils.ts +++ b/libs/langgraph/src/utils.ts @@ -103,10 +103,12 @@ export function prefixGenerator( generator: Generator, prefix: Prefix ): Generator<[Prefix, T]>; + export function prefixGenerator( generator: Generator, prefix?: undefined ): Generator; + export function prefixGenerator< T, Prefix extends string | undefined = undefined From 6a7dc805467d7bf698b949a8f712c0d86204d03a Mon Sep 17 00:00:00 2001 From: Tat Dat Duong Date: Fri, 27 Jun 2025 00:06:35 +0200 Subject: [PATCH 2/2] Remove export --- libs/langgraph/src/pregel/types.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/libs/langgraph/src/pregel/types.ts b/libs/langgraph/src/pregel/types.ts index 7bb9d469f..8b15af3ce 100644 --- a/libs/langgraph/src/pregel/types.ts +++ b/libs/langgraph/src/pregel/types.ts @@ -60,18 +60,17 @@ interface StreamTasksOutputBase { interrupts: Interrupt[]; } -export interface StreamTasksCreateOutput - extends StreamTasksOutputBase { +interface StreamTasksCreateOutput extends StreamTasksOutputBase { input: StreamValues; triggers: string[]; } -export interface StreamTasksResultOutput +interface StreamTasksResultOutput extends StreamTasksOutputBase { result: [Keys, StreamUpdates][]; } -export type StreamTasksOutput = +type StreamTasksOutput = | StreamTasksCreateOutput | StreamTasksResultOutput;