diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index e10c5ad35..df63dcafa 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -153,8 +153,8 @@ pub enum KademliaEvent { /// Query ID. query_id: QueryId, - /// Found record. - record: PeerRecord, + /// Found records. + records: RecordsType, }, /// `PUT_VALUE` query succeeded. @@ -173,6 +173,18 @@ pub enum KademliaEvent { }, } +/// The type of the DHT records. +#[derive(Debug, Clone)] +pub enum RecordsType { + /// Record was found in the local store. + /// + /// This contains only a single result. + LocalStore(Record), + + /// Records found in the network. + Network(Vec), +} + /// Handle for communicating with the Kademlia protocol. pub struct KademliaHandle { /// TX channel for sending commands to `Kademlia`. diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 256cc62c1..78f5c6a6e 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -52,6 +52,8 @@ pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode}; pub use query::QueryId; pub use record::{Key as RecordKey, PeerRecord, Record}; +pub use self::handle::RecordsType; + /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia"; @@ -636,11 +638,39 @@ impl Kademlia { Ok(()) } - QueryAction::GetRecordQueryDone { query_id, record } => { - self.store.put(record.record.clone()); + QueryAction::GetRecordQueryDone { query_id, records } => { + // Considering this gives a view of all peers and their records, some peers may have + // outdated records. Store only the record which is backed by most + // peers. + let now = std::time::Instant::now(); + let rec = records + .iter() + .filter_map(|peer_record| { + if peer_record.record.is_expired(now) { + None + } else { + Some(&peer_record.record) + } + }) + .fold(HashMap::new(), |mut acc, rec| { + *acc.entry(rec).or_insert(0) += 1; + acc + }) + .into_iter() + .max_by_key(|(_, v)| *v) + .map(|(k, _)| k); - let _ = - self.event_tx.send(KademliaEvent::GetRecordSuccess { query_id, record }).await; + if let Some(record) = rec { + self.store.put(record.clone()); + } + + let _ = self + .event_tx + .send(KademliaEvent::GetRecordSuccess { + query_id, + records: RecordsType::Network(records), + }) + .await; Ok(()) } QueryAction::QueryFailed { query } => { @@ -782,7 +812,7 @@ impl Kademlia { (Some(record), Quorum::One) => { let _ = self .event_tx - .send(KademliaEvent::GetRecordSuccess { query_id, record: PeerRecord { record: record.clone(), peer: None } }) + .send(KademliaEvent::GetRecordSuccess { query_id, records: RecordsType::LocalStore(record.clone()) }) .await; } (record, _) => { @@ -840,7 +870,7 @@ mod tests { event_rx: Receiver, } - fn _make_kademlia() -> (Kademlia, Context, TransportManager) { + fn make_kademlia() -> (Kademlia, Context, TransportManager) { let (manager, handle) = TransportManager::new( Keypair::generate(), HashSet::new(), @@ -875,4 +905,76 @@ mod tests { manager, ) } + + #[tokio::test] + async fn check_get_records_update() { + let (mut kademlia, _context, _manager) = make_kademlia(); + + let key = RecordKey::from(vec![1, 2, 3]); + let records = vec![ + // 2 peers backing the same record. + PeerRecord { + peer: PeerId::random(), + record: Record::new(key.clone(), vec![0x1]), + }, + PeerRecord { + peer: PeerId::random(), + record: Record::new(key.clone(), vec![0x1]), + }, + // only 1 peer backing the record. + PeerRecord { + peer: PeerId::random(), + record: Record::new(key.clone(), vec![0x2]), + }, + ]; + + let query_id = QueryId(1); + let action = QueryAction::GetRecordQueryDone { query_id, records }; + assert!(kademlia.on_query_action(action).await.is_ok()); + + // Check the local storage was updated. + let record = kademlia.store.get(&key).unwrap(); + assert_eq!(record.value, vec![0x1]); + } + + #[tokio::test] + async fn check_get_records_update_with_expired_records() { + let (mut kademlia, _context, _manager) = make_kademlia(); + + let key = RecordKey::from(vec![1, 2, 3]); + let expired = std::time::Instant::now() - std::time::Duration::from_secs(10); + let records = vec![ + // 2 peers backing the same record, one record is expired. + PeerRecord { + peer: PeerId::random(), + record: Record { + key: key.clone(), + value: vec![0x1], + publisher: None, + expires: Some(expired), + }, + }, + PeerRecord { + peer: PeerId::random(), + record: Record::new(key.clone(), vec![0x1]), + }, + // 2 peer backing the record. + PeerRecord { + peer: PeerId::random(), + record: Record::new(key.clone(), vec![0x2]), + }, + PeerRecord { + peer: PeerId::random(), + record: Record::new(key.clone(), vec![0x2]), + }, + ]; + + let query_id = QueryId(1); + let action = QueryAction::GetRecordQueryDone { query_id, records }; + assert!(kademlia.on_query_action(action).await.is_ok()); + + // Check the local storage was updated. + let record = kademlia.store.get(&key).unwrap(); + assert_eq!(record.value, vec![0x2]); + } } diff --git a/src/protocol/libp2p/kademlia/query/get_record.rs b/src/protocol/libp2p/kademlia/query/get_record.rs index 5f16e2b25..4d766af77 100644 --- a/src/protocol/libp2p/kademlia/query/get_record.rs +++ b/src/protocol/libp2p/kademlia/query/get_record.rs @@ -109,9 +109,9 @@ impl GetRecordContext { } } - /// Get the found record. - pub fn found_record(mut self) -> PeerRecord { - self.found_records.pop().expect("record to exist since query succeeded") + /// Get the found records. + pub fn found_records(mut self) -> Vec { + self.found_records } /// Register response failure for `peer`. @@ -136,12 +136,13 @@ impl GetRecordContext { return; }; - // TODO: validate record if let Some(record) = record { - self.found_records.push(PeerRecord { - record, - peer: Some(peer.peer), - }); + if !record.is_expired(std::time::Instant::now()) { + self.found_records.push(PeerRecord { + peer: peer.peer, + record, + }); + } } // add the queried peer to `queried` and all new peers which haven't been diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index 89fa4fab6..6177163db 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -22,9 +22,9 @@ use crate::{ protocol::libp2p::kademlia::{ message::KademliaMessage, query::{find_node::FindNodeContext, get_record::GetRecordContext}, - record::{Key as RecordKey, PeerRecord, Record}, + record::{Key as RecordKey, Record}, types::{KademliaPeer, Key}, - Quorum, + PeerRecord, Quorum, }, PeerId, }; @@ -124,8 +124,8 @@ pub enum QueryAction { /// Query ID. query_id: QueryId, - /// Found record. - record: PeerRecord, + /// Found records. + records: Vec, }, // TODO: remove @@ -396,7 +396,7 @@ impl QueryEngine { }, QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone { query_id: context.query, - record: context.found_record(), + records: context.found_records(), }, } } @@ -748,10 +748,21 @@ mod tests { let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect(); match engine.next_action() { - Some(QueryAction::GetRecordQueryDone { record, .. }) => { - assert!(peers.contains(&record.peer.expect("Peer Id must be provided"))); - assert_eq!(record.record.key, original_record.key); - assert_eq!(record.record.value, original_record.value); + Some(QueryAction::GetRecordQueryDone { records, .. }) => { + let query_peers = records + .iter() + .map(|peer_record| peer_record.peer) + .collect::>(); + assert_eq!(peers, query_peers); + + let records: std::collections::HashSet<_> = + records.into_iter().map(|peer_record| peer_record.record).collect(); + // One single record found across peers. + assert_eq!(records.len(), 1); + let record = records.into_iter().next().unwrap(); + + assert_eq!(record.key, original_record.key); + assert_eq!(record.value, original_record.value); } _ => panic!("invalid event received"), } diff --git a/src/protocol/libp2p/kademlia/record.rs b/src/protocol/libp2p/kademlia/record.rs index 1f58fb160..3f00a9eba 100644 --- a/src/protocol/libp2p/kademlia/record.rs +++ b/src/protocol/libp2p/kademlia/record.rs @@ -74,7 +74,7 @@ impl From for Key { } /// A record stored in the DHT. -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct Record { /// Key of the record. pub key: Key, @@ -109,13 +109,12 @@ impl Record { } } -/// A record either received by the given peer or retrieved from the local -/// record store. +/// A record received by the given peer. #[derive(Debug, Clone, PartialEq, Eq)] pub struct PeerRecord { - /// The peer from whom the record was received. `None` if the record was - /// retrieved from local storage. - pub peer: Option, + /// The peer from whom the record was received + pub peer: PeerId, + /// The provided record. pub record: Record, }