Skip to content

Commit 45765ac

Browse files
authored
Merge branch 'yk/worker_pool_acc' into yk/pool_clean
2 parents bc4ecf5 + f692d75 commit 45765ac

File tree

26 files changed

+1120
-234
lines changed

26 files changed

+1120
-234
lines changed

.github/workflows/hive.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,39 @@ jobs:
3434
repository: ethereum/hive
3535
path: hivetests
3636

37+
- name: Get hive commit hash
38+
id: hive-commit
39+
run: echo "hash=$(cd hivetests && git rev-parse HEAD)" >> $GITHUB_OUTPUT
40+
3741
- uses: actions/setup-go@v6
3842
with:
3943
go-version: "^1.13.1"
4044
- run: go version
4145

46+
- name: Restore hive assets cache
47+
id: cache-hive
48+
uses: actions/cache@v4
49+
with:
50+
path: ./hive_assets
51+
key: hive-assets-${{ steps.hive-commit.outputs.hash }}-${{ hashFiles('.github/assets/hive/build_simulators.sh') }}
52+
4253
- name: Build hive assets
54+
if: steps.cache-hive.outputs.cache-hit != 'true'
4355
run: .github/assets/hive/build_simulators.sh
4456

57+
- name: Load cached Docker images
58+
if: steps.cache-hive.outputs.cache-hit == 'true'
59+
run: |
60+
cd hive_assets
61+
for tar_file in *.tar; do
62+
if [ -f "$tar_file" ]; then
63+
echo "Loading $tar_file..."
64+
docker load -i "$tar_file"
65+
fi
66+
done
67+
# Make hive binary executable
68+
chmod +x hive
69+
4570
- name: Upload hive assets
4671
uses: actions/upload-artifact@v4
4772
with:

crates/chain-state/src/in_memory.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,8 +1380,7 @@ mod tests {
13801380
#[test]
13811381
fn test_canonical_in_memory_state_canonical_chain_empty() {
13821382
let state: CanonicalInMemoryState = CanonicalInMemoryState::empty();
1383-
let chain: Vec<_> = state.canonical_chain().collect();
1384-
assert!(chain.is_empty());
1383+
assert!(state.canonical_chain().next().is_none());
13851384
}
13861385

13871386
#[test]

crates/engine/primitives/src/config.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@ fn default_storage_worker_count() -> usize {
2626
}
2727
}
2828

29-
/// Returns the default number of account worker threads optimized for I/O-bound coordination.
29+
/// Returns the default number of account worker threads.
3030
///
31-
/// Account workers primarily coordinate storage proof collection and account trie traversal.
32-
/// They spend significant time blocked on `receiver.recv()` calls waiting for storage proofs,
33-
/// so we use higher concurrency (1.5x storage workers) to maximize throughput and overlap.
34-
/// While storage workers are CPU-bound, account workers are I/O-bound coordinators.
31+
/// Account workers coordinate storage proof collection and account trie traversal.
32+
/// They are set to the same count as storage workers for simplicity.
3533
fn default_account_worker_count() -> usize {
36-
((default_storage_worker_count() * 3) / 2).max(MIN_WORKER_COUNT)
34+
default_storage_worker_count()
3735
}
3836

3937
/// The size of proof targets chunk to spawn in one multiproof calculation.

crates/engine/tree/src/tree/mod.rs

Lines changed: 174 additions & 85 deletions
Large diffs are not rendered by default.

crates/engine/tree/src/tree/tests.rs

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ impl reth_engine_primitives::PayloadValidator<EthEngineTypes> for MockEngineVali
5656
reth_payload_primitives::NewPayloadError::Other(format!("{e:?}").into())
5757
})?;
5858
let sealed = block.seal_slow();
59+
5960
sealed.try_recover().map_err(|e| reth_payload_primitives::NewPayloadError::Other(e.into()))
6061
}
6162
}
@@ -1705,3 +1706,305 @@ mod payload_execution_tests {
17051706
}
17061707
}
17071708
}
1709+
1710+
/// Test suite for the refactored `on_forkchoice_updated` helper methods
1711+
#[cfg(test)]
1712+
mod forkchoice_updated_tests {
1713+
use super::*;
1714+
use alloy_primitives::Address;
1715+
1716+
/// Test that validates the forkchoice state pre-validation logic
1717+
#[tokio::test]
1718+
async fn test_validate_forkchoice_state() {
1719+
let chain_spec = MAINNET.clone();
1720+
let mut test_harness = TestHarness::new(chain_spec);
1721+
1722+
// Test 1: Zero head block hash should return early with invalid state
1723+
let zero_state = ForkchoiceState {
1724+
head_block_hash: B256::ZERO,
1725+
safe_block_hash: B256::ZERO,
1726+
finalized_block_hash: B256::ZERO,
1727+
};
1728+
1729+
let result = test_harness.tree.validate_forkchoice_state(zero_state).unwrap();
1730+
assert!(result.is_some(), "Zero head block hash should return early");
1731+
let outcome = result.unwrap();
1732+
// For invalid state, we expect an error response
1733+
assert!(matches!(outcome, OnForkChoiceUpdated { .. }));
1734+
1735+
// Test 2: Valid state with backfill active should return syncing
1736+
test_harness.tree.backfill_sync_state = BackfillSyncState::Active;
1737+
let valid_state = ForkchoiceState {
1738+
head_block_hash: B256::random(),
1739+
safe_block_hash: B256::ZERO,
1740+
finalized_block_hash: B256::ZERO,
1741+
};
1742+
1743+
let result = test_harness.tree.validate_forkchoice_state(valid_state).unwrap();
1744+
assert!(result.is_some(), "Backfill active should return early");
1745+
let outcome = result.unwrap();
1746+
// We need to await the outcome to check the payload status
1747+
let fcu_result = outcome.await.unwrap();
1748+
assert!(fcu_result.payload_status.is_syncing());
1749+
1750+
// Test 3: Valid state with idle backfill should continue processing
1751+
test_harness.tree.backfill_sync_state = BackfillSyncState::Idle;
1752+
let valid_state = ForkchoiceState {
1753+
head_block_hash: B256::random(),
1754+
safe_block_hash: B256::ZERO,
1755+
finalized_block_hash: B256::ZERO,
1756+
};
1757+
1758+
let result = test_harness.tree.validate_forkchoice_state(valid_state).unwrap();
1759+
assert!(result.is_none(), "Valid state should continue processing");
1760+
}
1761+
1762+
/// Test that verifies canonical head handling
1763+
#[tokio::test]
1764+
async fn test_handle_canonical_head() {
1765+
let chain_spec = MAINNET.clone();
1766+
let mut test_harness = TestHarness::new(chain_spec);
1767+
1768+
// Create test blocks
1769+
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..3).collect();
1770+
test_harness = test_harness.with_blocks(blocks);
1771+
1772+
let canonical_head = test_harness.tree.state.tree_state.canonical_block_hash();
1773+
1774+
// Test 1: Head is already canonical, no payload attributes
1775+
let state = ForkchoiceState {
1776+
head_block_hash: canonical_head,
1777+
safe_block_hash: B256::ZERO,
1778+
finalized_block_hash: B256::ZERO,
1779+
};
1780+
1781+
let result = test_harness
1782+
.tree
1783+
.handle_canonical_head(state, &None, EngineApiMessageVersion::default())
1784+
.unwrap();
1785+
assert!(result.is_some(), "Should return outcome for canonical head");
1786+
let outcome = result.unwrap();
1787+
let fcu_result = outcome.outcome.await.unwrap();
1788+
assert!(fcu_result.payload_status.is_valid());
1789+
1790+
// Test 2: Head is not canonical - should return None to continue processing
1791+
let non_canonical_state = ForkchoiceState {
1792+
head_block_hash: B256::random(),
1793+
safe_block_hash: B256::ZERO,
1794+
finalized_block_hash: B256::ZERO,
1795+
};
1796+
1797+
let result = test_harness
1798+
.tree
1799+
.handle_canonical_head(non_canonical_state, &None, EngineApiMessageVersion::default())
1800+
.unwrap();
1801+
assert!(result.is_none(), "Non-canonical head should return None");
1802+
}
1803+
1804+
/// Test that verifies chain update application
1805+
#[tokio::test]
1806+
async fn test_apply_chain_update() {
1807+
let chain_spec = MAINNET.clone();
1808+
let mut test_harness = TestHarness::new(chain_spec);
1809+
1810+
// Create a chain of blocks
1811+
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..5).collect();
1812+
test_harness = test_harness.with_blocks(blocks.clone());
1813+
1814+
let new_head = blocks[2].recovered_block().hash();
1815+
1816+
// Test 1: Apply chain update to a new head
1817+
let state = ForkchoiceState {
1818+
head_block_hash: new_head,
1819+
safe_block_hash: B256::ZERO,
1820+
finalized_block_hash: B256::ZERO,
1821+
};
1822+
1823+
let result = test_harness
1824+
.tree
1825+
.apply_chain_update(state, &None, EngineApiMessageVersion::default())
1826+
.unwrap();
1827+
assert!(result.is_some(), "Should apply chain update for new head");
1828+
let outcome = result.unwrap();
1829+
let fcu_result = outcome.outcome.await.unwrap();
1830+
assert!(fcu_result.payload_status.is_valid());
1831+
1832+
// Test 2: Try to apply chain update to missing block
1833+
let missing_state = ForkchoiceState {
1834+
head_block_hash: B256::random(),
1835+
safe_block_hash: B256::ZERO,
1836+
finalized_block_hash: B256::ZERO,
1837+
};
1838+
1839+
let result = test_harness
1840+
.tree
1841+
.apply_chain_update(missing_state, &None, EngineApiMessageVersion::default())
1842+
.unwrap();
1843+
assert!(result.is_none(), "Missing block should return None");
1844+
}
1845+
1846+
/// Test that verifies missing block handling
1847+
#[tokio::test]
1848+
async fn test_handle_missing_block() {
1849+
let chain_spec = MAINNET.clone();
1850+
let test_harness = TestHarness::new(chain_spec);
1851+
1852+
let state = ForkchoiceState {
1853+
head_block_hash: B256::random(),
1854+
safe_block_hash: B256::ZERO,
1855+
finalized_block_hash: B256::ZERO,
1856+
};
1857+
1858+
let result = test_harness.tree.handle_missing_block(state).unwrap();
1859+
1860+
// Should return syncing status with download event
1861+
let fcu_result = result.outcome.await.unwrap();
1862+
assert!(fcu_result.payload_status.is_syncing());
1863+
assert!(result.event.is_some());
1864+
1865+
if let Some(TreeEvent::Download(download_request)) = result.event {
1866+
match download_request {
1867+
DownloadRequest::BlockSet(block_set) => {
1868+
assert_eq!(block_set.len(), 1);
1869+
}
1870+
_ => panic!("Expected single block download request"),
1871+
}
1872+
}
1873+
}
1874+
1875+
/// Test the complete `on_forkchoice_updated` flow with all helper methods
1876+
#[tokio::test]
1877+
async fn test_on_forkchoice_updated_integration() {
1878+
reth_tracing::init_test_tracing();
1879+
1880+
let chain_spec = MAINNET.clone();
1881+
let mut test_harness = TestHarness::new(chain_spec);
1882+
1883+
// Create test blocks
1884+
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..3).collect();
1885+
test_harness = test_harness.with_blocks(blocks.clone());
1886+
1887+
let canonical_head = test_harness.tree.state.tree_state.canonical_block_hash();
1888+
1889+
// Test Case 1: FCU to existing canonical head
1890+
let state = ForkchoiceState {
1891+
head_block_hash: canonical_head,
1892+
safe_block_hash: canonical_head,
1893+
finalized_block_hash: canonical_head,
1894+
};
1895+
1896+
let result = test_harness
1897+
.tree
1898+
.on_forkchoice_updated(state, None, EngineApiMessageVersion::default())
1899+
.unwrap();
1900+
let fcu_result = result.outcome.await.unwrap();
1901+
assert!(fcu_result.payload_status.is_valid());
1902+
1903+
// Test Case 2: FCU to missing block
1904+
let missing_state = ForkchoiceState {
1905+
head_block_hash: B256::random(),
1906+
safe_block_hash: B256::ZERO,
1907+
finalized_block_hash: B256::ZERO,
1908+
};
1909+
1910+
let result = test_harness
1911+
.tree
1912+
.on_forkchoice_updated(missing_state, None, EngineApiMessageVersion::default())
1913+
.unwrap();
1914+
let fcu_result = result.outcome.await.unwrap();
1915+
assert!(fcu_result.payload_status.is_syncing());
1916+
assert!(result.event.is_some(), "Should trigger download event for missing block");
1917+
1918+
// Test Case 3: FCU during backfill sync
1919+
test_harness.tree.backfill_sync_state = BackfillSyncState::Active;
1920+
let state = ForkchoiceState {
1921+
head_block_hash: canonical_head,
1922+
safe_block_hash: B256::ZERO,
1923+
finalized_block_hash: B256::ZERO,
1924+
};
1925+
1926+
let result = test_harness
1927+
.tree
1928+
.on_forkchoice_updated(state, None, EngineApiMessageVersion::default())
1929+
.unwrap();
1930+
let fcu_result = result.outcome.await.unwrap();
1931+
assert!(fcu_result.payload_status.is_syncing(), "Should return syncing during backfill");
1932+
}
1933+
1934+
/// Test metrics recording in forkchoice updated
1935+
#[tokio::test]
1936+
async fn test_record_forkchoice_metrics() {
1937+
let chain_spec = MAINNET.clone();
1938+
let test_harness = TestHarness::new(chain_spec);
1939+
1940+
// Get initial metrics state by checking if metrics are recorded
1941+
// We can't directly get counter values, but we can verify the methods are called
1942+
1943+
// Test without attributes
1944+
let attrs_none = None;
1945+
test_harness.tree.record_forkchoice_metrics(&attrs_none);
1946+
1947+
// Test with attributes
1948+
let attrs_some = Some(alloy_rpc_types_engine::PayloadAttributes {
1949+
timestamp: 1000,
1950+
prev_randao: B256::random(),
1951+
suggested_fee_recipient: Address::random(),
1952+
withdrawals: None,
1953+
parent_beacon_block_root: None,
1954+
});
1955+
test_harness.tree.record_forkchoice_metrics(&attrs_some);
1956+
1957+
// We can't directly verify counter values since they're private metrics
1958+
// But we can verify the methods don't panic and execute successfully
1959+
}
1960+
1961+
/// Test edge case: FCU with invalid ancestor
1962+
#[tokio::test]
1963+
async fn test_fcu_with_invalid_ancestor() {
1964+
let chain_spec = MAINNET.clone();
1965+
let mut test_harness = TestHarness::new(chain_spec);
1966+
1967+
// Mark a block as invalid
1968+
let invalid_block_hash = B256::random();
1969+
test_harness.tree.state.invalid_headers.insert(BlockWithParent {
1970+
block: NumHash::new(1, invalid_block_hash),
1971+
parent: B256::ZERO,
1972+
});
1973+
1974+
// Test FCU that points to a descendant of the invalid block
1975+
// This is a bit tricky to test directly, but we can verify the check_invalid_ancestor
1976+
// method
1977+
let result = test_harness.tree.check_invalid_ancestor(invalid_block_hash).unwrap();
1978+
assert!(result.is_some(), "Should detect invalid ancestor");
1979+
}
1980+
1981+
/// Test `OpStack` specific behavior with canonical head
1982+
#[tokio::test]
1983+
async fn test_opstack_canonical_head_behavior() {
1984+
let chain_spec = MAINNET.clone();
1985+
let mut test_harness = TestHarness::new(chain_spec);
1986+
1987+
// Set engine kind to OpStack
1988+
test_harness.tree.engine_kind = EngineApiKind::OpStack;
1989+
1990+
// Create test blocks
1991+
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..3).collect();
1992+
test_harness = test_harness.with_blocks(blocks);
1993+
1994+
let canonical_head = test_harness.tree.state.tree_state.canonical_block_hash();
1995+
1996+
// For OpStack, even if head is already canonical, we should still process payload
1997+
// attributes
1998+
let state = ForkchoiceState {
1999+
head_block_hash: canonical_head,
2000+
safe_block_hash: B256::ZERO,
2001+
finalized_block_hash: B256::ZERO,
2002+
};
2003+
2004+
let result = test_harness
2005+
.tree
2006+
.handle_canonical_head(state, &None, EngineApiMessageVersion::default())
2007+
.unwrap();
2008+
assert!(result.is_some(), "OpStack should handle canonical head");
2009+
}
2010+
}

crates/node/builder/src/engine_api_ext.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
use crate::rpc::EngineApiBuilder;
66
use eyre::Result;
77
use reth_node_api::{AddOnsContext, FullNodeComponents};
8-
use reth_rpc_api::IntoEngineApiRpcModule;
98

109
/// Provides access to an `EngineApi` instance with a callback
1110
#[derive(Debug)]
@@ -27,7 +26,7 @@ impl<N, B, F> EngineApiBuilder<N> for EngineApiExt<B, F>
2726
where
2827
B: EngineApiBuilder<N>,
2928
N: FullNodeComponents,
30-
B::EngineApi: IntoEngineApiRpcModule + Send + Sync + Clone + 'static,
29+
B::EngineApi: Clone,
3130
F: FnOnce(B::EngineApi) + Send + Sync + 'static,
3231
{
3332
type EngineApi = B::EngineApi;

0 commit comments

Comments
 (0)