Replace custom POSIX SHM with zenoh SHM for node-direct publish#1378
Replace custom POSIX SHM with zenoh SHM for node-direct publish#1378Copilot wants to merge 33 commits into
Conversation
0a8aa22 to
c15cd77
Compare
…add SHM provider, remove DropToken system Co-authored-by: phil-opp <[email protected]>
…ons, fix zenoh 1.7 compatibility Co-authored-by: phil-opp <[email protected]>
…blishing Co-authored-by: phil-opp <[email protected]>
…_addr to NodeConfig Co-authored-by: phil-opp <[email protected]>
…back metadata warning Co-authored-by: phil-opp <[email protected]>
…assing On the receiver side, use payload.as_shm() to get a direct reference to the zenoh shared memory buffer instead of copying bytes via to_bytes(). The ZShm buffer is wrapped in an Arrow Buffer via from_custom_allocation, keeping the SHM region alive while Arrow references the underlying bytes. This eliminates the receiver-side memcpy that was previously required, leaving only the unavoidable sender-side Arrow serialization copy. Non-SHM payloads (e.g. from remote subscribers) fall back to the previous copy-based path. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Wrap the zenoh subscriber async task body in catch_unwind so that a panic doesn't silently kill sample delivery. On panic, the error is logged and a FatalError event is sent through the event channel to surface the failure to the node. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Add doc comments explaining that allocate_data_sample() blocks when the SHM pool is full (via BlockOn<GarbageCollect>), and that users can increase the pool via DORA_NODE_SHM_POOL_SIZE env var. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…tibility Nodes spawned by the daemon (custom binaries) don't have a tokio runtime, causing "there is no reactor running" panics on init. Replace all Handle::try_current() + block_on() calls with zenoh's synchronous Wait trait: session open, subscriber/publisher declaration, and publish. Subscriber receive loops now run on OS threads with blocking recv() instead of tokio tasks. Also skip SHM provider allocation for nodes without outputs. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Allow users to tune the zenoh SHM pool size per-node instead of relying on the hardcoded 64MB default or the DORA_NODE_SHM_POOL_SIZE env var. The new optional `shared_memory_pool_size` field on nodes accepts either an integer (raw bytes) or a human-friendly string with unit suffix (e.g. "128MB", "1GB", "512KB"). Priority: YAML config > env var > 64MB. Co-Authored-By: Claude Opus 4.6 <[email protected]>
- Reduce default SHM pool size from 64MB to 8MB to avoid ENOMEM on CI runners where multiple nodes each allocate their own pool - Guard zero-length allocations (return empty Vec instead of calling zenoh SHM allocator, which rejects zero-size on some platforms) - Fall back to heap Vec with a warning when SHM allocation fails (handles platform-specific layout issues on macOS/Windows) - Consolidate duplicated send_message calls in send_output_sample - Update doc comments to reflect new 8MB default Co-Authored-By: Claude Opus 4.6 <[email protected]>
c15cd77 to
b266471
Compare
Zenoh's SHM provider calls mlock() on the shared memory pool, which fails with ENOMEM on systems where RLIMIT_MEMLOCK is too low (e.g. 64KB default on many Linux systems). Previously this was fatal and crashed the node process. Now the SHM provider creation is non-fatal: on failure, the node logs a warning and falls back to heap-allocated buffers sent over zenoh. Data still flows correctly, just without zero-copy SHM optimization. To enforce SHM in environments where it must work (e.g. CI), set DORA_SHM_REQUIRED=1 — this makes the failure fatal again. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Zenoh's SHM provider calls mlock() which requires RLIMIT_MEMLOCK to be higher than the default 64KB on GitHub Actions runners. Add `sudo prlimit --memlock=unlimited` to all test steps that run dataflows. Also set DORA_SHM_REQUIRED=1 on all test jobs so that SHM provider failures are fatal in CI, ensuring the zero-copy path is always tested. Co-Authored-By: Claude Opus 4.6 <[email protected]>
prlimit is Linux-only and doesn't exist on macOS. Use `ulimit -l unlimited` on macOS to raise RLIMIT_MEMLOCK for zenoh's mlock() call on SHM segments. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Replace blocking subscriber.recv() with futures::future::select on both the zenoh subscriber and a shutdown channel. When EventStream is dropped, the shutdown sender is dropped, disconnecting the channel and immediately unblocking all subscriber threads. This fixes CI timeouts where nodes hung on shutdown because subscriber threads blocked indefinitely on recv(). Co-Authored-By: Claude Opus 4.6 <[email protected]>
Conflict resolutions: - Adopt main's NodeEventOrUnknown wrapper in daemon_to_node protocol - Keep branch's drop-token removal and zenoh-based input delivery - Combine prlimit/memlock handling with uv .python-version isolation in CI - Drop Unix Domain Socket CI step (yml removed on this branch) - Switch event_stream receiver to tokio mpsc (from main) while keeping zenoh_input_ids, shutdown tx, and stop_received handling Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
- Drop unused shared_memory_extended dep from dora-daemon.
- Stop sending a data:None SendMessage to the daemon after publishing via
zenoh. The daemon no longer forwards user data, so the notification was
only triggering a redundant NodeEvent::Input{data:None} at receivers.
- Drop the corresponding filter and zenoh_input_ids plumbing in the node
event stream, since those None events can no longer arrive for zenoh
inputs.
The fallback path (Interactive/Testing mode with no zenoh publisher) still
goes through the daemon control channel unchanged.
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
- send_out no longer computes remote_receivers / data_bytes just to discard them; it now simply delegates to send_output_to_local_receivers on the fallback path. - send_output_to_local_receivers drops its unused return value. - Remove the dead InterDaemonEvent::Output handler: this variant is never constructed anywhere in the tree (data flows via zenoh now), so the arm was unreachable. Keep a silent catch-all for backward-compat with outdated peers. - Drop the unused publish_all_messages_to_zenoh field from RunningDataflow — the daemon no longer branches on it. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
The topic echo/hz/info commands still decoded the zenoh payload as a Timestamped<InterDaemonEvent>, which was the wire format under the old daemon-forwarded architecture. Nodes now publish the raw arrow buffer as the zenoh payload and the bincode-serialized Metadata as a zenoh attachment (see DoraNode::send_output_sample), so every sample was silently rejected as "invalid event". - echo: read payload bytes directly, deserialize Metadata from the attachment. - hz: count samples; no need to decode anything. - info: pull the payload length and type_info from the attachment metadata. - Add bincode dep to dora-cli for attachment deserialization. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
- Remove the SharedMemoryId type alias and its re-exports; nothing references it now that DataMessage::SharedMemory is gone. - Drop the unused `local_node_inputs` return value in send_output_closed_events. - Drop unused imports (aligned_vec in dora-daemon, tokio::sync::oneshot in runtime/python). - Rename DoraNode.zenoh_session to _zenoh_session and document that it is kept alive only so the session outlives derived publishers and subscribers. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
|
I pushed some simplifications and cleanups. I think our goal should be to merge this, provided that the performance is good enough. The benchmarks I ran locally seem to indicate this, but please give it a try too |
Is there anything else that you think can be removed? |
| | DaemonRequest::NextEvent { .. } | ||
| | DaemonRequest::SubscribeDrop | ||
| | DaemonRequest::NextFinishedDropTokens | ||
| | DaemonRequest::NextEvent |
There was a problem hiding this comment.
So is this still necessary? Do we need to query for next event still?
There was a problem hiding this comment.
Mainly for the interactive and the integration testing code paths. But you're right, it still seems to be part of the normal execution as well. I look into it.
| }, | ||
| SubscribeDrop, | ||
| NextFinishedDropTokens, | ||
| NextEvent, |
There was a problem hiding this comment.
So is this still necessary? Do we need to query the daemon for next event still?
| pub enum DaemonReply { | ||
| Result(Result<(), String>), | ||
| PreparedMessage { shared_memory_id: SharedMemoryId }, | ||
| NextEvents(Vec<Timestamped<NodeEventOrUnknown>>), |
There was a problem hiding this comment.
Is this still the message definition that message are received? Because it wouldn't be a daemon reply is it? or is it not used anymore?
| /// Size of the zenoh shared memory pool for zero-copy output publishing. | ||
| /// | ||
| /// Accepts an integer (raw bytes) or a string with a unit suffix | ||
| /// (`KB`, `MB`, `GB`, case-insensitive). Defaults to 8 MB if not set. | ||
| /// | ||
| /// ## Example | ||
| /// | ||
| /// ```yaml | ||
| /// nodes: | ||
| /// - id: camera-node | ||
| /// shared_memory_pool_size: 128MB | ||
| /// ``` | ||
| #[serde(default, skip_serializing_if = "Option::is_none")] | ||
| pub shared_memory_pool_size: Option<ByteSize>, | ||
|
|
There was a problem hiding this comment.
Is this shared memory pool size including the queue as well as in processing messages or is it per individual messages?
| Ok(zenoh_session) | ||
| } | ||
|
|
||
| #[cfg(feature = "zenoh")] |
There was a problem hiding this comment.
What is the difference between this and open_zenoh_session?
There was a problem hiding this comment.
FYI I commented on one line too high should be line 154
- Remove unused ack_channel field from EventItem::NodeEvent and the matching _drop_channel parameter from data_to_arrow_array; the channel was always built with its receiver immediately dropped. - Remove dead OperatorEvent::AllocateOutputSample variant and match arm. - Collapse open_zenoh_session_sync onto the shared build_zenoh_config helper; extract the repeated "default" network id as a NETWORK_ID const shared by the output and daemon control topic builders. - Drop the format_arrow_type reimplementation in topic info and rely on arrow's Display impl for DataType. - Trim narrating "// Normal path / Fallback path" block in send_output_sample, drop the commented-out Zenoh variant / TODO stub in RemoteCommunicationConfig. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
|
Tried this branch (ee19709) against the dora-benchmark py-latency test — not reproducing a win vs. the prior Setup: py-latency (avg μs, dora Python node → node):
* rough envelope from my earlier runs on Consistent ~3–4× regression on small messages. Every run also logs Is there a feature flag / zenoh config I'm missing? Happy to rerun with a specific threshold sweep or zenoh mode if you have one in mind. |
|
● rs-latency on PR #1378 (units: μs, sizes in u64 elements → bytes = ×8): Rust floor is ~350-500 μs — lower than Python's 600-700, but still much higher than the typical ~50-150 μs people expect from dora Rust small-message latency. Same pattern as py-latency: flat baseline |
|
Heads up: when a message exceeds the SHM pool size, the node silently hangs instead of erroring — no log, no Repro with the dora-rs/rs-latency benchmark (UInt64Array sender/receiver, 100 samples × [8 B … 40 MB]):
The doc comment on the pool-size logic in apis/rust/node/src/node/mod.rs:493-501 already flags the behaviour — "If Suggestion: detect when a requested allocation is larger than the configured pool and either return an error or On the bright side, once sized correctly the zenoh-SHM path is slightly faster than feat/unix-domain-sockets at ┌───────┬─────────────────────┬─────────────────────┐ |
… + #314 opened) Verification on 2026-04-17 showed the D-7c premise in the plan doc was broken: - Upstream PR #1378 (the purported source for cherry-picking) is still OPEN, mergeable=false, Copilot-authored, 3,000 lines. - Upstream main doesn't use zenoh SHM at all. It's TCP-control + shared_memory_extended-data per commit 01995ad (2026-03-18) which *removed* the shmem control channel entirely. - Fork's shmem control channel is disabled-by-default (verified: LocalCommunicationConfig has #[default] Tcp). Applying Option D (upstream's cleanup equivalent) is safe but a real refactor. Per owner direction: multiple v1.0.0-rc.N tags during Phase 5b will progressively land architectural changes, with independent dogfood between each. 1.0 GA ships with both D and zenoh SHM. Plan doc changes: - §1 Status line updated to reflect Phases 0/1/3 landed, 3b deferred to RC-window iteration. Release flow line now names rc.2 = D, rc.3 = zenoh SHM. - §5 Phase 3b section rewritten as a deferred placeholder. No code work happens in Phase 3b. Landing sequence (rc.1, rc.2, rc.3, GA) documented inline. - §18 changelog entry for the rescope. Tracking: - #309 closed with full rationale. - #313 opened for rc.2 Option D. - #314 opened for rc.3 zenoh SHM port. - Both linked as sub-issues of #292 (Phase 5b tracker). Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
|
This seems to be heavily conflicting with 145ccce. A similar change as this was already applied by @heyong4725 in Because of that, I'm going to close this PR for now. I'll put claude on it to try to understand the differences and what's still missing. Either way, this has to be a new PR. |
…etter (#1741) Salvaged from the closed #1378 (zenoh-SHM refactor), which was written against pre-rewrite main and became unmergeable after the Q1 2026 consolidation. These are the portable pieces: - New optional `shared_memory_pool_size` field on dataflow nodes. Accepts an integer (raw bytes) or a string with unit suffix (`KB`, `MB`, `GB`). Priority: YAML > `DORA_NODE_SHM_POOL_SIZE` env var > built-in default. - `ByteSize` newtype in `dora-message` with string/integer serde, `Display`, `FromStr`, `JsonSchema`. - `DoraNode::zero_copy_threshold()` getter.
…ld getter - New optional `shared_memory_pool_size` field on dataflow nodes. Accepts an integer (raw bytes) or a string with unit suffix (`KB`, `MB`, `GB`, case-insensitive). Priority: YAML config > `DORA_NODE_SHM_POOL_SIZE` env var > built-in default. - Add `ByteSize` newtype in `dora-message` with string/integer serde support, Display, FromStr, and JsonSchema. - Add `DoraNode::zero_copy_threshold()` getter to surface the already-existing `DORA_ZERO_COPY_THRESHOLD`-backed value. Salvaged from the closed dora-rs#1378 (zenoh-SHM refactor), which was written against pre-rewrite main and became unmergeable after the Q1 2026 consolidation. These are the portable pieces that don't conflict with the new architecture. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
) Three related fixes salvaged from the closed #1378 that were not covered by #1745: 1. Wrap each zenoh subscriber thread in `catch_unwind` so a panic surfaces as `EventItem::FatalError` instead of silently killing sample delivery. 2. Replace blocking `subscriber.recv()` with `futures::future::select` on `recv_async()` and a shutdown channel; dropping `EventStream` now wakes every subscriber immediately. 3. Add a `stop_received` flag so `recv_async` / `poll_next` return `None` after `Event::Stop` — subscriber threads hold event-channel sender clones that would otherwise keep the receiver open. Includes regression tests for the Stop → None invariant.
…utdown races Three related fixes salvaged from the closed dora-rs#1378: 1. Panic handling. The subscriber threads are spawned from `std::thread::Builder`, so an uncaught panic would silently kill sample delivery for that input. Wrap the body in `catch_unwind(AssertUnwindSafe(...))` and surface a panic as an `EventItem::FatalError` so the node observes the failure. 2. Clean shutdown. The threads previously blocked on `subscriber.recv()` and only exited once the zenoh session itself tore down — dropping the `EventStream` was not enough to unblock them, causing test hangs. Replace the blocking `recv()` with `futures::future::select` on `subscriber.recv_async()` and a dedicated `flume` shutdown channel; dropping the new `_zenoh_shutdown_tx` on the `EventStream` disconnects the channel and wakes every subscriber immediately. 3. `stop_received` flag. Subscriber threads hold clones of the event channel sender, so after `AllInputsClosed`/`Stop` the daemon thread dropping its own sender is no longer sufficient to close the `tokio::sync::mpsc::Receiver` — `recv_async`/`poll_next` would hang waiting on those live subscriber senders. Track delivery of `Event::Stop` and return `None` on subsequent calls so the node exits and `EventStream::drop` triggers the shutdown path. Add regression tests covering the `Stop` → `None` invariant via both `recv()` and `Stream::next()`. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* refactor: remove DropToken system now that zenoh SHM is the data plane Continuation of #1741, salvaging more from the closed #1378. Zenoh's SHM provider now handles buffer lifecycle via its own reference counting, so the custom shmem + DropToken tracking path is pure legacy. - Node API: zenoh session + SHM provider are now mandatory in standard mode (no more DropStream fallback, no custom shmem allocation cache, no `DataSampleInner::Shmem`). `allocate_data_sample` always returns a heap buffer; zenoh publishes large payloads zero-copy. - Message types: removed `DataMessage::SharedMemory`, `DropToken`, `NodeDropEvent`, `DaemonReply::NextDropEvents`, the `SubscribeDrop` / `ReportDropTokens` / `NextFinishedDropTokens` requests, and `drop_tokens` on `NextEvent`. `DataMessage` is now just `Vec`. - DaemonCommunication::Shmem drops `daemon_drop_region_id`. - Daemon: removed `drop_channels`, `pending_drop_tokens`, `DropTokenInformation`, `check_drop_token`, the drop listener loop, and the SHM mmap+copy fan-out path (only `DataMessage::Vec` remains). - Dropped the `shared_memory_extended` dependency from dora-node-api. Nodes that were implicitly relying on the non-tokio fallback will now fail init with a clear error instead of silently degrading. * cleanup: remove dead SHM-protocol leftovers from message enums - DaemonReply::PreparedMessage: no construction or match sites remain after the SHM data-plane removal. - node_to_daemon::InputData: single-variant enum with no callers (the InputData used in integration tests comes from a different module). - Collapse the now-single-arm DataMessage match in the daemon into an irrefutable let. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> * Trigger CI * Opt-in owned tokio runtime for nodes without ambient executor (#1748) Support opt-in owned tokio runtime for nodes Embedders that lack an ambient tokio runtime can now set DORA_CREATE_OWNED_TOKIO_RUNTIME=1 to have the node build its own multi-threaded runtime for the zenoh SHM data plane. Default remains strict: init errors when no runtime is available. Follow-up to #1745. Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]> * Create owned tokio runtime by default when none is ambient Node authors shouldn't have to set up a tokio runtime just to call `DoraNode::init_from_env()`. Drop the `DORA_CREATE_OWNED_TOKIO_RUNTIME` opt-in gate introduced in #1748 and always build an owned multi-threaded runtime when `Handle::try_current()` returns `Err`. Callers with an ambient runtime keep using it. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> * Carry ParamUpdate value as JSON bytes over the bincode wire `NodeEvent` is serialized with bincode on the daemon→node TCP channel. `serde_json::Value::deserialize` calls `Deserializer::deserialize_any`, which bincode does not support: the first `ParamUpdate` would kill the node's event stream with "Bincode does not support the serde::Deserializer::deserialize_any method". Change `NodeEvent::ParamUpdate.value` to `value_json: Vec<u8>` (JSON-encoded), and serialize/deserialize at the daemon and node boundaries. The public `Event::ParamUpdate.value: serde_json::Value` stays unchanged for callers. Adds a bincode round-trip regression test over representative JSON shapes so we don't slip a `deserialize_any` field back into `NodeEvent`. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> * Fall back to heap publishes when zenoh SHM provider creation fails CI runners with a small `/dev/shm` (and tests that spawn many nodes in sequence without segment cleanup between runs) now hit `ShmProviderBuilder::default_backend` returning `OS error 12` (ENOMEM), which used to abort node init outright. Treat the SHM provider as best-effort instead: log a warning and proceed with `zenoh_shm_provider = None`. The send path already publishes via heap buffers when the provider is missing, so messages still flow — just without the SHM zero-copy fast path. Restores the contract-test suite that started failing once nodes actually reached the SHM allocation step (previously they failed earlier on tokio runtime init, masking the issue). Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --------- Co-authored-by: Claude <[email protected]>
Replaces the custom
shared_memory_extended-based data plane (POSIX SHM allocation, OS ID passing, read-only mapping, DropToken lifecycle tracking) with zenoh's native shared memory support. Nodes open their own zenoh sessions, allocate from a zenoh SHM provider, and publish outputs directly on zenoh topics. Subscribers receive zero-copy SHM-backed data on the same machine, or network-copied data across machines. The daemon control channel remains unchanged for lifecycle/routing metadata.Sender side — zenoh SHM allocation + publish
DoraNodeholds azenoh::Session,PosixShmProvider, and per-outputPublisherallocate_data_sample()allocates from the SHM provider viaprovider.alloc().with_policy::<BlockOn<GarbageCollect>>()send_output_sample()publishes via zenoh with metadata serialized as a zenoh attachment, then sends a lightweightdata: Nonenotification to the daemon control channelReceiver side — zenoh subscribers
EventStream::init_on_channel()declares a zenoh subscriber perInputMapping::Userinput on the topicdora/default/{dataflow_id}/output/{source_node}/{output_id}NodeEvent::Inputevents into the existing flume channelNodeEvent::Input { data: None }for zenoh-subscribed inputs to avoid duplicatesDropToken system removal
drop_stream.rs,DropToken,SharedMemoryId,ShmemHandle, all token tracking in daemon/node/messagesDataMessage::SharedMemory,DaemonRequest::ReportDropTokens/SubscribeDrop/NextFinishedDropTokens,NodeDropEvent,DaemonReply::NextDropEventsDaemonCommunication::Shmemfrom 4 SHM regions to 2 (control + events only)Dependency changes
1.1.1→1.7withshared-memory+unstablefeatureszenohdependency todora-node-apishared_memory_extendedfromdora-node-apiNodeConfig
coordinator_addr: Option<IpAddr>for zenoh peer discovery🔒 GitHub Advanced Security automatically protects Copilot coding agent pull requests. You can protect all pull requests by enabling Advanced Security for your repositories. Learn more about Advanced Security.