Harden zenoh subscriber threads against panics and shutdown hangs#1746
Merged
Conversation
…utdown races Three related fixes salvaged from the closed #1378: 1. Panic handling. The subscriber threads are spawned from `std::thread::Builder`, so an uncaught panic would silently kill sample delivery for that input. Wrap the body in `catch_unwind(AssertUnwindSafe(...))` and surface a panic as an `EventItem::FatalError` so the node observes the failure. 2. Clean shutdown. The threads previously blocked on `subscriber.recv()` and only exited once the zenoh session itself tore down — dropping the `EventStream` was not enough to unblock them, causing test hangs. Replace the blocking `recv()` with `futures::future::select` on `subscriber.recv_async()` and a dedicated `flume` shutdown channel; dropping the new `_zenoh_shutdown_tx` on the `EventStream` disconnects the channel and wakes every subscriber immediately. 3. `stop_received` flag. Subscriber threads hold clones of the event channel sender, so after `AllInputsClosed`/`Stop` the daemon thread dropping its own sender is no longer sufficient to close the `tokio::sync::mpsc::Receiver` — `recv_async`/`poll_next` would hang waiting on those live subscriber senders. Track delivery of `Event::Stop` and return `None` on subsequent calls so the node exits and `EventStream::drop` triggers the shutdown path. Add regression tests covering the `Stop` → `None` invariant via both `recv()` and `Stream::next()`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Collaborator
Author
|
@Mergifyio queue |
Contributor
Merge Queue Status
This pull request spent 34 minutes 13 seconds in the queue, including 31 minutes 57 seconds running CI. Required conditions to merge
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Three related fixes salvaged from the closed #1378 that were not covered by #1745:
catch_unwindso a panic surfaces asEventItem::FatalErrorinstead of silently killing sample delivery.subscriber.recv()withfutures::future::selectonrecv_async()and a shutdown channel; droppingEventStreamnow wakes every subscriber immediately.stop_receivedflag sorecv_async/poll_nextreturnNoneafterEvent::Stop— subscriber threads hold event-channel sender clones that would otherwise keep the receiver open.Includes regression tests for the Stop → None invariant.