diff --git a/src/adaptive/dht_integration.rs b/src/adaptive/dht_integration.rs index aa85c80..766230c 100644 --- a/src/adaptive/dht_integration.rs +++ b/src/adaptive/dht_integration.rs @@ -676,6 +676,7 @@ impl AdaptiveDHT { .map(|_| DhtNetworkResult::PutSuccess { key, replicated_to: 1, + peer_outcomes: Vec::new(), }) .map_err(|e| AdaptiveNetworkError::Other(e.to_string())), AdaptiveDhtBackend::Network { manager } => { diff --git a/src/adaptive/trust.rs b/src/adaptive/trust.rs index 0f71817..659c5e2 100644 --- a/src/adaptive/trust.rs +++ b/src/adaptive/trust.rs @@ -85,11 +85,25 @@ pub struct NodeStatistics { /// Statistics update type #[derive(Debug, Clone)] pub enum NodeStatisticsUpdate { + /// Node uptime increased by the given number of seconds. Uptime(u64), + /// Peer provided a correct response. CorrectResponse, + /// Peer failed to provide a response (generic failure). FailedResponse, + /// Peer did not have the requested data. + DataUnavailable, + /// Peer returned data that failed integrity verification. + /// Counts as 2 failures due to severity. + CorruptedData, + /// Peer violated the expected wire protocol. + /// Counts as 2 failures due to severity. + ProtocolViolation, + /// Storage contributed (in GB). StorageContributed(u64), + /// Bandwidth contributed (in GB). BandwidthContributed(u64), + /// Compute cycles contributed. ComputeContributed(u64), } @@ -155,6 +169,15 @@ impl EigenTrustEngine { NodeStatisticsUpdate::Uptime(seconds) => node_stats.uptime += seconds, NodeStatisticsUpdate::CorrectResponse => node_stats.correct_responses += 1, NodeStatisticsUpdate::FailedResponse => node_stats.failed_responses += 1, + NodeStatisticsUpdate::DataUnavailable => node_stats.failed_responses += 1, + NodeStatisticsUpdate::CorruptedData => { + // Corrupted data is a severe violation — counts as 2 failures + node_stats.failed_responses += 2; + } + NodeStatisticsUpdate::ProtocolViolation => { + // Protocol violations are severe — counts as 2 failures + node_stats.failed_responses += 2; + } NodeStatisticsUpdate::StorageContributed(gb) => node_stats.storage_contributed += gb, NodeStatisticsUpdate::BandwidthContributed(gb) => { node_stats.bandwidth_contributed += gb diff --git a/src/dht_network_manager.rs b/src/dht_network_manager.rs index 0383ce5..880d209 100644 --- a/src/dht_network_manager.rs +++ b/src/dht_network_manager.rs @@ -131,11 +131,32 @@ pub enum DhtNetworkOperation { Leave, } +/// Per-peer outcome from a DHT PUT replication attempt. +/// +/// Captures whether each target peer successfully stored the value, +/// along with optional error details. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PeerStoreOutcome { + /// The peer that was targeted for replication. + pub peer_id: PeerId, + /// Whether the store operation succeeded on this peer. + pub success: bool, + /// Error description if the operation failed. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, +} + /// DHT network operation result #[derive(Debug, Clone, Serialize, Deserialize)] pub enum DhtNetworkResult { /// Successful PUT operation - PutSuccess { key: Key, replicated_to: usize }, + PutSuccess { + key: Key, + replicated_to: usize, + /// Per-peer replication outcomes (empty for remote handlers). + #[serde(default)] + peer_outcomes: Vec, + }, /// Successful GET operation GetSuccess { key: Key, @@ -143,7 +164,18 @@ pub enum DhtNetworkResult { source: PeerId, }, /// GET operation found no value - GetNotFound { key: Key }, + GetNotFound { + key: Key, + /// Number of peers queried during the lookup. + #[serde(default)] + peers_queried: usize, + /// Number of peers that returned errors during the lookup. + #[serde(default)] + peers_failed: usize, + /// Last error encountered during the lookup, if any. + #[serde(default, skip_serializing_if = "Option::is_none")] + last_error: Option, + }, /// Nodes found for FIND_NODE NodesFound { key: Key, @@ -171,6 +203,21 @@ pub enum DhtNetworkResult { Error { operation: String, error: String }, } +/// Returns the variant name of a [`DhtNetworkResult`] without exposing internal data. +fn dht_network_result_variant_name(result: &DhtNetworkResult) -> &'static str { + match result { + DhtNetworkResult::PutSuccess { .. } => "PutSuccess", + DhtNetworkResult::GetSuccess { .. } => "GetSuccess", + DhtNetworkResult::GetNotFound { .. } => "GetNotFound", + DhtNetworkResult::NodesFound { .. } => "NodesFound", + DhtNetworkResult::ValueFound { .. } => "ValueFound", + DhtNetworkResult::PongReceived { .. } => "PongReceived", + DhtNetworkResult::JoinSuccess { .. } => "JoinSuccess", + DhtNetworkResult::LeaveSuccess => "LeaveSuccess", + DhtNetworkResult::Error { .. } => "Error", + } +} + /// DHT message envelope for network transmission #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DhtNetworkMessage { @@ -326,6 +373,17 @@ pub enum DhtNetworkEvent { }, /// Error occurred Error { error: String }, + /// Replication result for a PUT operation with per-peer details + ReplicationResult { + /// The key being replicated + key: Key, + /// Total number of peers targeted + total_peers: usize, + /// Number of peers that successfully stored the value + successful_peers: usize, + /// Per-peer outcomes + outcomes: Vec, + }, } /// DHT network statistics @@ -577,6 +635,7 @@ impl DhtNetworkManager { return Ok(DhtNetworkResult::PutSuccess { key, replicated_to: 1, + peer_outcomes: Vec::new(), }); } @@ -608,21 +667,18 @@ impl DhtNetworkManager { // Execute all replication requests in parallel let results = futures::future::join_all(replication_futures).await; - // Count successful replications - for (peer_id, result) in results { - match result { - Ok(DhtNetworkResult::PutSuccess { .. }) => { - replicated_count += 1; - debug!("Replicated to peer: {}", peer_id); - } - Ok(result) => { - debug!("Unexpected result from peer {}: {:?}", peer_id, result); - } - Err(e) => { - debug!("Failed to replicate to peer {}: {}", peer_id, e); - } - } - } + let (remote_successes, peer_outcomes) = self.collect_replication_outcomes(results).await; + replicated_count += remote_successes; + + // Emit replication result event + let total_peers = peer_outcomes.len(); + let successful_peers = peer_outcomes.iter().filter(|o| o.success).count(); + let _ = self.event_tx.send(DhtNetworkEvent::ReplicationResult { + key, + total_peers, + successful_peers, + outcomes: peer_outcomes.clone(), + }); info!( "PUT operation completed: key={}, replicated_to={}/{}", @@ -634,6 +690,7 @@ impl DhtNetworkManager { Ok(DhtNetworkResult::PutSuccess { key, replicated_to: replicated_count, + peer_outcomes, }) } @@ -714,24 +771,13 @@ impl DhtNetworkManager { }); let results = futures::future::join_all(replication_futures).await; - for (peer_id, result) in results { - match result { - Ok(DhtNetworkResult::PutSuccess { .. }) => { - replicated_count += 1; - debug!("Replicated to peer: {}", peer_id); - } - Ok(other) => { - debug!("Unexpected result from peer {}: {:?}", peer_id, other); - } - Err(e) => { - debug!("Failed to replicate to peer {}: {}", peer_id, e); - } - } - } + let (remote_successes, peer_outcomes) = self.collect_replication_outcomes(results).await; + replicated_count += remote_successes; Ok(DhtNetworkResult::PutSuccess { key, replicated_to: replicated_count, + peer_outcomes, }) } @@ -768,6 +814,8 @@ impl DhtNetworkManager { let mut queried_nodes = HashSet::new(); let mut candidate_nodes = VecDeque::new(); let mut queued_peer_ids: HashSet = HashSet::new(); + let mut peers_failed: usize = 0; + let mut last_error: Option = None; // Get initial candidates from local routing table and connected peers // IMPORTANT: Use find_closest_nodes_local to avoid making network requests @@ -915,10 +963,14 @@ impl DhtNetworkManager { } Err(e) => { debug!("Query to {} failed: {}", peer_id, e); + peers_failed += 1; + last_error = Some(e.to_string()); self.record_peer_failure(&peer_id).await; } Ok(other) => { debug!("Unexpected result from {}: {:?}", peer_id, other); + peers_failed += 1; + last_error = Some(format!("Unexpected result: {:?}", other)); self.record_peer_failure(&peer_id).await; } } @@ -945,7 +997,12 @@ impl DhtNetworkManager { hex::encode(key), queried_nodes.len() ); - Ok(DhtNetworkResult::GetNotFound { key: *key }) + Ok(DhtNetworkResult::GetNotFound { + key: *key, + peers_queried: queried_nodes.len(), + peers_failed, + last_error, + }) } /// Backwards-compatible API that performs a full iterative lookup. @@ -1468,6 +1525,55 @@ impl DhtNetworkManager { } } + /// Process replication results from parallel PUT requests. + /// + /// Returns the number of successful replications and the per-peer outcomes. + async fn collect_replication_outcomes( + &self, + results: Vec<(PeerId, Result)>, + ) -> (usize, Vec) { + let mut successes = 0usize; + let mut outcomes = Vec::with_capacity(results.len()); + for (peer_id, result) in results { + match result { + Ok(DhtNetworkResult::PutSuccess { .. }) => { + successes += 1; + self.record_peer_success(&peer_id).await; + debug!("Replicated to peer: {}", peer_id); + outcomes.push(PeerStoreOutcome { + peer_id, + success: true, + error: None, + }); + } + Ok(other) => { + self.record_peer_failure(&peer_id).await; + let err_msg = format!( + "Unexpected result variant: {}", + dht_network_result_variant_name(&other) + ); + debug!("Unexpected result from peer {}: {}", peer_id, err_msg); + outcomes.push(PeerStoreOutcome { + peer_id, + success: false, + error: Some(err_msg), + }); + } + Err(e) => { + self.record_peer_failure(&peer_id).await; + let err_msg = e.to_string(); + debug!("Failed to replicate to peer {}: {}", peer_id, err_msg); + outcomes.push(PeerStoreOutcome { + peer_id, + success: false, + error: Some(err_msg), + }); + } + } + } + (successes, outcomes) + } + async fn record_peer_success(&self, peer_id: &str) { if let Err(e) = self.node.report_peer_success(peer_id).await { trace!(peer_id = peer_id, error = %e, "Failed to record EigenTrust success"); @@ -1756,6 +1862,7 @@ impl DhtNetworkManager { Ok(DhtNetworkResult::PutSuccess { key: *key, replicated_to: 1, + peer_outcomes: Vec::new(), }) } DhtNetworkOperation::Get { key } => { @@ -2012,7 +2119,7 @@ impl DhtNetworkManager { value: vec![], }, DhtNetworkResult::GetSuccess { key, .. } => DhtNetworkOperation::Get { key: *key }, - DhtNetworkResult::GetNotFound { key } => DhtNetworkOperation::Get { key: *key }, + DhtNetworkResult::GetNotFound { key, .. } => DhtNetworkOperation::Get { key: *key }, DhtNetworkResult::NodesFound { key, .. } => DhtNetworkOperation::FindNode { key: *key }, DhtNetworkResult::ValueFound { key, .. } => { DhtNetworkOperation::FindValue { key: *key } diff --git a/src/error.rs b/src/error.rs index f4b1e00..efd3455 100644 --- a/src/error.rs +++ b/src/error.rs @@ -577,6 +577,80 @@ impl GeographicConfig { } } +/// Categorizes why a peer interaction failed. +/// +/// Used by consumers (like saorsa-node) to provide rich context when reporting +/// failures to the trust/reputation system. Each variant carries a severity +/// that the trust engine uses to weight the penalty. +/// +/// # Example +/// +/// ```rust,ignore +/// use saorsa_core::error::PeerFailureReason; +/// +/// let reason = PeerFailureReason::Timeout; +/// assert!(reason.is_transient()); +/// assert!(reason.trust_severity() < 0.5); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum PeerFailureReason { + /// The peer did not respond within the expected time window. + Timeout, + /// Could not establish or maintain a connection to the peer. + ConnectionFailed, + /// The peer was reachable but did not have the requested data. + DataUnavailable, + /// The peer returned data that failed integrity/hash verification. + CorruptedData, + /// The peer violated the expected wire protocol. + ProtocolError, + /// The peer explicitly refused the request. + Refused, +} + +impl PeerFailureReason { + /// Whether this failure is transient (likely to succeed on retry). + /// + /// Transient failures (timeout, connection issues) should not be penalized + /// as heavily as persistent ones (corrupted data, protocol errors). + pub fn is_transient(&self) -> bool { + matches!( + self, + PeerFailureReason::Timeout | PeerFailureReason::ConnectionFailed + ) + } + + /// Trust severity score in the range `[0.0, 1.0]`. + /// + /// Higher values indicate more severe trust violations: + /// - `0.2` — transient issues (timeout, connection) + /// - `0.5` — data unavailable / refused + /// - `1.0` — data corruption or protocol violation + pub fn trust_severity(&self) -> f64 { + match self { + PeerFailureReason::Timeout => 0.2, + PeerFailureReason::ConnectionFailed => 0.2, + PeerFailureReason::DataUnavailable => 0.5, + PeerFailureReason::CorruptedData => 1.0, + PeerFailureReason::ProtocolError => 1.0, + PeerFailureReason::Refused => 0.5, + } + } +} + +impl std::fmt::Display for PeerFailureReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PeerFailureReason::Timeout => write!(f, "timeout"), + PeerFailureReason::ConnectionFailed => write!(f, "connection_failed"), + PeerFailureReason::DataUnavailable => write!(f, "data_unavailable"), + PeerFailureReason::CorruptedData => write!(f, "corrupted_data"), + PeerFailureReason::ProtocolError => write!(f, "protocol_error"), + PeerFailureReason::Refused => write!(f, "refused"), + } + } +} + /// Result type alias for P2P operations pub type P2pResult = Result; @@ -975,6 +1049,8 @@ mod tests { assert!(json.contains("peer123")); } + // PeerFailureReason transient/severity tests are in tests/request_response_trust_test.rs + #[test] fn test_anyhow_conversion() { let p2p_result: P2pResult<()> = Err(P2PError::validation("Invalid input")); diff --git a/src/lib.rs b/src/lib.rs index c53d7ce..5aa084a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -173,13 +173,13 @@ pub use bootstrap::{BootstrapConfig, BootstrapManager, CacheConfig, ContactEntry pub use dht::{Key, Record}; pub use dht_network_manager::{ BootstrapNode, DhtNetworkConfig, DhtNetworkEvent, DhtNetworkManager, DhtNetworkOperation, - DhtNetworkResult, DhtPeerInfo, + DhtNetworkResult, DhtPeerInfo, PeerStoreOutcome, }; pub use encrypted_key_storage::{ Argon2Config, DerivationPriority as KeyDerivationPriority, EncryptedKeyStorageManager, KeyMetadata, PasswordValidation, SecurityLevel, StorageStats, }; -pub use error::{P2PError, P2pResult as Result}; +pub use error::{P2PError, P2pResult as Result, PeerFailureReason}; pub use events::{Subscription, TopologyEvent, device_subscribe, dht_watch, subscribe_topology}; pub use fwid::{FourWordsV1, Key as FwKey, fw_check, fw_to_key}; pub use health::{ @@ -196,6 +196,7 @@ pub use monotonic_counter::{ }; pub use network::{ ConnectionStatus, NetworkSender, NodeBuilder, NodeConfig, P2PEvent, P2PNode, PeerInfo, + PeerResponse, }; // Trust system exports for saorsa-node integration pub use adaptive::{EigenTrustEngine, NodeStatistics, NodeStatisticsUpdate, TrustProvider}; diff --git a/src/network.rs b/src/network.rs index 2a2025c..d297716 100644 --- a/src/network.rs +++ b/src/network.rs @@ -21,7 +21,7 @@ use crate::bgp_geo_provider::BgpGeoProvider; use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics}; use crate::config::Config; use crate::dht::DHT; -use crate::error::{NetworkError, P2PError, P2pResult as Result}; +use crate::error::{NetworkError, P2PError, P2pResult as Result, PeerFailureReason}; use crate::security::GeoProvider; use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics}; @@ -61,6 +61,12 @@ const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; /// Capacity of the internal channel used by the message receiving system. const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256; +/// Maximum number of concurrent in-flight request/response operations. +const MAX_ACTIVE_REQUESTS: usize = 256; + +/// Maximum allowed timeout for a single request (5 minutes). +const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300); + /// Configuration for a P2P node #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeConfig { @@ -600,6 +606,42 @@ pub enum P2PEvent { PeerDisconnected(PeerId), } +/// Response from a peer to a request sent via [`P2PNode::send_request`]. +/// +/// Contains the response payload along with metadata about the responder +/// and round-trip latency. +#[derive(Debug, Clone)] +pub struct PeerResponse { + /// The peer that sent the response. + pub peer_id: PeerId, + /// Raw response payload bytes. + pub data: Vec, + /// Round-trip latency from request to response. + pub latency: Duration, +} + +/// Wire format for request/response correlation. +/// +/// Wraps application payloads with a message ID and direction flag +/// so the receive loop can route responses back to waiting callers. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RequestResponseEnvelope { + /// Unique identifier to correlate request ↔ response. + message_id: String, + /// `false` for requests, `true` for responses. + is_response: bool, + /// Application payload. + payload: Vec, +} + +/// An in-flight request awaiting a response from a specific peer. +struct PendingRequest { + /// Oneshot sender for delivering the response payload. + response_tx: tokio::sync::oneshot::Sender>, + /// The peer we expect the response from (for origin validation). + expected_peer: String, +} + /// Main P2P node structure /// Main P2P network node that manages connections, routing, and communication /// @@ -687,6 +729,12 @@ pub struct P2PNode { /// Consumers (like saorsa-node) should report successes and failures /// via `report_peer_success()` and `report_peer_failure()` methods. trust_engine: Option>, + + /// Active request/response operations awaiting a response from a peer. + /// + /// Keyed by message ID (UUID). Entries are added by `send_request()` and + /// consumed by the message receive loop when a response envelope arrives. + active_requests: Arc>>, } /// Normalize wildcard bind addresses to localhost loopback addresses @@ -786,6 +834,7 @@ impl P2PNode { security_dashboard: None, is_bootstrapped: Arc::new(AtomicBool::new(false)), trust_engine: None, + active_requests: Arc::new(RwLock::new(HashMap::new())), }) } /// Create a new P2P node with the given configuration @@ -1075,6 +1124,7 @@ impl P2PNode { geo_provider, is_bootstrapped: Arc::new(AtomicBool::new(false)), trust_engine, + active_requests: Arc::new(RwLock::new(HashMap::new())), }; info!( "Created P2P node with peer ID: {} (call start() to begin networking)", @@ -1230,12 +1280,57 @@ impl P2PNode { /// } /// ``` pub async fn report_peer_failure(&self, peer_id: &str) -> Result<()> { + // Delegate to the enriched version with a generic transport-level reason + self.report_peer_failure_with_reason(peer_id, PeerFailureReason::ConnectionFailed) + .await + } + + /// Report a failed interaction with a peer, providing a specific failure reason. + /// + /// This is the enriched version of [`P2PNode::report_peer_failure`] that maps the failure + /// reason to the appropriate trust penalty. Use this when you know *why* the + /// interaction failed to give the trust engine more accurate data. + /// + /// - Transport-level failures (`Timeout`, `ConnectionFailed`) map to `FailedResponse` + /// - `DataUnavailable` maps to `DataUnavailable` + /// - `CorruptedData` maps to `CorruptedData` (counts as 2 failures) + /// - `ProtocolError` maps to `ProtocolViolation` (counts as 2 failures) + /// - `Refused` maps to `FailedResponse` + /// + /// Requires the `adaptive-ml` feature to be enabled. + /// + /// # Arguments + /// + /// * `peer_id` - The peer ID of the node that failed + /// * `reason` - Why the interaction failed + /// + /// # Example + /// + /// ```rust,ignore + /// use saorsa_core::error::PeerFailureReason; + /// + /// // After a chunk retrieval returns corrupted data + /// node.report_peer_failure_with_reason(&peer_id, PeerFailureReason::CorruptedData).await?; + /// ``` + pub async fn report_peer_failure_with_reason( + &self, + peer_id: &str, + reason: PeerFailureReason, + ) -> Result<()> { if let Some(ref engine) = self.trust_engine { let node_id = Self::peer_id_to_trust_node_id(peer_id); - engine - .update_node_stats(&node_id, NodeStatisticsUpdate::FailedResponse) - .await; + let update = match reason { + PeerFailureReason::Timeout | PeerFailureReason::ConnectionFailed => { + NodeStatisticsUpdate::FailedResponse + } + PeerFailureReason::DataUnavailable => NodeStatisticsUpdate::DataUnavailable, + PeerFailureReason::CorruptedData => NodeStatisticsUpdate::CorruptedData, + PeerFailureReason::ProtocolError => NodeStatisticsUpdate::ProtocolViolation, + PeerFailureReason::Refused => NodeStatisticsUpdate::FailedResponse, + }; + + engine.update_node_stats(&node_id, update).await; Ok(()) } else { // Trust engine not initialized - this is not an error, just a no-op @@ -1273,6 +1368,216 @@ impl P2PNode { } } + // ========================================================================= + // Request/Response API — Automatic Trust Feedback + // ========================================================================= + + /// Send a request to a peer and wait for a response with automatic trust reporting. + /// + /// Unlike fire-and-forget `send_message()`, this method: + /// 1. Wraps the payload in a `RequestResponseEnvelope` with a unique message ID + /// 2. Sends it on the `/rr/` protocol prefix + /// 3. Waits for a matching response (or timeout) + /// 4. Automatically reports success or failure to the trust engine + /// + /// The remote peer's handler should call `send_response()` with the + /// incoming message ID to route the response back. + /// + /// # Arguments + /// + /// * `peer_id` - Target peer + /// * `protocol` - Application protocol name (e.g. `"chunk_fetch"`) + /// * `data` - Request payload bytes + /// * `timeout` - Maximum time to wait for a response + /// + /// # Returns + /// + /// A [`PeerResponse`] on success, or an error on timeout / connection failure. + /// + /// # Example + /// + /// ```rust,ignore + /// let response = node.send_request(&peer_id, "chunk_fetch", chunk_id.to_vec(), Duration::from_secs(10)).await?; + /// println!("Got {} bytes from {}", response.data.len(), response.peer_id); + /// ``` + pub async fn send_request( + &self, + peer_id: &PeerId, + protocol: &str, + data: Vec, + timeout: Duration, + ) -> Result { + // Cap timeout to prevent unbounded resource retention + let timeout = timeout.min(MAX_REQUEST_TIMEOUT); + + // Validate protocol name + if protocol.is_empty() || protocol.contains(&['/', '\\', '\0'][..]) { + return Err(P2PError::Transport( + crate::error::TransportError::StreamError( + format!("Invalid protocol name: {:?}", protocol).into(), + ), + )); + } + + let message_id = uuid::Uuid::new_v4().to_string(); + let (tx, rx) = tokio::sync::oneshot::channel(); + let started_at = tokio::time::Instant::now(); + + // Atomic check-and-insert under a single write lock to prevent TOCTOU races + { + let mut reqs = self.active_requests.write().await; + if reqs.len() >= MAX_ACTIVE_REQUESTS { + return Err(P2PError::Transport( + crate::error::TransportError::StreamError( + format!( + "Too many active requests ({MAX_ACTIVE_REQUESTS}); try again later" + ) + .into(), + ), + )); + } + reqs.insert( + message_id.clone(), + PendingRequest { + response_tx: tx, + expected_peer: peer_id.to_string(), + }, + ); + } + + // Wrap in envelope + let envelope = RequestResponseEnvelope { + message_id: message_id.clone(), + is_response: false, + payload: data, + }; + let envelope_bytes = match postcard::to_allocvec(&envelope) { + Ok(bytes) => bytes, + Err(e) => { + self.active_requests.write().await.remove(&message_id); + return Err(P2PError::Serialization( + format!("Failed to serialize request envelope: {e}").into(), + )); + } + }; + + // Send on /rr/ prefix + let wire_protocol = format!("/rr/{}", protocol); + if let Err(e) = self + .send_message(peer_id, &wire_protocol, envelope_bytes) + .await + { + self.active_requests.write().await.remove(&message_id); + + // Report trust failure for send errors (connection refused, etc.) + let _ = self + .report_peer_failure_with_reason(peer_id, PeerFailureReason::ConnectionFailed) + .await; + + return Err(e); + } + + // Wait for response with timeout + let result = match tokio::time::timeout(timeout, rx).await { + Ok(Ok(response_bytes)) => { + let latency = started_at.elapsed(); + + // Auto-report success to trust engine + let _ = self.report_peer_success(peer_id).await; + + Ok(PeerResponse { + peer_id: peer_id.clone(), + data: response_bytes, + latency, + }) + } + Ok(Err(_)) => { + // Channel closed — peer disconnected or request was cancelled + let _ = self + .report_peer_failure_with_reason(peer_id, PeerFailureReason::ConnectionFailed) + .await; + + Err(P2PError::Network(NetworkError::ConnectionClosed { + peer_id: peer_id.to_string().into(), + })) + } + Err(_) => { + // Timeout + let _ = self + .report_peer_failure_with_reason(peer_id, PeerFailureReason::Timeout) + .await; + + Err(P2PError::Transport( + crate::error::TransportError::StreamError( + format!( + "Request to {} on {} timed out after {:?}", + peer_id, protocol, timeout + ) + .into(), + ), + )) + } + }; + + // Clean up the pending request entry (receive loop may have already + // removed it on the happy path, but remove is idempotent). + self.active_requests.write().await.remove(&message_id); + + result + } + + /// Send a response to a previously received request. + /// + /// This should be called by the consumer's message handler when it receives + /// a request on a `/rr/` topic. The `message_id` should be extracted + /// from the incoming `RequestResponseEnvelope`. + /// + /// # Arguments + /// + /// * `peer_id` - The peer that sent the original request + /// * `protocol` - The application protocol (without `/rr/` prefix) + /// * `message_id` - The message ID from the incoming request envelope + /// * `data` - Response payload bytes + pub async fn send_response( + &self, + peer_id: &PeerId, + protocol: &str, + message_id: &str, + data: Vec, + ) -> Result<()> { + // Validate protocol name (same rules as send_request) + if protocol.is_empty() || protocol.contains(&['/', '\\', '\0'][..]) { + return Err(P2PError::Transport( + crate::error::TransportError::StreamError( + format!("Invalid protocol name: {:?}", protocol).into(), + ), + )); + } + + let envelope = RequestResponseEnvelope { + message_id: message_id.to_string(), + is_response: true, + payload: data, + }; + let envelope_bytes = postcard::to_allocvec(&envelope).map_err(|e| { + P2PError::Serialization(format!("Failed to serialize response envelope: {e}").into()) + })?; + + let wire_protocol = format!("/rr/{}", protocol); + self.send_message(peer_id, &wire_protocol, envelope_bytes) + .await + } + + /// Parse a request/response envelope from incoming message bytes. + /// + /// Returns `None` if the bytes are not a valid envelope. Consumers should + /// call this when they receive a message on a `/rr/` protocol to extract + /// the message ID and payload. + pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec)> { + let envelope: RequestResponseEnvelope = postcard::from_bytes(data).ok()?; + Some((envelope.message_id, envelope.is_response, envelope.payload)) + } + pub async fn subscribe(&self, topic: &str) -> Result<()> { // In a real implementation, this would register the topic with the pubsub mechanism. // For now, we just log it. @@ -1457,6 +1762,7 @@ impl P2PNode { drop(tx); // drop original sender so rx closes when all tasks exit let event_tx = self.event_tx.clone(); + let active_requests = Arc::clone(&self.active_requests); handles.push(tokio::spawn(async move { info!("Message receive loop started"); while let Some((peer_id, bytes)) = rx.recv().await { @@ -1474,7 +1780,60 @@ impl P2PNode { } match parse_protocol_message(&bytes, &transport_peer_id) { - Some(event) => broadcast_event(&event_tx, event), + Some(event) => { + // Check if this is a /rr/ response that should be routed + // to a waiting send_request() caller + if let P2PEvent::Message { + ref topic, + ref data, + .. + } = event + && topic.starts_with("/rr/") + && let Ok(envelope) = + postcard::from_bytes::(data) + && envelope.is_response + { + // Route response to waiting caller with origin validation + let mut reqs = active_requests.write().await; + let expected_peer = match reqs.get(&envelope.message_id) { + Some(pending) => pending.expected_peer.clone(), + None => { + // No matching request — suppress internal /rr/ traffic + trace!( + message_id = %envelope.message_id, + "Unmatched /rr/ response (likely timed out) — suppressing" + ); + continue; + } + }; + if expected_peer != transport_peer_id { + warn!( + message_id = %envelope.message_id, + expected = %expected_peer, + actual = %transport_peer_id, + "Response origin mismatch — ignoring" + ); + // Don't deliver; don't broadcast + continue; + } + if let Some(pending) = reqs.remove(&envelope.message_id) { + if pending.response_tx.send(envelope.payload).is_err() { + warn!( + message_id = %envelope.message_id, + "Response receiver dropped before delivery" + ); + } + continue; // Don't broadcast responses + } + // No matching request — suppress internal /rr/ traffic + trace!( + message_id = %envelope.message_id, + "Unmatched /rr/ response (likely timed out) — suppressing" + ); + continue; + } + broadcast_event(&event_tx, event); + } None => { warn!("Failed to parse protocol message ({} bytes)", bytes.len()); } diff --git a/tests/dht_cross_node_test.rs b/tests/dht_cross_node_test.rs index 18e7e58..3bf0792 100644 --- a/tests/dht_cross_node_test.rs +++ b/tests/dht_cross_node_test.rs @@ -300,25 +300,24 @@ async fn test_concurrent_dht_operations() -> Result<()> { Ok(()) } -/// Test DHT put with timeout (large value) +/// Test DHT put at the maximum allowed value size (512 bytes) succeeds, +/// and that oversized values are correctly rejected with a validation error. #[tokio::test] async fn test_dht_put_large_value() -> Result<()> { let config = create_test_dht_config("large_value_test_node", 0); let manager = Arc::new(DhtNetworkManager::new(config).await?); manager.start().await?; - // Create a large value (1MB) - let key = key_from_str("large_value_key"); - let value = vec![0u8; 1024 * 1024]; + // A value at exactly the 512-byte limit should succeed + let key = key_from_str("max_size_value_key"); + let value = vec![0xABu8; 512]; - // Put should complete within timeout let put_result = timeout(Duration::from_secs(30), manager.put(key, value.clone())).await??; assert!( matches!(put_result, DhtNetworkResult::PutSuccess { .. }), - "Large value put should succeed" + "Value at max size should succeed" ); - // Get should return the large value let get_result = timeout(Duration::from_secs(30), manager.get(&key)).await??; match get_result { DhtNetworkResult::GetSuccess { @@ -330,10 +329,30 @@ async fn test_dht_put_large_value() -> Result<()> { value.len(), "Retrieved value size should match" ); + assert_eq!( + retrieved_value, value, + "Retrieved value content should match" + ); } - _ => panic!("Get for large value should succeed"), + _ => panic!("Get for max-size value should succeed"), } + // A value exceeding the 512-byte limit should be rejected + let oversized_key = key_from_str("oversized_value_key"); + let oversized_value = vec![0xFFu8; 513]; + + let oversized_result = manager.put(oversized_key, oversized_value).await; + assert!( + oversized_result.is_err(), + "Oversized value should be rejected with a validation error" + ); + let err_msg = format!("{}", oversized_result.unwrap_err()); + assert!( + err_msg.contains("513") && err_msg.contains("512"), + "Error should mention both actual (513) and max (512) sizes, got: {}", + err_msg + ); + manager.stop().await?; Ok(()) } diff --git a/tests/dht_parallel_replication_e2e_test.rs b/tests/dht_parallel_replication_e2e_test.rs index 4fe5d05..d412243 100644 --- a/tests/dht_parallel_replication_e2e_test.rs +++ b/tests/dht_parallel_replication_e2e_test.rs @@ -184,6 +184,7 @@ async fn test_replication_count_isolated_node() -> Result<()> { DhtNetworkResult::PutSuccess { replicated_to, key: result_key, + .. } => { assert_eq!(result_key, key, "Returned key should match"); assert_eq!( @@ -198,7 +199,8 @@ async fn test_replication_count_isolated_node() -> Result<()> { Ok(()) } -/// Stress test: 50 values of varying sizes (1KB–10KB), all stored and retrieved. +/// Stress test: 50 values of varying sizes (10–500 bytes), all stored and retrieved. +/// Values stay within the 512-byte DHT value size limit. #[tokio::test] async fn test_stress_50_values() -> Result<()> { let config = create_test_dht_config("stress_node", 0, 8); @@ -209,7 +211,8 @@ async fn test_stress_50_values() -> Result<()> { for i in 0..50 { let key = key_from_str(&format!("stress_key_{}", i)); - let value_size = 1024 * (i % 10 + 1); + // Vary sizes from 10 to 500 bytes (within 512-byte limit) + let value_size = 10 + (i % 10) * 50; let value = vec![i as u8; value_size]; match manager.put(key, value).await { @@ -223,7 +226,7 @@ async fn test_stress_50_values() -> Result<()> { for i in 0..50 { let key = key_from_str(&format!("stress_key_{}", i)); - let expected_size = 1024 * (i % 10 + 1); + let expected_size = 10 + (i % 10) * 50; match manager.get(&key).await { Ok(DhtNetworkResult::GetSuccess { value, .. }) => { diff --git a/tests/request_response_trust_test.rs b/tests/request_response_trust_test.rs new file mode 100644 index 0000000..d481b83 --- /dev/null +++ b/tests/request_response_trust_test.rs @@ -0,0 +1,313 @@ +// Copyright 2024 Saorsa Labs Limited +// +// This software is dual-licensed under: +// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later) +// - Commercial License +// +// For AGPL-3.0 license, see LICENSE-AGPL-3.0 +// For commercial licensing, contact: david@saorsalabs.com + +#![allow(clippy::unwrap_used, clippy::expect_used)] + +//! Integration tests for the request/response API and trust feedback. +//! +//! Tests cover: +//! - `PeerFailureReason` enum semantics +//! - `PeerStoreOutcome` construction and enriched `DhtNetworkResult` variants +//! - `RequestResponseEnvelope` serialization round-trip +//! - `P2PNode::parse_request_envelope` helper + +use saorsa_core::error::PeerFailureReason; +use saorsa_core::{DhtNetworkResult, PeerStoreOutcome}; + +/// Mirror of the private `RequestResponseEnvelope` for constructing test bytes. +#[derive(serde::Serialize, serde::Deserialize)] +struct TestEnvelope { + message_id: String, + is_response: bool, + payload: Vec, +} + +// ---- PeerFailureReason tests ---- + +#[test] +fn test_failure_reason_transient_classification() { + assert!(PeerFailureReason::Timeout.is_transient()); + assert!(PeerFailureReason::ConnectionFailed.is_transient()); + assert!(!PeerFailureReason::DataUnavailable.is_transient()); + assert!(!PeerFailureReason::CorruptedData.is_transient()); + assert!(!PeerFailureReason::ProtocolError.is_transient()); + assert!(!PeerFailureReason::Refused.is_transient()); +} + +#[test] +fn test_failure_reason_severity_ranges() { + let all_reasons = [ + PeerFailureReason::Timeout, + PeerFailureReason::ConnectionFailed, + PeerFailureReason::DataUnavailable, + PeerFailureReason::CorruptedData, + PeerFailureReason::ProtocolError, + PeerFailureReason::Refused, + ]; + + for reason in &all_reasons { + let severity = reason.trust_severity(); + assert!( + (0.0..=1.0).contains(&severity), + "{:?} severity {} out of range", + reason, + severity + ); + } + + // Transient failures have lower severity than data integrity failures + assert!( + PeerFailureReason::Timeout.trust_severity() + < PeerFailureReason::CorruptedData.trust_severity() + ); + assert!( + PeerFailureReason::ConnectionFailed.trust_severity() + < PeerFailureReason::ProtocolError.trust_severity() + ); +} + +#[test] +fn test_failure_reason_display() { + assert_eq!(PeerFailureReason::Timeout.to_string(), "timeout"); + assert_eq!( + PeerFailureReason::ConnectionFailed.to_string(), + "connection_failed" + ); + assert_eq!( + PeerFailureReason::DataUnavailable.to_string(), + "data_unavailable" + ); + assert_eq!( + PeerFailureReason::CorruptedData.to_string(), + "corrupted_data" + ); + assert_eq!( + PeerFailureReason::ProtocolError.to_string(), + "protocol_error" + ); + assert_eq!(PeerFailureReason::Refused.to_string(), "refused"); +} + +#[test] +fn test_failure_reason_serde_roundtrip() { + for reason in &[ + PeerFailureReason::Timeout, + PeerFailureReason::ConnectionFailed, + PeerFailureReason::DataUnavailable, + PeerFailureReason::CorruptedData, + PeerFailureReason::ProtocolError, + PeerFailureReason::Refused, + ] { + let json = serde_json::to_string(reason).unwrap(); + let roundtripped: PeerFailureReason = serde_json::from_str(&json).unwrap(); + assert_eq!(*reason, roundtripped); + } +} + +// ---- PeerStoreOutcome tests ---- + +#[test] +fn test_peer_store_outcome_success() { + let outcome = PeerStoreOutcome { + peer_id: "peer_abc123".to_string(), + success: true, + error: None, + }; + assert!(outcome.success); + assert!(outcome.error.is_none()); +} + +#[test] +fn test_peer_store_outcome_failure() { + let outcome = PeerStoreOutcome { + peer_id: "peer_def456".to_string(), + success: false, + error: Some("Connection refused".to_string()), + }; + assert!(!outcome.success); + assert_eq!(outcome.error.as_deref(), Some("Connection refused")); +} + +#[test] +fn test_peer_store_outcome_serde_roundtrip() { + let outcome = PeerStoreOutcome { + peer_id: "peer_test".to_string(), + success: false, + error: Some("timeout".to_string()), + }; + let json = serde_json::to_string(&outcome).unwrap(); + let roundtripped: PeerStoreOutcome = serde_json::from_str(&json).unwrap(); + assert_eq!(roundtripped.peer_id, "peer_test"); + assert!(!roundtripped.success); + assert_eq!(roundtripped.error.as_deref(), Some("timeout")); +} + +#[test] +fn test_peer_store_outcome_serde_default_fields() { + // Simulate old wire format without optional fields + let json = r#"{"peer_id":"peer_old","success":true}"#; + let outcome: PeerStoreOutcome = serde_json::from_str(json).unwrap(); + assert_eq!(outcome.peer_id, "peer_old"); + assert!(outcome.success); + assert!(outcome.error.is_none()); +} + +// ---- Enriched DhtNetworkResult tests ---- + +#[test] +fn test_put_success_with_peer_outcomes() { + let key = [42u8; 32]; + let outcomes = vec![ + PeerStoreOutcome { + peer_id: "peer_a".to_string(), + success: true, + error: None, + }, + PeerStoreOutcome { + peer_id: "peer_b".to_string(), + success: false, + error: Some("timeout".to_string()), + }, + ]; + + let result = DhtNetworkResult::PutSuccess { + key, + replicated_to: 2, + peer_outcomes: outcomes.clone(), + }; + + match result { + DhtNetworkResult::PutSuccess { + replicated_to, + peer_outcomes, + .. + } => { + assert_eq!(replicated_to, 2); + assert_eq!(peer_outcomes.len(), 2); + assert!(peer_outcomes[0].success); + assert!(!peer_outcomes[1].success); + } + _ => panic!("Expected PutSuccess"), + } +} + +#[test] +fn test_put_success_backward_compat_serde() { + // Old wire format without peer_outcomes + let json = r#"{"PutSuccess":{"key":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"replicated_to":1}}"#; + let result: DhtNetworkResult = serde_json::from_str(json).unwrap(); + match result { + DhtNetworkResult::PutSuccess { + peer_outcomes, + replicated_to, + .. + } => { + assert_eq!(replicated_to, 1); + assert!(peer_outcomes.is_empty()); + } + _ => panic!("Expected PutSuccess"), + } +} + +#[test] +fn test_get_not_found_with_diagnostics() { + let key = [7u8; 32]; + let result = DhtNetworkResult::GetNotFound { + key, + peers_queried: 5, + peers_failed: 2, + last_error: Some("connection timeout".to_string()), + }; + + match result { + DhtNetworkResult::GetNotFound { + peers_queried, + peers_failed, + last_error, + .. + } => { + assert_eq!(peers_queried, 5); + assert_eq!(peers_failed, 2); + assert_eq!(last_error.as_deref(), Some("connection timeout")); + } + _ => panic!("Expected GetNotFound"), + } +} + +#[test] +fn test_get_not_found_backward_compat_serde() { + // Old wire format without diagnostic fields + let json = r#"{"GetNotFound":{"key":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}}"#; + let result: DhtNetworkResult = serde_json::from_str(json).unwrap(); + match result { + DhtNetworkResult::GetNotFound { + peers_queried, + peers_failed, + last_error, + .. + } => { + assert_eq!(peers_queried, 0); + assert_eq!(peers_failed, 0); + assert!(last_error.is_none()); + } + _ => panic!("Expected GetNotFound"), + } +} + +// ---- RequestResponseEnvelope tests ---- + +#[test] +fn test_request_envelope_roundtrip() { + use saorsa_core::P2PNode; + + let data = b"hello world".to_vec(); + + let envelope = TestEnvelope { + message_id: "test-msg-id-123".to_string(), + is_response: false, + payload: data.clone(), + }; + let bytes = postcard::to_allocvec(&envelope).unwrap(); + + let parsed = P2PNode::parse_request_envelope(&bytes); + assert!(parsed.is_some()); + let (msg_id, is_response, payload) = parsed.unwrap(); + assert_eq!(msg_id, "test-msg-id-123"); + assert!(!is_response); + assert_eq!(payload, data); +} + +#[test] +fn test_response_envelope_roundtrip() { + use saorsa_core::P2PNode; + + let response_data = b"response payload".to_vec(); + let envelope = TestEnvelope { + message_id: "resp-456".to_string(), + is_response: true, + payload: response_data.clone(), + }; + let bytes = postcard::to_allocvec(&envelope).unwrap(); + + let parsed = P2PNode::parse_request_envelope(&bytes); + assert!(parsed.is_some()); + let (msg_id, is_response, payload) = parsed.unwrap(); + assert_eq!(msg_id, "resp-456"); + assert!(is_response); + assert_eq!(payload, response_data); +} + +#[test] +fn test_parse_invalid_envelope() { + use saorsa_core::P2PNode; + + // Random bytes should fail to parse + let garbage = vec![0xFF, 0xFE, 0xFD]; + assert!(P2PNode::parse_request_envelope(&garbage).is_none()); +}