Skip to content

Commit 76abed0

Browse files
sanityclaude
andcommitted
fix: allow parallel connection attempts (max 3 concurrent)
Previously the connection maintenance loop only allowed one connection attempt at a time. If a connection took 60 seconds to complete, all other attempts were blocked. This caused star topology formation in small networks since peers couldn't form connections fast enough. Now we allow up to MAX_CONCURRENT_CONNECTIONS (3) parallel connection attempts by using live_tx_tracker.active_transaction_count() instead of tracking a single live_tx. Fixes the serialized connection acquisition from #2173. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 9aad6f5 commit 76abed0

File tree

2 files changed

+27
-17
lines changed

2 files changed

+27
-17
lines changed

crates/core/src/ring/live_tx.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,15 @@ impl LiveTransactionTracker {
4747
self.tx_per_peer.contains_key(&peer_addr)
4848
}
4949

50-
pub(crate) fn still_alive(&self, tx: &Transaction) -> bool {
51-
self.tx_per_peer.iter().any(|e| e.value().contains(tx))
52-
}
53-
5450
pub(crate) fn len(&self) -> usize {
5551
self.tx_per_peer.len()
5652
}
53+
54+
/// Returns the total number of active transactions across all peers.
55+
pub(crate) fn active_transaction_count(&self) -> usize {
56+
self.tx_per_peer
57+
.iter()
58+
.map(|entry| entry.value().len())
59+
.sum()
60+
}
5761
}

crates/core/src/ring/mod.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,11 @@ impl Ring {
400400

401401
const REGENERATE_DENSITY_MAP_INTERVAL: Duration = Duration::from_secs(60);
402402

403+
/// Maximum number of concurrent connection acquisition attempts.
404+
/// Allows parallel connection attempts to speed up network formation
405+
/// instead of serial blocking on a single connection at a time.
406+
const MAX_CONCURRENT_CONNECTIONS: usize = 3;
407+
403408
let mut check_interval = tokio::time::interval(CHECK_TICK_DURATION);
404409
check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
405410
let mut refresh_density_map = tokio::time::interval(REGENERATE_DENSITY_MAP_INTERVAL);
@@ -410,7 +415,6 @@ impl Ring {
410415
tokio::time::sleep(Duration::from_secs(2)).await;
411416
tracing::info!("Connection maintenance task: initial sleep completed");
412417

413-
let mut live_tx = None;
414418
let mut pending_conn_adds = BTreeSet::new();
415419
let mut this_peer = None;
416420
loop {
@@ -433,20 +437,17 @@ impl Ring {
433437
let mut skip_list = HashSet::new();
434438
skip_list.insert(this_peer);
435439

436-
// if there are no open connections, we need to acquire more
437-
if let Some(tx) = &live_tx {
438-
if !live_tx_tracker.still_alive(tx) {
439-
let _ = live_tx.take();
440-
}
441-
}
442-
440+
// Acquire new connections up to MAX_CONCURRENT_CONNECTIONS limit
441+
let active_count = live_tx_tracker.active_transaction_count();
443442
if let Some(ideal_location) = pending_conn_adds.pop_first() {
444-
if live_tx.is_none() {
443+
if active_count < MAX_CONCURRENT_CONNECTIONS {
445444
tracing::info!(
445+
active_connections = active_count,
446+
max_concurrent = MAX_CONCURRENT_CONNECTIONS,
446447
"Attempting to acquire new connection for location: {:?}",
447448
ideal_location
448449
);
449-
live_tx = self
450+
let tx = self
450451
.acquire_new(
451452
ideal_location,
452453
&skip_list,
@@ -462,18 +463,23 @@ impl Ring {
462463
);
463464
error
464465
})?;
465-
if live_tx.is_none() {
466+
if tx.is_none() {
466467
let conns = self.connection_manager.connection_count();
467468
tracing::warn!(
468469
"acquire_new returned None - likely no peers to query through (connections: {})",
469470
conns
470471
);
471472
} else {
472-
tracing::info!("Successfully initiated connection acquisition");
473+
tracing::info!(
474+
active_connections = active_count + 1,
475+
"Successfully initiated connection acquisition"
476+
);
473477
}
474478
} else {
475479
tracing::debug!(
476-
"Skipping connection attempt - live transaction still active, re-queuing location {}",
480+
active_connections = active_count,
481+
max_concurrent = MAX_CONCURRENT_CONNECTIONS,
482+
"At max concurrent connections, re-queuing location {}",
477483
ideal_location
478484
);
479485
pending_conn_adds.insert(ideal_location);

0 commit comments

Comments
 (0)