Skip to content

Commit 970b172

Browse files
sanityclaude
andauthored
fix: allow parallel connection attempts (max 3 concurrent) (#2174)
Co-authored-by: Claude <[email protected]>
1 parent 8bd39fc commit 970b172

File tree

4 files changed

+121
-86
lines changed

4 files changed

+121
-86
lines changed

crates/core/src/operations/subscribe.rs

Lines changed: 27 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use crate::{
1111
message::{InnerMessage, NetMessage, Transaction},
1212
node::{NetworkBridge, OpManager, PeerId},
1313
ring::{Location, PeerKeyLocation, RingError},
14-
transport::ObservedAddr,
1514
};
1615
use freenet_stdlib::{
1716
client_api::{ContractResponse, ErrorKind, HostResponse},
@@ -275,7 +274,7 @@ async fn complete_local_subscription(
275274
key: ContractKey,
276275
) -> Result<(), OpError> {
277276
let subscriber = op_manager.ring.connection_manager.own_location();
278-
// Local subscription - no upstream_addr needed since it's our own peer
277+
// Local subscription - no upstream NAT address
279278
if let Err(err) = op_manager
280279
.ring
281280
.add_subscriber(&key, subscriber.clone(), None)
@@ -310,7 +309,7 @@ pub(crate) struct SubscribeOp {
310309
state: Option<SubscribeState>,
311310
/// The address we received this operation's message from.
312311
/// Used for connection-based routing: responses are sent back to this address.
313-
upstream_addr: Option<ObservedAddr>,
312+
upstream_addr: Option<std::net::SocketAddr>,
314313
}
315314

316315
impl SubscribeOp {
@@ -364,16 +363,11 @@ impl Operation for SubscribeOp {
364363
}
365364
Ok(None) => {
366365
// new request to subscribe to a contract, initialize the machine
367-
tracing::debug!(
368-
tx = %id,
369-
?source_addr,
370-
"subscribe: load_or_init creating new op with source_addr as upstream_addr"
371-
);
372366
Ok(OpInitialization {
373367
op: Self {
374368
state: Some(SubscribeState::ReceivedRequest),
375369
id,
376-
upstream_addr: source_addr.map(ObservedAddr::new), // Connection-based routing: store who sent us this request
370+
upstream_addr: source_addr, // Connection-based routing: store who sent us this request
377371
},
378372
source_addr,
379373
})
@@ -413,29 +407,31 @@ impl Operation for SubscribeOp {
413407
target: _,
414408
subscriber,
415409
} => {
416-
// ALWAYS use the transport-level source address when available.
417-
// This is critical for NAT peers: they may embed a "known" but wrong address
418-
// (e.g., 127.0.0.1:31337 for loopback). The transport address is the only
419-
// reliable way to route responses back through the NAT.
410+
// Fill in subscriber's external address from transport layer if unknown.
411+
// This is the key step where the first recipient (gateway) determines the
412+
// subscriber's external address from the actual packet source address.
413+
// IMPORTANT: Must fill address BEFORE any .peer() calls to avoid panic.
420414
let mut subscriber = subscriber.clone();
421415

416+
if subscriber.peer_addr.is_unknown() {
417+
if let Some(addr) = source_addr {
418+
subscriber.set_addr(addr);
419+
tracing::debug!(
420+
tx = %id,
421+
%key,
422+
subscriber_addr = %addr,
423+
"subscribe: filled subscriber address from source_addr"
424+
);
425+
}
426+
}
427+
422428
tracing::debug!(
423429
tx = %id,
424430
%key,
425-
subscriber_orig = %subscriber.peer(),
431+
subscriber = %subscriber.peer(),
426432
source_addr = ?source_addr,
427433
"subscribe: processing RequestSub"
428434
);
429-
430-
if let Some(addr) = source_addr {
431-
subscriber.set_addr(addr);
432-
tracing::debug!(
433-
tx = %id,
434-
%key,
435-
subscriber_updated = %subscriber.peer(),
436-
"subscribe: updated subscriber address from transport source"
437-
);
438-
}
439435
let own_loc = op_manager.ring.connection_manager.own_location();
440436

441437
if !matches!(
@@ -462,10 +458,10 @@ impl Operation for SubscribeOp {
462458
"subscribe: handling RequestSub locally (contract available)"
463459
);
464460

465-
// Use upstream_addr for NAT routing - subscriber may embed wrong address
461+
// Local registration - no upstream NAT address
466462
if op_manager
467463
.ring
468-
.add_subscriber(key, subscriber.clone(), self.upstream_addr)
464+
.add_subscriber(key, subscriber.clone(), None)
469465
.is_err()
470466
{
471467
tracing::warn!(
@@ -532,13 +528,6 @@ impl Operation for SubscribeOp {
532528
subscribed: true,
533529
};
534530

535-
tracing::debug!(
536-
tx = %id,
537-
%key,
538-
upstream_addr = ?self.upstream_addr,
539-
"subscribe: creating ReturnSub with upstream_addr"
540-
);
541-
542531
return build_op_result(
543532
self.id,
544533
None,
@@ -741,10 +730,7 @@ impl Operation for SubscribeOp {
741730
subscribers_before = ?before_direct,
742731
"subscribe: attempting to register direct subscriber"
743732
);
744-
// Pass None: subscriber address was already corrected by Gateway at the
745-
// start of the subscribe flow. Using self.upstream_addr here would
746-
// incorrectly overwrite with the forwarder's address instead of the
747-
// original subscriber's Gateway-corrected address.
733+
// Local registration - no upstream NAT address
748734
if op_manager
749735
.ring
750736
.add_subscriber(key, subscriber.clone(), None)
@@ -895,7 +881,7 @@ impl Operation for SubscribeOp {
895881
subscribers_before = ?before_upstream,
896882
"subscribe: attempting to register upstream link"
897883
);
898-
// upstream_subscriber was stored in op state, no transport address available
884+
// Local registration - no upstream NAT address
899885
if op_manager
900886
.ring
901887
.add_subscriber(key, upstream_subscriber.clone(), None)
@@ -928,10 +914,7 @@ impl Operation for SubscribeOp {
928914
subscribers_before = ?before_provider,
929915
"subscribe: registering provider/subscription source"
930916
);
931-
// Pass None: sender was already looked up from source_addr (line ~866),
932-
// so it has the correct transport address. Using self.upstream_addr
933-
// would incorrectly use the original requester's address instead of
934-
// the provider's address.
917+
// Local registration - no upstream NAT address
935918
if op_manager
936919
.ring
937920
.add_subscriber(key, sender.clone(), None)
@@ -993,26 +976,17 @@ fn build_op_result(
993976
id: Transaction,
994977
state: Option<SubscribeState>,
995978
msg: Option<SubscribeMsg>,
996-
upstream_addr: Option<ObservedAddr>,
979+
upstream_addr: Option<std::net::SocketAddr>,
997980
) -> Result<OperationResult, OpError> {
998981
// For response messages (ReturnSub), use upstream_addr directly for routing.
999982
// This is more reliable than extracting from the message's target field, which
1000983
// may have been looked up from connection_manager (subject to race conditions).
1001984
// For forward messages (SeekNode, RequestSub, FetchRouting), use the message's target.
1002985
let target_addr = match &msg {
1003-
// Convert ObservedAddr to SocketAddr at the transport boundary
1004-
Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr.map(|a| a.socket_addr()),
986+
Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr,
1005987
_ => msg.as_ref().and_then(|m| m.target_addr()),
1006988
};
1007989

1008-
tracing::debug!(
1009-
tx = %id,
1010-
msg_type = ?msg.as_ref().map(|m| std::any::type_name_of_val(m)),
1011-
?upstream_addr,
1012-
?target_addr,
1013-
"build_op_result: computed target_addr"
1014-
);
1015-
1016990
let output_op = state.map(|state| SubscribeOp {
1017991
id,
1018992
state: Some(state),

crates/core/src/ring/live_tx.rs

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,71 @@ impl LiveTransactionTracker {
4747
self.tx_per_peer.contains_key(&peer_addr)
4848
}
4949

50-
pub(crate) fn still_alive(&self, tx: &Transaction) -> bool {
51-
self.tx_per_peer.iter().any(|e| e.value().contains(tx))
52-
}
53-
5450
pub(crate) fn len(&self) -> usize {
5551
self.tx_per_peer.len()
5652
}
53+
54+
/// Returns the total number of active transactions across all peers.
55+
pub(crate) fn active_transaction_count(&self) -> usize {
56+
self.tx_per_peer
57+
.iter()
58+
.map(|entry| entry.value().len())
59+
.sum()
60+
}
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use super::*;
66+
use crate::operations::connect::ConnectMsg;
67+
68+
#[test]
69+
fn active_transaction_count_empty() {
70+
let tracker = LiveTransactionTracker::new();
71+
assert_eq!(tracker.active_transaction_count(), 0);
72+
}
73+
74+
#[test]
75+
fn active_transaction_count_single_peer() {
76+
let tracker = LiveTransactionTracker::new();
77+
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
78+
79+
tracker.add_transaction(addr, Transaction::new::<ConnectMsg>());
80+
assert_eq!(tracker.active_transaction_count(), 1);
81+
82+
tracker.add_transaction(addr, Transaction::new::<ConnectMsg>());
83+
assert_eq!(tracker.active_transaction_count(), 2);
84+
}
85+
86+
#[test]
87+
fn active_transaction_count_multiple_peers() {
88+
let tracker = LiveTransactionTracker::new();
89+
let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
90+
let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap();
91+
92+
tracker.add_transaction(addr1, Transaction::new::<ConnectMsg>());
93+
tracker.add_transaction(addr1, Transaction::new::<ConnectMsg>());
94+
tracker.add_transaction(addr2, Transaction::new::<ConnectMsg>());
95+
96+
assert_eq!(tracker.active_transaction_count(), 3);
97+
}
98+
99+
#[test]
100+
fn active_transaction_count_after_removal() {
101+
let tracker = LiveTransactionTracker::new();
102+
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
103+
104+
let tx1 = Transaction::new::<ConnectMsg>();
105+
let tx2 = Transaction::new::<ConnectMsg>();
106+
107+
tracker.add_transaction(addr, tx1);
108+
tracker.add_transaction(addr, tx2);
109+
assert_eq!(tracker.active_transaction_count(), 2);
110+
111+
tracker.remove_finished_transaction(tx1);
112+
assert_eq!(tracker.active_transaction_count(), 1);
113+
114+
tracker.remove_finished_transaction(tx2);
115+
assert_eq!(tracker.active_transaction_count(), 0);
116+
}
57117
}

crates/core/src/ring/mod.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,8 @@ impl Ring {
330330
///
331331
/// The `upstream_addr` parameter is the transport-level address from which the subscribe
332332
/// message was received. This is used instead of the address embedded in `subscriber`
333-
/// because NAT peers may embed incorrect addresses in their messages.
333+
/// because NAT peers may embed incorrect (e.g., loopback) addresses in their messages.
334+
/// The transport address is the only reliable way to route back to them.
334335
pub fn add_subscriber(
335336
&self,
336337
contract: &ContractKey,
@@ -389,6 +390,11 @@ impl Ring {
389390

390391
const REGENERATE_DENSITY_MAP_INTERVAL: Duration = Duration::from_secs(60);
391392

393+
/// Maximum number of concurrent connection acquisition attempts.
394+
/// Allows parallel connection attempts to speed up network formation
395+
/// instead of serial blocking on a single connection at a time.
396+
const MAX_CONCURRENT_CONNECTIONS: usize = 3;
397+
392398
let mut check_interval = tokio::time::interval(CHECK_TICK_DURATION);
393399
check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
394400
let mut refresh_density_map = tokio::time::interval(REGENERATE_DENSITY_MAP_INTERVAL);
@@ -399,7 +405,6 @@ impl Ring {
399405
tokio::time::sleep(Duration::from_secs(2)).await;
400406
tracing::info!("Connection maintenance task: initial sleep completed");
401407

402-
let mut live_tx = None;
403408
let mut pending_conn_adds = BTreeSet::new();
404409
let mut this_peer = None;
405410
loop {
@@ -422,20 +427,17 @@ impl Ring {
422427
let mut skip_list = HashSet::new();
423428
skip_list.insert(this_peer);
424429

425-
// if there are no open connections, we need to acquire more
426-
if let Some(tx) = &live_tx {
427-
if !live_tx_tracker.still_alive(tx) {
428-
let _ = live_tx.take();
429-
}
430-
}
431-
430+
// Acquire new connections up to MAX_CONCURRENT_CONNECTIONS limit
431+
let active_count = live_tx_tracker.active_transaction_count();
432432
if let Some(ideal_location) = pending_conn_adds.pop_first() {
433-
if live_tx.is_none() {
433+
if active_count < MAX_CONCURRENT_CONNECTIONS {
434434
tracing::info!(
435+
active_connections = active_count,
436+
max_concurrent = MAX_CONCURRENT_CONNECTIONS,
435437
"Attempting to acquire new connection for location: {:?}",
436438
ideal_location
437439
);
438-
live_tx = self
440+
let tx = self
439441
.acquire_new(
440442
ideal_location,
441443
&skip_list,
@@ -451,18 +453,23 @@ impl Ring {
451453
);
452454
error
453455
})?;
454-
if live_tx.is_none() {
456+
if tx.is_none() {
455457
let conns = self.connection_manager.connection_count();
456458
tracing::warn!(
457459
"acquire_new returned None - likely no peers to query through (connections: {})",
458460
conns
459461
);
460462
} else {
461-
tracing::info!("Successfully initiated connection acquisition");
463+
tracing::info!(
464+
active_connections = active_count + 1,
465+
"Successfully initiated connection acquisition"
466+
);
462467
}
463468
} else {
464469
tracing::debug!(
465-
"Skipping connection attempt - live transaction still active, re-queuing location {}",
470+
active_connections = active_count,
471+
max_concurrent = MAX_CONCURRENT_CONNECTIONS,
472+
"At max concurrent connections, re-queuing location {}",
466473
ideal_location
467474
);
468475
pending_conn_adds.insert(ideal_location);

crates/core/src/transport/mod.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,7 @@ type MessagePayload = Vec<u8>;
2525

2626
type PacketId = u32;
2727

28-
pub use self::crypto::{TransportKeypair, TransportPublicKey};
29-
pub(crate) use self::{
30-
connection_handler::{
31-
create_connection_handler, InboundConnectionHandler, OutboundConnectionHandler,
32-
},
33-
peer_connection::PeerConnection,
34-
};
35-
36-
/// Address observed at the transport layer (from UDP packet source).
37-
///
28+
/// A wrapper around SocketAddr that represents an address observed at the transport layer.
3829
/// This is the "ground truth" for NAT scenarios - it's the actual address we see
3930
/// at the network layer, not what the peer claims in protocol messages.
4031
///
@@ -44,11 +35,6 @@ pub(crate) use self::{
4435
pub struct ObservedAddr(SocketAddr);
4536

4637
impl ObservedAddr {
47-
/// Create a new observed address from a socket address.
48-
pub fn new(addr: SocketAddr) -> Self {
49-
Self(addr)
50-
}
51-
5238
/// Get the underlying socket address.
5339
pub fn socket_addr(&self) -> SocketAddr {
5440
self.0
@@ -67,6 +53,14 @@ impl From<SocketAddr> for ObservedAddr {
6753
}
6854
}
6955

56+
pub use self::crypto::{TransportKeypair, TransportPublicKey};
57+
pub(crate) use self::{
58+
connection_handler::{
59+
create_connection_handler, InboundConnectionHandler, OutboundConnectionHandler,
60+
},
61+
peer_connection::PeerConnection,
62+
};
63+
7064
#[derive(Debug, thiserror::Error)]
7165
pub(crate) enum TransportError {
7266
#[error("transport handler channel closed, socket likely closed")]

0 commit comments

Comments
 (0)