Skip to content

Commit abcfd97

Browse files
alexgghhitchhooker
authored andcommitted
Forward put_record requests to authorithy-discovery (paritytech#4683)
Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
1 parent 36e954f commit abcfd97

9 files changed

Lines changed: 425 additions & 61 deletions

File tree

substrate/client/authority-discovery/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,10 @@ pub enum Error {
7575

7676
#[error("Unable to fetch best block.")]
7777
BestBlockFetchingError,
78+
79+
#[error("Publisher not present.")]
80+
MissingPublisher,
81+
82+
#[error("Unknown authority.")]
83+
UnknownAuthority,
7884
}

substrate/client/authority-discovery/src/worker.rs

Lines changed: 131 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use sc_network_types::{
4949
multihash::{Code, Multihash},
5050
PeerId,
5151
};
52+
use schema::PeerSignature;
5253
use sp_api::{ApiError, ProvideRuntimeApi};
5354
use sp_authority_discovery::{
5455
AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature,
@@ -111,7 +112,7 @@ pub enum Role {
111112
/// network peerset.
112113
///
113114
/// 5. Allow querying of the collected addresses via the [`crate::Service`].
114-
pub struct Worker<Client, Block, DhtEventStream> {
115+
pub struct Worker<Client, Block: BlockT, DhtEventStream> {
115116
/// Channel receiver for messages send by a [`crate::Service`].
116117
from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,
117118

@@ -152,6 +153,12 @@ pub struct Worker<Client, Block, DhtEventStream> {
152153
/// Queue of throttled lookups pending to be passed to the network.
153154
pending_lookups: Vec<AuthorityId>,
154155

156+
/// The list of all known authorities.
157+
known_authorities: HashMap<KademliaKey, AuthorityId>,
158+
159+
/// The last time we requested the list of authorities.
160+
authorities_queried_at: Option<Block::Hash>,
161+
155162
/// Set of in-flight lookups.
156163
in_flight_lookups: HashMap<KademliaKey, AuthorityId>,
157164

@@ -268,6 +275,8 @@ where
268275
network,
269276
dht_event_rx,
270277
publish_interval,
278+
known_authorities: Default::default(),
279+
authorities_queried_at: None,
271280
publish_if_changed_interval,
272281
latest_published_keys: HashSet::new(),
273282
latest_published_kad_keys: HashSet::new(),
@@ -482,6 +491,13 @@ where
482491
.filter(|id| !local_keys.contains(id.as_ref()))
483492
.collect::<Vec<_>>();
484493

494+
self.known_authorities = authorities
495+
.clone()
496+
.into_iter()
497+
.map(|authority| (hash_authority_id(authority.as_ref()), authority))
498+
.collect::<HashMap<_, _>>();
499+
self.authorities_queried_at = Some(best_hash);
500+
485501
self.addr_cache.retain_ids(&authorities);
486502

487503
authorities.shuffle(&mut thread_rng());
@@ -581,7 +597,112 @@ where
581597

582598
debug!(target: LOG_TARGET, "Failed to put hash '{:?}' on Dht.", hash)
583599
},
600+
DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires) => {
601+
if let Err(e) = self
602+
.handle_put_record_requested(record_key, record_value, publisher, expires)
603+
.await
604+
{
605+
debug!(target: LOG_TARGET, "Failed to handle put record request: {}", e)
606+
}
607+
608+
if let Some(metrics) = &self.metrics {
609+
metrics.dht_event_received.with_label_values(&["put_record_req"]).inc();
610+
}
611+
},
612+
}
613+
}
614+
615+
async fn handle_put_record_requested(
616+
&mut self,
617+
record_key: KademliaKey,
618+
record_value: Vec<u8>,
619+
publisher: Option<PeerId>,
620+
expires: Option<std::time::Instant>,
621+
) -> Result<()> {
622+
let publisher = publisher.ok_or(Error::MissingPublisher)?;
623+
624+
// Make sure we don't ever work with an outdated set of authorities
625+
// and that we do not update known_authorithies too often.
626+
let best_hash = self.client.best_hash().await?;
627+
if !self.known_authorities.contains_key(&record_key) &&
628+
self.authorities_queried_at
629+
.map(|authorities_queried_at| authorities_queried_at != best_hash)
630+
.unwrap_or(true)
631+
{
632+
let authorities = self
633+
.client
634+
.authorities(best_hash)
635+
.await
636+
.map_err(|e| Error::CallingRuntime(e.into()))?
637+
.into_iter()
638+
.collect::<Vec<_>>();
639+
640+
self.known_authorities = authorities
641+
.into_iter()
642+
.map(|authority| (hash_authority_id(authority.as_ref()), authority))
643+
.collect::<HashMap<_, _>>();
644+
645+
self.authorities_queried_at = Some(best_hash);
646+
}
647+
648+
let authority_id =
649+
self.known_authorities.get(&record_key).ok_or(Error::UnknownAuthority)?;
650+
let signed_record =
651+
Self::check_record_signed_with_authority_id(record_value.as_slice(), authority_id)?;
652+
self.check_record_signed_with_network_key(
653+
&signed_record.record,
654+
signed_record.peer_signature,
655+
publisher,
656+
authority_id,
657+
)?;
658+
self.network.store_record(record_key, record_value, Some(publisher), expires);
659+
Ok(())
660+
}
661+
662+
fn check_record_signed_with_authority_id(
663+
record: &[u8],
664+
authority_id: &AuthorityId,
665+
) -> Result<schema::SignedAuthorityRecord> {
666+
let signed_record: schema::SignedAuthorityRecord =
667+
schema::SignedAuthorityRecord::decode(record).map_err(Error::DecodingProto)?;
668+
669+
let auth_signature = AuthoritySignature::decode(&mut &signed_record.auth_signature[..])
670+
.map_err(Error::EncodingDecodingScale)?;
671+
672+
if !AuthorityPair::verify(&auth_signature, &signed_record.record, &authority_id) {
673+
return Err(Error::VerifyingDhtPayload)
584674
}
675+
676+
Ok(signed_record)
677+
}
678+
679+
fn check_record_signed_with_network_key(
680+
&self,
681+
record: &Vec<u8>,
682+
peer_signature: Option<PeerSignature>,
683+
remote_peer_id: PeerId,
684+
authority_id: &AuthorityId,
685+
) -> Result<()> {
686+
if let Some(peer_signature) = peer_signature {
687+
match self.network.verify(
688+
remote_peer_id.into(),
689+
&peer_signature.public_key,
690+
&peer_signature.signature,
691+
record,
692+
) {
693+
Ok(true) => {},
694+
Ok(false) => return Err(Error::VerifyingDhtPayload),
695+
Err(error) => return Err(Error::ParsingLibp2pIdentity(error)),
696+
}
697+
} else if self.strict_record_validation {
698+
return Err(Error::MissingPeerIdSignature)
699+
} else {
700+
debug!(
701+
target: LOG_TARGET,
702+
"Received unsigned authority discovery record from {}", authority_id
703+
);
704+
}
705+
Ok(())
585706
}
586707

587708
fn handle_dht_value_found_event(&mut self, values: Vec<(KademliaKey, Vec<u8>)>) -> Result<()> {
@@ -600,16 +721,8 @@ where
600721
let remote_addresses: Vec<Multiaddr> = values
601722
.into_iter()
602723
.map(|(_k, v)| {
603-
let schema::SignedAuthorityRecord { record, auth_signature, peer_signature } =
604-
schema::SignedAuthorityRecord::decode(v.as_slice())
605-
.map_err(Error::DecodingProto)?;
606-
607-
let auth_signature = AuthoritySignature::decode(&mut &auth_signature[..])
608-
.map_err(Error::EncodingDecodingScale)?;
609-
610-
if !AuthorityPair::verify(&auth_signature, &record, &authority_id) {
611-
return Err(Error::VerifyingDhtPayload)
612-
}
724+
let schema::SignedAuthorityRecord { record, peer_signature, .. } =
725+
Self::check_record_signed_with_authority_id(&v, &authority_id)?;
613726

614727
let addresses: Vec<Multiaddr> = schema::AuthorityRecord::decode(record.as_slice())
615728
.map(|a| a.addresses)
@@ -638,26 +751,12 @@ where
638751
// At this point we know all the valid multiaddresses from the record, know that
639752
// each of them belong to the same PeerId, we just need to check if the record is
640753
// properly signed by the owner of the PeerId
641-
642-
if let Some(peer_signature) = peer_signature {
643-
match self.network.verify(
644-
remote_peer_id.into(),
645-
&peer_signature.public_key,
646-
&peer_signature.signature,
647-
&record,
648-
) {
649-
Ok(true) => {},
650-
Ok(false) => return Err(Error::VerifyingDhtPayload),
651-
Err(error) => return Err(Error::ParsingLibp2pIdentity(error)),
652-
}
653-
} else if self.strict_record_validation {
654-
return Err(Error::MissingPeerIdSignature)
655-
} else {
656-
debug!(
657-
target: LOG_TARGET,
658-
"Received unsigned authority discovery record from {}", authority_id
659-
);
660-
}
754+
self.check_record_signed_with_network_key(
755+
&record,
756+
peer_signature,
757+
remote_peer_id,
758+
&authority_id,
759+
)?;
661760
Ok(addresses)
662761
})
663762
.collect::<Result<Vec<Vec<Multiaddr>>>>()?
@@ -870,7 +969,7 @@ impl Metrics {
870969

871970
// Helper functions for unit testing.
872971
#[cfg(test)]
873-
impl<Block, Client, DhtEventStream> Worker<Client, Block, DhtEventStream> {
972+
impl<Block: BlockT, Client, DhtEventStream> Worker<Client, Block, DhtEventStream> {
874973
pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
875974
self.addr_cache.insert(authority, addresses);
876975
}

0 commit comments

Comments
 (0)