diff --git a/cmd/ethrex/l2/initializers.rs b/cmd/ethrex/l2/initializers.rs index 9ef702ed796..98ab8f04dc9 100644 --- a/cmd/ethrex/l2/initializers.rs +++ b/cmd/ethrex/l2/initializers.rs @@ -11,7 +11,6 @@ use ethrex_blockchain::{Blockchain, BlockchainType, L2Config}; use ethrex_common::fd_limit::raise_fd_limit; use ethrex_common::types::fee_config::{FeeConfig, L1FeeConfig, OperatorFeeConfig}; use ethrex_common::{Address, types::DEFAULT_BUILDER_GAS_CEIL}; -use ethrex_l2::SequencerConfig; use ethrex_l2::sequencer::l1_committer::regenerate_head_state; use ethrex_p2p::{ discv4::peer_table::PeerTable, @@ -23,11 +22,12 @@ use ethrex_p2p::{ }; use ethrex_storage::Store; use ethrex_storage_rollup::{EngineTypeRollup, StoreRollup}; +use eyre::OptionExt; use secp256k1::SecretKey; use std::{fs::read_to_string, path::Path, sync::Arc, time::Duration}; use tokio::task::JoinSet; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use tracing::{error, info, warn}; +use tracing::{info, warn}; use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, reload}; use tui_logger::{LevelFilter, TuiTracingSubscriberLayer}; use url::Url; @@ -36,12 +36,12 @@ use url::Url; async fn init_rpc_api( opts: &L1Options, l2_opts: &L2Options, - peer_handler: PeerHandler, + peer_handler: Option, local_p2p_node: Node, local_node_record: NodeRecord, store: Store, blockchain: Arc, - cancel_token: CancellationToken, + syncer: Option>, tracker: TaskTracker, rollup_store: StoreRollup, log_filter_handler: Option>, @@ -49,17 +49,6 @@ async fn init_rpc_api( ) { init_datadir(&opts.datadir); - // Create SyncManager - let syncer = SyncManager::new( - peer_handler.clone(), - &opts.syncmode, - cancel_token, - blockchain.clone(), - store.clone(), - opts.datadir.clone(), - ) - .await; - let rpc_api = ethrex_l2_rpc::start_api( get_http_socket_addr(opts), get_authrpc_socket_addr(opts), @@ -197,46 +186,77 @@ pub async fn init_l2( let local_node_record = get_local_node_record(&datadir, &local_p2p_node, &signer); - let peer_table = PeerTable::spawn(opts.node_opts.target_peers); - // TODO: Check every module starts properly. let tracker = TaskTracker::new(); let mut join_set = JoinSet::new(); - let p2p_context = P2PContext::new( - local_p2p_node.clone(), - tracker.clone(), - signer, - peer_table.clone(), - store.clone(), - blockchain.clone(), - get_client_version(), - #[cfg(feature = "l2")] - Some(P2PBasedContext { - store_rollup: rollup_store.clone(), - // TODO: The Web3Signer refactor introduced a limitation where the committer key cannot be accessed directly because the signer could be either Local or Remote. - // The Signer enum cannot be used in the P2PBasedContext struct due to cyclic dependencies between the l2-rpc and p2p crates. - // As a temporary solution, a dummy committer key is used until a proper mechanism to utilize the Signer enum is implemented. - // This should be replaced with the Signer enum once the refactor is complete. - committer_key: Arc::new( - SecretKey::from_slice( - &hex::decode( - "385c546456b6a603a1cfcaa9ec9494ba4832da08dd6bcf4de9a71e4a01b74924", + let cancel_token = tokio_util::sync::CancellationToken::new(); + + let based = opts.sequencer_opts.based; + + let (peer_handler, syncer) = if based { + let peer_table = PeerTable::spawn(opts.node_opts.target_peers); + let p2p_context = P2PContext::new( + local_p2p_node.clone(), + tracker.clone(), + signer, + peer_table.clone(), + store.clone(), + blockchain.clone(), + get_client_version(), + #[cfg(feature = "l2")] + Some(P2PBasedContext { + store_rollup: rollup_store.clone(), + // TODO: The Web3Signer refactor introduced a limitation where the committer key cannot be accessed directly because the signer could be either Local or Remote. + // The Signer enum cannot be used in the P2PBasedContext struct due to cyclic dependencies between the l2-rpc and p2p crates. + // As a temporary solution, a dummy committer key is used until a proper mechanism to utilize the Signer enum is implemented. + // This should be replaced with the Signer enum once the refactor is complete. + committer_key: Arc::new( + SecretKey::from_slice( + &hex::decode( + "385c546456b6a603a1cfcaa9ec9494ba4832da08dd6bcf4de9a71e4a01b74924", + ) + .expect("Invalid committer key"), ) - .expect("Invalid committer key"), - ) - .expect("Failed to create committer key"), - ), - }), - opts.node_opts.tx_broadcasting_time_interval, - ) - .await - .expect("P2P context could not be created"); + .expect("Failed to create committer key"), + ), + }), + opts.node_opts.tx_broadcasting_time_interval, + ) + .await + .expect("P2P context could not be created"); + let initiator = RLPxInitiator::spawn(p2p_context.clone()).await; + let peer_handler = PeerHandler::new(peer_table, initiator); - let initiator = RLPxInitiator::spawn(p2p_context.clone()).await; - let peer_handler = PeerHandler::new(peer_table, initiator); + // Create SyncManager + let syncer = SyncManager::new( + peer_handler.clone(), + &opts.node_opts.syncmode, + cancel_token.clone(), + blockchain.clone(), + store.clone(), + opts.node_opts.datadir.clone(), + ) + .await; - let cancel_token = tokio_util::sync::CancellationToken::new(); + // TODO: This should be handled differently, the current problem + // with using opts.node_opts.p2p_disabled is that with the removal + // of the l2 feature flag, p2p_disabled is set to false by default + // prioritizing the L1 UX. + init_network( + &opts.node_opts, + &network, + &datadir, + peer_handler.clone(), + tracker.clone(), + blockchain.clone(), + p2p_context, + ) + .await; + (Some(peer_handler), Some(Arc::new(syncer))) + } else { + (None, None) + }; init_rpc_api( &opts.node_opts, @@ -246,7 +266,7 @@ pub async fn init_l2( local_node_record.clone(), store.clone(), blockchain.clone(), - cancel_token.clone(), + syncer, tracker.clone(), rollup_store.clone(), log_filter_handler, @@ -256,32 +276,10 @@ pub async fn init_l2( // Initialize metrics if enabled if opts.node_opts.metrics_enabled { - init_metrics(&opts.node_opts, tracker.clone()); + init_metrics(&opts.node_opts, tracker); } - let l2_sequencer_cfg = SequencerConfig::try_from(opts.sequencer_opts).inspect_err(|err| { - error!("{err}"); - })?; - let cancellation_token = CancellationToken::new(); - - // TODO: This should be handled differently, the current problem - // with using opts.node_opts.p2p_diabled is that with the removal - // of the l2 feature flag, p2p_diabled is set to false by default - // prioritizing the L1 UX. - if l2_sequencer_cfg.based.enabled { - init_network( - &opts.node_opts, - &network, - &datadir, - peer_handler.clone(), - tracker, - blockchain.clone(), - p2p_context, - ) - .await; - } else { - info!("P2P is disabled"); - } + let sequencer_cancellation_token = CancellationToken::new(); let l2_url = Url::parse(&format!( "http://{}:{}", @@ -293,8 +291,8 @@ pub async fn init_l2( store, rollup_store, blockchain, - l2_sequencer_cfg, - cancellation_token.clone(), + opts.sequencer_opts.try_into()?, + sequencer_cancellation_token.clone(), l2_url, genesis, checkpoints_dir, @@ -307,15 +305,18 @@ pub async fn init_l2( _ = tokio::signal::ctrl_c() => { join_set.abort_all(); } - _ = cancellation_token.cancelled() => { + _ = sequencer_cancellation_token.cancelled() => { } } info!("Server shut down started..."); let node_config_path = datadir.join("node_config.json"); info!(path = %node_config_path.display(), "Storing node config"); cancel_token.cancel(); - let node_config = NodeConfigFile::new(peer_handler.peer_table, local_node_record).await; - store_node_config_file(node_config, node_config_path).await; + if based { + let peer_handler = peer_handler.ok_or_eyre("Peer handler not initialized")?; + let node_config = NodeConfigFile::new(peer_handler.peer_table, local_node_record).await; + store_node_config_file(node_config, node_config_path).await; + } tokio::time::sleep(Duration::from_secs(1)).await; info!("Server shutting down!"); Ok(()) diff --git a/crates/l2/networking/rpc/rpc.rs b/crates/l2/networking/rpc/rpc.rs index 2e0f895f39a..159819996e5 100644 --- a/crates/l2/networking/rpc/rpc.rs +++ b/crates/l2/networking/rpc/rpc.rs @@ -76,8 +76,8 @@ pub async fn start_api( jwt_secret: Bytes, local_p2p_node: Node, local_node_record: NodeRecord, - syncer: SyncManager, - peer_handler: PeerHandler, + syncer: Option>, + peer_handler: Option, client_version: String, valid_delegation_addresses: Vec
, sponsor_pk: SecretKey, @@ -94,7 +94,7 @@ pub async fn start_api( storage, blockchain, active_filters: active_filters.clone(), - syncer: Arc::new(syncer), + syncer, peer_handler, node_data: NodeData { jwt_secret, diff --git a/crates/networking/rpc/admin/peers.rs b/crates/networking/rpc/admin/peers.rs index f125f69e621..95525cfcf2c 100644 --- a/crates/networking/rpc/admin/peers.rs +++ b/crates/networking/rpc/admin/peers.rs @@ -86,13 +86,17 @@ impl From for RpcPeer { } pub async fn peers(context: &mut RpcApiContext) -> Result { - let peers = context - .peer_handler + let Some(peer_handler) = &mut context.peer_handler else { + return Err(RpcErr::Internal("Peer handler not initialized".to_string())); + }; + + let peers = peer_handler .read_connected_peers() .await .into_iter() .map(RpcPeer::from) .collect::>(); + Ok(serde_json::to_value(peers)?) } @@ -116,7 +120,10 @@ fn parse(request: &RpcRequest) -> Result { } pub async fn add_peer(context: &mut RpcApiContext, request: &RpcRequest) -> Result { - let mut server = context.peer_handler.initiator.clone(); + let Some(peer_handler) = context.peer_handler.as_mut() else { + return Err(RpcErr::Internal("Peer handler not initialized".to_string())); + }; + let mut server = peer_handler.initiator.clone(); let node = parse(request)?; let start = Instant::now(); @@ -128,7 +135,7 @@ pub async fn add_peer(context: &mut RpcApiContext, request: &RpcRequest) -> Resu // This loop is necessary because connections are asynchronous, so to check if the connection with the peer was actually // established we need to wait. loop { - if peer_is_connected(&mut context.peer_handler, &node.enode_url()).await { + if peer_is_connected(peer_handler, &node.enode_url()).await { return Ok(serde_json::to_value(true)?); } diff --git a/crates/networking/rpc/engine/fork_choice.rs b/crates/networking/rpc/engine/fork_choice.rs index 5ee2984015c..e400d72f3d3 100644 --- a/crates/networking/rpc/engine/fork_choice.rs +++ b/crates/networking/rpc/engine/fork_choice.rs @@ -169,6 +169,11 @@ async fn handle_forkchoice( context: RpcApiContext, version: usize, ) -> Result<(Option, ForkChoiceResponse), RpcErr> { + let Some(syncer) = &context.syncer else { + return Err(RpcErr::Internal( + "Fork choice requested but syncer is not initialized".to_string(), + )); + }; info!( version = %format!("v{}", version), head = %format!("{:#x}", fork_choice_state.head_block_hash), @@ -278,9 +283,7 @@ async fn handle_forkchoice( ), InvalidForkChoice::Syncing => { // Start sync - context - .syncer - .sync_to_head(fork_choice_state.head_block_hash); + syncer.sync_to_head(fork_choice_state.head_block_hash); ForkChoiceResponse::from(PayloadStatus::syncing()) } InvalidForkChoice::Disconnected(_, _) | InvalidForkChoice::ElementNotFound(_) => { diff --git a/crates/networking/rpc/engine/payload.rs b/crates/networking/rpc/engine/payload.rs index a99ef9b9cc4..4f3e2c2e7dd 100644 --- a/crates/networking/rpc/engine/payload.rs +++ b/crates/networking/rpc/engine/payload.rs @@ -591,6 +591,11 @@ async fn handle_new_payload_v1_v2( block: Block, context: RpcApiContext, ) -> Result { + let Some(syncer) = &context.syncer else { + return Err(RpcErr::Internal( + "New payload requested but syncer is not initialized".to_string(), + )); + }; // Validate block hash if let Err(RpcErr::Internal(error_msg)) = validate_block_hash(payload, &block) { return Ok(PayloadStatus::invalid_with_err(&error_msg)); @@ -604,7 +609,7 @@ async fn handle_new_payload_v1_v2( // We have validated ancestors, the parent is correct let latest_valid_hash = block.header.parent_hash; - if context.syncer.sync_mode() == SyncMode::Snap { + if syncer.sync_mode() == SyncMode::Snap { debug!("Snap sync in progress, skipping new payload validation"); return Ok(PayloadStatus::syncing()); } @@ -698,6 +703,11 @@ async fn try_execute_payload( context: &RpcApiContext, latest_valid_hash: H256, ) -> Result { + let Some(syncer) = &context.syncer else { + return Err(RpcErr::Internal( + "New payload requested but syncer is not initialized".to_string(), + )); + }; let block_hash = block.hash(); let block_number = block.header.number; let storage = &context.storage; @@ -714,7 +724,7 @@ async fn try_execute_payload( match add_block(context, block).await { Err(ChainError::ParentNotFound) => { // Start sync - context.syncer.sync_to_head(block_hash); + syncer.sync_to_head(block_hash); Ok(PayloadStatus::syncing()) } // Under the current implementation this is not possible: we always calculate the state diff --git a/crates/networking/rpc/eth/client.rs b/crates/networking/rpc/eth/client.rs index 04a6181c963..271cb4cd495 100644 --- a/crates/networking/rpc/eth/client.rs +++ b/crates/networking/rpc/eth/client.rs @@ -50,14 +50,18 @@ impl RpcHandler for Syncing { } async fn handle(&self, context: RpcApiContext) -> Result { + let Some(syncer) = &context.syncer else { + return Err(RpcErr::Internal( + "Syncing status requested but syncer is not initialized".to_string(), + )); + }; if context.blockchain.is_synced() { Ok(Value::Bool(!context.blockchain.is_synced())) } else { let syncing_status = SyncingStatusRpc { starting_block: context.storage.get_earliest_block_number().await?, current_block: context.storage.get_latest_block_number().await?, - highest_block: context - .syncer + highest_block: syncer .get_last_fcu_head() .map_err(|error| RpcErr::Internal(error.to_string()))? .to_low_u64_be(), diff --git a/crates/networking/rpc/net/mod.rs b/crates/networking/rpc/net/mod.rs index 0b2395faf62..5d95a8cbf4f 100644 --- a/crates/networking/rpc/net/mod.rs +++ b/crates/networking/rpc/net/mod.rs @@ -13,8 +13,10 @@ pub fn version(_req: &RpcRequest, context: RpcApiContext) -> Result Result { - let total_peers = context - .peer_handler + let Some(peer_handler) = &mut context.peer_handler else { + return Err(RpcErr::Internal("Peer handler not initialized".to_string())); + }; + let total_peers = peer_handler .count_total_peers() .await .map_err(|e| RpcErr::Internal(format!("Could not retrieve peer count: {e}")))?; diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index d61a81a7c59..dd50394fe50 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -165,8 +165,10 @@ pub struct RpcApiContext { pub storage: Store, pub blockchain: Arc, pub active_filters: ActiveFilters, - pub syncer: Arc, - pub peer_handler: PeerHandler, + // L2 nodes don't need to initialize the syncer + pub syncer: Option>, + // L2 nodes don't need to initialize the peer handler + pub peer_handler: Option, pub node_data: NodeData, pub gas_tip_estimator: Arc>, pub log_filter_handler: Option>, @@ -246,8 +248,8 @@ pub async fn start_api( storage, blockchain, active_filters: active_filters.clone(), - syncer: Arc::new(syncer), - peer_handler, + syncer: Some(Arc::new(syncer)), + peer_handler: Some(peer_handler), node_data: NodeData { jwt_secret, local_p2p_node, diff --git a/crates/networking/rpc/test_utils.rs b/crates/networking/rpc/test_utils.rs index 9dd87e59d8a..1b29f45e4eb 100644 --- a/crates/networking/rpc/test_utils.rs +++ b/crates/networking/rpc/test_utils.rs @@ -266,8 +266,8 @@ pub async fn default_context_with_storage(storage: Store) -> RpcApiContext { storage, blockchain, active_filters: Default::default(), - syncer: Arc::new(dummy_sync_manager().await), - peer_handler: dummy_peer_handler().await, + syncer: Some(Arc::new(dummy_sync_manager().await)), + peer_handler: Some(dummy_peer_handler().await), node_data: NodeData { jwt_secret: Default::default(), local_p2p_node: example_p2p_node(), diff --git a/docs/CLI.md b/docs/CLI.md index 1b8ae492669..6974d2a009c 100644 --- a/docs/CLI.md +++ b/docs/CLI.md @@ -147,7 +147,7 @@ Block building options: --builder.extra-data Block extra data message. - [default: "ethrex 5.0.0"] + [default: "ethrex 6.0.0"] --builder.gas-limit Target block gas limit. @@ -310,7 +310,7 @@ Block building options: --builder.extra-data Block extra data message. - [default: "ethrex 5.0.0"] + [default: "ethrex 6.0.0"] --builder.gas-limit Target block gas limit.