Skip to content

Commit 9236bce

Browse files
committed
fix: batch drain local proposals + increase P2P channel to 4096
Local proposals are now drained in batch (try_recv loop) instead of one per select iteration. This prevents the event loop from being starved when hundreds of proposals queue up during sync, allowing rpc_p2p_rx (WASM uploads, sudo actions) to be processed promptly. Also increased P2P command channel from 256 to 4096 to prevent silent proposal broadcast drops.
1 parent 209ed55 commit 9236bce

File tree

1 file changed

+57
-48
lines changed

1 file changed

+57
-48
lines changed

bins/validator-node/src/main.rs

Lines changed: 57 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ async fn main() -> Result<()> {
472472

473473
// Create command channel for P2P network
474474
let (p2p_cmd_tx, p2p_cmd_rx) =
475-
tokio::sync::mpsc::channel::<platform_p2p_consensus::P2PCommand>(256);
475+
tokio::sync::mpsc::channel::<platform_p2p_consensus::P2PCommand>(4096);
476476

477477
// Spawn P2P network task
478478
let network_clone = network.clone();
@@ -1390,55 +1390,64 @@ async fn main() -> Result<()> {
13901390
}
13911391
}
13921392

1393-
// Local storage proposals (from our own WASM executions)
1394-
Some(proposal) = local_proposal_rx.recv() => {
1395-
debug!(
1396-
proposal_id = %hex::encode(&proposal.proposal_id[..8]),
1397-
challenge_id = %proposal.challenge_id,
1398-
"Adding local storage proposal to state"
1399-
);
1400-
let proposal_id = proposal.proposal_id;
1401-
state_manager.apply(|state| {
1402-
state.add_storage_proposal(proposal.clone());
1403-
});
1404-
// Vote for our own proposal
1405-
let my_hotkey = keypair.hotkey();
1406-
let consensus_result = state_manager.apply(|state| {
1407-
state.vote_storage_proposal(&proposal_id, my_hotkey, true)
1408-
});
1393+
// Local storage proposals (from our own WASM executions) - batch drain
1394+
Some(first_proposal) = local_proposal_rx.recv() => {
1395+
let mut batch = vec![first_proposal];
1396+
while let Ok(p) = local_proposal_rx.try_recv() {
1397+
batch.push(p);
1398+
}
1399+
let batch_size = batch.len();
1400+
if batch_size > 1 {
1401+
debug!("Processing batch of {} local proposals", batch_size);
1402+
}
14091403

1410-
// If consensus reached with just our vote (high stake), write immediately
1411-
if let Some(true) = consensus_result {
1412-
let proposal_opt = state_manager.apply(|state| {
1413-
state.remove_storage_proposal(&proposal_id)
1404+
// Vote for all proposals and collect those that reached consensus
1405+
let my_hotkey = keypair.hotkey();
1406+
let mut to_write = Vec::new();
1407+
for proposal in batch {
1408+
let proposal_id = proposal.proposal_id;
1409+
state_manager.apply(|state| {
1410+
state.add_storage_proposal(proposal);
14141411
});
1415-
if let Some(p) = proposal_opt {
1416-
let storage_key = StorageKey::new(
1417-
&p.challenge_id.to_string(),
1418-
hex::encode(&p.key),
1419-
);
1420-
let result = if p.value.is_empty() {
1421-
storage.delete(&storage_key).await
1422-
} else {
1423-
storage.put(storage_key.clone(), p.value.clone(), put_options_with_block(&state_manager)).await.map(|_| true)
1424-
};
1425-
match result {
1426-
Ok(_) => {
1427-
let op = if p.value.is_empty() { "deleted" } else { "written" };
1428-
info!(
1429-
proposal_id = %hex::encode(&p.proposal_id[..8]),
1430-
challenge_id = %p.challenge_id,
1431-
key_len = p.key.len(),
1432-
"Local proposal consensus reached, data {}", op
1433-
);
1434-
}
1435-
Err(e) => {
1436-
error!(
1437-
proposal_id = %hex::encode(&p.proposal_id[..8]),
1438-
error = %e,
1439-
"Failed to write local consensus storage"
1440-
);
1441-
}
1412+
let consensus_result = state_manager.apply(|state| {
1413+
state.vote_storage_proposal(&proposal_id, my_hotkey.clone(), true)
1414+
});
1415+
if let Some(true) = consensus_result {
1416+
if let Some(p) = state_manager.apply(|state| {
1417+
state.remove_storage_proposal(&proposal_id)
1418+
}) {
1419+
to_write.push(p);
1420+
}
1421+
}
1422+
}
1423+
1424+
// Write all consensus-reached proposals
1425+
for p in to_write {
1426+
let storage_key = StorageKey::new(
1427+
&p.challenge_id.to_string(),
1428+
hex::encode(&p.key),
1429+
);
1430+
let result = if p.value.is_empty() {
1431+
storage.delete(&storage_key).await
1432+
} else {
1433+
storage.put(storage_key.clone(), p.value.clone(), put_options_with_block(&state_manager)).await.map(|_| true)
1434+
};
1435+
match result {
1436+
Ok(_) => {
1437+
let op = if p.value.is_empty() { "deleted" } else { "written" };
1438+
info!(
1439+
proposal_id = %hex::encode(&p.proposal_id[..8]),
1440+
challenge_id = %p.challenge_id,
1441+
key_len = p.key.len(),
1442+
"Local proposal consensus reached, data {}", op
1443+
);
1444+
}
1445+
Err(e) => {
1446+
error!(
1447+
proposal_id = %hex::encode(&p.proposal_id[..8]),
1448+
error = %e,
1449+
"Failed to write local consensus storage"
1450+
);
14421451
}
14431452
}
14441453
}

0 commit comments

Comments
 (0)