Skip to content
654 changes: 467 additions & 187 deletions src/dht_network_manager.rs

Large diffs are not rendered by default.

193 changes: 95 additions & 98 deletions src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ struct WireMessage {
/// Payload bytes used for keepalive messages to prevent connection timeouts.
const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive";

/// Channel capacity for the per-stack recv task mpsc channel.
const RECV_CHANNEL_CAPACITY: usize = 256;

/// Configuration for a P2P node
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeConfig {
Expand Down Expand Up @@ -1202,8 +1199,9 @@ impl P2PNode {
let listen_addrs = self.listen_addrs.read().await;
info!("P2P node started on addresses: {:?}", *listen_addrs);

// Start message receiving system
self.start_message_receiving_system().await?;
// NOTE: Message receiving is now integrated into the accept loop in start_network_listeners()
// The old start_message_receiving_system() is no longer needed as it competed with the accept
// loop for incoming connections, causing messages to be lost.

// Connect to bootstrap peers
self.connect_bootstrap_peers().await?;
Expand All @@ -1225,7 +1223,9 @@ impl P2PNode {
*la = addrs.clone();
}

// Spawn a background accept loop that handles incoming connections from either stack
// Spawn a background loop that accepts connections AND handles their messages
// Uses accept_with_message() which is the ONLY place calling transport.accept(),
// avoiding competition between multiple accept loops.
let event_tx = self.event_tx.clone();
let peers = self.peers.clone();
let active_connections = self.active_connections.clone();
Expand All @@ -1237,27 +1237,76 @@ impl P2PNode {
if shutdown.load(Ordering::Relaxed) {
break;
}
match dual.accept_any().await {
Ok((ant_peer_id, remote_sock)) => {
// Enforce rate limiting — skip peer registration when rate-limited
match dual.accept_with_message().await {
Ok((ant_peer_id, remote_sock, message_data)) => {
let peer_id =
crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
let remote_addr = NetworkAddress::from(remote_sock);

// Rate limiting - log if rate limit exceeded
if let Err(e) = rate_limiter.check_ip(&remote_sock.ip()) {
warn!(
"Rate-limited incoming connection from {}: {}",
remote_sock, e
);
continue;
warn!("Rate limit exceeded for IP {}: {}", remote_sock.ip(), e);
}

debug!(
"Accepted connection from {} (protocol validation pending ant-quic implementation)",
remote_sock
);

let peer_id = ant_peer_id_to_string(&ant_peer_id);
let remote_addr = NetworkAddress::from(remote_sock);
// Register peer
let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
register_new_peer(&peers, &peer_id, &remote_addr).await;
active_connections.write().await.insert(peer_id);
active_connections.write().await.insert(peer_id.clone());

// Process the message
trace!("Received {} bytes from {}", message_data.len(), peer_id);
match serde_json::from_slice::<serde_json::Value>(&message_data) {
Ok(value) => {
if let (Some(protocol), Some(data), Some(from)) = (
value.get("protocol").and_then(|v| v.as_str()),
value.get("data").and_then(|v| v.as_array()),
value.get("from").and_then(|v| v.as_str()),
) {
// Security: Validate that 'from' matches the actual connection peer ID
// This prevents peer ID spoofing attacks
if from != peer_id {
warn!(
"Peer ID mismatch: message claims from='{}' but connection is from '{}'. Using connection peer ID.",
from, peer_id
);
}
Comment on lines +1269 to +1274
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the code correctly uses the authenticated peer_id over the claimed from field, it only logs a warning for peer ID spoofing attempts. Consider tracking these incidents and implementing rate limiting or temporary bans for peers that repeatedly attempt ID spoofing, as this indicates malicious behavior.

Copilot uses AI. Check for mistakes.
// Always use the authenticated peer_id from the connection, not the claimed 'from'
let verified_source = peer_id.clone();

// Security: Validate u8 bounds on payload bytes
let payload: Vec<u8> = data
.iter()
.filter_map(|v| {
v.as_u64().and_then(|n| {
if n <= 255 {
Some(n as u8)
} else {
warn!("Invalid byte value {} in message payload, skipping", n);
None
}
})
})
.collect();
trace!(
"Forwarding message on topic {} from {}",
protocol, verified_source
);
let _ = event_tx.send(P2PEvent::Message {
topic: protocol.to_string(),
source: verified_source,
data: payload,
});
Comment on lines +1271 to +1300
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code logs a warning for peer ID spoofing but continues processing the message. Consider rejecting the message entirely or adding metrics to track spoofing attempts, as this could indicate an active attack.

Suggested change
"Peer ID mismatch: message claims from='{}' but connection is from '{}'. Using connection peer ID.",
from, peer_id
);
}
// Always use the authenticated peer_id from the connection, not the claimed 'from'
let verified_source = peer_id.clone();
// Security: Validate u8 bounds on payload bytes
let payload: Vec<u8> = data
.iter()
.filter_map(|v| {
v.as_u64().and_then(|n| {
if n <= 255 {
Some(n as u8)
} else {
warn!("Invalid byte value {} in message payload, skipping", n);
None
}
})
})
.collect();
trace!(
"Forwarding message on topic {} from {}",
protocol, verified_source
);
let _ = event_tx.send(P2PEvent::Message {
topic: protocol.to_string(),
source: verified_source,
data: payload,
});
"Peer ID mismatch: message claims from='{}' but connection is from '{}'. Dropping message.",
from, peer_id
);
} else {
// Always use the authenticated peer_id from the connection, not the claimed 'from'
let verified_source = peer_id.clone();
// Security: Validate u8 bounds on payload bytes
let payload: Vec<u8> = data
.iter()
.filter_map(|v| {
v.as_u64().and_then(|n| {
if n <= 255 {
Some(n as u8)
} else {
warn!("Invalid byte value {} in message payload, skipping", n);
None
}
})
})
.collect();
trace!(
"Forwarding message on topic {} from {}",
protocol, verified_source
);
let _ = event_tx.send(P2PEvent::Message {
topic: protocol.to_string(),
source: verified_source,
data: payload,
});
}

Copilot uses AI. Check for mistakes.
} else {
debug!(
"Message from {} missing required fields (protocol/data/from)",
peer_id
);
}
}
Err(e) => {
debug!("Failed to parse JSON message from {}: {}", peer_id, e);
}
}
}
Err(e) => {
warn!("Accept failed: {}", e);
Expand All @@ -1268,68 +1317,13 @@ impl P2PNode {
});
*self.listener_handle.write().await = Some(handle);

info!("Dual-stack listeners active on: {:?}", addrs);
Ok(())
}

/// Start the message receiving system with channel-based background tasks
///
/// Spawns per-stack recv tasks that feed into a single mpsc channel,
/// giving instant wakeup on data arrival without poll/timeout latency.
async fn start_message_receiving_system(&self) -> Result<()> {
info!("Starting message receiving system");

let (tx, mut rx) = tokio::sync::mpsc::channel(RECV_CHANNEL_CAPACITY);
let shutdown = Arc::clone(&self.shutdown);

let mut handles = Vec::new();

// Spawn per-stack recv tasks — both feed into the same channel
if let Some(v6) = self.dual_node.v6.as_ref() {
handles.push(v6.spawn_recv_task(tx.clone(), Arc::clone(&shutdown)));
}
if let Some(v4) = self.dual_node.v4.as_ref() {
handles.push(v4.spawn_recv_task(tx.clone(), Arc::clone(&shutdown)));
}
drop(tx); // drop original sender so rx closes when all tasks exit

let event_tx = self.event_tx.clone();
handles.push(tokio::spawn(async move {
info!("Message receive loop started");
while let Some((peer_id, bytes)) = rx.recv().await {
let transport_peer_id = ant_peer_id_to_string(&peer_id);
info!(
"Received {} bytes from peer {}",
bytes.len(),
transport_peer_id
);

if bytes == KEEPALIVE_PAYLOAD {
trace!("Received keepalive from {}", transport_peer_id);
continue;
}

match parse_protocol_message(&bytes, &transport_peer_id) {
Some(event) => {
let _ = event_tx.send(event);
}
None => {
warn!("Failed to parse protocol message ({} bytes)", bytes.len());
}
}
}
info!("Message receive loop ended — channel closed");
}));

*self.recv_handles.write().await = handles;
// NOTE: start_message_receiving_system() is NOT called here
// because accept_with_message() handles both connection acceptance and message receiving.

info!("Dual-stack listeners active on: {:?}", addrs);
Ok(())
}

// MCP removed

// MCP removed

/// Run the P2P node (blocks until shutdown)
pub async fn run(&self) -> Result<()> {
if !*self.running.read().await {
Expand Down Expand Up @@ -1698,14 +1692,9 @@ impl P2PNode {
)));
}

// **NEW**: Check if the ant-quic connection is actually active
// Check if the ant-quic connection is actually active
// This is the critical fix for the connection state synchronization issue
if !self.is_connection_active(peer_id).await {
debug!(
"Connection to peer {} exists in peers map but ant-quic connection is closed",
peer_id
);

// Clean up stale peer entry
self.remove_peer(peer_id).await;

Expand Down Expand Up @@ -1799,10 +1788,13 @@ impl P2PNode {
/// Returns `None` if the bytes cannot be deserialized as a valid `WireMessage`.
///
/// The `from` field is a required part of the wire protocol but is **not**
/// used as the event source. Instead, `source` — the transport-level peer ID
/// used as the event source. Instead, `source` — the transport-level peer ID
/// derived from the authenticated QUIC connection — is used so that consumers
/// can pass it directly to `send_message()`. This eliminates a spoofing
/// can pass it directly to `send_message()`. This eliminates a spoofing
/// vector where a peer could claim an arbitrary identity via the payload.
///
/// **Note**: This function is only used in tests to verify message parsing logic.
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment claims this function is only used in tests, but the #[cfg(test)] attribute on line 1798 already enforces this. The comment is redundant and should be removed or simplified.

Suggested change
/// **Note**: This function is only used in tests to verify message parsing logic.
/// **Note**: This function is primarily used in tests to verify message parsing logic.

Copilot uses AI. Check for mistakes.
#[cfg(test)]
fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<P2PEvent> {
let message: WireMessage = bincode::deserialize(bytes).ok()?;

Expand Down Expand Up @@ -2388,19 +2380,20 @@ impl P2PNode {
self.dht.as_ref()
}

/// Store a value in the DHT
/// Store a value in the local DHT
///
/// This method stores data in the local DHT instance only. For network-wide
/// replication across multiple nodes, use `DhtNetworkManager` instead,
/// which owns a P2PNode for transport and coordinates replication.
pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
if let Some(ref dht) = self.dht {
let mut dht_instance = dht.write().await;
let dht_key = crate::dht::DhtKey::from_bytes(key);
dht_instance
.store(&dht_key, value.clone())
.await
.map_err(|e| {
P2PError::Dht(crate::error::DhtError::StoreFailed(
format!("{:?}: {e}", key).into(),
))
})?;
dht_instance.store(&dht_key, value).await.map_err(|e| {
P2PError::Dht(crate::error::DhtError::StoreFailed(
format!("{:?}: {e}", key).into(),
))
})?;

Ok(())
} else {
Expand All @@ -2410,7 +2403,11 @@ impl P2PNode {
}
}

/// Retrieve a value from the DHT
/// Retrieve a value from the local DHT
///
/// This method retrieves data from the local DHT instance only. For network-wide
/// lookups across multiple nodes, use `DhtNetworkManager` instead,
/// which owns a P2PNode for transport and coordinates distributed queries.
pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
if let Some(ref dht) = self.dht {
let dht_instance = dht.read().await;
Expand Down
Loading
Loading