feat: DHT consumer error reporting and trust feedback#21
Conversation
There was a problem hiding this comment.
Pull request overview
Adds richer per-peer diagnostics and automatic trust feedback around DHT operations and a new request/response messaging API, plus adjusts tests to respect the 512-byte DHT value limit.
Changes:
- Introduces
PeerFailureReasonand richer trust-stat updates + reporting APIs. - Adds per-peer replication outcomes and extra diagnostics to DHT results/events.
- Adds
/rr/request/response envelopes with oneshot correlation and timeout handling; updates/extends integration tests.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/network.rs |
Adds request/response API with envelope parsing and trust feedback routing |
src/dht_network_manager.rs |
Adds per-peer replication outcomes, diagnostics on not-found, and replication event emission |
src/error.rs |
Adds PeerFailureReason with transient/severity semantics |
src/adaptive/trust.rs |
Adds new trust statistic update variants and scoring behavior |
src/lib.rs |
Re-exports new public types |
tests/request_response_trust_test.rs |
Adds integration tests for new enums/results/envelope parsing |
tests/dht_cross_node_test.rs |
Fixes large-value test to align with 512-byte limit and validates oversize rejection |
tests/dht_parallel_replication_e2e_test.rs |
Fixes stress test value sizes to remain under 512-byte limit |
src/network.rs
Outdated
| // Register the pending request | ||
| self.active_requests | ||
| .write() | ||
| .await | ||
| .insert(message_id.clone(), tx); | ||
|
|
||
| // Wrap in envelope | ||
| let envelope = RequestResponseEnvelope { | ||
| message_id: message_id.clone(), | ||
| is_response: false, | ||
| payload: data, | ||
| }; | ||
| let envelope_bytes = postcard::to_allocvec(&envelope).map_err(|e| { | ||
| P2PError::Serialization(format!("Failed to serialize request envelope: {e}").into()) | ||
| })?; | ||
|
|
||
| // Send on /rr/<protocol> prefix | ||
| let wire_protocol = format!("/rr/{}", protocol); | ||
| self.send_message(peer_id, &wire_protocol, envelope_bytes) | ||
| .await?; |
There was a problem hiding this comment.
send_request() inserts into active_requests before envelope serialization and before send_message(). If serialization or send_message() returns an error, the function exits early and the active_requests entry is never removed, leaking the map entry until process exit. Consider registering the pending request only after successful serialization (still before send_message()), and ensure removal happens on all early-return paths (e.g., via a guard that removes message_id on drop, or by explicitly removing before returning errors from serialization/send).
src/network.rs
Outdated
| let _ = pending.send(envelope.payload); | ||
| continue; // Don't broadcast responses | ||
| } | ||
| // No matching request — fall through to broadcast |
There was a problem hiding this comment.
If pending.send(...) fails (e.g., the requester dropped the receiver early), the code still continues and suppresses broadcasting the response event, effectively dropping the message silently. Consider only continue when the send succeeds; on send failure, fall through to broadcast_event (or at least trace/log) so responses aren’t inexplicably lost during debugging.
| let _ = pending.send(envelope.payload); | |
| continue; // Don't broadcast responses | |
| } | |
| // No matching request — fall through to broadcast | |
| match pending.send(envelope.payload) { | |
| Ok(_) => { | |
| // Successfully delivered to requester — don't broadcast | |
| continue; | |
| } | |
| Err(e) => { | |
| warn!( | |
| "Failed to deliver /rr/ response to requester (message_id {}): {}", | |
| envelope.message_id, | |
| e | |
| ); | |
| // Fall through to broadcast below so the message isn't silently lost | |
| } | |
| } | |
| } | |
| // No matching request or failed delivery — fall through to broadcast |
src/dht_network_manager.rs
Outdated
| /// Per-peer outcome from a DHT PUT replication attempt. | ||
| /// | ||
| /// Captures whether each target peer successfully stored the value, | ||
| /// along with optional error details and latency measurements. |
There was a problem hiding this comment.
The doc comment mentions “latency measurements”, but PeerStoreOutcome has no latency field. Either remove the latency mention from the struct docs, or add a latency-related field (and wire it up where outcomes are collected).
| /// along with optional error details and latency measurements. | |
| /// along with optional error details. |
tests/request_response_trust_test.rs
Outdated
| assert!(outcome.error.is_none()); | ||
| } | ||
|
|
There was a problem hiding this comment.
Duplicate assertion on outcome.error.is_none() (lines 158–159). Remove one to keep the test minimal and avoid confusion when extending it with new defaulted fields later.
| assert!(outcome.error.is_none()); | |
| } | |
| } |
Greptile OverviewGreptile SummaryThis PR adds three major capabilities to saorsa-core: enriched trust feedback, detailed DHT error reporting, and a new request/response API with automatic trust integration. Key ChangesPhase 1: Enriched Trust Feedback
Phase 2: DHT Error Reporting
Phase 3: Request/Response API
Test Fixes
Issues Found
Confidence Score: 4/5
|
| Filename | Overview |
|---|---|
| src/error.rs | Added PeerFailureReason enum with severity scoring (is_transient(), trust_severity()) to enable rich failure context for trust feedback. Clean implementation with proper serialization support. |
| src/adaptive/trust.rs | Extended NodeStatisticsUpdate with three new variants (DataUnavailable, CorruptedData, ProtocolViolation) for granular trust penalties. Severity weighting (1x vs 2x) properly applied in update_node_stats(). |
| src/network.rs | Implemented request/response API with send_request()/send_response(), oneshot correlation, automatic trust reporting, and /rr/ protocol isolation. Added report_peer_failure_with_reason() that maps failure reasons to trust updates. Minor issue: potential memory leak in active_requests if responses never arrive. |
| src/dht_network_manager.rs | Added PeerStoreOutcome per-peer details on PutSuccess, diagnostic fields on GetNotFound, ReplicationResult event, and collect_replication_outcomes() helper. Trust feedback wired into replication failures. Clean backward-compatible implementation. |
| tests/request_response_trust_test.rs | New comprehensive test file covering PeerFailureReason semantics, PeerStoreOutcome serialization, and enriched DHT result variants. Excellent test coverage with 15 tests. |
Sequence Diagram
sequenceDiagram
participant Client as P2PNode (Client)
participant Server as P2PNode (Server)
participant TrustEngine as EigenTrustEngine
Note over Client,Server: Request/Response Flow with Automatic Trust
Client->>Client: Generate UUID message_id
Client->>Client: Create oneshot channel
Client->>Client: Register in active_requests
Client->>Server: send_message(/rr/protocol, RequestResponseEnvelope)
Note right of Client: Envelope: {message_id, is_response=false, payload}
alt Response received within timeout
Server->>Server: Receive on /rr/protocol
Server->>Server: Parse RequestResponseEnvelope
Server->>Server: Process application payload
Server->>Client: send_response(message_id, response_data)
Note left of Server: Envelope: {message_id, is_response=true, payload}
Client->>Client: Receive loop matches message_id
Client->>Client: Remove from active_requests
Client->>Client: Send via oneshot to waiting caller
Client->>TrustEngine: report_peer_success(peer_id)
Note right of TrustEngine: NodeStatisticsUpdate::CorrectResponse
Client-->>Client: Return PeerResponse{data, latency}
else Timeout occurs
Client->>Client: Oneshot channel times out
Client->>Client: Remove from active_requests
Client->>TrustEngine: report_peer_failure_with_reason(Timeout)
Note right of TrustEngine: NodeStatisticsUpdate::FailedResponse (severity 0.2)
Client-->>Client: Return Timeout error
else Connection closed
Server->>Client: Connection dropped
Client->>Client: Oneshot channel closed
Client->>Client: Remove from active_requests
Client->>TrustEngine: report_peer_failure_with_reason(ConnectionFailed)
Note right of TrustEngine: NodeStatisticsUpdate::FailedResponse (severity 0.2)
Client-->>Client: Return ConnectionClosed error
end
Note over Client,TrustEngine: DHT Replication with Per-Peer Trust
Client->>Client: dht_manager.put(key, value)
Client->>Server: Parallel replication to N peers
loop For each peer
alt Peer stores successfully
Server-->>Client: PutSuccess
Client->>TrustEngine: record_peer_success(peer_id)
Note right of Client: PeerStoreOutcome{success=true}
else Peer fails
Server-->>Client: Error or timeout
Client->>TrustEngine: record_peer_failure(peer_id)
Note right of Client: PeerStoreOutcome{success=false, error}
end
end
Client->>Client: Emit ReplicationResult event
Note right of Client: {total_peers, successful_peers, outcomes}
src/network.rs
Outdated
| .insert(message_id.clone(), tx); | ||
|
|
||
| // Wrap in envelope |
There was a problem hiding this comment.
If send_message() fails here, the function returns early via ? without cleaning up the active_requests entry, causing a memory leak
| .insert(message_id.clone(), tx); | |
| // Wrap in envelope | |
| // Send on /rr/<protocol> prefix | |
| let wire_protocol = format!("/rr/{}", protocol); | |
| if let Err(e) = self.send_message(peer_id, &wire_protocol, envelope_bytes).await { | |
| self.active_requests.write().await.remove(&message_id); | |
| return Err(e); | |
| } |
Code Review: Security and Reliability Issues FoundI've completed a comprehensive code review of PR #21. While the implementation is well-structured with excellent test coverage, I've identified 5 issues that should be addressed before merging: 🔴 Critical Issues1. Resource Leak in
|
- Clean up active_requests on serialization/send_message early returns to prevent map entry leaks (Copilot + Greptile) - Log warning when response receiver is dropped before delivery instead of silently discarding (Copilot) - Remove stale "latency" mention from PeerStoreOutcome doc (Copilot) - Remove duplicate assertion in serde default fields test (Copilot) - Fix rustfmt formatting in dht_cross_node_test.rs Co-Authored-By: Claude Opus 4.6 <[email protected]>
Addressing Code Review FindingsThanks for the thorough review! All 5 issues have been addressed in commit cc44d45: Issue 1: Resource Leak in
|
| .send_message(peer_id, &wire_protocol, envelope_bytes) | ||
| .await | ||
| { | ||
| self.active_requests.write().await.remove(&message_id); |
There was a problem hiding this comment.
send_request() documents “automatic trust reporting” on failure, but the early-return path when send_message() fails does not report a peer failure. Consider reporting PeerFailureReason::ConnectionFailed (under adaptive-ml) before returning the error so trust feedback is consistent across all failure modes.
| self.active_requests.write().await.remove(&message_id); | |
| self.active_requests.write().await.remove(&message_id); | |
| // Report connection failure to trust engine for consistency with other failure modes | |
| #[cfg(feature = "adaptive-ml")] | |
| { | |
| let _ = self | |
| .report_peer_failure_with_reason( | |
| peer_id, | |
| PeerFailureReason::ConnectionFailed, | |
| ) | |
| .await; | |
| } |
src/network.rs
Outdated
| } | ||
| // No matching request — fall through to broadcast |
There was a problem hiding this comment.
The comment says “Don’t broadcast responses”, but the current logic will broadcast /rr/ responses when there is no matching pending request (e.g., after a timeout or if the map entry was removed). If /rr/ is intended to be internal-only, consider suppressing broadcasting for all /rr/ responses (matched or unmatched) and optionally log/trace unexpected unmatched responses instead.
| } | |
| // No matching request — fall through to broadcast | |
| } else { | |
| // No matching request — log and do not broadcast internal /rr/ responses | |
| warn!( | |
| message_id = %envelope.message_id, | |
| from = %transport_peer_id, | |
| "Received /rr/ response with no matching pending request — dropping" | |
| ); | |
| continue; | |
| } |
| pub async fn send_response( | ||
| &self, | ||
| peer_id: &PeerId, | ||
| protocol: &str, | ||
| message_id: &str, | ||
| data: Vec<u8>, | ||
| ) -> Result<()> { | ||
| let envelope = RequestResponseEnvelope { | ||
| message_id: message_id.to_string(), | ||
| is_response: true, | ||
| payload: data, | ||
| }; | ||
| let envelope_bytes = postcard::to_allocvec(&envelope).map_err(|e| { | ||
| P2PError::Serialization(format!("Failed to serialize response envelope: {e}").into()) | ||
| })?; | ||
|
|
||
| let wire_protocol = format!("/rr/{}", protocol); | ||
| self.send_message(peer_id, &wire_protocol, envelope_bytes) | ||
| .await | ||
| } |
There was a problem hiding this comment.
send_request() validates protocol (no empty / / / \\ / \0), but send_response() does not. This makes the API inconsistent and can generate invalid or surprising /rr/ topics. Consider reusing the same protocol validation logic in send_response() for symmetry and safety.
src/network.rs
Outdated
| // Reject if at concurrency limit | ||
| { | ||
| let reqs = self.active_requests.read().await; | ||
| if reqs.len() >= MAX_ACTIVE_REQUESTS { | ||
| return Err(P2PError::Transport( | ||
| crate::error::TransportError::StreamError( | ||
| format!( | ||
| "Too many active requests ({MAX_ACTIVE_REQUESTS}); try again later" | ||
| ) | ||
| .into(), | ||
| ), | ||
| )); | ||
| } | ||
| } | ||
|
|
||
| let message_id = uuid::Uuid::new_v4().to_string(); | ||
| let (tx, rx) = tokio::sync::oneshot::channel(); | ||
| let started_at = tokio::time::Instant::now(); | ||
|
|
||
| // Register the pending request with the expected peer for origin validation | ||
| self.active_requests.write().await.insert( | ||
| message_id.clone(), | ||
| PendingRequest { | ||
| response_tx: tx, | ||
| expected_peer: peer_id.to_string(), | ||
| }, | ||
| ); |
There was a problem hiding this comment.
The concurrency limit check and the insertion are performed under different locks (read then write), so multiple concurrent callers can pass the len() check and exceed MAX_ACTIVE_REQUESTS. If the limit is meant to be strict, consider making the check+insert atomic (e.g., do both under a single write lock, or use a semaphore/permit-based approach).
| // Validate protocol name | ||
| if protocol.is_empty() || protocol.contains(&['/', '\\', '\0'][..]) { | ||
| return Err(P2PError::Transport( | ||
| crate::error::TransportError::StreamError( | ||
| format!("Invalid protocol name: {:?}", protocol).into(), | ||
| ), | ||
| )); | ||
| } |
There was a problem hiding this comment.
This is input validation, but it returns a transport-layer StreamError. If the project has (or expects) a validation/bad-argument error variant, using it here would make error semantics clearer for callers (and easier to match on) than a transport error.
| let err_msg = format!("{}", oversized_result.unwrap_err()); | ||
| assert!( | ||
| err_msg.contains("exceeds maximum"), | ||
| "Error should mention size limit, got: {}", | ||
| err_msg | ||
| ); |
There was a problem hiding this comment.
This assertion is fairly brittle because it depends on a specific substring in the error message. If possible, prefer asserting on an error variant/category (e.g., a validation error type) or check for more stable details (like the limit value) to reduce false failures when error wording changes.
| let err_msg = format!("{}", oversized_result.unwrap_err()); | |
| assert!( | |
| err_msg.contains("exceeds maximum"), | |
| "Error should mention size limit, got: {}", | |
| err_msg | |
| ); |
| pub async fn send_request( | ||
| &self, | ||
| peer_id: &PeerId, | ||
| protocol: &str, | ||
| data: Vec<u8>, | ||
| timeout: Duration, | ||
| ) -> Result<PeerResponse> { |
There was a problem hiding this comment.
send_request()/send_response() and the receive-loop routing introduce significant new behavior (timeout handling, origin validation, concurrency limiting, and “automatic trust reporting”), but the added integration tests in this PR appear to focus on envelope parsing and enum/result serde compatibility rather than exercising the end-to-end request/response flow. Adding tests that cover: successful request/response routing, timeout cleanup behavior, origin-mismatch suppression, invalid protocol rejection, MAX_ACTIVE_REQUESTS enforcement, and trust reporting on send failure/timeout would better protect this API.
Addressing Second Round of Review CommentsAll actionable findings from the latest Copilot review have been fixed in commit ba845e2: Fixed
Acknowledged (not changed)
All checks pass locally: |
|
@claude please review |
|
Claude finished @grumbach's task in 1m 32s —— View job Code Review CompleteTodo List
Review SummaryI've completed a comprehensive code review of PR #21. The implementation is well-structured and thoroughly tested, with all previously identified critical issues properly addressed. However, I've identified 2 additional issues that should be addressed before merging. Previous Issues - Verification Status ✅All 5 previously identified issues have been correctly fixed:
New Issues Found🟡 Issue 6: Potential Race Condition in Receive Loop (Medium Severity)Location: Issue: The receive loop acquires a write lock on
Impact:
Suggested Fix: Drop the lock immediately after removing the entry: let pending = {
let mut reqs = active_requests.write().await;
reqs.remove(&envelope.message_id)
};
if let Some(pending) = pending {
// Origin validation without holding lock
if pending.expected_peer != transport_peer_id {
warn!(...);
continue;
}
// Send without holding lock
if pending.response_tx.send(envelope.payload).is_err() {
warn!(...);
}
continue;
}Reference: src/network.rs:1835-1860 🟢 Issue 7: Minor Documentation Inconsistency (Low Severity)Location: Issue: The Impact: Minor - documentation could be more accurate Suggested Fix: Update the doc comment to reflect both internal and external usage: /// Used both internally (by `send_request()`) and by consumers (like saorsa-node)
/// to provide rich context when reporting failures to the trust/reputation system.
Reference: src/error.rs:580-639 Positive ObservationsExcellent Implementation Quality
Code Quality Highlights
RecommendationsBefore Merging (Medium Priority)
Optional Improvements (Low Priority)
Future Enhancements (Not Blocking)As noted in the Copilot review comments, end-to-end integration tests for the request/response flow would be valuable (timeout cleanup, origin mismatch handling, trust reporting on send failure). These can be added when the multi-node test infrastructure is extended. ConclusionThis is a high-quality PR with thoughtful design and comprehensive testing. The implementation properly addresses all previously identified security and reliability concerns. The single medium-severity issue (lock contention in receive loop) should be fixed before merging, but it's a straightforward refactoring that doesn't require architectural changes. Recommendation: Address Issue 6, then approve for merge. Great work on the comprehensive error reporting and trust integration! 🎉 |
|
@claude fix both issues and commit them here directly |
|
Claude finished @grumbach's task in 1m 19s —— View job Fixes Complete ✅Todo List
SummaryBoth issues have been fixed and committed to the PR branch: Issue 6: Lock Contention in Receive Loop (Medium Severity) ✅Fixed in src/network.rs:1833-1869 The receive loop now drops the write lock immediately after removing the pending request entry, before performing:
This prevents blocking concurrent Key changes: // Before: Held lock during validation, send, and logging
let mut reqs = active_requests.write().await;
if let Some(pending) = reqs.remove(&envelope.message_id) {
// validation, send, logging all while holding lock
}
// After: Drop lock immediately after removal
let pending = {
let mut reqs = active_requests.write().await;
reqs.remove(&envelope.message_id)
};
if let Some(pending) = pending {
// validation, send, logging without holding lock
}Issue 7: Documentation Inconsistency (Low Severity) ✅Fixed in src/error.rs:580-584 Updated
Commit Details
--- | |
|
@claude fix the compile/CI issues |
|
Claude finished @grumbach's task in 3m 19s —— View job Fixes Complete ✅Todo List
Issue FixedCompilation Error (src/network.rs:1298):
Commit
The fix ensures the code compiles correctly regardless of which feature flags are enabled. CI should now pass. |
Three-phase implementation closing the gap between the consumer layer (saorsa-node) and saorsa-core's trust/adaptive layers: Phase 1 — Enriched trust feedback: - PeerFailureReason enum with severity scoring and transient classification - NodeStatisticsUpdate extended with DataUnavailable, CorruptedData, ProtocolViolation variants (corrupted/protocol count as 2x failures) - report_peer_failure_with_reason() maps reasons to trust penalties - Trust wiring in put() replication failures Phase 2 — Enriched DHT error reporting: - PeerStoreOutcome struct for per-peer replication detail - PutSuccess enriched with peer_outcomes vec - GetNotFound enriched with peers_queried, peers_failed, last_error - ReplicationResult event emitted on put() - All new fields use #[serde(default)] for wire compatibility Phase 3 — Request/response with automatic trust: - send_request() with timeout, oneshot correlation, auto trust reporting - send_response() for consumer reply path - parse_request_envelope() public helper - /rr/ protocol prefix isolates traffic from existing protocols Also fixes two pre-existing test failures: - test_dht_put_large_value: now validates both max-size success and oversized rejection (was trying 1MB against 512B limit) - test_stress_50_values: value sizes reduced to 10-500B range (was 1KB-10KB, all exceeding 512B limit) Co-Authored-By: Claude Opus 4.6 <[email protected]>
- Clean up active_requests on serialization/send_message early returns to prevent map entry leaks (Copilot + Greptile) - Log warning when response receiver is dropped before delivery instead of silently discarding (Copilot) - Remove stale "latency" mention from PeerStoreOutcome doc (Copilot) - Remove duplicate assertion in serde default fields test (Copilot) - Fix rustfmt formatting in dht_cross_node_test.rs Co-Authored-By: Claude Opus 4.6 <[email protected]>
- Sanitize error strings in collect_replication_outcomes to avoid leaking internal DhtNetworkResult Debug output over the wire; now only emits the variant name (e.g. "GetSuccess") - Add response origin validation: store expected peer_id in active_requests and reject responses from unexpected peers - Cap request timeout at 5 minutes and limit concurrent active requests to 256 to prevent unbounded resource growth - Validate protocol parameter in send_request: reject empty strings and names containing /, \, or null bytes Co-Authored-By: Claude Opus 4.6 <[email protected]>
- Report trust failure (ConnectionFailed) when send_message() fails in send_request(), ensuring trust feedback on all failure paths - Suppress unmatched /rr/ responses instead of broadcasting them; these are internal protocol messages (likely from timed-out requests) - Add protocol validation to send_response() matching send_request() for API consistency - Fix TOCTOU race in concurrency limit: atomic check-and-insert under a single write lock instead of separate read-then-write - Make test assertion more stable: check for actual/max size values (513/512) instead of substring "exceeds maximum" Co-Authored-By: Claude Opus 4.6 <[email protected]>
After rebasing onto main (which includes PR #20's feature gate removal), our new code still had #[cfg(feature = "adaptive-ml")] guards that referenced the now-removed feature, causing compilation failures in CI. Co-Authored-By: Claude Opus 4.6 <[email protected]>
527e11a to
a68b4ab
Compare
|
Claude finished @dirvine's task in 4m 8s —— View job Code Review CompleteTodo List
Review SummaryAfter rebasing onto main to incorporate PR #20 (feature gate removal), this PR is in good shape but 2 issues from the previous review remain unfixed. The rebase successfully removed the stale ✅ Compilation Fix Verified (Commit a68b4ab)The latest commit correctly addresses the CI failures by:
Status: Working correctly. Lint & Format CI has passed. Status of Previously Identified Issues✅ Issues 1-5: Properly Fixed1. Resource Leak in
|
| let mut reqs = active_requests.write().await; | ||
| if let Some(pending) = reqs.remove(&envelope.message_id) { | ||
| if pending.expected_peer != transport_peer_id { | ||
| warn!( | ||
| message_id = %envelope.message_id, | ||
| expected = %pending.expected_peer, | ||
| actual = %transport_peer_id, | ||
| "Response origin mismatch — ignoring" | ||
| ); | ||
| // Don't deliver; don't broadcast | ||
| continue; | ||
| } |
There was a problem hiding this comment.
On response-origin mismatch, the pending request entry is removed (and its oneshot sender dropped), which will cause the waiting send_request() to fail immediately and prevents a subsequent valid response from the expected peer from being delivered. Fix by not removing the entry until after origin validation (e.g., check via get first, or re-insert pending back into the map on mismatch).
| let successful_peers = peer_outcomes.iter().filter(|o| o.success).count(); | ||
| let _ = self.event_tx.send(DhtNetworkEvent::ReplicationResult { | ||
| key, | ||
| total_peers, | ||
| successful_peers, | ||
| outcomes: peer_outcomes.clone(), | ||
| }); |
There was a problem hiding this comment.
DhtNetworkEvent::ReplicationResult computes total_peers/successful_peers solely from peer_outcomes, but replicated_to in PutSuccess appears to include the local store (at least in several places it’s initialized with 1). This mismatch can confuse consumers (event says N successes, result says N+1). Consider either (a) including the local store as an explicit PeerStoreOutcome, or (b) clearly documenting/renaming fields to indicate these counts are for remote replication targets only.
| return Err(P2PError::Transport( | ||
| crate::error::TransportError::StreamError( | ||
| format!("Invalid protocol name: {:?}", protocol).into(), | ||
| ), | ||
| )); |
There was a problem hiding this comment.
Invalid protocol is an input validation failure, but the code reports it as a transport StreamError. Prefer returning a validation-style error (if the error type supports it) so callers can distinguish malformed input from network failures; also consider extracting this protocol validation into a shared helper since the same logic exists in both send_request() and send_response().
| return Err(P2PError::Transport( | |
| crate::error::TransportError::StreamError( | |
| format!("Invalid protocol name: {:?}", protocol).into(), | |
| ), | |
| )); | |
| return Err(NetworkError::Validation( | |
| format!("Invalid protocol name: {:?}", protocol).into(), | |
| ) | |
| .into()); |
Greptile OverviewGreptile SummaryThis PR adds richer trust/error reporting across the P2P and DHT layers.
Main items needing attention are around lifecycle management for in-flight requests ( Confidence Score: 3/5
|
| Filename | Overview |
|---|---|
| src/adaptive/dht_integration.rs | Populates new peer_outcomes field with empty vec for local PUT success; no functional issues spotted. |
| src/adaptive/trust.rs | Adds NodeStatisticsUpdate variants for richer trust signals and maps them into failure counters; change is straightforward. |
| src/dht_network_manager.rs | Adds per-peer replication outcomes, GET-not-found diagnostics, and emits a replication-result event; watch event field semantics (remote-only counts) and consumer expectations. |
| src/error.rs | Introduces PeerFailureReason with transient/severity helpers and Display; appears consistent with intended trust mapping. |
| src/lib.rs | Re-exports new public types (PeerStoreOutcome, PeerFailureReason, PeerResponse) to support consumers; no issues found. |
| src/network.rs | Implements request/response API with active request tracking and automatic trust reporting; main concern is active_requests lifecycle on cancellation/timeouts and edge-case timeout semantics. |
| tests/dht_cross_node_test.rs | Updates large-value test to respect 512-byte limit and asserts oversize rejection; looks correct. |
| tests/dht_parallel_replication_e2e_test.rs | Adjusts stress test payload sizes to fit 512-byte limit and relaxes PutSuccess match to ignore new fields; looks correct. |
| tests/request_response_trust_test.rs | Adds integration tests for new enums/results and envelope parsing; provides backward-compat serde checks. |
Sequence Diagram
sequenceDiagram
participant Caller
participant Node as P2PNode::send_request
participant AR as active_requests (RwLock<HashMap>)
participant Tx as send_message(/rr/<protocol>)
participant Remote as Remote peer handler
participant RxLoop as message receive loop
Caller->>Node: send_request(peer, protocol, payload, timeout)
Node->>AR: insert(message_id -> PendingRequest{expected_peer, oneshot_tx})
Node->>Tx: send_message(peer, /rr/protocol, postcard(envelope{is_response=false}))
Tx-->>Node: Ok / Err
alt send_message error
Node->>AR: remove(message_id)
Node->>Node: report_peer_failure_with_reason(ConnectionFailed)
Node-->>Caller: Err
else sent ok
Node->>Node: await timeout(oneshot_rx)
alt response arrives
Remote-->>RxLoop: /rr/protocol postcard(envelope{is_response=true, message_id})
RxLoop->>AR: remove(message_id)
RxLoop->>RxLoop: validate origin == expected_peer
RxLoop-->>Node: oneshot_tx.send(payload)
Node->>Node: report_peer_success()
Node->>AR: remove(message_id) (idempotent)
Node-->>Caller: PeerResponse{data, latency}
else timeout
Node->>Node: report_peer_failure_with_reason(Timeout)
Node->>AR: remove(message_id)
Node-->>Caller: Err(timeout)
else caller cancels future
Note over Node,AR: cleanup after await is skipped;
Note over AR: entry can remain until process exit
end
end
| } | ||
|
|
||
| let message_id = uuid::Uuid::new_v4().to_string(); |
There was a problem hiding this comment.
Request leak on timeout
send_request() cleans up the active_requests entry after waiting on tokio::time::timeout(timeout, rx). If the caller drops the future before the await completes (common with select! cancellation or task abort), the cleanup code is never reached and the entry can remain until process exit (bounded but still leaks up to MAX_ACTIVE_REQUESTS). Consider ensuring removal happens on drop/cancellation (e.g., RAII guard / scopeguard) so entries are removed even if the future is cancelled.
| format!("Failed to serialize request envelope: {e}").into(), | ||
| )); | ||
| } |
There was a problem hiding this comment.
Timeout=0 becomes 5min
let timeout = timeout.min(MAX_REQUEST_TIMEOUT); only caps the upper bound; it doesn’t prevent Duration::ZERO. With timeout=0, tokio::time::timeout fires immediately and treats it as a failure, but the code then reports a peer timeout and still leaves the request entry until the end of the function (and until cancellation, if cancelled). If callers pass 0 as “no timeout” (a common pattern), this behavior is likely surprising. Either document that 0 means immediate timeout or enforce a sensible minimum / treat 0 as the max cap explicitly.
Additional Comments (1)
the |
Code reviewNo issues found. Checked for bugs and CLAUDE.md compliance. |
|
Findings
|
Code reviewNo issues found. Checked for bugs and CLAUDE.md compliance. |
…ing) Implements 11 hardening improvements from PR #21 review comments: 1. Request Leak/Cleanup: - Added RAII RequestCleanupGuard for automatic cleanup on drop/cancel - Atomic check+insert under single write lock to prevent races 2. Input Validation: - Added TransportError::ValidationError for input validation errors - validate_protocol_name() now uses ValidationError instead of StreamError 3. Timeout Handling: - Added MIN_REQUEST_TIMEOUT (100ms) to prevent Duration::ZERO immediate timeout - send_request() clamps timeout to [100ms, 5min] range 4. Response Routing: - Improved logging for failed pending.send() to clarify timeout scenario 5. Documentation: - Added documentation to ReplicationResult clarifying remote-only counts - Fixed brittle error assertion in tests to check error variant 6. Testing: - New tests/request_response_e2e_test.rs with 7 comprehensive tests: * Successful request/response routing * Timeout cleanup behavior * Invalid protocol rejection (empty, /, \, \0) * Protocol validation in send_response() * Minimum timeout enforcement * Trust reporting on failure Items already correct (verified): - #3: Protocol validation in send_response() already present - #6: Response-origin mismatch uses get() before remove() - #7: Unmatched /rr/ responses already suppressed - #9: Trust reporting on send_message() failure already implemented - #10: PeerStoreOutcome docs correct (no latency mention) Closes #23 Co-authored-by: David Irvine <[email protected]>
Summary
PeerFailureReasonenum with severity scoring, extendedNodeStatisticsUpdate(DataUnavailable, CorruptedData, ProtocolViolation),report_peer_failure_with_reason()method, trust wiring input()replication failuresPeerStoreOutcomeper-peer detail onPutSuccess, diagnostic fields (peers_queried,peers_failed,last_error) onGetNotFound,ReplicationResultevent emissionsend_request()/send_response()with timeout, oneshot correlation, automatic trust reporting on success/failure/timeout,/rr/protocol prefix isolationtest_dht_put_large_valueandtest_stress_50_values) that exceeded the 512-byte DHT value size limitAll new fields use
#[serde(default)]for backward wire compatibility.send_message()is untouched — existing consumers continue to work.Files changed
src/error.rsPeerFailureReasonenum withis_transient(),trust_severity()src/adaptive/trust.rsNodeStatisticsUpdatevariantssrc/network.rssend_request(),send_response(),parse_request_envelope(),PeerResponse,report_peer_failure_with_reason()src/dht_network_manager.rsPeerStoreOutcome, enrichedPutSuccess/GetNotFound,collect_replication_outcomes()helper,ReplicationResulteventsrc/lib.rstests/request_response_trust_test.rstests/dht_cross_node_test.rstest_dht_put_large_valuetests/dht_parallel_replication_e2e_test.rstest_stress_50_valuesTest plan
cargo check --all-features --all-targets— zero errorscargo clippy --all-features --all-targets -- -D warnings— zero warningscargo fmt --all -- --check— cleancargo doc --all-features --no-deps— zero warningscargo nextest run --all-features --no-fail-fast— 1452 passed, 0 failed, 33 skipped🤖 Generated with Claude Code