Skip to content
16 changes: 14 additions & 2 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ pub enum KademliaEvent {
/// Query ID.
query_id: QueryId,

/// Found record.
record: PeerRecord,
/// Found records.
records: RecordsType,
},

/// `PUT_VALUE` query succeeded.
Expand All @@ -173,6 +173,18 @@ pub enum KademliaEvent {
},
}

/// The type of the DHT records.
#[derive(Debug, Clone)]
pub enum RecordsType {
/// Record was found in the local store.
///
/// This contains only a single result.
LocalStore(Record),

/// Records found in the network.
Network(Vec<PeerRecord>),
}

/// Handle for communicating with the Kademlia protocol.
pub struct KademliaHandle {
/// TX channel for sending commands to `Kademlia`.
Expand Down
114 changes: 108 additions & 6 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode};
pub use query::QueryId;
pub use record::{Key as RecordKey, PeerRecord, Record};

pub use self::handle::RecordsType;

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia";

Expand Down Expand Up @@ -636,11 +638,39 @@ impl Kademlia {

Ok(())
}
QueryAction::GetRecordQueryDone { query_id, record } => {
self.store.put(record.record.clone());
QueryAction::GetRecordQueryDone { query_id, records } => {
// Considering this gives a view of all peers and their records, some peers may have
// outdated records. Store only the record which is backed by most
// peers.
let now = std::time::Instant::now();
let rec = records
.iter()
.filter_map(|peer_record| {
if peer_record.record.is_expired(now) {
None
} else {
Some(&peer_record.record)
}
})
.fold(HashMap::new(), |mut acc, rec| {
*acc.entry(rec).or_insert(0) += 1;
acc
})
.into_iter()
.max_by_key(|(_, v)| *v)
.map(|(k, _)| k);

let _ =
self.event_tx.send(KademliaEvent::GetRecordSuccess { query_id, record }).await;
if let Some(record) = rec {
self.store.put(record.clone());
}

let _ = self
.event_tx
.send(KademliaEvent::GetRecordSuccess {
query_id,
records: RecordsType::Network(records),
})
.await;
Ok(())
}
QueryAction::QueryFailed { query } => {
Expand Down Expand Up @@ -782,7 +812,7 @@ impl Kademlia {
(Some(record), Quorum::One) => {
let _ = self
.event_tx
.send(KademliaEvent::GetRecordSuccess { query_id, record: PeerRecord { record: record.clone(), peer: None } })
.send(KademliaEvent::GetRecordSuccess { query_id, records: RecordsType::LocalStore(record.clone()) })
.await;
}
(record, _) => {
Expand Down Expand Up @@ -840,7 +870,7 @@ mod tests {
event_rx: Receiver<KademliaEvent>,
}

fn _make_kademlia() -> (Kademlia, Context, TransportManager) {
fn make_kademlia() -> (Kademlia, Context, TransportManager) {
let (manager, handle) = TransportManager::new(
Keypair::generate(),
HashSet::new(),
Expand Down Expand Up @@ -875,4 +905,76 @@ mod tests {
manager,
)
}

#[tokio::test]
async fn check_get_records_update() {
let (mut kademlia, _context, _manager) = make_kademlia();

let key = RecordKey::from(vec![1, 2, 3]);
let records = vec![
// 2 peers backing the same record.
PeerRecord {
peer: PeerId::random(),
record: Record::new(key.clone(), vec![0x1]),
},
PeerRecord {
peer: PeerId::random(),
record: Record::new(key.clone(), vec![0x1]),
},
// only 1 peer backing the record.
PeerRecord {
peer: PeerId::random(),
record: Record::new(key.clone(), vec![0x2]),
},
];

let query_id = QueryId(1);
let action = QueryAction::GetRecordQueryDone { query_id, records };
assert!(kademlia.on_query_action(action).await.is_ok());

// Check the local storage was updated.
let record = kademlia.store.get(&key).unwrap();
assert_eq!(record.value, vec![0x1]);
}

#[tokio::test]
async fn check_get_records_update_with_expired_records() {
let (mut kademlia, _context, _manager) = make_kademlia();

let key = RecordKey::from(vec![1, 2, 3]);
let expired = std::time::Instant::now() - std::time::Duration::from_secs(10);
let records = vec![
// 2 peers backing the same record, one record is expired.
PeerRecord {
peer: PeerId::random(),
record: Record {
key: key.clone(),
value: vec![0x1],
publisher: None,
expires: Some(expired),
},
},
PeerRecord {
peer: PeerId::random(),
record: Record::new(key.clone(), vec![0x1]),
},
// 2 peer backing the record.
PeerRecord {
peer: PeerId::random(),
record: Record::new(key.clone(), vec![0x2]),
},
PeerRecord {
peer: PeerId::random(),
record: Record::new(key.clone(), vec![0x2]),
},
];

let query_id = QueryId(1);
let action = QueryAction::GetRecordQueryDone { query_id, records };
assert!(kademlia.on_query_action(action).await.is_ok());

// Check the local storage was updated.
let record = kademlia.store.get(&key).unwrap();
assert_eq!(record.value, vec![0x2]);
}
}
17 changes: 9 additions & 8 deletions src/protocol/libp2p/kademlia/query/get_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ impl GetRecordContext {
}
}

/// Get the found record.
pub fn found_record(mut self) -> PeerRecord {
self.found_records.pop().expect("record to exist since query succeeded")
/// Get the found records.
pub fn found_records(mut self) -> Vec<PeerRecord> {
self.found_records
}

/// Register response failure for `peer`.
Expand All @@ -136,12 +136,13 @@ impl GetRecordContext {
return;
};

// TODO: validate record
if let Some(record) = record {
self.found_records.push(PeerRecord {
record,
peer: Some(peer.peer),
});
if !record.is_expired(std::time::Instant::now()) {
self.found_records.push(PeerRecord {
peer: peer.peer,
record,
});
}
}

// add the queried peer to `queried` and all new peers which haven't been
Expand Down
29 changes: 20 additions & 9 deletions src/protocol/libp2p/kademlia/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use crate::{
protocol::libp2p::kademlia::{
message::KademliaMessage,
query::{find_node::FindNodeContext, get_record::GetRecordContext},
record::{Key as RecordKey, PeerRecord, Record},
record::{Key as RecordKey, Record},
types::{KademliaPeer, Key},
Quorum,
PeerRecord, Quorum,
},
PeerId,
};
Expand Down Expand Up @@ -124,8 +124,8 @@ pub enum QueryAction {
/// Query ID.
query_id: QueryId,

/// Found record.
record: PeerRecord,
/// Found records.
records: Vec<PeerRecord>,
},

// TODO: remove
Expand Down Expand Up @@ -396,7 +396,7 @@ impl QueryEngine {
},
QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone {
query_id: context.query,
record: context.found_record(),
records: context.found_records(),
},
}
}
Expand Down Expand Up @@ -748,10 +748,21 @@ mod tests {

let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
match engine.next_action() {
Some(QueryAction::GetRecordQueryDone { record, .. }) => {
assert!(peers.contains(&record.peer.expect("Peer Id must be provided")));
assert_eq!(record.record.key, original_record.key);
assert_eq!(record.record.value, original_record.value);
Some(QueryAction::GetRecordQueryDone { records, .. }) => {
let query_peers = records
.iter()
.map(|peer_record| peer_record.peer)
.collect::<std::collections::HashSet<_>>();
assert_eq!(peers, query_peers);

let records: std::collections::HashSet<_> =
records.into_iter().map(|peer_record| peer_record.record).collect();
// One single record found across peers.
assert_eq!(records.len(), 1);
let record = records.into_iter().next().unwrap();

assert_eq!(record.key, original_record.key);
assert_eq!(record.value, original_record.value);
}
_ => panic!("invalid event received"),
}
Expand Down
11 changes: 5 additions & 6 deletions src/protocol/libp2p/kademlia/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl From<Multihash> for Key {
}

/// A record stored in the DHT.
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct Record {
/// Key of the record.
pub key: Key,
Expand Down Expand Up @@ -109,13 +109,12 @@ impl Record {
}
}

/// A record either received by the given peer or retrieved from the local
/// record store.
/// A record received by the given peer.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerRecord {
/// The peer from whom the record was received. `None` if the record was
/// retrieved from local storage.
pub peer: Option<PeerId>,
/// The peer from whom the record was received
pub peer: PeerId,

/// The provided record.
pub record: Record,
}