Skip to content
Merged
30 changes: 26 additions & 4 deletions src/protocol/libp2p/kademlia/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use std::{collections::HashMap, time::Duration};

/// Default TTL for the records.
const DEFAULT_TTL: u64 = 36 * 60 * 60;
const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60);

/// Default provider record TTL.
const DEFAULT_PROVIDER_TTL: Duration = Duration::from_secs(48 * 60 * 60);

/// Protocol name.
const PROTOCOL_NAME: &str = "/ipfs/kad/1.0.0";
Expand Down Expand Up @@ -65,9 +68,12 @@ pub struct Config {
/// Incoming records validation mode.
pub(super) validation_mode: IncomingRecordValidationMode,

/// Default record TTl.
/// Default record TTL.
pub(super) record_ttl: Duration,

/// Provider record TTL.
pub(super) provider_ttl: Duration,

/// TX channel for sending events to `KademliaHandle`.
pub(super) event_tx: Sender<KademliaEvent>,

Expand All @@ -83,6 +89,7 @@ impl Config {
update_mode: RoutingTableUpdateMode,
validation_mode: IncomingRecordValidationMode,
record_ttl: Duration,
provider_ttl: Duration,
) -> (Self, KademliaHandle) {
let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE);
let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE);
Expand All @@ -98,6 +105,7 @@ impl Config {
update_mode,
validation_mode,
record_ttl,
provider_ttl,
codec: ProtocolCodec::UnsignedVarint(None),
replication_factor,
known_peers,
Expand All @@ -116,7 +124,8 @@ impl Config {
Vec::new(),
RoutingTableUpdateMode::Automatic,
IncomingRecordValidationMode::Automatic,
Duration::from_secs(DEFAULT_TTL),
DEFAULT_TTL,
DEFAULT_PROVIDER_TTL,
)
}
}
Expand All @@ -141,6 +150,9 @@ pub struct ConfigBuilder {

/// Default TTL for the records.
pub(super) record_ttl: Duration,

/// Default TTL for the provider records.
pub(super) provider_ttl: Duration,
}

impl Default for ConfigBuilder {
Expand All @@ -158,7 +170,8 @@ impl ConfigBuilder {
protocol_names: Vec::new(),
update_mode: RoutingTableUpdateMode::Automatic,
validation_mode: IncomingRecordValidationMode::Automatic,
record_ttl: Duration::from_secs(DEFAULT_TTL),
record_ttl: DEFAULT_TTL,
provider_ttl: DEFAULT_PROVIDER_TTL,
}
}

Expand Down Expand Up @@ -211,6 +224,14 @@ impl ConfigBuilder {
self
}

/// Set default TTL for the provider records. Recommended value is 2 * (refresh interval) + 20%.
///
/// If unspecified, the default TTL is 48 hours.
pub fn with_provider_record_ttl(mut self, provider_record_ttl: Duration) -> Self {
self.provider_ttl = provider_record_ttl;
self
}

/// Build Kademlia [`Config`].
pub fn build(self) -> (Config, KademliaHandle) {
Config::new(
Expand All @@ -220,6 +241,7 @@ impl ConfigBuilder {
self.update_mode,
self.validation_mode,
self.record_ttl,
self.provider_ttl,
)
}
}
139 changes: 134 additions & 5 deletions src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

use crate::{
protocol::libp2p::kademlia::{
record::{Key as RecordKey, Record},
record::{Key as RecordKey, ProviderRecord, Record},
schema,
types::KademliaPeer,
types::{ConnectionType, KademliaPeer},
},
PeerId,
};
Expand Down Expand Up @@ -60,9 +60,31 @@ pub enum KademliaMessage {
/// Record.
record: Option<Record>,

/// Peers closest to key.
/// Peers closer to the key.
peers: Vec<KademliaPeer>,
},

/// `ADD_PROVIDER` message.
AddProvider {
/// Key.
key: RecordKey,

/// Peers, providing the data for `key`. Must contain exactly one peer matching the sender
/// of the message.
providers: Vec<KademliaPeer>,
},

/// `GET_PROVIDERS` message.
GetProviders {
/// Key. `None` in response.
key: Option<RecordKey>,

/// Peers closer to the key.
peers: Vec<KademliaPeer>,

/// Peers, providing the data for `key`.
providers: Vec<KademliaPeer>,
},
}

impl KademliaMessage {
Expand Down Expand Up @@ -149,10 +171,84 @@ impl KademliaMessage {
buf
}

/// Create `ADD_PROVIDER` message with `provider`.
#[allow(unused)]
pub fn add_provider(provider: ProviderRecord) -> Vec<u8> {
let peer = KademliaPeer::new(
provider.provider,
provider.addresses,
ConnectionType::CanConnect, // ignored by message recipient
);
let message = schema::kademlia::Message {
key: provider.key.clone().to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::AddProvider.into(),
provider_peers: std::iter::once((&peer).into()).collect(),
..Default::default()
};

let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");

buf
}

/// Create `GET_PROVIDERS` request for `key`.
#[allow(unused)]
pub fn get_providers_request(key: RecordKey) -> Vec<u8> {
let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetProviders.into(),
..Default::default()
};

let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");

buf
}

/// Create `GET_PROVIDERS` response.
pub fn get_providers_response(
key: RecordKey,
providers: Vec<ProviderRecord>,
closer_peers: &[KademliaPeer],
) -> Vec<u8> {
debug_assert!(providers.iter().all(|p| p.key == key));

let provider_peers = providers
.into_iter()
.map(|p| {
KademliaPeer::new(
p.provider,
p.addresses,
ConnectionType::CanConnect, // ignored by recipient
)
})
.map(|p| (&p).into())
.collect();

let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetProviders.into(),
closer_peers: closer_peers.iter().map(Into::into).collect(),
provider_peers,
..Default::default()
};

let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");

buf
}

/// Get [`KademliaMessage`] from bytes.
pub fn from_bytes(bytes: BytesMut) -> Option<Self> {
match schema::kademlia::Message::decode(bytes) {
Ok(message) => match message.r#type {
// FIND_NODE
4 => {
let peers = message
.closer_peers
Expand All @@ -165,13 +261,15 @@ impl KademliaMessage {
peers,
})
}
// PUT_VALUE
0 => {
let record = message.record?;

Some(Self::PutValue {
record: record_from_schema(record)?,
})
}
// GET_VALUE
1 => {
let key = match message.key.is_empty() {
true => message.record.as_ref().and_then(|record| {
Expand All @@ -196,8 +294,39 @@ impl KademliaMessage {
.collect(),
})
}
message => {
tracing::warn!(target: LOG_TARGET, ?message, "unhandled message");
// ADD_PROVIDER
2 => {
let key = (!message.key.is_empty()).then_some(message.key.into())?;
let providers = message
.provider_peers
.iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.collect();

Some(Self::AddProvider { key, providers })
}
// GET_PROVIDERS
3 => {
let key = (!message.key.is_empty()).then_some(message.key.into());
let peers = message
.closer_peers
.iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.collect();
let providers = message
.provider_peers
.iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.collect();

Some(Self::GetProviders {
key,
peers,
providers,
})
}
message_type => {
tracing::warn!(target: LOG_TARGET, ?message_type, "unhandled message");
None
}
},
Expand Down
Loading