feat: ForwardPassMetrics dynamo event plane integration#7250
Conversation
Signed-off-by: hongkuanz <hongkuanz@nvidia.com>
WalkthroughThis change introduces a complete forward-pass metrics (FPM) infrastructure that bridges local ZMQ metrics from vLLM to the Dynamo event plane. The implementation includes Rust bindings for relay and subscriber components, Python utilities for metrics consumption, vLLM integration for lifecycle management, and architectural documentation describing the two-layer event flow system. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PeaBrane
left a comment
There was a problem hiding this comment.
Clean integration that follows the existing KV event relay patterns well. One minor nit on cleanup consistency.
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
components/src/dynamo/vllm/handlers.py (1)
879-882: Keep the shutdown traceback here.Line 881-882 logs only
str(e), which makes relay teardown failures much harder to debug across the Python/Rust boundary. Preferlogger.exception(...)orexc_info=Trueso the full stack is preserved.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/src/dynamo/vllm/handlers.py` around lines 879 - 882, The current except block around relay.shutdown() only logs str(e) which loses the traceback; update the handler catching Exception in the shutdown try/except so it logs the full exception context (either call logger.exception("Failed to shut down FPM relay") or logger.warning("Failed to shut down FPM relay", exc_info=True)) instead of logger.warning(f"Failed to shut down FPM relay: {e}"), leaving the message text intact and ensuring the stacktrace from relay.shutdown() is preserved for debugging.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/src/dynamo/common/recv_forward_pass_metrics.py`:
- Around line 60-62: The runtime is being created with enable_nats forcibly set
to False which breaks topologies where the event plane uses NATS; update the
DistributedRuntime instantiation (the runtime variable / DistributedRuntime
constructor call) to stop forcing enable_nats=False — either remove that
argument so the binding uses its default behavior, or wire an explicit
CLI/runtime flag into the constructor (e.g., pass args.enable_nats) so callers
can control NATS instead of hardcoding False.
In `@components/src/dynamo/vllm/main.py`:
- Around line 724-727: The FPM relays are being started for engines built from
snapshots without a seeded worker ID, causing blank worker_id in
ForwardPassMetrics; update the checkpoint/snapshot path in worker()—where
snapshot_engine is created via setup_vllm_engine()—to either seed fpm_worker_id
before engine construction (pass the fpm_worker_id into setup_vllm_engine) or
skip calling setup_fpm_relay entirely for snapshot-created engines;
specifically, ensure the code that currently calls
setup_fpm_relay(generate_endpoint, vllm_config) and assigns handler.fpm_relays
only runs when a valid fpm_worker_id is present (or when the engine was not
created from a snapshot), or alternatively inject fpm_worker_id into
setup_vllm_engine so subsequent setup_fpm_relay sees a non-empty worker id
(affecting fpm_relays, setup_fpm_relay, handler.fpm_relays, snapshot_engine,
worker(), setup_vllm_engine, and fpm_worker_id).
In `@lib/bindings/python/rust/llm/fpm.rs`:
- Around line 68-69: Change the unbounded handoff to a bounded channel to
prevent unbounded memory growth: replace the rx field type
Arc<std::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>>> with
Arc<std::sync::Mutex<tokio::sync::mpsc::Receiver<Vec<u8>>>> (or the equivalent
bounded receiver type), create the channel with
tokio::sync::mpsc::channel(CAPACITY) instead of unbounded_channel in the
producer/initialization code (the site currently calling unbounded_channel
around line 86), and update the producer send logic to use .send().await
(backpressure) or an explicit overflow policy (e.g., call try_send() and drop
oldest when Err(TrySendError::Full) if you prefer a drop policy); also adjust
any recv/send call sites and error handling to handle the bounded channel
semantics and closed-channel errors while keeping the CancellationToken usage
unchanged.
In `@lib/llm/src/fpm_publisher.rs`:
- Around line 89-103: The code currently accepts any ZMQ multipart message with
frames.len() >= 3 and silently discards extra frames; update the validation so
only messages with exactly 3 frames are accepted: check frames.len() == 3,
extract the payload only when that is true (instead of swap_remove on index 2
for >=3), and treat any other frame count as a malformed message (log/warn and
skip publishing via publisher.publish_bytes). Ensure you reference the existing
frames Vec<Vec<u8>> handling and the call to
publisher.publish_bytes(payload).await when implementing this strict check.
---
Nitpick comments:
In `@components/src/dynamo/vllm/handlers.py`:
- Around line 879-882: The current except block around relay.shutdown() only
logs str(e) which loses the traceback; update the handler catching Exception in
the shutdown try/except so it logs the full exception context (either call
logger.exception("Failed to shut down FPM relay") or logger.warning("Failed to
shut down FPM relay", exc_info=True)) instead of logger.warning(f"Failed to shut
down FPM relay: {e}"), leaving the message text intact and ensuring the
stacktrace from relay.shutdown() is preserved for debugging.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 507f52c2-8883-4eb3-8ab5-fc1e5c9740ff
📒 Files selected for processing (11)
components/src/dynamo/common/forward_pass_metrics.pycomponents/src/dynamo/common/recv_forward_pass_metrics.pycomponents/src/dynamo/vllm/handlers.pycomponents/src/dynamo/vllm/main.pylib/bindings/python/rust/lib.rslib/bindings/python/rust/llm.rslib/bindings/python/rust/llm/fpm.rslib/bindings/python/src/dynamo/_core.pyilib/bindings/python/src/dynamo/llm/__init__.pylib/llm/src/fpm_publisher.rslib/llm/src/lib.rs
Signed-off-by: hongkuanz <hongkuanz@nvidia.com>
Signed-off-by: hongkuanz <hongkuanz@nvidia.com>
|
On naming, FPM is not a well-understood acronym yet. |
Signed-off-by: hongkuanz <hongkuanz@nvidia.com>
FPM Event Plane Integration
Summary
Adds a Dynamo event plane relay for ForwardPassMetrics, enabling automatic discovery and transport-agnostic delivery (NATS Core / ZMQ). Follows the same two-layer bridge architecture as KV events.
Motivation
The initial FPM implementation (merged in the prior PR) uses raw ZMQ PUB/SUB with manual port configuration. This works for local testing but lacks:
This PR bridges the gap by adding a relay in the parent process that subscribes to the child's raw ZMQ and re-publishes to the Dynamo event plane, exactly like KV events do.
Architecture
The raw ZMQ hop is needed because the scheduler runs in a forked child process without access to the Dynamo runtime. The
FpmEventRelaybridge in the parent process handles event plane transport and discovery registration automatically.Why two layers (same as KV events)
vLLM's
EngineCoreruns in a forked child process. Afterfork(), the child has no access to the parent's Dynamo runtime (tokio executor, NATS connections, etcd leases, K8s watchers don't survive fork). The child can only open new raw sockets. The parent process has the runtime and acts as the bridge.Changes
Rust:
dynamo-llmcratelib/llm/src/fpm_publisher.rs(new)FpmEventRelay-- subscribes to a local ZMQ PUB socket and relays raw bytes toEventPublisher::for_component("forward-pass-metrics"). Simpler thanKvEventPublisher(no event transformation, no batching, no local indexer). Includes:CancellationTokenfor graceful shutdownDropimpl that cancels the relay taskRust:
dynamo-py3crate (Python bindings)lib/bindings/python/rust/llm/fpm.rs(new)FpmEventRelay-- thin#[pyclass]wrapper arounddynamo_llm::fpm_publisher::FpmEventRelayFpmEventSubscriber-- wrapsEventSubscriber::for_componentfor the consumer side. Background tokio task feeds anmpscchannel;recv()releases the GIL while blocking. Exits cleanly when the receiver is dropped.Python
components/src/dynamo/vllm/main.pysetup_fpm_relay()that creates oneFpmEventRelayper dp_rank (parallel tosetup_kv_event_publisher())init_prefill()andinit()pathsDYN_VLLM_FORWARDPASS_METRIC_PORTis setcomponents/src/dynamo/vllm/handlers.pyself.fpm_relaysfield on handler classcleanup()to prevent resource leakscomponents/src/dynamo/common/recv_forward_pass_metrics.py(new)Standalone consumer that uses
FpmEventSubscriberwith the Dynamo runtime for auto-discovery. Usage:lib/bindings/python/src/dynamo/llm/__init__.pyRe-exports
FpmEventRelayandFpmEventSubscriberfrom_core.Files not changed
instrumented_scheduler.py-- stays as-is (raw ZMQ PUB from child process)forward_pass_metrics.py-- schema unchangedenvs.py--DYN_VLLM_FORWARDPASS_METRIC_PORTstill needed for child-to-parent ZMQargs.py-- injection logic unchangedDiscovery
Each
FpmEventRelaycreates anEventPublisher::for_component("forward-pass-metrics")which automatically:DiscoveryInstance::EventChannelwithtopic: "forward-pass-metrics"in the discovery planeSubscribers use
EventSubscriber::for_component("forward-pass-metrics")which watches discovery and dynamically connects/disconnects as engines come and go. Works across all discovery backends (K8s CRD, etcd, file, mem).Testing
Summary by CodeRabbit
Release Notes
New Features
Chores