Skip to content

Commit 3e7010f

Browse files
committed
Lint
1 parent 25a5944 commit 3e7010f

File tree

5 files changed

+87
-82
lines changed

5 files changed

+87
-82
lines changed

libs/langgraph/langgraph/pregel/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2236,8 +2236,6 @@ def _defaults(
22362236
stream_mode = ["values"]
22372237
elif stream_mode is None:
22382238
stream_mode = self.stream_mode
2239-
elif stream_mode == "debug":
2240-
stream_mode = ["checkpoints", "tasks"]
22412239
if not isinstance(stream_mode, list):
22422240
stream_mode = [stream_mode]
22432241
if self.checkpointer is False:

libs/langgraph/langgraph/pregel/debug.py

Lines changed: 45 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from collections import defaultdict
44
from collections.abc import Iterable, Iterator, Mapping, Sequence
55
from dataclasses import asdict
6-
from datetime import datetime, timezone
76
from pprint import pformat
87
from typing import (
98
Any,
@@ -17,7 +16,7 @@
1716
from typing_extensions import TypedDict
1817

1918
from langgraph.channels.base import BaseChannel
20-
from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, PendingWrite
19+
from langgraph.checkpoint.base import CheckpointMetadata, PendingWrite
2120
from langgraph.constants import (
2221
CONF,
2322
CONFIG_KEY_CHECKPOINT_NS,
@@ -92,30 +91,21 @@ class DebugOutputCheckpoint(DebugOutputBase):
9291
TASK_NAMESPACE = UUID("6ba7b831-9dad-11d1-80b4-00c04fd430c8")
9392

9493

95-
def map_debug_tasks(
96-
step: int, tasks: Iterable[PregelExecutableTask]
97-
) -> Iterator[DebugOutputTask]:
94+
def map_debug_tasks(tasks: Iterable[PregelExecutableTask]) -> Iterator[DebugOutputTask]:
9895
"""Produce "task" events for stream_mode=debug."""
99-
ts = datetime.now(timezone.utc).isoformat()
10096
for task in tasks:
10197
if task.config is not None and TAG_HIDDEN in task.config.get("tags", []):
10298
continue
10399

104100
yield {
105-
"type": "task",
106-
"timestamp": ts,
107-
"step": step,
108-
"payload": {
109-
"id": task.id,
110-
"name": task.name,
111-
"input": task.input,
112-
"triggers": task.triggers,
113-
},
101+
"id": task.id,
102+
"name": task.name,
103+
"input": task.input,
104+
"triggers": task.triggers,
114105
}
115106

116107

117108
def map_debug_task_results(
118-
step: int,
119109
task_tup: tuple[PregelExecutableTask, Sequence[tuple[str, Any]]],
120110
stream_keys: str | Sequence[str],
121111
) -> Iterator[DebugOutputTaskResult]:
@@ -125,23 +115,16 @@ def map_debug_task_results(
125115
)
126116
task, writes = task_tup
127117
yield {
128-
"type": "task_result",
129-
"timestamp": datetime.now(timezone.utc).isoformat(),
130-
"step": step,
131-
"payload": {
132-
"id": task.id,
133-
"name": task.name,
134-
"error": next((w[1] for w in writes if w[0] == ERROR), None),
135-
"result": [
136-
w for w in writes if w[0] in stream_channels_list or w[0] == RETURN
137-
],
138-
"interrupts": [
139-
asdict(v)
140-
for w in writes
141-
if w[0] == INTERRUPT
142-
for v in (w[1] if isinstance(w[1], Sequence) else [w[1]])
143-
],
144-
},
118+
"id": task.id,
119+
"name": task.name,
120+
"error": next((w[1] for w in writes if w[0] == ERROR), None),
121+
"result": [w for w in writes if w[0] in stream_channels_list or w[0] == RETURN],
122+
"interrupts": [
123+
asdict(v)
124+
for w in writes
125+
if w[0] == INTERRUPT
126+
for v in (w[1] if isinstance(w[1], Sequence) else [w[1]])
127+
],
145128
}
146129

147130

@@ -159,12 +142,10 @@ def rm_pregel_keys(config: RunnableConfig | None) -> RunnableConfig | None:
159142

160143

161144
def map_debug_checkpoint(
162-
step: int,
163145
config: RunnableConfig,
164146
channels: Mapping[str, BaseChannel],
165147
stream_channels: str | Sequence[str],
166148
metadata: CheckpointMetadata,
167-
checkpoint: Checkpoint,
168149
tasks: Iterable[PregelExecutableTask],
169150
pending_writes: list[PendingWrite],
170151
parent_config: RunnableConfig | None,
@@ -193,42 +174,35 @@ def map_debug_checkpoint(
193174
}
194175

195176
yield {
196-
"type": "checkpoint",
197-
"timestamp": checkpoint["ts"],
198-
"step": step,
199-
"payload": {
200-
"config": rm_pregel_keys(patch_checkpoint_map(config, metadata)),
201-
"parent_config": rm_pregel_keys(
202-
patch_checkpoint_map(parent_config, metadata)
203-
),
204-
"values": read_channels(channels, stream_channels),
205-
"metadata": metadata,
206-
"next": [t.name for t in tasks],
207-
"tasks": [
208-
{
209-
"id": t.id,
210-
"name": t.name,
211-
"error": t.error,
212-
"state": t.state,
213-
}
214-
if t.error
215-
else {
216-
"id": t.id,
217-
"name": t.name,
218-
"result": t.result,
219-
"interrupts": tuple(asdict(i) for i in t.interrupts),
220-
"state": t.state,
221-
}
222-
if t.result
223-
else {
224-
"id": t.id,
225-
"name": t.name,
226-
"interrupts": tuple(asdict(i) for i in t.interrupts),
227-
"state": t.state,
228-
}
229-
for t in tasks_w_writes(tasks, pending_writes, task_states, output_keys)
230-
],
231-
},
177+
"config": rm_pregel_keys(patch_checkpoint_map(config, metadata)),
178+
"parent_config": rm_pregel_keys(patch_checkpoint_map(parent_config, metadata)),
179+
"values": read_channels(channels, stream_channels),
180+
"metadata": metadata,
181+
"next": [t.name for t in tasks],
182+
"tasks": [
183+
{
184+
"id": t.id,
185+
"name": t.name,
186+
"error": t.error,
187+
"state": t.state,
188+
}
189+
if t.error
190+
else {
191+
"id": t.id,
192+
"name": t.name,
193+
"result": t.result,
194+
"interrupts": tuple(asdict(i) for i in t.interrupts),
195+
"state": t.state,
196+
}
197+
if t.result
198+
else {
199+
"id": t.id,
200+
"name": t.name,
201+
"interrupts": tuple(asdict(i) for i in t.interrupts),
202+
"state": t.state,
203+
}
204+
for t in tasks_w_writes(tasks, pending_writes, task_states, output_keys)
205+
],
232206
}
233207

234208

libs/langgraph/langgraph/pregel/loop.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
AsyncExitStack,
1212
ExitStack,
1313
)
14+
from datetime import datetime, timezone
1415
from inspect import signature
1516
from types import TracebackType
1617
from typing import (
@@ -423,7 +424,7 @@ def accept_push(
423424
),
424425
):
425426
# produce debug output
426-
self._emit("tasks", map_debug_tasks, self.step, [pushed])
427+
self._emit("tasks", map_debug_tasks, [pushed])
427428
# debug flag
428429
if self.debug:
429430
print_step_tasks(self.step, [pushed])
@@ -475,7 +476,6 @@ def tick(self) -> bool:
475476
self._emit(
476477
"checkpoints",
477478
map_debug_checkpoint,
478-
self.step - 1, # printing checkpoint for previous step
479479
{
480480
**self.checkpoint_config,
481481
CONF: {
@@ -486,7 +486,6 @@ def tick(self) -> bool:
486486
self.channels,
487487
self.stream_keys,
488488
self.checkpoint_metadata,
489-
self.checkpoint,
490489
self.tasks.values(),
491490
self.checkpoint_pending_writes,
492491
self.prev_checkpoint_config,
@@ -510,7 +509,7 @@ def tick(self) -> bool:
510509
raise GraphInterrupt()
511510

512511
# produce debug output
513-
self._emit("tasks", map_debug_tasks, self.step, self.tasks.values())
512+
self._emit("tasks", map_debug_tasks, self.tasks.values())
514513

515514
# debug flag
516515
if self.debug:
@@ -842,10 +841,32 @@ def _emit(
842841
) -> None:
843842
if self.stream is None:
844843
return
845-
if mode not in self.stream.modes:
844+
debug_remap = mode in ("checkpoints", "tasks") and "debug" in self.stream.modes
845+
if mode not in self.stream.modes and not debug_remap:
846846
return
847847
for v in values(*args, **kwargs):
848-
self.stream((self.checkpoint_ns, mode, v))
848+
if mode in self.stream.modes:
849+
self.stream((self.checkpoint_ns, mode, v))
850+
# "debug" mode is "checkpoints" or "tasks" with a wrapper dict
851+
if debug_remap:
852+
self.stream(
853+
(
854+
self.checkpoint_ns,
855+
"debug",
856+
{
857+
"step": self.step - 1
858+
if mode == "checkpoints"
859+
else self.step,
860+
"timestamp": datetime.now(timezone.utc).isoformat(),
861+
"type": "checkpoint"
862+
if mode == "checkpoints"
863+
else "task_result"
864+
if "result" in v
865+
else "task",
866+
"payload": v,
867+
},
868+
)
869+
)
849870

850871
def output_writes(
851872
self, task_id: str, writes: WritesT, *, cached: bool = False
@@ -888,7 +909,6 @@ def output_writes(
888909
self._emit(
889910
"tasks",
890911
map_debug_task_results,
891-
self.step,
892912
(task, writes),
893913
self.stream_keys,
894914
)

libs/langgraph/langgraph/types.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ class ToolOutputMixin: # type: ignore[no-redef]
4646
- False disables checkpointing, even if the parent graph has a checkpointer.
4747
- None inherits checkpointer from the parent graph."""
4848

49-
StreamMode = Literal["values", "updates", "checkpoints", "tasks", "messages", "custom"]
49+
StreamMode = Literal[
50+
"values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"
51+
]
5052
"""How the stream method should emit outputs.
5153
5254
- `"values"`: Emit all values in the state after each step, including interrupts.
@@ -57,6 +59,7 @@ class ToolOutputMixin: # type: ignore[no-redef]
5759
- `"messages"`: Emit LLM messages token-by-token together with metadata for any LLM invocations inside nodes or tasks.
5860
- `"checkpoints"`: Emit an event when a checkpoint is created, in the same format as returned by get_state().
5961
- `"tasks"`: Emit events when tasks start and finish, including their results and errors.
62+
- `"debug"`: Emit "checlkpoints" and "tasks" events, for debugging purposes.
6063
"""
6164

6265
StreamWriter = Callable[[Any], None]

libs/sdk-py/langgraph_sdk/schema.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,24 @@
3636
"""
3737

3838
StreamMode = Literal[
39-
"values", "messages", "updates", "events", "debug", "custom", "messages-tuple"
39+
"values",
40+
"messages",
41+
"updates",
42+
"events",
43+
"tasks",
44+
"checkpoints",
45+
"debug",
46+
"custom",
47+
"messages-tuple",
4048
]
4149
"""
4250
Defines the mode of streaming:
4351
- "values": Stream only the values.
4452
- "messages": Stream complete messages.
4553
- "updates": Stream updates to the state.
4654
- "events": Stream events occurring during execution.
55+
- "checkpoints": Stream checkpoints as they are created.
56+
- "tasks": Stream task start and finish events.
4757
- "debug": Stream detailed debug information.
4858
- "custom": Stream custom events.
4959
"""

0 commit comments

Comments
 (0)