Skip to content

Commit daf343f

Browse files
committed
Make approval-voting runable on a worker thread
Signed-off-by: Alexandru Gheorghe <[email protected]>
1 parent e4f883e commit daf343f

5 files changed

Lines changed: 469 additions & 136 deletions

File tree

polkadot/cli/src/command.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -373,16 +373,16 @@ pub fn run() -> Result<()> {
373373
Ok(runner.async_run(|mut config| {
374374
let (client, backend, _, task_manager) =
375375
polkadot_service::new_chain_ops(&mut config, None)?;
376+
let task_handle = task_manager.spawn_handle();
376377
let aux_revert = Box::new(|client, backend, blocks| {
377-
polkadot_service::revert_backend(client, backend, blocks, config).map_err(
378-
|err| {
378+
polkadot_service::revert_backend(client, backend, blocks, config, task_handle)
379+
.map_err(|err| {
379380
match err {
380381
polkadot_service::Error::Blockchain(err) => err.into(),
381382
// Generic application-specific error.
382383
err => sc_cli::Error::Application(err.into()),
383384
}
384-
},
385-
)
385+
})
386386
});
387387
Ok((
388388
cmd.run(client, backend, Some(aux_revert)).map_err(Error::SubstrateCli),

polkadot/node/core/approval-voting/src/import.rs

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use polkadot_node_subsystem::{
4444
overseer, RuntimeApiError, SubsystemError, SubsystemResult,
4545
};
4646
use polkadot_node_subsystem_util::{determine_new_blocks, runtime::RuntimeInfo};
47+
use polkadot_overseer::SubsystemSender;
4748
use polkadot_primitives::{
4849
node_features, BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, ConsensusLog,
4950
CoreIndex, GroupIndex, Hash, Header, SessionIndex,
@@ -110,8 +111,8 @@ enum ImportedBlockInfoError {
110111
/// Computes information about the imported block. Returns an error if the info couldn't be
111112
/// extracted.
112113
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
113-
async fn imported_block_info<Context>(
114-
ctx: &mut Context,
114+
async fn imported_block_info<Sender: SubsystemSender<RuntimeApiMessage>>(
115+
sender: &mut Sender,
115116
env: ImportedBlockInfoEnv<'_>,
116117
block_hash: Hash,
117118
block_header: &Header,
@@ -123,11 +124,12 @@ async fn imported_block_info<Context>(
123124
// fetch candidates
124125
let included_candidates: Vec<_> = {
125126
let (c_tx, c_rx) = oneshot::channel();
126-
ctx.send_message(RuntimeApiMessage::Request(
127-
block_hash,
128-
RuntimeApiRequest::CandidateEvents(c_tx),
129-
))
130-
.await;
127+
sender
128+
.send_message(RuntimeApiMessage::Request(
129+
block_hash,
130+
RuntimeApiRequest::CandidateEvents(c_tx),
131+
))
132+
.await;
131133

132134
let events: Vec<CandidateEvent> = match c_rx.await {
133135
Ok(Ok(events)) => events,
@@ -150,11 +152,12 @@ async fn imported_block_info<Context>(
150152
// short, that shouldn't happen.
151153
let session_index = {
152154
let (s_tx, s_rx) = oneshot::channel();
153-
ctx.send_message(RuntimeApiMessage::Request(
154-
block_header.parent_hash,
155-
RuntimeApiRequest::SessionIndexForChild(s_tx),
156-
))
157-
.await;
155+
sender
156+
.send_message(RuntimeApiMessage::Request(
157+
block_header.parent_hash,
158+
RuntimeApiRequest::SessionIndexForChild(s_tx),
159+
))
160+
.await;
158161

159162
let session_index = match s_rx.await {
160163
Ok(Ok(s)) => s,
@@ -200,11 +203,12 @@ async fn imported_block_info<Context>(
200203
// by one block. This gives us the opposite invariant for sessions - the parent block's
201204
// post-state gives us the canonical information about the session index for any of its
202205
// children, regardless of which slot number they might be produced at.
203-
ctx.send_message(RuntimeApiMessage::Request(
204-
block_hash,
205-
RuntimeApiRequest::CurrentBabeEpoch(s_tx),
206-
))
207-
.await;
206+
sender
207+
.send_message(RuntimeApiMessage::Request(
208+
block_hash,
209+
RuntimeApiRequest::CurrentBabeEpoch(s_tx),
210+
))
211+
.await;
208212

209213
match s_rx.await {
210214
Ok(Ok(s)) => s,
@@ -215,7 +219,7 @@ async fn imported_block_info<Context>(
215219
};
216220

217221
let extended_session_info =
218-
get_extended_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await;
222+
get_extended_session_info(env.runtime_info, sender, block_hash, session_index).await;
219223
let enable_v2_assignments = extended_session_info.map_or(false, |extended_session_info| {
220224
*extended_session_info
221225
.node_features
@@ -224,7 +228,7 @@ async fn imported_block_info<Context>(
224228
.unwrap_or(&false)
225229
});
226230

227-
let session_info = get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index)
231+
let session_info = get_session_info(env.runtime_info, sender, block_hash, session_index)
228232
.await
229233
.ok_or(ImportedBlockInfoError::SessionInfoUnavailable)?;
230234

@@ -328,9 +332,15 @@ pub struct BlockImportedCandidates {
328332
/// * and return information about all candidates imported under each block.
329333
///
330334
/// It is the responsibility of the caller to schedule wakeups for each block.
331-
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
332-
pub(crate) async fn handle_new_head<Context, B: Backend>(
333-
ctx: &mut Context,
335+
pub(crate) async fn handle_new_head<
336+
Sender: SubsystemSender<ChainApiMessage>
337+
+ SubsystemSender<RuntimeApiMessage>
338+
+ SubsystemSender<ChainSelectionMessage>,
339+
AVSender: SubsystemSender<ApprovalDistributionMessage>,
340+
B: Backend,
341+
>(
342+
sender: &mut Sender,
343+
approval_voting_sender: &mut AVSender,
334344
state: &State,
335345
db: &mut OverlayedBackend<'_, B>,
336346
session_info_provider: &mut RuntimeInfo,
@@ -348,7 +358,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
348358

349359
let header = {
350360
let (h_tx, h_rx) = oneshot::channel();
351-
ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await;
361+
sender.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await;
352362
match h_rx.await? {
353363
Err(e) => {
354364
gum::debug!(
@@ -374,7 +384,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
374384
let lower_bound_number = finalized_number.unwrap_or(lower_bound_number).max(lower_bound_number);
375385

376386
let new_blocks = determine_new_blocks(
377-
ctx.sender(),
387+
sender,
378388
|h| db.load_block_entry(h).map(|e| e.is_some()),
379389
head,
380390
&header,
@@ -400,12 +410,15 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
400410
keystore: &state.keystore,
401411
};
402412

403-
match imported_block_info(ctx, env, block_hash, &block_header, finalized_number).await {
413+
match imported_block_info(sender, env, block_hash, &block_header, finalized_number)
414+
.await
415+
{
404416
Ok(i) => imported_blocks_and_info.push((block_hash, block_header, i)),
405417
Err(error) => {
406418
// It's possible that we've lost a race with finality.
407419
let (tx, rx) = oneshot::channel();
408-
ctx.send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx))
420+
sender
421+
.send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx))
409422
.await;
410423

411424
let lost_to_finality = match rx.await {
@@ -449,17 +462,11 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
449462
force_approve,
450463
} = imported_block_info;
451464

452-
let session_info = match get_session_info(
453-
session_info_provider,
454-
ctx.sender(),
455-
head,
456-
session_index,
457-
)
458-
.await
459-
{
460-
Some(session_info) => session_info,
461-
None => return Ok(Vec::new()),
462-
};
465+
let session_info =
466+
match get_session_info(session_info_provider, sender, head, session_index).await {
467+
Some(session_info) => session_info,
468+
None => return Ok(Vec::new()),
469+
};
463470

464471
let (block_tick, no_show_duration) = {
465472
let block_tick = slot_number_to_tick(state.slot_duration_millis, slot);
@@ -509,7 +516,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
509516
};
510517
// If all bits are already set, then send an approve message.
511518
if approved_bitfield.count_ones() == approved_bitfield.len() {
512-
ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await;
519+
sender.send_message(ChainSelectionMessage::Approved(block_hash)).await;
513520
}
514521

515522
let block_entry = v3::BlockEntry {
@@ -566,7 +573,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
566573

567574
// Notify chain-selection of all approved hashes.
568575
for hash in approved_hashes {
569-
ctx.send_message(ChainSelectionMessage::Approved(hash)).await;
576+
sender.send_message(ChainSelectionMessage::Approved(hash)).await;
570577
}
571578
}
572579

@@ -602,7 +609,8 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
602609
"Informing distribution of newly imported chain",
603610
);
604611

605-
ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta));
612+
approval_voting_sender
613+
.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta));
606614
Ok(imported_candidates)
607615
}
608616

0 commit comments

Comments
 (0)