Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions examples/spec_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

//! Minimal example aligned with current saorsa-core API.
use anyhow::Result;
use saorsa_core::fwid::{Key, Word, compute_key, fw_check, fw_to_key};
use saorsa_core::types::{
Device, DeviceId, Endpoint, MlDsaKeyPair, presence::DeviceType as DevType,
};
use saorsa_core::{get_data, identity_fetch, register_identity, register_presence, store_data};
use saorsa_core::fwid::{fw_check, fw_to_key, compute_key, Key, Word};
use saorsa_core::types::{presence::DeviceType as DevType, Device, DeviceId, Endpoint, MlDsaKeyPair};

#[tokio::main]
async fn main() -> Result<()> {
Expand Down Expand Up @@ -49,7 +51,10 @@ async fn quick_start_identity_presence_storage() -> Result<()> {
id: DeviceId::generate(),
device_type: DevType::Desktop,
storage_gb: 128,
endpoint: Endpoint { protocol: "quic".into(), address: "127.0.0.1:9000".into() },
endpoint: Endpoint {
protocol: "quic".into(),
address: "127.0.0.1:9000".into(),
},
capabilities: Default::default(),
}];
let _ = register_presence(&handle, devices.clone(), devices[0].id).await?;
Expand Down
12 changes: 6 additions & 6 deletions examples/test_api.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use saorsa_core::{register_identity, get_identity};
use saorsa_core::types::MlDsaKeyPair;
use saorsa_core::{get_identity, register_identity};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Testing API registration...");

let words = ["welfare", "absurd", "king", "ridge"];
let keypair = MlDsaKeyPair::generate()?;

println!("Registering identity with words: {:?}", words);
let handle = register_identity(words, &keypair).await?;
println!("Registration successful! Key: {}", handle.key());

println!("Fetching identity...");
let fetched = get_identity(handle.key()).await?;
println!("Identity fetched! Words: {:?}", fetched.words);

Ok(())
}
}
2 changes: 1 addition & 1 deletion examples/test_keygen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ fn main() {
Ok(_) => println!("Key generation successful!"),
Err(e) => println!("Key generation failed: {:?}", e),
}
}
}
16 changes: 9 additions & 7 deletions examples/test_words.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use four_word_networking::FourWordEncoder;

fn main() {
let encoder = FourWordEncoder::new();

// Try to encode some IP addresses to get valid words
let test_ips = vec![
(std::net::Ipv4Addr::new(192, 168, 1, 1), 8080),
Expand All @@ -23,22 +23,24 @@ fn main() {
(std::net::Ipv4Addr::new(172, 16, 2, 1), 5000),
(std::net::Ipv4Addr::new(192, 168, 4, 1), 8080),
];

for (ip, port) in test_ips {
match encoder.encode_ipv4(ip, port) {
Ok(encoding) => {
println!("Valid words for IP {}:{}", ip, port);
// Use Display trait to get the words
let words_str = format!("{}", encoding);
println!(" Words string: {}", words_str);

// Parse back to get individual words
let words: Vec<&str> = words_str.split(' ').collect();
if words.len() == 4 {
println!(" Words array: [\"{}\", \"{}\", \"{}\", \"{}\"]",
words[0], words[1], words[2], words[3]);
println!(
" Words array: [\"{}\", \"{}\", \"{}\", \"{}\"]",
words[0], words[1], words[2], words[3]
);
}

// Test decoding back
let result = encoder.decode_ipv4(&encoding);
if result.is_ok() {
Expand All @@ -50,4 +52,4 @@ fn main() {
}
}
}
}
}
91 changes: 57 additions & 34 deletions src/adaptive/churn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ use crate::adaptive::{
replication::ReplicationManager,
routing::AdaptiveRouter,
};
use crate::dht::{
NodeFailureTracker, ReplicationGracePeriodConfig,
};
use crate::dht::{NodeFailureTracker, ReplicationGracePeriodConfig};
use anyhow::Result;
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -336,7 +334,9 @@ impl ChurnHandler {

/// Set the node failure tracker for grace period management
pub async fn set_failure_tracker(&self, failure_tracker: Arc<dyn NodeFailureTracker>) {
self.recovery_manager.set_failure_tracker(failure_tracker).await;
self.recovery_manager
.set_failure_tracker(failure_tracker)
.await;
}

/// Start monitoring network for churn
Expand Down Expand Up @@ -467,8 +467,12 @@ impl ChurnHandler {

// 4. Queue recovery tasks with grace period consideration
let grace_config = ReplicationGracePeriodConfig::default();
tracing::info!("Node {} failed, queuing recovery for {} content items with {}s grace period",
node_id, lost_content.len(), grace_config.grace_period_duration.as_secs());
tracing::info!(
"Node {} failed, queuing recovery for {} content items with {}s grace period",
node_id,
lost_content.len(),
grace_config.grace_period_duration.as_secs()
);

for content_hash in lost_content {
self.recovery_manager
Expand Down Expand Up @@ -496,7 +500,13 @@ impl ChurnHandler {
/ stats.failed_nodes as f64;

// Update grace period metrics
if self.recovery_manager.node_failure_tracker.read().await.is_some() {
if self
.recovery_manager
.node_failure_tracker
.read()
.await
.is_some()
{
stats.grace_period_preventions += 1; // Assuming this failure used grace period
}

Expand Down Expand Up @@ -810,17 +820,21 @@ impl RecoveryManager {
config: &ReplicationGracePeriodConfig,
) -> Result<()> {
if failed_nodes.is_empty() {
return self.queue_recovery(content_hash, failed_nodes, priority).await;
return self
.queue_recovery(content_hash, failed_nodes, priority)
.await;
}

if let Some(ref failure_tracker) = *self.node_failure_tracker.read().await {
// Record failures and check grace periods
for node_id in &failed_nodes {
failure_tracker.record_node_failure(
node_id.clone(),
crate::dht::replication_grace_period::NodeFailureReason::NetworkTimeout,
config,
).await?;
failure_tracker
.record_node_failure(
node_id.clone(),
crate::dht::replication_grace_period::NodeFailureReason::NetworkTimeout,
config,
)
.await?;
}

let mut immediate_recovery_nodes = Vec::new();
Expand All @@ -836,27 +850,36 @@ impl RecoveryManager {

// Queue immediate recovery for nodes past grace period
if !immediate_recovery_nodes.is_empty() {
tracing::info!("Queuing immediate recovery for {} nodes (past grace period) for content {:?}",
immediate_recovery_nodes.len(), content_hash);
self.queue_recovery(content_hash, immediate_recovery_nodes, priority).await?;
tracing::info!(
"Queuing immediate recovery for {} nodes (past grace period) for content {:?}",
immediate_recovery_nodes.len(),
content_hash
);
self.queue_recovery(content_hash, immediate_recovery_nodes, priority)
.await?;
}

// Schedule delayed checks for nodes in grace period
if !delayed_recovery_nodes.is_empty() {
tracing::info!("Scheduling delayed recovery check for {} nodes (in grace period) for content {:?}",
delayed_recovery_nodes.len(), content_hash);
tracing::info!(
"Scheduling delayed recovery check for {} nodes (in grace period) for content {:?}",
delayed_recovery_nodes.len(),
content_hash
);
self.schedule_grace_period_check(
content_hash,
delayed_recovery_nodes,
priority,
failure_tracker.clone(),
).await?;
)
.await?;
}

Ok(())
} else {
// No failure tracker, use immediate recovery
self.queue_recovery(content_hash, failed_nodes, priority).await
self.queue_recovery(content_hash, failed_nodes, priority)
.await
}
}

Expand Down Expand Up @@ -885,20 +908,20 @@ impl RecoveryManager {
}

if !nodes_to_recover.is_empty() {
// Create a new RecoveryManager instance to queue recovery
// In practice, this would be handled by the owning ChurnHandler
if !nodes_to_recover.is_empty() {
tracing::info!(
"Grace period expired for {} nodes, queuing recovery for content {:?}",
nodes_to_recover.len(),
content_hash
);
} else {
tracing::debug!(
"Grace period check completed for content {:?}, no nodes require recovery",
content_hash
);
}
// Create a new RecoveryManager instance to queue recovery
// In practice, this would be handled by the owning ChurnHandler
if !nodes_to_recover.is_empty() {
tracing::info!(
"Grace period expired for {} nodes, queuing recovery for content {:?}",
nodes_to_recover.len(),
content_hash
);
} else {
tracing::debug!(
"Grace period check completed for content {:?}, no nodes require recovery",
content_hash
);
}
}
}
});
Expand Down
31 changes: 18 additions & 13 deletions src/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
//! - Public helpers are async and safe to call from other modules/apps.

use crate::identity::four_words::FourWordAddress;
use sha2::{Digest, Sha256};
use crate::{error::{IdentityError, P2PError}, fwid, Result};
use crate::{
Result,
error::{IdentityError, P2PError},
fwid,
};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -57,16 +61,15 @@ impl AddressBook {
/// Register mapping (overwrites existing).
pub async fn register(&self, user_id: String, four_words: String) -> Result<()> {
// Validate format (4 hyphen-separated words)
let parts: Vec<String> = four_words
.split('-')
.map(|s| s.to_string())
.collect();
if parts.len() != 4 || !fwid::fw_check([
parts[0].clone(),
parts[1].clone(),
parts[2].clone(),
parts[3].clone(),
]) {
let parts: Vec<String> = four_words.split('-').map(|s| s.to_string()).collect();
if parts.len() != 4
|| !fwid::fw_check([
parts[0].clone(),
parts[1].clone(),
parts[2].clone(),
parts[3].clone(),
])
{
return Err(P2PError::Identity(IdentityError::InvalidFourWordAddress(
"invalid four-word address format".into(),
)));
Expand All @@ -88,7 +91,9 @@ impl AddressBook {
four_words: four_words.clone(),
};
let _ = client.put_object(key_user_to_words(&user_id), &entry).await;
let _ = client.put_object(key_words_to_user(&four_words), &entry).await;
let _ = client
.put_object(key_words_to_user(&four_words), &entry)
.await;
}
Ok(())
}
Expand Down
Loading
Loading