Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7bdd21b
refactor(poa): move redis publish into PoA, go native-async with cach…
Voxelot Apr 23, 2026
1475373
chore: rename changelog entry to PR number
Voxelot Apr 23, 2026
2d0cc91
test(importer): drop reconciliation-writer fakes after publish moved …
Voxelot Apr 23, 2026
afda814
refactor(poa): spawn per-node publish as tokio tasks for write-path d…
Voxelot Apr 23, 2026
5c2002f
refactor(poa): collapse ReadPort + WritePort into BlockReconciliation…
Voxelot Apr 23, 2026
bf28529
fix(poa): don't pin lease lifetime via spawned-task adapter clones
Voxelot Apr 23, 2026
b5a04f9
metrics(poa): add poa_outstanding_publish_tasks gauge
Voxelot Apr 23, 2026
34531de
use explicit types instead of implicit Arc to track if the adapter sh…
MitchTurner Apr 27, 2026
765d9d2
Improve usage
MitchTurner Apr 27, 2026
9475561
appease Clippy-sama
MitchTurner Apr 27, 2026
6efacea
refactor(poa): cancel-on-quorum read paths via FuturesUnordered
Voxelot Apr 27, 2026
580c678
refactor(poa): parallelize spawn_expand_to_non_owners promotions
Voxelot Apr 27, 2026
478556e
fix(poa): fold late-arriving promotion tokens into current_epoch_token
Voxelot Apr 27, 2026
59231ec
fix(poa): route spawn_expand_to_non_owners epoch updates through fold…
Voxelot Apr 27, 2026
2d2e64a
fix(poa): gate should_reconcile_from_stream backlog short-circuit on …
Voxelot Apr 28, 2026
6ce791c
make spell-check happy
MitchTurner Apr 28, 2026
f653a00
fix docs
MitchTurner Apr 28, 2026
1891e64
refactor(poa): rename agent-coined identifiers to descriptive names
Voxelot Apr 28, 2026
b660184
test(poa): use PING readiness probe for redis test server startup
Voxelot Apr 29, 2026
ccca052
Merge branch 'master' into refactor/poa-publish-to-poa
Voxelot Apr 29, 2026
e17ab28
Revert "fix docs"
Voxelot Apr 29, 2026
60028f0
test(poa): wait_for late epoch fold in higher-epoch adoption test
Voxelot Apr 29, 2026
dc69a1a
fix(poa): drop racy has_lease_owner_quorum check from release_if_owner
Voxelot Apr 29, 2026
a1fa5d6
test(poa): wait_for all-nodes-have-block after publish_produced_block
Voxelot Apr 29, 2026
33b881d
test(poa): wait_for epoch heal write in heal_epoch test
Voxelot Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/changed/3280.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move the PoA redis-publish path out of the importer and into the PoA service itself. `BlockReconciliationWritePort` now lives in `fuel_core_poa::ports` and is async; `RedisLeaderLeaseAdapter::publish_produced_block` runs natively on the tokio runtime, fans out to all nodes in parallel via `FuturesUnordered`, reuses cached per-node `MultiplexedConnection`s, wraps each per-node call in `tokio::time::timeout(node_timeout, ...)`, and short-circuits on quorum by dropping the remaining futures. No more rayon bridge, no more `std::thread::spawn` leak from the 2026-04-22 hotfix.
43 changes: 1 addition & 42 deletions crates/fuel-core/src/service/adapters/block_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
BlockImporterAdapter,
ExecutorAdapter,
VerifierAdapter,
consensus_module::poa::RedisLeaderLeaseAdapter,
},
};
use fuel_core_importer::{
Expand Down Expand Up @@ -62,30 +61,16 @@ use fuel_core_types::{
use itertools::Itertools;
use std::sync::Arc;

#[allow(clippy::large_enum_variant)]
pub enum BlockReconciliationWriteAdapter {
Redis(RedisLeaderLeaseAdapter),
Noop(NoopBlockReconciliationWriteAdapter),
}

impl BlockImporterAdapter {
pub fn new(
chain_id: ChainId,
config: Config,
database: Database,
executor: ExecutorAdapter,
verifier: VerifierAdapter,
block_reconciliation_write_adapter: BlockReconciliationWriteAdapter,
) -> Self {
let database_for_height = database.clone();
let importer = Importer::new(
chain_id,
config,
database,
executor,
verifier,
block_reconciliation_write_adapter,
);
let importer = Importer::new(chain_id, config, database, executor, verifier);
Self {
block_importer: Arc::new(importer),
database: database_for_height,
Expand All @@ -111,32 +96,6 @@ impl BlockVerifier for VerifierAdapter {
}
}

#[derive(Default)]
pub struct NoopBlockReconciliationWriteAdapter;

impl fuel_core_importer::ports::BlockReconciliationWritePort
for BlockReconciliationWriteAdapter
{
fn publish_produced_block(&self, block: &SealedBlock) -> anyhow::Result<()> {
match self {
Self::Redis(adapter) => {
fuel_core_importer::ports::BlockReconciliationWritePort::publish_produced_block(adapter, block)
}
Self::Noop(adapter) => {
fuel_core_importer::ports::BlockReconciliationWritePort::publish_produced_block(adapter, block)
}
}
}
}

impl fuel_core_importer::ports::BlockReconciliationWritePort
for NoopBlockReconciliationWriteAdapter
{
fn publish_produced_block(&self, _block: &SealedBlock) -> anyhow::Result<()> {
Ok(())
}
}

impl ImporterDatabase for Database {
fn latest_block_height(&self) -> StorageResult<Option<BlockHeight>> {
self.iter_all_keys::<FuelBlocks>(Some(IterDirection::Reverse))
Expand Down
Loading
Loading