Skip to content

Commit a25a6d5

Browse files
sanityclaude
andcommitted
refactor: wire protocol cleanup and NAT routing improvements
Consolidates approved changes from PRs #2169 and #2171: - Remove ObservedAddr in favor of std::net::SocketAddr - Add PeerAddr enum to explicitly represent known/unknown addresses - Update PeerKeyLocation to use PeerAddr for NAT scenarios - Clean up wire protocol types for address handling - Remove unused transient_manager module This is part of the larger #2164 effort to simplify peer identity. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 4e4bb2e commit a25a6d5

File tree

21 files changed

+965
-1300
lines changed

21 files changed

+965
-1300
lines changed

crates/core/src/node/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ use crate::operations::handle_op_request;
6161
pub(crate) use network_bridge::{ConnectionError, EventLoopNotificationsSender, NetworkBridge};
6262

6363
use crate::topology::rate::Rate;
64-
use crate::transport::{ObservedAddr, TransportKeypair, TransportPublicKey};
64+
use crate::transport::{TransportKeypair, TransportPublicKey};
6565
pub(crate) use op_state_manager::{OpManager, OpNotAvailable};
6666

6767
mod message_processor;
@@ -620,7 +620,7 @@ pub(super) async fn process_message<CB>(
620620
#[allow(clippy::too_many_arguments)]
621621
pub(crate) async fn process_message_decoupled<CB>(
622622
msg: NetMessage,
623-
source_addr: Option<ObservedAddr>,
623+
source_addr: Option<std::net::SocketAddr>,
624624
op_manager: Arc<OpManager>,
625625
conn_manager: CB,
626626
mut event_listener: Box<dyn NetEventRegister>,
@@ -681,7 +681,7 @@ pub(crate) async fn process_message_decoupled<CB>(
681681
#[allow(clippy::too_many_arguments)]
682682
async fn handle_pure_network_message<CB>(
683683
msg: NetMessage,
684-
source_addr: Option<ObservedAddr>,
684+
source_addr: Option<std::net::SocketAddr>,
685685
op_manager: Arc<OpManager>,
686686
conn_manager: CB,
687687
event_listener: &mut dyn NetEventRegister,
@@ -876,7 +876,7 @@ async fn process_message_v1<CB>(
876876
#[allow(clippy::too_many_arguments)]
877877
async fn handle_pure_network_message_v1<CB>(
878878
msg: NetMessageV1,
879-
source_addr: Option<ObservedAddr>,
879+
source_addr: Option<std::net::SocketAddr>,
880880
op_manager: Arc<OpManager>,
881881
mut conn_manager: CB,
882882
event_listener: &mut dyn NetEventRegister,

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 68 additions & 98 deletions
Large diffs are not rendered by default.

crates/core/src/node/p2p_impl.rs

Lines changed: 94 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{convert::Infallible, sync::Arc, time::Duration};
1+
use std::{collections::HashSet, convert::Infallible, sync::Arc, time::Duration};
22

33
use futures::{future::BoxFuture, FutureExt};
44
use tokio::task::JoinHandle;
@@ -21,9 +21,12 @@ use crate::{
2121
self, ContractHandler, ContractHandlerChannel, ExecutorToEventLoopChannel,
2222
NetworkEventListenerHalve, WaitingResolution,
2323
},
24-
message::NodeEvent,
24+
message::{NetMessage, NetMessageV1, NodeEvent},
2525
node::NodeConfig,
26-
operations::connect::{self},
26+
operations::{
27+
connect::{self, ConnectOp},
28+
OpEnum,
29+
},
2730
};
2831

2932
use super::OpManager;
@@ -46,10 +49,9 @@ pub(crate) struct NodeP2P {
4649
}
4750

4851
impl NodeP2P {
49-
/// Aggressively monitor connections during startup.
50-
/// This is a static version that can be spawned as a background task.
51-
async fn aggressive_initial_connections_static(op_manager: &OpManager) {
52-
let min_connections = op_manager.ring.connection_manager.min_connections;
52+
/// Aggressively establish connections during startup to avoid on-demand delays
53+
async fn aggressive_initial_connections(&self) {
54+
let min_connections = self.op_manager.ring.connection_manager.min_connections;
5355

5456
tracing::info!(
5557
"Starting aggressive connection acquisition phase (target: {} connections)",
@@ -60,13 +62,15 @@ impl NodeP2P {
6062
// to avoid the 10+ second delays on first GET operations
6163
let start = std::time::Instant::now();
6264
let max_duration = Duration::from_secs(10);
65+
let mut last_connection_count = 0;
66+
let mut stable_rounds = 0;
6367

6468
while start.elapsed() < max_duration {
6569
// Cooperative yielding for CI environments with limited CPU cores
6670
// Research shows CI (2 cores) needs explicit yields to prevent task starvation
6771
tokio::task::yield_now().await;
6872

69-
let current_connections = op_manager.ring.open_connections();
73+
let current_connections = self.op_manager.ring.open_connections();
7074

7175
// If we've reached our target, we're done
7276
if current_connections >= min_connections {
@@ -78,6 +82,33 @@ impl NodeP2P {
7882
break;
7983
}
8084

85+
// If connection count is stable for 3 rounds, actively trigger more connections
86+
if current_connections == last_connection_count {
87+
stable_rounds += 1;
88+
if stable_rounds >= 3 && current_connections > 0 {
89+
tracing::info!(
90+
"Connection count stable at {}, triggering active peer discovery",
91+
current_connections
92+
);
93+
94+
// Trigger the connection maintenance task to actively look for more peers
95+
// In small networks, we want to be more aggressive
96+
for _ in 0..3 {
97+
// Yield before each connection attempt to prevent blocking other tasks
98+
tokio::task::yield_now().await;
99+
100+
if let Err(e) = self.trigger_connection_maintenance().await {
101+
tracing::warn!("Failed to trigger connection maintenance: {}", e);
102+
}
103+
tokio::time::sleep(Duration::from_millis(100)).await;
104+
}
105+
stable_rounds = 0;
106+
}
107+
} else {
108+
stable_rounds = 0;
109+
last_connection_count = current_connections;
110+
}
111+
81112
tracing::debug!(
82113
"Current connections: {}/{}, waiting for more peers (elapsed: {}s)",
83114
current_connections,
@@ -94,7 +125,7 @@ impl NodeP2P {
94125
tokio::time::sleep(sleep_duration).await;
95126
}
96127

97-
let final_connections = op_manager.ring.open_connections();
128+
let final_connections = self.op_manager.ring.open_connections();
98129
tracing::info!(
99130
"Aggressive connection phase complete. Final connections: {}/{} (took {}s)",
100131
final_connections,
@@ -103,6 +134,56 @@ impl NodeP2P {
103134
);
104135
}
105136

137+
/// Trigger the connection maintenance task to actively look for more peers
138+
async fn trigger_connection_maintenance(&self) -> anyhow::Result<()> {
139+
let ideal_location = Location::random();
140+
141+
// Find a connected peer to query
142+
let query_target = {
143+
let router = self.op_manager.ring.router.read();
144+
self.op_manager.ring.connection_manager.routing(
145+
ideal_location,
146+
None,
147+
&HashSet::<PeerId>::new(),
148+
&router,
149+
)
150+
};
151+
152+
if let Some(query_target) = query_target {
153+
let joiner = self.op_manager.ring.connection_manager.own_location();
154+
let ttl = self
155+
.op_manager
156+
.ring
157+
.max_hops_to_live
158+
.max(1)
159+
.min(u8::MAX as usize) as u8;
160+
let target_connections = self.op_manager.ring.connection_manager.min_connections;
161+
162+
let (tx, op, msg) = ConnectOp::initiate_join_request(
163+
joiner,
164+
query_target.clone(),
165+
ideal_location,
166+
ttl,
167+
target_connections,
168+
self.op_manager.connect_forward_estimator.clone(),
169+
);
170+
171+
tracing::debug!(
172+
%tx,
173+
query_peer = %query_target.peer(),
174+
%ideal_location,
175+
"Triggering connection maintenance connect request"
176+
);
177+
self.op_manager
178+
.notify_op_change(
179+
NetMessage::V1(NetMessageV1::Connect(msg)),
180+
OpEnum::Connect(Box::new(op)),
181+
)
182+
.await?;
183+
}
184+
185+
Ok(())
186+
}
106187
pub(super) async fn run_node(mut self) -> anyhow::Result<Infallible> {
107188
if self.should_try_connect {
108189
let join_handle = connect::initial_join_procedure(
@@ -111,21 +192,11 @@ impl NodeP2P {
111192
)
112193
.await?;
113194
self.initial_join_task = Some(join_handle);
114-
}
115195

116-
// Spawn aggressive connection acquisition as a background task.
117-
// This MUST run concurrently with the event listener, not before it,
118-
// because the event listener is what binds the UDP socket and processes
119-
// incoming messages - without it running, no connections can be established.
120-
let op_manager_for_connections = self.op_manager.clone();
121-
let should_try_connect = self.should_try_connect;
122-
tokio::spawn(async move {
123-
if should_try_connect {
124-
// Small delay to ensure event listener has started
125-
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
126-
Self::aggressive_initial_connections_static(&op_manager_for_connections).await;
127-
}
128-
});
196+
// After connecting to gateways, aggressively try to reach min_connections
197+
// This is important for fast startup and avoiding on-demand connection delays
198+
self.aggressive_initial_connections().await;
199+
}
129200

130201
let f = self.conn_manager.run_event_listener(
131202
self.op_manager.clone(),

0 commit comments

Comments
 (0)