Skip to content
Merged
38 changes: 32 additions & 6 deletions cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use ethrex_common::fd_limit::raise_fd_limit;
use ethrex_common::types::fee_config::{FeeConfig, L1FeeConfig, OperatorFeeConfig};
use ethrex_common::{Address, types::DEFAULT_BUILDER_GAS_CEIL};
use ethrex_l2::SequencerConfig;
use ethrex_l2::sequencer::block_producer;
use ethrex_l2::sequencer::l1_committer;
use ethrex_l2::sequencer::l1_committer::regenerate_head_state;
use ethrex_p2p::{
discv4::peer_table::PeerTable,
Expand All @@ -24,6 +26,7 @@ use ethrex_p2p::{
use ethrex_storage::Store;
use ethrex_storage_rollup::{EngineTypeRollup, StoreRollup};
use secp256k1::SecretKey;
use spawned_concurrency::tasks::GenServerHandle;
use std::{fs::read_to_string, path::Path, sync::Arc, time::Duration};
use tokio::task::JoinSet;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
Expand Down Expand Up @@ -144,14 +147,34 @@ pub fn init_tracing(opts: &L2Options) -> Option<reload::Handle<EnvFilter, Regist
}
}

async fn shutdown_sequencer_handles(
committer_handle: Option<GenServerHandle<l1_committer::L1Committer>>,
block_producer_handle: Option<GenServerHandle<block_producer::BlockProducer>>,
) {
if let Some(mut handle) = committer_handle {
handle
.cast(l1_committer::InMessage::Abort)
.await
.inspect_err(|err| warn!("Failed to send committer abort: {err:?}"))
.ok();
}
if let Some(mut handle) = block_producer_handle {
handle
.cast(block_producer::InMessage::Abort)
.await
.inspect_err(|err| warn!("Failed to send block producer abort: {err:?}"))
.ok();
}
}

pub async fn init_l2(
opts: L2Options,
log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>,
) -> eyre::Result<()> {
raise_fd_limit()?;

let datadir = opts.node_opts.datadir.clone();
init_datadir(&opts.node_opts.datadir);

let rollup_store_dir = datadir.join("rollup_store");

// Checkpoints are stored in the main datadir
Expand Down Expand Up @@ -282,14 +305,12 @@ pub async fn init_l2(
} else {
info!("P2P is disabled");
}

let l2_url = Url::parse(&format!(
"http://{}:{}",
opts.node_opts.http_addr, opts.node_opts.http_port
))
.map_err(|err| eyre::eyre!("Failed to parse L2 RPC URL: {err}"))?;

let l2_sequencer = ethrex_l2::start_l2(
let (committer_handle, block_producer_handle, l2_sequencer) = ethrex_l2::start_l2(
store,
rollup_store,
blockchain,
Expand All @@ -299,15 +320,20 @@ pub async fn init_l2(
genesis,
checkpoints_dir,
)
.into_future();

.await?;
join_set.spawn(l2_sequencer);

tokio::select! {
_ = tokio::signal::ctrl_c() => {
shutdown_sequencer_handles(
committer_handle.clone(),
block_producer_handle.clone()
).await;
join_set.abort_all();
}

_ = cancellation_token.cancelled() => {
shutdown_sequencer_handles(committer_handle.clone(), block_producer_handle.clone()).await;
}
}
info!("Server shut down started...");
Expand Down
31 changes: 18 additions & 13 deletions crates/l2/sequencer/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub enum CallMessage {
#[derive(Clone)]
pub enum InMessage {
Produce,
Abort,
}

#[derive(Clone)]
Expand Down Expand Up @@ -260,22 +261,26 @@ impl GenServer for BlockProducer {

async fn handle_cast(
&mut self,
_message: Self::CastMsg,
message: Self::CastMsg,
handle: &GenServerHandle<Self>,
) -> CastResponse {
// Right now we only have the Produce message, so we ignore the message
if let SequencerStatus::Sequencing = self.sequencer_state.status().await {
let _ = self
.produce_block()
.await
.inspect_err(|e| error!("Block Producer Error: {e}"));
match message {
InMessage::Produce => {
if let SequencerStatus::Sequencing = self.sequencer_state.status().await {
let _ = self
.produce_block()
.await
.inspect_err(|e| error!("Block Producer Error: {e}"));
}
send_after(
Duration::from_millis(self.block_time_ms),
handle.clone(),
Self::CastMsg::Produce,
);
CastResponse::NoReply
}
InMessage::Abort => CastResponse::Stop,
}
send_after(
Duration::from_millis(self.block_time_ms),
handle.clone(),
Self::CastMsg::Produce,
);
CastResponse::NoReply
}

async fn handle_call(
Expand Down
104 changes: 58 additions & 46 deletions crates/l2/sequencer/l1_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub enum CallMessage {
#[derive(Clone)]
pub enum InMessage {
Commit,
Abort,
}

#[derive(Clone)]
Expand Down Expand Up @@ -1098,58 +1099,69 @@ impl GenServer for L1Committer {
// Right now we only have the `Commit` message, so we ignore the `message` parameter
async fn handle_cast(
&mut self,
_message: Self::CastMsg,
message: Self::CastMsg,
handle: &GenServerHandle<Self>,
) -> CastResponse {
if let SequencerStatus::Sequencing = self.sequencer_state.status().await {
let current_last_committed_batch =
get_last_committed_batch(&self.eth_client, self.on_chain_proposer_address)
.await
.unwrap_or(self.last_committed_batch);
let Some(current_time) = utils::system_now_ms() else {
self.schedule_commit(self.committer_wake_up_ms, handle.clone());
return CastResponse::NoReply;
};

// In the event that the current batch in L1 is greater than the one we have recorded we shouldn't send a new batch
if current_last_committed_batch > self.last_committed_batch {
info!(
l1_batch = current_last_committed_batch,
last_batch_registered = self.last_committed_batch,
"Committer was not aware of new L1 committed batches, updating internal state accordingly"
);
self.last_committed_batch = current_last_committed_batch;
self.last_committed_batch_timestamp = current_time;
match message {
InMessage::Commit => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can move the handling of this message to its own function for clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done cc0fd39

if let SequencerStatus::Sequencing = self.sequencer_state.status().await {
let current_last_committed_batch =
get_last_committed_batch(&self.eth_client, self.on_chain_proposer_address)
.await
.unwrap_or(self.last_committed_batch);
let Some(current_time) = utils::system_now_ms() else {
self.schedule_commit(self.committer_wake_up_ms, handle.clone());
return CastResponse::NoReply;
};

// In the event that the current batch in L1 is greater than the one we have recorded we shouldn't send a new batch
if current_last_committed_batch > self.last_committed_batch {
info!(
l1_batch = current_last_committed_batch,
last_batch_registered = self.last_committed_batch,
"Committer was not aware of new L1 committed batches, updating internal state accordingly"
);
self.last_committed_batch = current_last_committed_batch;
self.last_committed_batch_timestamp = current_time;
self.schedule_commit(self.committer_wake_up_ms, handle.clone());
return CastResponse::NoReply;
}

let commit_time: u128 = self.commit_time_ms.into();
let should_send_commitment =
current_time - self.last_committed_batch_timestamp > commit_time;

debug!(
last_committed_batch_at = self.last_committed_batch_timestamp,
will_send_commitment = should_send_commitment,
last_committed_batch = self.last_committed_batch,
"Committer woke up"
);

#[expect(clippy::collapsible_if)]
if should_send_commitment {
if self
.commit_next_batch_to_l1()
.await
.inspect_err(|e| error!("L1 Committer Error: {e}"))
.is_ok()
{
self.last_committed_batch_timestamp =
system_now_ms().unwrap_or(current_time);
self.last_committed_batch = current_last_committed_batch + 1;
}
}
}
self.schedule_commit(self.committer_wake_up_ms, handle.clone());
return CastResponse::NoReply;
CastResponse::NoReply
}

let commit_time: u128 = self.commit_time_ms.into();
let should_send_commitment =
current_time - self.last_committed_batch_timestamp > commit_time;

debug!(
last_committed_batch_at = self.last_committed_batch_timestamp,
will_send_commitment = should_send_commitment,
last_committed_batch = self.last_committed_batch,
"Committer woke up"
);

#[expect(clippy::collapsible_if)]
if should_send_commitment {
if self
.commit_next_batch_to_l1()
.await
.inspect_err(|e| error!("L1 Committer Error: {e}"))
.is_ok()
{
self.last_committed_batch_timestamp = system_now_ms().unwrap_or(current_time);
self.last_committed_batch = current_last_committed_batch + 1;
}
InMessage::Abort => {
if let Some(ct) = self.cancellation_token.take() {
ct.cancel()
};
CastResponse::Stop
}
}
self.schedule_commit(self.committer_wake_up_ms, handle.clone());
CastResponse::NoReply
}

async fn handle_call(
Expand Down
63 changes: 41 additions & 22 deletions crates/l2/sequencer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use l1_watcher::L1Watcher;
use metrics::MetricsGatherer;
use proof_coordinator::ProofCoordinator;
use reqwest::Url;
use spawned_concurrency::tasks::GenServerHandle;
use std::pin::Pin;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use utils::get_needed_proof_types;
Expand Down Expand Up @@ -49,7 +51,14 @@ pub async fn start_l2(
_l2_url: Url,
genesis: Genesis,
checkpoints_dir: PathBuf,
) -> Result<(), errors::SequencerError> {
) -> Result<
(
Option<GenServerHandle<L1Committer>>,
Option<GenServerHandle<BlockProducer>>,
Pin<Box<dyn Future<Output = Result<(), errors::SequencerError>> + Send>>,
),
errors::SequencerError,
> {
let initial_status = if cfg.based.enabled {
SequencerStatus::default()
} else {
Expand All @@ -76,7 +85,11 @@ pub async fn start_l2(
)
.await
.inspect_err(|e| error!("Error starting Sequencer: {e}")) else {
return Ok(());
return Ok((
None,
None,
Box::pin(async { Ok::<(), errors::SequencerError>(()) }),
));
};

if needed_proof_types.contains(&ProverType::TDX)
Expand All @@ -85,7 +98,11 @@ pub async fn start_l2(
error!(
"A private key for TDX is required. Please set the flag `--proof-coordinator.tdx-private-key <KEY>` or use the `ETHREX_PROOF_COORDINATOR_TDX_PRIVATE_KEY` environment variable to set the private key"
);
return Ok(());
return Ok((
None,
None,
Box::pin(async { Ok::<(), errors::SequencerError>(()) }),
));
}

let l1_watcher = L1Watcher::spawn(
Expand Down Expand Up @@ -120,7 +137,6 @@ pub async fn start_l2(
.inspect_err(|err| {
error!("Error starting Proof Coordinator: {err}");
});

let l1_proof_sender = L1ProofSender::spawn(
cfg.clone(),
shared_state.clone(),
Expand Down Expand Up @@ -196,15 +212,17 @@ pub async fn start_l2(
.await?;
}

let l1_committer_handle = l1_committer.ok();
let block_producer_handle = block_producer.ok();
let admin_server = start_api(
format!(
"{}:{}",
cfg.admin_server.listen_ip, cfg.admin_server.listen_port
),
l1_committer.ok(),
l1_committer_handle.clone(),
l1_watcher.ok(),
l1_proof_sender.ok(),
block_producer.ok(),
block_producer_handle.clone(),
#[cfg(feature = "metrics")]
metrics_gatherer.ok(),
)
Expand All @@ -214,26 +232,27 @@ pub async fn start_l2(
})
.ok();

match (verifier_handle, admin_server) {
(Some(handle), Some(admin_server)) => {
let (server_res, verifier_res) = tokio::join!(admin_server.into_future(), handle);
if let Err(e) = server_res {
error!("Admin server task error: {e}");
let driver = Box::pin(async move {
match (verifier_handle, admin_server) {
(Some(handle), Some(admin_server)) => {
let (server_res, verifier_res) = tokio::join!(admin_server.into_future(), handle);
if let Err(e) = server_res {
error!("Admin server task error: {e}");
}
handle_verifier_result(verifier_res).await;
}
handle_verifier_result(verifier_res).await;
}
(Some(handle), None) => {
handle_verifier_result(tokio::join!(handle).0).await;
}
(None, Some(admin_server)) => {
if let Err(e) = admin_server.into_future().await {
error!("Admin server task error: {e}");
(Some(handle), None) => handle_verifier_result(tokio::join!(handle).0).await,
(None, Some(admin_server)) => {
if let Err(e) = admin_server.into_future().await {
error!("Admin server task error: {e}");
}
}
(None, None) => {}
}
(None, None) => {}
}

Ok(())
Ok(())
});
Ok((l1_committer_handle, block_producer_handle, driver))
}

async fn handle_verifier_result(res: Result<Result<(), SequencerError>, tokio::task::JoinError>) {
Expand Down
Loading
Loading