Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::operations::handle_op_request;
pub(crate) use network_bridge::{ConnectionError, EventLoopNotificationsSender, NetworkBridge};

use crate::topology::rate::Rate;
use crate::transport::{ObservedAddr, TransportKeypair, TransportPublicKey};
use crate::transport::{TransportKeypair, TransportPublicKey};
pub(crate) use op_state_manager::{OpManager, OpNotAvailable};

mod message_processor;
Expand Down Expand Up @@ -620,7 +620,7 @@ pub(super) async fn process_message<CB>(
#[allow(clippy::too_many_arguments)]
pub(crate) async fn process_message_decoupled<CB>(
msg: NetMessage,
source_addr: Option<ObservedAddr>,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
mut event_listener: Box<dyn NetEventRegister>,
Expand Down Expand Up @@ -681,7 +681,7 @@ pub(crate) async fn process_message_decoupled<CB>(
#[allow(clippy::too_many_arguments)]
async fn handle_pure_network_message<CB>(
msg: NetMessage,
source_addr: Option<ObservedAddr>,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
event_listener: &mut dyn NetEventRegister,
Expand Down Expand Up @@ -876,7 +876,7 @@ async fn process_message_v1<CB>(
#[allow(clippy::too_many_arguments)]
async fn handle_pure_network_message_v1<CB>(
msg: NetMessageV1,
source_addr: Option<ObservedAddr>,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
mut conn_manager: CB,
event_listener: &mut dyn NetEventRegister,
Expand Down
166 changes: 68 additions & 98 deletions crates/core/src/node/network_bridge/p2p_protoc.rs

Large diffs are not rendered by default.

117 changes: 94 additions & 23 deletions crates/core/src/node/p2p_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::Infallible, sync::Arc, time::Duration};
use std::{collections::HashSet, convert::Infallible, sync::Arc, time::Duration};

use futures::{future::BoxFuture, FutureExt};
use tokio::task::JoinHandle;
Expand All @@ -21,9 +21,12 @@ use crate::{
self, ContractHandler, ContractHandlerChannel, ExecutorToEventLoopChannel,
NetworkEventListenerHalve, WaitingResolution,
},
message::NodeEvent,
message::{NetMessage, NetMessageV1, NodeEvent},
node::NodeConfig,
operations::connect::{self},
operations::{
connect::{self, ConnectOp},
OpEnum,
},
};

use super::OpManager;
Expand All @@ -46,10 +49,9 @@ pub(crate) struct NodeP2P {
}

impl NodeP2P {
/// Aggressively monitor connections during startup.
/// This is a static version that can be spawned as a background task.
async fn aggressive_initial_connections_static(op_manager: &OpManager) {
let min_connections = op_manager.ring.connection_manager.min_connections;
/// Aggressively establish connections during startup to avoid on-demand delays
async fn aggressive_initial_connections(&self) {
let min_connections = self.op_manager.ring.connection_manager.min_connections;

tracing::info!(
"Starting aggressive connection acquisition phase (target: {} connections)",
Expand All @@ -60,13 +62,15 @@ impl NodeP2P {
// to avoid the 10+ second delays on first GET operations
let start = std::time::Instant::now();
let max_duration = Duration::from_secs(10);
let mut last_connection_count = 0;
let mut stable_rounds = 0;

while start.elapsed() < max_duration {
// Cooperative yielding for CI environments with limited CPU cores
// Research shows CI (2 cores) needs explicit yields to prevent task starvation
tokio::task::yield_now().await;

let current_connections = op_manager.ring.open_connections();
let current_connections = self.op_manager.ring.open_connections();

// If we've reached our target, we're done
if current_connections >= min_connections {
Expand All @@ -78,6 +82,33 @@ impl NodeP2P {
break;
}

// If connection count is stable for 3 rounds, actively trigger more connections
if current_connections == last_connection_count {
stable_rounds += 1;
if stable_rounds >= 3 && current_connections > 0 {
tracing::info!(
"Connection count stable at {}, triggering active peer discovery",
current_connections
);

// Trigger the connection maintenance task to actively look for more peers
// In small networks, we want to be more aggressive
for _ in 0..3 {
// Yield before each connection attempt to prevent blocking other tasks
tokio::task::yield_now().await;

if let Err(e) = self.trigger_connection_maintenance().await {
tracing::warn!("Failed to trigger connection maintenance: {}", e);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
stable_rounds = 0;
}
} else {
stable_rounds = 0;
last_connection_count = current_connections;
}

tracing::debug!(
"Current connections: {}/{}, waiting for more peers (elapsed: {}s)",
current_connections,
Expand All @@ -94,7 +125,7 @@ impl NodeP2P {
tokio::time::sleep(sleep_duration).await;
}

let final_connections = op_manager.ring.open_connections();
let final_connections = self.op_manager.ring.open_connections();
tracing::info!(
"Aggressive connection phase complete. Final connections: {}/{} (took {}s)",
final_connections,
Expand All @@ -103,6 +134,56 @@ impl NodeP2P {
);
}

/// Trigger the connection maintenance task to actively look for more peers
async fn trigger_connection_maintenance(&self) -> anyhow::Result<()> {
let ideal_location = Location::random();

// Find a connected peer to query
let query_target = {
let router = self.op_manager.ring.router.read();
self.op_manager.ring.connection_manager.routing(
ideal_location,
None,
&HashSet::<PeerId>::new(),
&router,
)
};

if let Some(query_target) = query_target {
let joiner = self.op_manager.ring.connection_manager.own_location();
let ttl = self
.op_manager
.ring
.max_hops_to_live
.max(1)
.min(u8::MAX as usize) as u8;
let target_connections = self.op_manager.ring.connection_manager.min_connections;

let (tx, op, msg) = ConnectOp::initiate_join_request(
joiner,
query_target.clone(),
ideal_location,
ttl,
target_connections,
self.op_manager.connect_forward_estimator.clone(),
);

tracing::debug!(
%tx,
query_peer = %query_target.peer(),
%ideal_location,
"Triggering connection maintenance connect request"
);
self.op_manager
.notify_op_change(
NetMessage::V1(NetMessageV1::Connect(msg)),
OpEnum::Connect(Box::new(op)),
)
.await?;
}

Ok(())
}
pub(super) async fn run_node(mut self) -> anyhow::Result<Infallible> {
if self.should_try_connect {
let join_handle = connect::initial_join_procedure(
Expand All @@ -111,21 +192,11 @@ impl NodeP2P {
)
.await?;
self.initial_join_task = Some(join_handle);
}

// Spawn aggressive connection acquisition as a background task.
// This MUST run concurrently with the event listener, not before it,
// because the event listener is what binds the UDP socket and processes
// incoming messages - without it running, no connections can be established.
let op_manager_for_connections = self.op_manager.clone();
let should_try_connect = self.should_try_connect;
tokio::spawn(async move {
if should_try_connect {
// Small delay to ensure event listener has started
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Self::aggressive_initial_connections_static(&op_manager_for_connections).await;
}
});
// After connecting to gateways, aggressively try to reach min_connections
// This is important for fast startup and avoiding on-demand connection delays
self.aggressive_initial_connections().await;
}

let f = self.conn_manager.run_event_listener(
self.op_manager.clone(),
Expand Down
Loading
Loading