Skip to content

Commit a25eecf

Browse files
committed
fix(presence): fan out beacons concurrently
Send presence beacons to broadcast peers concurrently under a bounded timeout so one slow peer cannot delay delivery to the rest of the mesh. Increase the per-peer timeout to 15s now that fanout no longer has head-of-line blocking.\n\nBumps workspace crate version to 0.5.23 and documents the 0.5.22/0.5.23 presence fixes.
1 parent 658499c commit a25eecf

3 files changed

Lines changed: 48 additions & 13 deletions

File tree

CHANGELOG.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,28 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.5.23] - 2026-04-27
9+
10+
### Fixed
11+
12+
- Presence beacon fanout now sends to peers concurrently with a bounded timeout,
13+
preventing one slow or wedged peer from delaying delivery to the rest of the
14+
mesh.
15+
- Increased the per-peer presence beacon send timeout to 15 seconds now that
16+
fanout is concurrent, reducing false timeouts on high-latency live links
17+
without reintroducing head-of-line blocking.
18+
19+
## [0.5.22] - 2026-04-26
20+
21+
### Fixed
22+
23+
- Presence beacon sends are bounded by a timeout so a stalled peer cannot wedge
24+
the beacon task indefinitely.
25+
- Presence beacon fanout snapshots groups and broadcast peers before network I/O,
26+
so no broadcast peer lock is held across awaited sends.
27+
- Added `PresenceManager::replace_broadcast_peers()` for authoritative fanout
28+
refresh and stale-peer pruning by callers.
29+
830
## [0.5.20] - 2026-04-23
931

1032
### Changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ members = [
1818
resolver = "2"
1919

2020
[workspace.package]
21-
version = "0.5.22"
21+
version = "0.5.23"
2222
edition = "2021"
2323
authors = ["David Irvine <david@saorsalabs.com>"]
2424
license = "MIT OR Apache-2.0"

crates/presence/src/lib.rs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ use std::sync::atomic::{AtomicU64, Ordering};
2424
use std::sync::Arc;
2525
use std::time::{Duration, Instant};
2626
use tokio::sync::RwLock;
27-
use tokio::task::JoinHandle;
27+
use tokio::task::{JoinHandle, JoinSet};
2828
use tracing::{debug, warn};
2929

3030
/// Maximum time to spend attempting a single presence beacon send.
3131
///
3232
/// Presence beacons are periodic best-effort liveness hints. A stalled peer must
3333
/// not be able to wedge the single beacon broadcast task indefinitely.
34-
const BEACON_SEND_TIMEOUT: Duration = Duration::from_secs(5);
34+
const BEACON_SEND_TIMEOUT: Duration = Duration::from_secs(15);
3535

3636
/// Presence status for a peer
3737
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -508,27 +508,40 @@ impl PresenceManager {
508508
.iter()
509509
.copied()
510510
.collect();
511-
let mut timed_out_peers = Vec::new();
511+
let mut send_tasks = JoinSet::new();
512512
for target_peer in peers.iter().copied() {
513-
let send = transport.send_to_peer(
514-
target_peer,
515-
GossipStreamType::Bulk,
516-
data.clone(),
517-
);
518-
match tokio::time::timeout(BEACON_SEND_TIMEOUT, send).await {
519-
Ok(Ok(())) => {}
520-
Ok(Err(e)) => {
513+
let transport = Arc::clone(&transport);
514+
let data = data.clone();
515+
send_tasks.spawn(async move {
516+
let send = transport.send_to_peer(
517+
target_peer,
518+
GossipStreamType::Bulk,
519+
data,
520+
);
521+
let result = tokio::time::timeout(BEACON_SEND_TIMEOUT, send).await;
522+
(target_peer, result)
523+
});
524+
}
525+
526+
let mut timed_out_peers = Vec::new();
527+
while let Some(result) = send_tasks.join_next().await {
528+
match result {
529+
Ok((_, Ok(Ok(())))) => {}
530+
Ok((target_peer, Ok(Err(e)))) => {
521531
debug!(?target_peer, ?e, "Failed to send beacon to peer");
522532
// Continue to next peer - don't fail entire broadcast
523533
}
524-
Err(_) => {
534+
Ok((target_peer, Err(_))) => {
525535
warn!(
526536
?target_peer,
527537
timeout_secs = BEACON_SEND_TIMEOUT.as_secs(),
528538
"Timed out sending beacon to peer"
529539
);
530540
timed_out_peers.push(target_peer);
531541
}
542+
Err(e) => {
543+
warn!(?e, "Presence beacon send task failed");
544+
}
532545
}
533546
}
534547

0 commit comments

Comments
 (0)