Skip to content

feat(dht): implement K-replication for DHT store operations#3

Closed
mickvandijke wants to merge 5 commits intomainfrom
feat/dht-k-replication
Closed

feat(dht): implement K-replication for DHT store operations#3
mickvandijke wants to merge 5 commits intomainfrom
feat/dht-k-replication

Conversation

@mickvandijke
Copy link
Collaborator

@mickvandijke mickvandijke commented Jan 28, 2026

Summary

  • Implement K-way replication when storing data in the DHT
  • Add background repair task to restore degraded replicas after node failures
  • Skip duplicate replication requests for existing keys (content-addressed dedup)
  • Use standard Kademlia timing parameters (1-hour replication interval)

Changes

K-Replication in store()

  • Add optional transport field to DhtCoreEngine for network communication
  • store() now sends DhtMessage::Replicate to K-1 remote nodes
  • Track successful replicas in StoreReceipt.stored_at
  • Schedule repair for keys with partial replication

ReplicationManager

  • Use HashSet<DhtKey> for O(1) pending repair lookups
  • Add schedule_repair(), take_pending_repairs_batch() methods
  • Throttle repairs with MAX_REPAIRS_PER_CYCLE = 20 (matches Kademlia k)

Background Repair Task

  • Added to start_maintenance_tasks()
  • REPLICATION_INTERVAL_SECS = 3600 (1 hour, per Kademlia tReplicate spec)
  • Process pending repairs and DataIntegrityMonitor recommendations
  • Throttled to ~160 network messages max per cycle

Node Failure Handling

  • handle_node_failure() now finds affected keys and schedules repairs
  • Added DataIntegrityMonitor::remove_node_from_all() helper

Deduplication

  • Added DhtCoreEngine::has_key() for O(1) existence check
  • Replication handler skips storing if key already exists

Constants (per Kademlia spec)

Constant Value Rationale
DEFAULT_REPLICATION_FACTOR 8 K closest nodes store each key
REPLICATION_INTERVAL_SECS 3600 Standard Kademlia tReplicate (1 hour)
MAX_REPAIRS_PER_CYCLE 20 Matches Kademlia k parameter

Test plan

  • cargo test --lib - 1318 tests pass
  • cargo test --test dht_replication_test - 10 tests pass
  • cargo clippy -- -D warnings -D clippy::unwrap_used -D clippy::expect_used - passes

References

🤖 Generated with Claude Code

mickvandijke and others added 5 commits January 28, 2026 16:54
Add K-way replication to ensure data redundancy across the DHT network:

- Add optional transport to DhtCoreEngine for network communication
- Modify store() to replicate data to K-1 remote nodes via DhtMessage::Replicate
- Activate ReplicationManager with schedule_repair, take_pending_repairs methods
- Add background repair task to maintenance loop (60s interval)
- Implement node failure handling that schedules repairs for affected keys
- Add DataIntegrityMonitor accessors: get_storage_nodes, remove_node_from_all, add_storage_node

Partial replication is accepted - background repair completes missing replicas.
Content-addressed storage means identical keys guarantee identical data.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add O(1) has_key check to DhtCoreEngine and use it in the replication
handler to skip storing data we already have. Since storage is
content-addressed, identical keys guarantee identical data.

This prevents wasted I/O when multiple nodes send replication requests
for the same key to the same target node.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Change replication maintenance interval from 60 seconds to 3600 seconds
(1 hour) to match the standard Kademlia tReplicate parameter.

Also increase MAX_REPAIRS_PER_CYCLE from 10 to 100 since with hourly
checks we can afford larger batches.

References:
- Kademlia spec: tReplicate = 3600s
- libp2p/IPFS: 1 hour default replication interval

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Reduce from 100 to 20 to align with standard Kademlia k parameter
used in libp2p/IPFS. With K=8 replication this means ~160 network
messages max per hourly cycle, handling ~480 repairs/day.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add store_local_only() method for Replicate handlers to store without
triggering further replication. Original Store requests still replicate
to K-1 nodes, but Replicate message receivers now persist locally only.

This prevents the cascade where node A sends Replicate to B, then B
would also try to replicate to C, D, etc.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@mickvandijke
Copy link
Collaborator Author

mickvandijke commented Jan 28, 2026

@claude Please review this

@mickvandijke mickvandijke marked this pull request as draft January 28, 2026 19:21
@dirvine
Copy link
Collaborator

dirvine commented Jan 28, 2026

@mickvandijke be careful here that it's not a simple DHT and uses a multi layer model for trust lstm etc. So this needs a deeper dive on the multi layers and how they choose nodes and use GEO distribution and so on.

@dirvine
Copy link
Collaborator

dirvine commented Jan 28, 2026

@claude Please review PR #3 focusing on how the K-replication integrates with the multi-layer trust-weighted routing, geographic distribution, and adaptive network components. Verify it doesn't bypass the intelligent placement system.

@claude
Copy link

claude bot commented Jan 28, 2026

Claude finished @dirvine's task in 3m 45s —— View job


Review: K-Replication Integration with Multi-Layer Architecture ✅

Tasks

  • Understand the multi-layer architecture (placement, adaptive, trust)
  • Review K-replication implementation in DHT core
  • Verify integration with placement system and EigenTrust
  • Check geographic distribution compliance
  • Analyze adaptive network layer interaction
  • Provide comprehensive review feedback

Critical Issues Found

🚨 MAJOR: Bypassing the Intelligent Placement System

The K-replication implementation in DhtCoreEngine::store() (src/dht/core_engine.rs:936-1062) completely bypasses the trust-weighted placement system. This is exactly what @dirvine warned about.

Current flow (WRONG):

DhtCoreEngine::store() 
  → find_closest_nodes() [simple XOR distance]
  → select_least_loaded() [only considers load]
  → replicate to nodes

Expected flow (from architecture):

PlacementOrchestrator::place_data()
  → PlacementEngine::select_nodes()
    → WeightedPlacementStrategy
      → Calculate weights: w_i = (τ_i^α) * (p_i^β) * (c_i^γ) * d_i
        - τ_i: EigenTrust reputation (currently IGNORED)
        - p_i: Performance/stability score (currently IGNORED)  
        - c_i: Capacity factor (only partially used)
        - d_i: Geographic diversity bonus (NOT calculated)
      → Efraimidis-Spirakis weighted sampling
      → DiversityEnforcer validation
  → DhtCoreEngine::store() [should be lower-level operation]

Evidence:

  1. src/dht/core_engine.rs:940-948 - Node selection uses basic Kademlia + load balancing:

    let target_nodes = routing.find_closest_nodes(key, DEFAULT_REPLICATION_FACTOR);
    let selected_nodes = load_balancer.select_least_loaded(&target_nodes, 8);

    This is pure DHT logic with NO trust scores, NO EigenTrust, NO diversity enforcement.

  2. src/placement/orchestrator.rs:119-166 - The proper placement flow exists but is NOT used by DHT:

    let decision = placement_engine.select_nodes(
        &available_nodes,
        replication_factor,
        &trust_system,  // ← EigenTrust integration
        &performance_monitor,  // ← Performance scores
        &node_metadata,  // ← Geographic data
    ).await?;
  3. src/placement/algorithms.rs:115-167 - The trust-weighted selection algorithm exists but is bypassed:

    pub fn calculate_weight(
        &self,
        node_id: &NodeId,
        trust_score: f64,        // ← Should be from EigenTrust
        stability_score: f64,    // ← Should be from churn predictor
        capacity_factor: f64,
        diversity_factor: f64,   // ← Should be from DiversityEnforcer
        alpha: f64, beta: f64, gamma: f64,
    ) -> PlacementResult<f64>

🔴 CRITICAL: No EigenTrust Integration

Current state in src/dht/core_engine.rs:

  • Line 665-672: EigenTrust scores only used for EVICTION during bucket refresh (security validation)
  • Lines 940-1013: Store operation uses ZERO trust data for replica selection
  • Trust system exists (self.close_group_validator, eviction_manager.get_trust_score()) but is NOT consulted during replication

What should happen (from src/placement/algorithms.rs:403-404):

let trust_score = 0.8; // Mock trust score  // ← Should be: trust_system.get_trust(&node_id)
let stability_score = 0.9; // Mock stability score  // ← Should be: churn_predictor.predict(&node_id)

The placement algorithm has mock values hardcoded, showing this integration was never completed.

🟠 HIGH: Geographic Diversity Only at Add, Not Replication

src/dht/core_engine.rs:1317-1337 - Geographic diversity enforcer exists and is used when adding nodes to routing table.

BUT src/dht/core_engine.rs:940-1013 (store operation) does NOT calculate geographic diversity factors when selecting replica targets. The GeographicDiversityEnforcer is initialized (line 556) but never queried during find_closest_nodes().

Compare to placement system:

  • src/placement/algorithms.rs:256-295 - DiversityEnforcer::calculate_diversity_factor() actively penalizes nodes that are:
    • Too close geographically (<100km minimum, line 249)
    • Clustered in same region (max 2 per region, line 250)
    • Clustered in same ASN (max 3 per ASN, line 251)
    • Applies 50% penalty for violations (line 252)

This logic exists but is never called in the replication path!

🟡 MEDIUM: Adaptive Network Layer Not Consulted

The PR description mentions "multi-layer model for trust lstm etc." but:

src/adaptive/multi_armed_bandit.rs:82-106 - MultiArmedBandit::select_route() with Thompson Sampling is designed to adaptively choose between routing strategies based on performance.

src/adaptive/routing.rs defines strategy selection for:

  • Kademlia DHT (what's currently hardcoded)
  • Hyperbolic routing
  • Trust-based routing
  • SOM region routing

BUT src/dht/core_engine.rs:943 hardcodes Kademlia XOR distance. No MAB consultation, no strategy adaptation, no learning from past successes/failures.


Architecture Violations

Layer Separation Broken

The current implementation conflates concerns:

Layer Responsibility Current PR Behavior
Placement (src/placement/) Trust-weighted selection, diversity ❌ Bypassed
Adaptive (src/adaptive/) Strategy selection (MAB/Thompson) ❌ Not used
DHT (src/dht/) Low-level storage, routing table ✅ Used (but should be lower-level)

Missing Integration Points

  1. No PlacementEngine call - DhtCoreEngine::store() should delegate to PlacementEngine::select_nodes()
  2. No trust system query - Never calls EigenTrustEngine::get_trust() (src/adaptive/trust.rs)
  3. No performance monitoring - Never calls PerformanceMonitor for stability scores
  4. No diversity calculation - Never calls DiversityEnforcer::calculate_diversity_factor()

Recommendations

Option A: Fix Integration (Recommended)

Refactor DhtCoreEngine::store() to use placement system:

Add to DhtCoreEngine:

pub struct DhtCoreEngine {
    // ... existing fields ...
    placement_engine: Option<Arc<RwLock<PlacementEngine>>>,
    trust_system: Option<Arc<EigenTrustEngine>>,
    performance_monitor: Option<Arc<PerformanceMonitor>>,
}

Then in store() (around line 940):

pub async fn store(&mut self, key: &DhtKey, value: Vec<u8>) -> Result<StoreReceipt> {
    // Get candidate nodes from routing table
    let routing = self.routing_table.read().await;
    let candidate_nodes = routing.find_closest_nodes(key, 20); // Get more candidates
    drop(routing);
    
    let selected_nodes = if let (Some(placement), Some(trust), Some(perf)) = 
        (&self.placement_engine, &self.trust_system, &self.performance_monitor) 
    {
        // Use trust-weighted placement
        let available: HashSet<NodeId> = candidate_nodes.iter()
            .map(|n| n.id.clone()).collect();
        let metadata = self.get_node_metadata(&candidate_nodes).await?;
        
        let mut engine = placement.write().await;
        let decision = engine.select_nodes(
            &available,
            DEFAULT_REPLICATION_FACTOR as u8,
            trust,              // ← EigenTrust scores
            perf,               // ← Stability/churn prediction
            &metadata,          // ← Geographic diversity
        ).await?;
        
        decision.selected_nodes
    } else {
        // Fallback to basic Kademlia (existing logic)
        let load_balancer = self.load_balancer.read().await;
        load_balancer.select_least_loaded(&candidate_nodes, 8)
    };
    
    // Now replicate to selected_nodes (existing code from line 955+)
    // ...
}

Benefits:

  • ✅ Respects trust scores from EigenTrust
  • ✅ Enforces geographic diversity constraints
  • ✅ Uses performance/stability predictions
  • ✅ Proper weighted sampling (Efraimidis-Spirakis)
  • ✅ Maintains architectural layering
  • ✅ Backward compatible (fallback to basic mode)

Option B: Document the Tradeoff

If Option A is too complex for this PR, explicitly document that this is a simplified implementation and file an issue to integrate with placement system later.

Add to src/dht/core_engine.rs:936 docstring:

/// Store data in the DHT with K-replication
///
/// # Current Limitations (FIXME: Issue #XXX)
/// 
/// This implementation uses basic Kademlia XOR distance and load balancing.
/// It does NOT integrate with:
/// - PlacementEngine's trust-weighted selection (EigenTrust scores)
/// - Geographic diversity enforcement beyond routing table limits
/// - Adaptive routing strategy selection (Multi-Armed Bandit)
/// - Churn prediction for stability scoring
///
/// Replica selection currently ignores:
/// - Node reputation (τ_i from EigenTrust)
/// - Performance/stability scores (p_i from PerformanceMonitor)
/// - Geographic diversity bonuses (d_i from DiversityEnforcer)
///
/// This means malicious or unreliable nodes may be selected for storage,
/// and geographic concentration attacks are not mitigated during replication.

Also required: Create tracking issue with title like "Integrate DHT K-replication with trust-weighted placement system"


Positive Aspects

Good infrastructure:

  • ReplicationManager with deduplication (HashSet) - src/dht/core_engine.rs:329-411
  • Background repair task with throttling (MAX_REPAIRS_PER_CYCLE = 20) - src/dht/core_engine.rs:50
  • store_local_only() to prevent cascading replication - src/dht/core_engine.rs:1095
  • Proper separation of concerns in repair logic - src/dht/core_engine.rs:834-929
  • Standard Kademlia timing (1-hour replication interval) - src/dht/core_engine.rs:43

Security components present:

  • CloseGroupValidator for trust-based validation - src/dht/core_engine.rs:504
  • IPDiversityEnforcer and GeographicDiversityEnforcer - src/dht/core_engine.rs:505, 507
  • Security metrics tracking - src/dht/core_engine.rs:502
  • Eviction manager with trust score tracking - src/dht/core_engine.rs:506

But none of these are used during replica selection in store()!


Test Coverage Gap

tests/dht_replication_test.rs tests:

  • ✅ ReplicationManager scheduling and deduplication (lines 24-60)
  • ✅ Consistency level configuration (lines 63-78)
  • ✅ Store without transport / local mode (lines 81-100)
  • ❌ NO tests verifying trust scores affect replica selection
  • ❌ NO tests verifying geographic diversity constraints
  • ❌ NO tests comparing DHT selection with PlacementEngine behavior
  • ❌ NO tests with malicious nodes (low trust scores)

Suggested additional tests:

#[tokio::test]
async fn test_store_respects_trust_scores() {
    // Verify high-trust nodes are preferred over low-trust nodes
}

#[tokio::test]  
async fn test_store_enforces_geographic_diversity() {
    // Verify replicas spread across regions, not clustered
}

#[tokio::test]
async fn test_store_uses_placement_engine_when_available() {
    // Verify PlacementEngine is consulted if provided
}

Summary

This PR implements solid DHT replication mechanics but bypasses the intelligent placement system that is core to saorsa's architecture. Before merging, the team must decide:

  1. Integrate now - Refactor to use PlacementEngine (Option A) - ~1-2 days work
  2. Track for later - Document limitation and create follow-up issue (Option B) - ~30 min work
  3. Accept risk - Merge knowing replication ignores trust/diversity (NOT recommended)

The current implementation is essentially "traditional Kademlia" when the architecture calls for "trust-weighted, geo-diverse, adaptive Kademlia."

Security Implications

Without trust-weighted selection:

  • ❌ Malicious nodes can be selected as replica holders (no EigenTrust filtering)
  • ❌ Geographic concentration attacks not mitigated (no diversity enforcement)
  • ❌ Sybil attacks easier (no trust-based node filtering)
  • ❌ Eclipse attacks more feasible (no adaptive routing strategy selection)

The Byzantine fault tolerance and witness systems (src/dht/witness.rs) are present but weakened if replica selection ignores trust scores.


Recommendation: Implement Option B immediately (document + issue), then prioritize Option A for next sprint. This PR has good foundational work but needs the trust layer integrated before production use.


@mickvandijke
Copy link
Collaborator Author

Ah thanks for raising that, I'll make sure to consider these other layers when I implement replication on the node side.

Closing this PR because the logic will be moved to the node.

claude bot pushed a commit that referenced this pull request Feb 12, 2026
…ing)

Implements 11 hardening improvements from PR #21 review comments:

1. Request Leak/Cleanup:
   - Added RAII RequestCleanupGuard for automatic cleanup on drop/cancel
   - Atomic check+insert under single write lock to prevent races

2. Input Validation:
   - Added TransportError::ValidationError for input validation errors
   - validate_protocol_name() now uses ValidationError instead of StreamError

3. Timeout Handling:
   - Added MIN_REQUEST_TIMEOUT (100ms) to prevent Duration::ZERO immediate timeout
   - send_request() clamps timeout to [100ms, 5min] range

4. Response Routing:
   - Improved logging for failed pending.send() to clarify timeout scenario

5. Documentation:
   - Added documentation to ReplicationResult clarifying remote-only counts
   - Fixed brittle error assertion in tests to check error variant

6. Testing:
   - New tests/request_response_e2e_test.rs with 7 comprehensive tests:
     * Successful request/response routing
     * Timeout cleanup behavior
     * Invalid protocol rejection (empty, /, \, \0)
     * Protocol validation in send_response()
     * Minimum timeout enforcement
     * Trust reporting on failure

Items already correct (verified):
- #3: Protocol validation in send_response() already present
- #6: Response-origin mismatch uses get() before remove()
- #7: Unmatched /rr/ responses already suppressed
- #9: Trust reporting on send_message() failure already implemented
- #10: PeerStoreOutcome docs correct (no latency mention)

Closes #23

Co-authored-by: David Irvine <dirvine@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants