Skip to content

Commit e6e7538

Browse files
sanityclaude
andcommitted
fix: allow parallel connection attempts (max 3 concurrent)
Previously the connection maintenance loop only allowed one connection attempt at a time. If a connection took 60 seconds to complete, all other attempts were blocked. This caused star topology formation in small networks since peers couldn't form connections fast enough. Now we allow up to MAX_CONCURRENT_CONNECTIONS (3) parallel connection attempts by using live_tx_tracker.active_transaction_count() instead of tracking a single live_tx. Fixes the serialized connection acquisition from #2173. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 9328929 commit e6e7538

File tree

2 files changed

+27
-17
lines changed

2 files changed

+27
-17
lines changed

crates/core/src/ring/live_tx.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,15 @@ impl LiveTransactionTracker {
4747
self.tx_per_peer.contains_key(&peer_addr)
4848
}
4949

50-
pub(crate) fn still_alive(&self, tx: &Transaction) -> bool {
51-
self.tx_per_peer.iter().any(|e| e.value().contains(tx))
52-
}
53-
5450
pub(crate) fn len(&self) -> usize {
5551
self.tx_per_peer.len()
5652
}
53+
54+
/// Returns the total number of active transactions across all peers.
55+
pub(crate) fn active_transaction_count(&self) -> usize {
56+
self.tx_per_peer
57+
.iter()
58+
.map(|entry| entry.value().len())
59+
.sum()
60+
}
5761
}

crates/core/src/ring/mod.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,11 @@ impl Ring {
383383

384384
const REGENERATE_DENSITY_MAP_INTERVAL: Duration = Duration::from_secs(60);
385385

386+
/// Maximum number of concurrent connection acquisition attempts.
387+
/// Allows parallel connection attempts to speed up network formation
388+
/// instead of serial blocking on a single connection at a time.
389+
const MAX_CONCURRENT_CONNECTIONS: usize = 3;
390+
386391
let mut check_interval = tokio::time::interval(CHECK_TICK_DURATION);
387392
check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
388393
let mut refresh_density_map = tokio::time::interval(REGENERATE_DENSITY_MAP_INTERVAL);
@@ -393,7 +398,6 @@ impl Ring {
393398
tokio::time::sleep(Duration::from_secs(2)).await;
394399
tracing::info!("Connection maintenance task: initial sleep completed");
395400

396-
let mut live_tx = None;
397401
let mut pending_conn_adds = BTreeSet::new();
398402
let mut this_peer = None;
399403
loop {
@@ -416,20 +420,17 @@ impl Ring {
416420
let mut skip_list = HashSet::new();
417421
skip_list.insert(this_peer);
418422

419-
// if there are no open connections, we need to acquire more
420-
if let Some(tx) = &live_tx {
421-
if !live_tx_tracker.still_alive(tx) {
422-
let _ = live_tx.take();
423-
}
424-
}
425-
423+
// Acquire new connections up to MAX_CONCURRENT_CONNECTIONS limit
424+
let active_count = live_tx_tracker.active_transaction_count();
426425
if let Some(ideal_location) = pending_conn_adds.pop_first() {
427-
if live_tx.is_none() {
426+
if active_count < MAX_CONCURRENT_CONNECTIONS {
428427
tracing::info!(
428+
active_connections = active_count,
429+
max_concurrent = MAX_CONCURRENT_CONNECTIONS,
429430
"Attempting to acquire new connection for location: {:?}",
430431
ideal_location
431432
);
432-
live_tx = self
433+
let tx = self
433434
.acquire_new(
434435
ideal_location,
435436
&skip_list,
@@ -445,18 +446,23 @@ impl Ring {
445446
);
446447
error
447448
})?;
448-
if live_tx.is_none() {
449+
if tx.is_none() {
449450
let conns = self.connection_manager.connection_count();
450451
tracing::warn!(
451452
"acquire_new returned None - likely no peers to query through (connections: {})",
452453
conns
453454
);
454455
} else {
455-
tracing::info!("Successfully initiated connection acquisition");
456+
tracing::info!(
457+
active_connections = active_count + 1,
458+
"Successfully initiated connection acquisition"
459+
);
456460
}
457461
} else {
458462
tracing::debug!(
459-
"Skipping connection attempt - live transaction still active, re-queuing location {}",
463+
active_connections = active_count,
464+
max_concurrent = MAX_CONCURRENT_CONNECTIONS,
465+
"At max concurrent connections, re-queuing location {}",
460466
ideal_location
461467
);
462468
pending_conn_adds.insert(ideal_location);

0 commit comments

Comments
 (0)