-
-
Notifications
You must be signed in to change notification settings - Fork 107
fix: allow parallel connection attempts (max 3 concurrent) #2174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: allow parallel connection attempts (max 3 concurrent) #2174
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR aims to enable parallel connection attempts (up to 3 concurrent) to improve network formation speed by replacing a single-transaction tracking mechanism with a concurrent limit approach. However, the implementation does not achieve the intended parallelism.
Key Changes:
- Added
MAX_CONCURRENT_CONNECTIONSconstant (set to 3) to limit concurrent connection attempts - Replaced single
live_tx: Option<Transaction>withactive_transaction_count()method for tracking - Removed unused
still_alive()method fromLiveTransactionTracker
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| crates/core/src/ring/mod.rs | Added concurrent connection limit constant and modified maintenance loop to check active transaction count instead of single transaction tracking |
| crates/core/src/ring/live_tx.rs | Removed still_alive() method and added active_transaction_count() to sum all active transactions |
Comments suppressed due to low confidence (1)
crates/core/src/ring/mod.rs:486
- The loop only processes one location per iteration, which prevents achieving the desired parallelism. To actually run up to
MAX_CONCURRENT_CONNECTIONSconcurrent connections, the code should loop whileactive_count < MAX_CONCURRENT_CONNECTIONSand attempt to start multiple connections until the limit is reached orpending_conn_addsis empty.
Suggested fix:
// Acquire new connections up to MAX_CONCURRENT_CONNECTIONS limit
let mut active_count = live_tx_tracker.active_transaction_count();
while active_count < MAX_CONCURRENT_CONNECTIONS {
if let Some(ideal_location) = pending_conn_adds.pop_first() {
// ... existing connection attempt code ...
if tx.is_some() {
active_count += 1; // Update count for next iteration
}
} else {
break; // No more pending connections
}
} if let Some(ideal_location) = pending_conn_adds.pop_first() {
if active_count < MAX_CONCURRENT_CONNECTIONS {
tracing::info!(
active_connections = active_count,
max_concurrent = MAX_CONCURRENT_CONNECTIONS,
"Attempting to acquire new connection for location: {:?}",
ideal_location
);
let tx = self
.acquire_new(
ideal_location,
&skip_list,
¬ifier,
&live_tx_tracker,
&op_manager,
)
.await
.map_err(|error| {
tracing::error!(
?error,
"FATAL: Connection maintenance task failed - shutting down"
);
error
})?;
if tx.is_none() {
let conns = self.connection_manager.connection_count();
tracing::warn!(
"acquire_new returned None - likely no peers to query through (connections: {})",
conns
);
} else {
tracing::info!(
active_connections = active_count + 1,
"Successfully initiated connection acquisition"
);
}
} else {
tracing::debug!(
active_connections = active_count,
max_concurrent = MAX_CONCURRENT_CONNECTIONS,
"At max concurrent connections, re-queuing location {}",
ideal_location
);
pending_conn_adds.insert(ideal_location);
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// Returns the total number of active transactions across all peers. | ||
| pub(crate) fn active_transaction_count(&self) -> usize { | ||
| self.tx_per_peer | ||
| .iter() | ||
| .map(|entry| entry.value().len()) | ||
| .sum() | ||
| } |
Copilot
AI
Nov 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new active_transaction_count() method lacks test coverage. Consider adding unit tests to verify:
- Returns 0 when no transactions are tracked
- Correctly counts a single transaction for one peer
- Correctly sums transactions across multiple peers
- Updates correctly after transactions are removed via
remove_finished_transaction()
Example test:
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn active_transaction_count_empty() {
let tracker = LiveTransactionTracker::new();
assert_eq!(tracker.active_transaction_count(), 0);
}
#[test]
fn active_transaction_count_multiple_peers() {
let tracker = LiveTransactionTracker::new();
let addr1 = "127.0.0.1:8080".parse().unwrap();
let addr2 = "127.0.0.1:8081".parse().unwrap();
tracker.add_transaction(addr1, Transaction::new::<()>());
tracker.add_transaction(addr1, Transaction::new::<()>());
tracker.add_transaction(addr2, Transaction::new::<()>());
assert_eq!(tracker.active_transaction_count(), 3);
}
}|
Re: Copilot's suggestion for Added comprehensive unit tests for
All 4 tests pass. [AI-assisted - Claude] |
4cac273 to
44f2a6d
Compare
d596c99 to
872bee8
Compare
44f2a6d to
caf6ce8
Compare
872bee8 to
26e7660
Compare
caf6ce8 to
bf00c2c
Compare
26e7660 to
e960382
Compare
bf00c2c to
075fb1f
Compare
e960382 to
99ab273
Compare
075fb1f to
9b8f5db
Compare
99ab273 to
01943b7
Compare
9b8f5db to
21f2780
Compare
01943b7 to
3359306
Compare
- Add ObservedAddr type to transport/mod.rs for NAT address tracking - Update add_subscriber wrapper in ring/mod.rs to accept upstream_addr param - Add upstream_addr (None) to all local subscription call sites - Remove duplicate get_peer_location_by_addr functions - Fix socket_addr() calls (use dereference instead) - Remove extra is_gateway argument from initiate_join_request 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
3359306 to
d01029d
Compare
1c5b683 to
daa05b9
Compare
d01029d to
0be445d
Compare
daa05b9 to
9db2219
Compare
This commit applies all wire protocol cleanup changes from PR #2169 on top of the rebased PR #2167 base: - Remove sender field from GetMsg, PutMsg, SubscribeMsg, UpdateMsg, ConnectMsg - Use upstream_addr for routing responses instead of embedded sender fields - Delete transient_manager.rs (no longer needed) - Update freenet-macros code generation for new message structure The routing logic now derives the response target from the connection's observed address (upstream_addr) rather than trusting sender fields in messages. This is more reliable for NAT traversal scenarios. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Phase 1.3 of peer identity restructuring (issue #2164). Uses rust-analyzer SSR to convert: - .peer.pub_key -> .pub_key() - .peer.addr -> .addr() (for read accesses) Assignment operations (.peer.addr = x) are kept as direct field access for now since the addr() method returns a copy, not a reference. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
… address Key changes: - Replace `peer: PeerId` with `pub_key: TransportPublicKey` + `peer_addr: PeerAddr` - Add PeerAddr enum with Unknown/Known variants for explicit address state - Add accessor methods: pub_key(), addr(), socket_addr(), peer() - Add constructors: new(), with_unknown_addr(), with_location() - Implement Ord/PartialOrd based on socket address This separates cryptographic identity (pub_key) from network address (peer_addr), enabling proper handling of peers behind NAT who don't know their external address. Part of #2164 peer identity restructuring. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Addresses Nacho's architectural feedback to avoid raw SocketAddr in protocol layer. Uses ObservedAddr newtype to wrap transport-layer addresses, making the address semantics explicit at the type level. Changes: - Add ObservedAddr newtype in transport/mod.rs - Update Operation trait to use Option<ObservedAddr> for source_addr - Update all operation implementations (connect, get, put, subscribe, update) - Update node/mod.rs and p2p_protoc.rs to use ObservedAddr - Wrap incoming source_addr in ObservedAddr::new() at transport boundary - Convert back to SocketAddr at network send boundaries The conversion to raw SocketAddr happens at transport boundaries: - build_op_result() converts for target_addr - network_bridge.send() calls use .socket_addr() 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
This commit applies all wire protocol cleanup changes from PR #2169 on top of the rebased PR #2167 base: - Remove sender field from GetMsg, PutMsg, SubscribeMsg, UpdateMsg, ConnectMsg - Use upstream_addr for routing responses instead of embedded sender fields - Delete transient_manager.rs (no longer needed) - Update freenet-macros code generation for new message structure The routing logic now derives the response target from the connection's observed address (upstream_addr) rather than trusting sender fields in messages. This is more reliable for NAT traversal scenarios. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Previously the connection maintenance loop only allowed one connection attempt at a time. If a connection took 60 seconds to complete, all other attempts were blocked. This caused star topology formation in small networks since peers couldn't form connections fast enough. Now we allow up to MAX_CONCURRENT_CONNECTIONS (3) parallel connection attempts by using live_tx_tracker.active_transaction_count() instead of tracking a single live_tx. Fixes the serialized connection acquisition from #2173. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Tests verify the method: - Returns 0 when no transactions are tracked - Correctly counts transactions for a single peer - Correctly sums transactions across multiple peers - Updates correctly after transactions are removed 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
- Add ObservedAddr type to transport/mod.rs for NAT address tracking - Update add_subscriber wrapper in ring/mod.rs to accept upstream_addr param - Add upstream_addr (None) to all local subscription call sites - Remove duplicate get_peer_location_by_addr functions The add_subscriber method now takes an optional ObservedAddr parameter to track the transport-level address for NAT peers. For local subscriptions (no remote upstream), None is passed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
The acceptor's PeerAddr is intentionally Unknown (NAT scenario) since acceptors don't know their external address. Tracing calls now use pub_key() instead of peer() to avoid panics on unknown addresses. This fix was needed after rebase because a prior commit replayed changes that reintroduced the old code. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
0be445d to
bf5356f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Fill in subscriber's external address from transport layer if unknown. | ||
| // This is the key step where the first recipient (gateway) determines the | ||
| // subscriber's external address from the actual packet source address. | ||
| let mut subscriber = subscriber.clone(); | ||
| if let Some(addr) = source_addr { | ||
| subscriber.set_addr(addr); | ||
| tracing::debug!( | ||
| tx = %id, | ||
| %key, | ||
| subscriber_addr = %addr, | ||
| "subscribe: using transport source_addr for subscriber" | ||
| ); | ||
| if subscriber.peer_addr.is_unknown() { | ||
| if let Some(addr) = source_addr { | ||
| subscriber.set_addr(addr); | ||
| tracing::debug!( | ||
| tx = %id, | ||
| %key, | ||
| subscriber_addr = %addr, | ||
| "subscribe: filled subscriber address from source_addr" | ||
| ); | ||
| } | ||
| } |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This behavioral change from the previous implementation removes protection against known-but-incorrect NAT addresses. The old code (visible in removed lines elsewhere in this diff) would always use the transport source address when available, specifically to handle cases where NAT peers embed known-but-wrong addresses (e.g., 127.0.0.1:31337 or internal network addresses like 192.168.x.x).
The new conditional logic only fills in addresses marked as PeerAddr::Unknown, meaning if a NAT peer mistakenly sends PeerAddr::Known(127.0.0.1:31337) instead of PeerAddr::Unknown, the loopback address will not be corrected.
While this change aligns subscribe.rs with the pattern used in connect.rs and put.rs, it removes a safety mechanism that was explicitly documented as "critical for NAT peers" in the old code comments. This could break existing clients behind NAT that haven't been updated to send PeerAddr::Unknown when they don't know their external address.
Consider either:
- Reverting to unconditional address correction for backward compatibility, or
- Adding validation to reject obviously incorrect addresses (loopback, private ranges) when received from remote peers, or
- Documenting this as a breaking change requiring client updates
Merges branch fix/seeding-subscriber-nat-2164 to bring in Nacho's address validation fixes: - Better tracing for subscribe address updates - Clearer comments for subscription registration 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
PR #2172 added detailed tracing that calls subscriber.peer() BEFORE filling in the address from source_addr. This causes a panic when the subscriber has PeerAddr::Unknown (NAT peers). The correct order (from PR #2171) is: 1. Check if address is unknown 2. Fill in address from source_addr 3. THEN call .peer() for tracing This regression was introduced when merging PR #2172's tracing improvements without preserving the correct order of operations. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
|
Re: Copilot's suggestion for Already addressed - comprehensive unit tests for
The test coverage Copilot suggested is already in place. Re: NAT address handling concern Copilot raises a valid concern about backward compatibility with clients that might send However, this PR is part of a larger refactor (#2164) that introduces the explicit
The "old code" that always overwrote addresses was a workaround for not having an explicit "unknown" state. With If backward compatibility with very old clients is a concern, that would be a separate discussion - but for the core network protocol being refactored, using explicit state is the cleaner approach. [AI-assisted - Claude] |
Consolidates changes from PRs #2172, #2174, and #2175: This builds on PR #2191 (wire protocol cleanup) and adds: - Fix seeding/subscribe operations to handle PeerAddr::Unknown for NAT scenarios - Gateway properly fills in observed addresses from packet source - Improved subscriber address tracking in seeding manager - Update live_tx and connection tests for new address model NOTE: This PR requires review - previous PRs (#2174, #2175) had CHANGES_REQUESTED from Nacho. Submitting consolidated changes for fresh review. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Adds NAT address handling to subscribe/seeding operations: - Subscribers with PeerAddr::Unknown have their address filled in by gateway - Gateway observes real UDP source address and updates subscriber address - SeedingManager tracks subscriber addresses properly - live_tx tests updated for new address model - In-memory testing infrastructure updated for PeerAddr Supersedes PRs #2172, #2174, #2175 (which had changes requested). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Problem
In small network tests, peers form a star topology (all connected only to gateway) instead of a mesh topology. The root cause is serialized connection acquisition in the maintenance loop.
Location:
crates/core/src/ring/mod.rsThe code used a single
live_tx: Option<Transaction>to track pending connections. This means only ONE connection attempt could proceed at a time. If a connection takes 60 seconds (OPERATION_TTL), ALL other attempts are blocked for 60 seconds.With 25
min_connectionstarget and serial acquisition, reaching minimum connections takes far longer than test windows allow.Solution
Replace the single
live_txtracking with a concurrent limit usinglive_tx_tracker.active_transaction_count():MAX_CONCURRENT_CONNECTIONS = 3constantactive_count < MAX_CONCURRENT_CONNECTIONSstill_alivemethod fromLiveTransactionTrackeractive_transaction_count()method to count all active transactionsTesting
Stack
This PR stacks on #2172 (seeding/subscriber NAT routing).
Part of #2173 (Priority 1).
[AI-assisted - Claude]