Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use std::{
Arc,
},
task::{Context, Poll},
time::Instant,
};

/// Quorum.
Expand Down Expand Up @@ -174,6 +175,25 @@ pub enum KademliaCommand {
// Record.
record: Record,
},

/// Resume providing a key without publishing it to the network immediately.
///
/// This restores provider state after a node restart. The provider is inserted into the local
/// store and a refresh is scheduled for `next_refresh_at`. If that instant is already in the
/// past the refresh is triggered as soon as possible.
ResumeProviding {
/// Provided key.
key: RecordKey,

/// [`Quorum`] for the query when the refresh fires.
quorum: Quorum,

/// Absolute instant at which the next network refresh should be sent.
next_refresh_at: Instant,
},

/// Request all local providers and their next scheduled refresh times.
GetLocalProviders,
}

/// Kademlia events.
Expand Down Expand Up @@ -276,6 +296,17 @@ pub enum KademliaEvent {
/// Provider.
provider: ContentProvider,
},

/// Response to [`KademliaCommand::GetLocalProviders`].
///
/// Contains every key that this node is currently providing along with the [`Instant`] at
/// which the next network refresh for that key is scheduled. Persist these before shutting
/// down the node and restore them on the next start via
/// [`KademliaHandle::resume_providing()`].
LocalProviders {
/// List of `(key, next_refresh_at)` pairs for all local providers.
providers: Vec<(RecordKey, Instant)>,
},
}

/// Handle for communicating with the Kademlia protocol.
Expand Down Expand Up @@ -424,6 +455,35 @@ impl KademliaHandle {
let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await;
}

/// Resume providing `key` without publishing it to the network.
///
/// Intended to be used on node restart to restore provider state that was previously saved via
/// [`KademliaHandle::local_providers()`]. The provider is inserted into the local store and a
/// network refresh is scheduled for `next_refresh_at`. If `next_refresh_at` is already in the
/// past, the refresh fires as soon as possible.
///
/// This avoids the overhead of immediately re-publishing every provider on every restart when
/// there are hundreds of thousands or millions of keys.
pub async fn resume_providing(&self, key: RecordKey, quorum: Quorum, next_refresh_at: Instant) {
let _ = self
.cmd_tx
.send(KademliaCommand::ResumeProviding {
key,
quorum,
next_refresh_at,
})
.await;
}

/// Request the list of local providers and their next refresh times.
///
/// The result is delivered asynchronously as [`KademliaEvent::LocalProviders`]. Use this
/// before shutting down the node to persist provider state, then call
/// [`KademliaHandle::resume_providing()`] on the next start to restore it.
pub async fn local_providers(&self) {
let _ = self.cmd_tx.send(KademliaCommand::GetLocalProviders).await;
}

/// Try to add known peer and if the channel is clogged, return an error.
pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec<Multiaddr>) -> Result<(), ()> {
self.cmd_tx
Expand Down Expand Up @@ -494,6 +554,23 @@ impl KademliaHandle {
self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ())
}

/// Try to resume providing `key` without publishing immediately.
/// If the channel is clogged, returns an error.
pub fn try_resume_providing(
&self,
key: RecordKey,
quorum: Quorum,
next_refresh_at: Instant,
) -> Result<(), ()> {
self.cmd_tx
.try_send(KademliaCommand::ResumeProviding {
key,
quorum,
next_refresh_at,
})
.map_err(|_| ())
}

#[cfg(feature = "fuzz")]
/// Expose functionality for fuzzing
pub async fn fuzz_send_message(&mut self, command: KademliaCommand) -> crate::Result<()> {
Expand Down
32 changes: 32 additions & 0 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,38 @@ impl Kademlia {

self.store.put(record);
}
Some(KademliaCommand::ResumeProviding {
key,
quorum,
next_refresh_at,
}) => {
tracing::debug!(
target: LOG_TARGET,
?key,
?next_refresh_at,
"resume providing without immediate publish",
);

self.store.resume_provider(key, quorum, next_refresh_at);
}
Some(KademliaCommand::GetLocalProviders) => {
tracing::debug!(
target: LOG_TARGET,
"querying local providers",
);

// Collect (key, next_refresh_at) from the store and send back as an event.
let providers = self
.store
.local_providers()
.into_iter()
.collect::<Vec<_>>();

let _ = self
.event_tx
.send(KademliaEvent::LocalProviders { providers })
.await;
}
None => return Err(Error::EssentialTaskClosed),
}
},
Expand Down
Loading