Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 1 addition & 17 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,28 +558,12 @@ impl PeerHandler {
}
}

/// Requests block bodies from any suitable peer given their block hashes
/// Returns the block bodies or None if:
/// - There are no available peers (the node just started up or was rejected by all other nodes)
/// - No peer returned a valid response in the given time and retry limits
pub async fn request_block_bodies(
&mut self,
block_hashes: &[H256],
) -> Result<Option<Vec<BlockBody>>, PeerHandlerError> {
for _ in 0..REQUEST_RETRY_ATTEMPTS {
if let Some((block_bodies, _)) = self.request_block_bodies_inner(block_hashes).await? {
return Ok(Some(block_bodies));
}
}
Ok(None)
}

/// Requests block bodies from any suitable peer given their block headers and validates them
/// Returns the requested block bodies or None if:
/// - There are no available peers (the node just started up or was rejected by all other nodes)
/// - No peer returned a valid response in the given time and retry limits
/// - The block bodies are invalid given the block headers
pub async fn request_and_validate_block_bodies(
pub async fn request_block_bodies(
&mut self,
block_headers: &[BlockHeader],
) -> Result<Option<Vec<BlockBody>>, PeerHandlerError> {
Expand Down
24 changes: 16 additions & 8 deletions crates/networking/p2p/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ impl Syncer {
let header_batch = &headers[..min(MAX_BLOCK_BODIES_TO_REQUEST, headers.len())];
let bodies = self
.peers
.request_and_validate_block_bodies(header_batch)
.request_block_bodies(header_batch)
.await?
.ok_or(SyncError::BodiesNotFound)?;
debug!("Obtained: {} block bodies", bodies.len());
Expand Down Expand Up @@ -556,25 +556,28 @@ impl Syncer {
}
}

/// Fetches all block bodies for the given block hashes via p2p and stores them
/// Fetches all block bodies for the given block headers via p2p and stores them
async fn store_block_bodies(
mut block_hashes: Vec<BlockHash>,
mut block_headers: Vec<BlockHeader>,
mut peers: PeerHandler,
store: Store,
) -> Result<(), SyncError> {
loop {
debug!("Requesting Block Bodies ");
if let Some(block_bodies) = peers.request_block_bodies(&block_hashes).await? {
if let Some(block_bodies) = peers.request_block_bodies(&block_headers).await? {
debug!(" Received {} Block Bodies", block_bodies.len());
// Track which bodies we have already fetched
let current_block_hashes = block_hashes.drain(..block_bodies.len());
let current_block_headers = block_headers.drain(..block_bodies.len());
// Add bodies to storage
for (hash, body) in current_block_hashes.zip(block_bodies.into_iter()) {
for (hash, body) in current_block_headers
.map(|h| h.hash())
.zip(block_bodies.into_iter())
{
store.add_block_body(hash, body).await?;
}

// Check if we need to ask for another batch
if block_hashes.is_empty() {
if block_headers.is_empty() {
break;
}
}
Expand Down Expand Up @@ -967,7 +970,12 @@ impl Syncer {

debug_assert!(validate_bytecodes(store.clone(), pivot_header.state_root).await);

store_block_bodies(vec![pivot_header.hash()], self.peers.clone(), store.clone()).await?;
store_block_bodies(
vec![pivot_header.clone()],
self.peers.clone(),
store.clone(),
)
.await?;

let block = store
.get_block_by_hash(pivot_header.hash())
Expand Down