Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 79 additions & 78 deletions cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use ethrex_blockchain::{Blockchain, BlockchainType, L2Config};
use ethrex_common::fd_limit::raise_fd_limit;
use ethrex_common::types::fee_config::{FeeConfig, L1FeeConfig, OperatorFeeConfig};
use ethrex_common::{Address, types::DEFAULT_BUILDER_GAS_CEIL};
use ethrex_l2::SequencerConfig;
use ethrex_l2::sequencer::l1_committer::regenerate_head_state;
use ethrex_p2p::{
discv4::peer_table::PeerTable,
Expand All @@ -23,11 +22,12 @@ use ethrex_p2p::{
};
use ethrex_storage::Store;
use ethrex_storage_rollup::{EngineTypeRollup, StoreRollup};
use eyre::OptionExt;
use secp256k1::SecretKey;
use std::{fs::read_to_string, path::Path, sync::Arc, time::Duration};
use tokio::task::JoinSet;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::{error, info, warn};
use tracing::{info, warn};
use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, reload};
use tui_logger::{LevelFilter, TuiTracingSubscriberLayer};
use url::Url;
Expand All @@ -36,30 +36,19 @@ use url::Url;
async fn init_rpc_api(
opts: &L1Options,
l2_opts: &L2Options,
peer_handler: PeerHandler,
peer_handler: Option<PeerHandler>,
local_p2p_node: Node,
local_node_record: NodeRecord,
store: Store,
blockchain: Arc<Blockchain>,
cancel_token: CancellationToken,
syncer: Option<Arc<SyncManager>>,
tracker: TaskTracker,
rollup_store: StoreRollup,
log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>,
gas_ceil: Option<u64>,
) {
init_datadir(&opts.datadir);

// Create SyncManager
let syncer = SyncManager::new(
peer_handler.clone(),
&opts.syncmode,
cancel_token,
blockchain.clone(),
store.clone(),
opts.datadir.clone(),
)
.await;

let rpc_api = ethrex_l2_rpc::start_api(
get_http_socket_addr(opts),
get_authrpc_socket_addr(opts),
Expand Down Expand Up @@ -197,46 +186,77 @@ pub async fn init_l2(

let local_node_record = get_local_node_record(&datadir, &local_p2p_node, &signer);

let peer_table = PeerTable::spawn(opts.node_opts.target_peers);

// TODO: Check every module starts properly.
let tracker = TaskTracker::new();
let mut join_set = JoinSet::new();

let p2p_context = P2PContext::new(
local_p2p_node.clone(),
tracker.clone(),
signer,
peer_table.clone(),
store.clone(),
blockchain.clone(),
get_client_version(),
#[cfg(feature = "l2")]
Some(P2PBasedContext {
store_rollup: rollup_store.clone(),
// TODO: The Web3Signer refactor introduced a limitation where the committer key cannot be accessed directly because the signer could be either Local or Remote.
// The Signer enum cannot be used in the P2PBasedContext struct due to cyclic dependencies between the l2-rpc and p2p crates.
// As a temporary solution, a dummy committer key is used until a proper mechanism to utilize the Signer enum is implemented.
// This should be replaced with the Signer enum once the refactor is complete.
committer_key: Arc::new(
SecretKey::from_slice(
&hex::decode(
"385c546456b6a603a1cfcaa9ec9494ba4832da08dd6bcf4de9a71e4a01b74924",
let cancel_token = tokio_util::sync::CancellationToken::new();

let based = opts.sequencer_opts.based;

let (peer_handler, syncer) = if based {
let peer_table = PeerTable::spawn(opts.node_opts.target_peers);
let p2p_context = P2PContext::new(
local_p2p_node.clone(),
tracker.clone(),
signer,
peer_table.clone(),
store.clone(),
blockchain.clone(),
get_client_version(),
#[cfg(feature = "l2")]
Some(P2PBasedContext {
store_rollup: rollup_store.clone(),
// TODO: The Web3Signer refactor introduced a limitation where the committer key cannot be accessed directly because the signer could be either Local or Remote.
// The Signer enum cannot be used in the P2PBasedContext struct due to cyclic dependencies between the l2-rpc and p2p crates.
// As a temporary solution, a dummy committer key is used until a proper mechanism to utilize the Signer enum is implemented.
// This should be replaced with the Signer enum once the refactor is complete.
committer_key: Arc::new(
SecretKey::from_slice(
&hex::decode(
"385c546456b6a603a1cfcaa9ec9494ba4832da08dd6bcf4de9a71e4a01b74924",
)
.expect("Invalid committer key"),
)
.expect("Invalid committer key"),
)
.expect("Failed to create committer key"),
),
}),
opts.node_opts.tx_broadcasting_time_interval,
)
.await
.expect("P2P context could not be created");
.expect("Failed to create committer key"),
),
}),
opts.node_opts.tx_broadcasting_time_interval,
)
.await
.expect("P2P context could not be created");
let initiator = RLPxInitiator::spawn(p2p_context.clone()).await;
let peer_handler = PeerHandler::new(peer_table, initiator);

let initiator = RLPxInitiator::spawn(p2p_context.clone()).await;
let peer_handler = PeerHandler::new(peer_table, initiator);
// Create SyncManager
let syncer = SyncManager::new(
peer_handler.clone(),
&opts.node_opts.syncmode,
cancel_token.clone(),
blockchain.clone(),
store.clone(),
opts.node_opts.datadir.clone(),
)
.await;

let cancel_token = tokio_util::sync::CancellationToken::new();
// TODO: This should be handled differently, the current problem
// with using opts.node_opts.p2p_disabled is that with the removal
// of the l2 feature flag, p2p_disabled is set to false by default
// prioritizing the L1 UX.
init_network(
&opts.node_opts,
&network,
&datadir,
peer_handler.clone(),
tracker.clone(),
blockchain.clone(),
p2p_context,
)
.await;
(Some(peer_handler), Some(Arc::new(syncer)))
} else {
(None, None)
};

init_rpc_api(
&opts.node_opts,
Expand All @@ -246,7 +266,7 @@ pub async fn init_l2(
local_node_record.clone(),
store.clone(),
blockchain.clone(),
cancel_token.clone(),
syncer,
tracker.clone(),
rollup_store.clone(),
log_filter_handler,
Expand All @@ -256,32 +276,10 @@ pub async fn init_l2(

// Initialize metrics if enabled
if opts.node_opts.metrics_enabled {
init_metrics(&opts.node_opts, tracker.clone());
init_metrics(&opts.node_opts, tracker);
}

let l2_sequencer_cfg = SequencerConfig::try_from(opts.sequencer_opts).inspect_err(|err| {
error!("{err}");
})?;
let cancellation_token = CancellationToken::new();

// TODO: This should be handled differently, the current problem
// with using opts.node_opts.p2p_diabled is that with the removal
// of the l2 feature flag, p2p_diabled is set to false by default
// prioritizing the L1 UX.
if l2_sequencer_cfg.based.enabled {
init_network(
&opts.node_opts,
&network,
&datadir,
peer_handler.clone(),
tracker,
blockchain.clone(),
p2p_context,
)
.await;
} else {
info!("P2P is disabled");
}
let sequencer_cancellation_token = CancellationToken::new();

let l2_url = Url::parse(&format!(
"http://{}:{}",
Expand All @@ -293,8 +291,8 @@ pub async fn init_l2(
store,
rollup_store,
blockchain,
l2_sequencer_cfg,
cancellation_token.clone(),
opts.sequencer_opts.try_into()?,
sequencer_cancellation_token.clone(),
l2_url,
genesis,
checkpoints_dir,
Expand All @@ -307,15 +305,18 @@ pub async fn init_l2(
_ = tokio::signal::ctrl_c() => {
join_set.abort_all();
}
_ = cancellation_token.cancelled() => {
_ = sequencer_cancellation_token.cancelled() => {
}
}
info!("Server shut down started...");
let node_config_path = datadir.join("node_config.json");
info!(path = %node_config_path.display(), "Storing node config");
cancel_token.cancel();
let node_config = NodeConfigFile::new(peer_handler.peer_table, local_node_record).await;
store_node_config_file(node_config, node_config_path).await;
if based {
let peer_handler = peer_handler.ok_or_eyre("Peer handler not initialized")?;
let node_config = NodeConfigFile::new(peer_handler.peer_table, local_node_record).await;
store_node_config_file(node_config, node_config_path).await;
}
tokio::time::sleep(Duration::from_secs(1)).await;
info!("Server shutting down!");
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions crates/l2/networking/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ pub async fn start_api(
jwt_secret: Bytes,
local_p2p_node: Node,
local_node_record: NodeRecord,
syncer: SyncManager,
peer_handler: PeerHandler,
syncer: Option<Arc<SyncManager>>,
peer_handler: Option<PeerHandler>,
client_version: String,
valid_delegation_addresses: Vec<Address>,
sponsor_pk: SecretKey,
Expand All @@ -94,7 +94,7 @@ pub async fn start_api(
storage,
blockchain,
active_filters: active_filters.clone(),
syncer: Arc::new(syncer),
syncer,
peer_handler,
node_data: NodeData {
jwt_secret,
Expand Down
15 changes: 11 additions & 4 deletions crates/networking/rpc/admin/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@ impl From<PeerData> for RpcPeer {
}

pub async fn peers(context: &mut RpcApiContext) -> Result<Value, RpcErr> {
let peers = context
.peer_handler
let Some(peer_handler) = &mut context.peer_handler else {
return Err(RpcErr::Internal("Peer handler not initialized".to_string()));
};

let peers = peer_handler
.read_connected_peers()
.await
.into_iter()
.map(RpcPeer::from)
.collect::<Vec<_>>();

Ok(serde_json::to_value(peers)?)
}

Expand All @@ -116,7 +120,10 @@ fn parse(request: &RpcRequest) -> Result<Node, RpcErr> {
}

pub async fn add_peer(context: &mut RpcApiContext, request: &RpcRequest) -> Result<Value, RpcErr> {
let mut server = context.peer_handler.initiator.clone();
let Some(peer_handler) = context.peer_handler.as_mut() else {
return Err(RpcErr::Internal("Peer handler not initialized".to_string()));
};
let mut server = peer_handler.initiator.clone();
let node = parse(request)?;

let start = Instant::now();
Expand All @@ -128,7 +135,7 @@ pub async fn add_peer(context: &mut RpcApiContext, request: &RpcRequest) -> Resu
// This loop is necessary because connections are asynchronous, so to check if the connection with the peer was actually
// established we need to wait.
loop {
if peer_is_connected(&mut context.peer_handler, &node.enode_url()).await {
if peer_is_connected(peer_handler, &node.enode_url()).await {
return Ok(serde_json::to_value(true)?);
}

Expand Down
9 changes: 6 additions & 3 deletions crates/networking/rpc/engine/fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ async fn handle_forkchoice(
context: RpcApiContext,
version: usize,
) -> Result<(Option<BlockHeader>, ForkChoiceResponse), RpcErr> {
let Some(syncer) = &context.syncer else {
return Err(RpcErr::Internal(
"Fork choice requested but syncer is not initialized".to_string(),
));
};
info!(
version = %format!("v{}", version),
head = %format!("{:#x}", fork_choice_state.head_block_hash),
Expand Down Expand Up @@ -278,9 +283,7 @@ async fn handle_forkchoice(
),
InvalidForkChoice::Syncing => {
// Start sync
context
.syncer
.sync_to_head(fork_choice_state.head_block_hash);
syncer.sync_to_head(fork_choice_state.head_block_hash);
ForkChoiceResponse::from(PayloadStatus::syncing())
}
InvalidForkChoice::Disconnected(_, _) | InvalidForkChoice::ElementNotFound(_) => {
Expand Down
14 changes: 12 additions & 2 deletions crates/networking/rpc/engine/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,11 @@ async fn handle_new_payload_v1_v2(
block: Block,
context: RpcApiContext,
) -> Result<PayloadStatus, RpcErr> {
let Some(syncer) = &context.syncer else {
return Err(RpcErr::Internal(
"New payload requested but syncer is not initialized".to_string(),
));
};
// Validate block hash
if let Err(RpcErr::Internal(error_msg)) = validate_block_hash(payload, &block) {
return Ok(PayloadStatus::invalid_with_err(&error_msg));
Expand All @@ -604,7 +609,7 @@ async fn handle_new_payload_v1_v2(
// We have validated ancestors, the parent is correct
let latest_valid_hash = block.header.parent_hash;

if context.syncer.sync_mode() == SyncMode::Snap {
if syncer.sync_mode() == SyncMode::Snap {
debug!("Snap sync in progress, skipping new payload validation");
return Ok(PayloadStatus::syncing());
}
Expand Down Expand Up @@ -698,6 +703,11 @@ async fn try_execute_payload(
context: &RpcApiContext,
latest_valid_hash: H256,
) -> Result<PayloadStatus, RpcErr> {
let Some(syncer) = &context.syncer else {
return Err(RpcErr::Internal(
"New payload requested but syncer is not initialized".to_string(),
));
};
let block_hash = block.hash();
let block_number = block.header.number;
let storage = &context.storage;
Expand All @@ -714,7 +724,7 @@ async fn try_execute_payload(
match add_block(context, block).await {
Err(ChainError::ParentNotFound) => {
// Start sync
context.syncer.sync_to_head(block_hash);
syncer.sync_to_head(block_hash);
Ok(PayloadStatus::syncing())
}
// Under the current implementation this is not possible: we always calculate the state
Expand Down
8 changes: 6 additions & 2 deletions crates/networking/rpc/eth/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,18 @@ impl RpcHandler for Syncing {
}

async fn handle(&self, context: RpcApiContext) -> Result<Value, RpcErr> {
let Some(syncer) = &context.syncer else {
return Err(RpcErr::Internal(
"Syncing status requested but syncer is not initialized".to_string(),
));
};
if context.blockchain.is_synced() {
Ok(Value::Bool(!context.blockchain.is_synced()))
} else {
let syncing_status = SyncingStatusRpc {
starting_block: context.storage.get_earliest_block_number().await?,
current_block: context.storage.get_latest_block_number().await?,
highest_block: context
.syncer
highest_block: syncer
.get_last_fcu_head()
.map_err(|error| RpcErr::Internal(error.to_string()))?
.to_low_u64_be(),
Expand Down
Loading
Loading