Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adaptive/dht_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ impl AdaptiveDHT {
.map(|_| DhtNetworkResult::PutSuccess {
key,
replicated_to: 1,
peer_outcomes: Vec::new(),
})
.map_err(|e| AdaptiveNetworkError::Other(e.to_string())),
AdaptiveDhtBackend::Network { manager } => {
Expand Down
23 changes: 23 additions & 0 deletions src/adaptive/trust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,25 @@ pub struct NodeStatistics {
/// Statistics update type
#[derive(Debug, Clone)]
pub enum NodeStatisticsUpdate {
/// Node uptime increased by the given number of seconds.
Uptime(u64),
/// Peer provided a correct response.
CorrectResponse,
/// Peer failed to provide a response (generic failure).
FailedResponse,
/// Peer did not have the requested data.
DataUnavailable,
/// Peer returned data that failed integrity verification.
/// Counts as 2 failures due to severity.
CorruptedData,
/// Peer violated the expected wire protocol.
/// Counts as 2 failures due to severity.
ProtocolViolation,
/// Storage contributed (in GB).
StorageContributed(u64),
/// Bandwidth contributed (in GB).
BandwidthContributed(u64),
/// Compute cycles contributed.
ComputeContributed(u64),
}

Expand Down Expand Up @@ -155,6 +169,15 @@ impl EigenTrustEngine {
NodeStatisticsUpdate::Uptime(seconds) => node_stats.uptime += seconds,
NodeStatisticsUpdate::CorrectResponse => node_stats.correct_responses += 1,
NodeStatisticsUpdate::FailedResponse => node_stats.failed_responses += 1,
NodeStatisticsUpdate::DataUnavailable => node_stats.failed_responses += 1,
NodeStatisticsUpdate::CorruptedData => {
// Corrupted data is a severe violation — counts as 2 failures
node_stats.failed_responses += 2;
}
NodeStatisticsUpdate::ProtocolViolation => {
// Protocol violations are severe — counts as 2 failures
node_stats.failed_responses += 2;
}
NodeStatisticsUpdate::StorageContributed(gb) => node_stats.storage_contributed += gb,
NodeStatisticsUpdate::BandwidthContributed(gb) => {
node_stats.bandwidth_contributed += gb
Expand Down
173 changes: 140 additions & 33 deletions src/dht_network_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,51 @@ pub enum DhtNetworkOperation {
Leave,
}

/// Per-peer outcome from a DHT PUT replication attempt.
///
/// Captures whether each target peer successfully stored the value,
/// along with optional error details.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerStoreOutcome {
/// The peer that was targeted for replication.
pub peer_id: PeerId,
/// Whether the store operation succeeded on this peer.
pub success: bool,
/// Error description if the operation failed.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}

/// DHT network operation result
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DhtNetworkResult {
/// Successful PUT operation
PutSuccess { key: Key, replicated_to: usize },
PutSuccess {
key: Key,
replicated_to: usize,
/// Per-peer replication outcomes (empty for remote handlers).
#[serde(default)]
peer_outcomes: Vec<PeerStoreOutcome>,
},
/// Successful GET operation
GetSuccess {
key: Key,
value: Vec<u8>,
source: PeerId,
},
/// GET operation found no value
GetNotFound { key: Key },
GetNotFound {
key: Key,
/// Number of peers queried during the lookup.
#[serde(default)]
peers_queried: usize,
/// Number of peers that returned errors during the lookup.
#[serde(default)]
peers_failed: usize,
/// Last error encountered during the lookup, if any.
#[serde(default, skip_serializing_if = "Option::is_none")]
last_error: Option<String>,
},
/// Nodes found for FIND_NODE
NodesFound {
key: Key,
Expand Down Expand Up @@ -171,6 +203,21 @@ pub enum DhtNetworkResult {
Error { operation: String, error: String },
}

/// Returns the variant name of a [`DhtNetworkResult`] without exposing internal data.
fn dht_network_result_variant_name(result: &DhtNetworkResult) -> &'static str {
match result {
DhtNetworkResult::PutSuccess { .. } => "PutSuccess",
DhtNetworkResult::GetSuccess { .. } => "GetSuccess",
DhtNetworkResult::GetNotFound { .. } => "GetNotFound",
DhtNetworkResult::NodesFound { .. } => "NodesFound",
DhtNetworkResult::ValueFound { .. } => "ValueFound",
DhtNetworkResult::PongReceived { .. } => "PongReceived",
DhtNetworkResult::JoinSuccess { .. } => "JoinSuccess",
DhtNetworkResult::LeaveSuccess => "LeaveSuccess",
DhtNetworkResult::Error { .. } => "Error",
}
}

/// DHT message envelope for network transmission
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DhtNetworkMessage {
Expand Down Expand Up @@ -326,6 +373,17 @@ pub enum DhtNetworkEvent {
},
/// Error occurred
Error { error: String },
/// Replication result for a PUT operation with per-peer details
ReplicationResult {
/// The key being replicated
key: Key,
/// Total number of peers targeted
total_peers: usize,
/// Number of peers that successfully stored the value
successful_peers: usize,
/// Per-peer outcomes
outcomes: Vec<PeerStoreOutcome>,
},
}

/// DHT network statistics
Expand Down Expand Up @@ -577,6 +635,7 @@ impl DhtNetworkManager {
return Ok(DhtNetworkResult::PutSuccess {
key,
replicated_to: 1,
peer_outcomes: Vec::new(),
});
}

Expand Down Expand Up @@ -608,21 +667,18 @@ impl DhtNetworkManager {
// Execute all replication requests in parallel
let results = futures::future::join_all(replication_futures).await;

// Count successful replications
for (peer_id, result) in results {
match result {
Ok(DhtNetworkResult::PutSuccess { .. }) => {
replicated_count += 1;
debug!("Replicated to peer: {}", peer_id);
}
Ok(result) => {
debug!("Unexpected result from peer {}: {:?}", peer_id, result);
}
Err(e) => {
debug!("Failed to replicate to peer {}: {}", peer_id, e);
}
}
}
let (remote_successes, peer_outcomes) = self.collect_replication_outcomes(results).await;
replicated_count += remote_successes;

// Emit replication result event
let total_peers = peer_outcomes.len();
let successful_peers = peer_outcomes.iter().filter(|o| o.success).count();
let _ = self.event_tx.send(DhtNetworkEvent::ReplicationResult {
key,
total_peers,
successful_peers,
outcomes: peer_outcomes.clone(),
});
Comment on lines +675 to +681
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DhtNetworkEvent::ReplicationResult computes total_peers/successful_peers solely from peer_outcomes, but replicated_to in PutSuccess appears to include the local store (at least in several places it’s initialized with 1). This mismatch can confuse consumers (event says N successes, result says N+1). Consider either (a) including the local store as an explicit PeerStoreOutcome, or (b) clearly documenting/renaming fields to indicate these counts are for remote replication targets only.

Copilot uses AI. Check for mistakes.

info!(
"PUT operation completed: key={}, replicated_to={}/{}",
Expand All @@ -634,6 +690,7 @@ impl DhtNetworkManager {
Ok(DhtNetworkResult::PutSuccess {
key,
replicated_to: replicated_count,
peer_outcomes,
})
}

Expand Down Expand Up @@ -714,24 +771,13 @@ impl DhtNetworkManager {
});

let results = futures::future::join_all(replication_futures).await;
for (peer_id, result) in results {
match result {
Ok(DhtNetworkResult::PutSuccess { .. }) => {
replicated_count += 1;
debug!("Replicated to peer: {}", peer_id);
}
Ok(other) => {
debug!("Unexpected result from peer {}: {:?}", peer_id, other);
}
Err(e) => {
debug!("Failed to replicate to peer {}: {}", peer_id, e);
}
}
}
let (remote_successes, peer_outcomes) = self.collect_replication_outcomes(results).await;
replicated_count += remote_successes;

Ok(DhtNetworkResult::PutSuccess {
key,
replicated_to: replicated_count,
peer_outcomes,
})
}

Expand Down Expand Up @@ -768,6 +814,8 @@ impl DhtNetworkManager {
let mut queried_nodes = HashSet::new();
let mut candidate_nodes = VecDeque::new();
let mut queued_peer_ids: HashSet<String> = HashSet::new();
let mut peers_failed: usize = 0;
let mut last_error: Option<String> = None;

// Get initial candidates from local routing table and connected peers
// IMPORTANT: Use find_closest_nodes_local to avoid making network requests
Expand Down Expand Up @@ -915,10 +963,14 @@ impl DhtNetworkManager {
}
Err(e) => {
debug!("Query to {} failed: {}", peer_id, e);
peers_failed += 1;
last_error = Some(e.to_string());
self.record_peer_failure(&peer_id).await;
}
Ok(other) => {
debug!("Unexpected result from {}: {:?}", peer_id, other);
peers_failed += 1;
last_error = Some(format!("Unexpected result: {:?}", other));
self.record_peer_failure(&peer_id).await;
}
}
Expand All @@ -945,7 +997,12 @@ impl DhtNetworkManager {
hex::encode(key),
queried_nodes.len()
);
Ok(DhtNetworkResult::GetNotFound { key: *key })
Ok(DhtNetworkResult::GetNotFound {
key: *key,
peers_queried: queried_nodes.len(),
peers_failed,
last_error,
})
}

/// Backwards-compatible API that performs a full iterative lookup.
Expand Down Expand Up @@ -1468,6 +1525,55 @@ impl DhtNetworkManager {
}
}

/// Process replication results from parallel PUT requests.
///
/// Returns the number of successful replications and the per-peer outcomes.
async fn collect_replication_outcomes(
&self,
results: Vec<(PeerId, Result<DhtNetworkResult>)>,
) -> (usize, Vec<PeerStoreOutcome>) {
let mut successes = 0usize;
let mut outcomes = Vec::with_capacity(results.len());
for (peer_id, result) in results {
match result {
Ok(DhtNetworkResult::PutSuccess { .. }) => {
successes += 1;
self.record_peer_success(&peer_id).await;
debug!("Replicated to peer: {}", peer_id);
outcomes.push(PeerStoreOutcome {
peer_id,
success: true,
error: None,
});
}
Ok(other) => {
self.record_peer_failure(&peer_id).await;
let err_msg = format!(
"Unexpected result variant: {}",
dht_network_result_variant_name(&other)
);
debug!("Unexpected result from peer {}: {}", peer_id, err_msg);
outcomes.push(PeerStoreOutcome {
peer_id,
success: false,
error: Some(err_msg),
});
}
Err(e) => {
self.record_peer_failure(&peer_id).await;
let err_msg = e.to_string();
debug!("Failed to replicate to peer {}: {}", peer_id, err_msg);
outcomes.push(PeerStoreOutcome {
peer_id,
success: false,
error: Some(err_msg),
});
}
}
}
(successes, outcomes)
}

async fn record_peer_success(&self, peer_id: &str) {
if let Err(e) = self.node.report_peer_success(peer_id).await {
trace!(peer_id = peer_id, error = %e, "Failed to record EigenTrust success");
Expand Down Expand Up @@ -1756,6 +1862,7 @@ impl DhtNetworkManager {
Ok(DhtNetworkResult::PutSuccess {
key: *key,
replicated_to: 1,
peer_outcomes: Vec::new(),
})
}
DhtNetworkOperation::Get { key } => {
Expand Down Expand Up @@ -2012,7 +2119,7 @@ impl DhtNetworkManager {
value: vec![],
},
DhtNetworkResult::GetSuccess { key, .. } => DhtNetworkOperation::Get { key: *key },
DhtNetworkResult::GetNotFound { key } => DhtNetworkOperation::Get { key: *key },
DhtNetworkResult::GetNotFound { key, .. } => DhtNetworkOperation::Get { key: *key },
DhtNetworkResult::NodesFound { key, .. } => DhtNetworkOperation::FindNode { key: *key },
DhtNetworkResult::ValueFound { key, .. } => {
DhtNetworkOperation::FindValue { key: *key }
Expand Down
Loading
Loading