Skip to content
Merged
39 changes: 33 additions & 6 deletions cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use ethrex_blockchain::{Blockchain, BlockchainType, L2Config};
use ethrex_common::fd_limit::raise_fd_limit;
use ethrex_common::types::fee_config::{FeeConfig, L1FeeConfig, OperatorFeeConfig};
use ethrex_common::{Address, types::DEFAULT_BUILDER_GAS_CEIL};
use ethrex_l2::sequencer::block_producer;
use ethrex_l2::sequencer::l1_committer;
use ethrex_l2::sequencer::l1_committer::regenerate_head_state;
use ethrex_p2p::{
discv4::peer_table::PeerTable,
Expand All @@ -24,6 +26,7 @@ use ethrex_storage::Store;
use ethrex_storage_rollup::{EngineTypeRollup, StoreRollup};
use eyre::OptionExt;
use secp256k1::SecretKey;
use spawned_concurrency::tasks::GenServerHandle;
use std::{fs::read_to_string, path::Path, sync::Arc, time::Duration};
use tokio::task::JoinSet;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
Expand Down Expand Up @@ -133,14 +136,36 @@ pub fn init_tracing(opts: &L2Options) -> Option<reload::Handle<EnvFilter, Regist
}
}

async fn shutdown_sequencer_handles(
committer_handle: Option<GenServerHandle<l1_committer::L1Committer>>,
block_producer_handle: Option<GenServerHandle<block_producer::BlockProducer>>,
) {
// These GenServers run via start_blocking, so aborting the JoinSet alone never stops them.
// Sending Abort elicits CastResponse::Stop and lets the blocking loop unwind cleanly.
if let Some(mut handle) = committer_handle {
handle
.cast(l1_committer::InMessage::Abort)
.await
.inspect_err(|err| warn!("Failed to send committer abort: {err:?}"))
.ok();
}
if let Some(mut handle) = block_producer_handle {
handle
.cast(block_producer::InMessage::Abort)
.await
.inspect_err(|err| warn!("Failed to send block producer abort: {err:?}"))
.ok();
}
}

pub async fn init_l2(
opts: L2Options,
log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>,
) -> eyre::Result<()> {
raise_fd_limit()?;

let datadir = opts.node_opts.datadir.clone();
init_datadir(&opts.node_opts.datadir);

let rollup_store_dir = datadir.join("rollup_store");

// Checkpoints are stored in the main datadir
Expand Down Expand Up @@ -280,14 +305,12 @@ pub async fn init_l2(
}

let sequencer_cancellation_token = CancellationToken::new();

let l2_url = Url::parse(&format!(
"http://{}:{}",
opts.node_opts.http_addr, opts.node_opts.http_port
))
.map_err(|err| eyre::eyre!("Failed to parse L2 RPC URL: {err}"))?;

let l2_sequencer = ethrex_l2::start_l2(
let (committer_handle, block_producer_handle, l2_sequencer) = ethrex_l2::start_l2(
store,
rollup_store,
blockchain,
Expand All @@ -297,15 +320,19 @@ pub async fn init_l2(
genesis,
checkpoints_dir,
)
.into_future();

.await?;
join_set.spawn(l2_sequencer);

tokio::select! {
_ = tokio::signal::ctrl_c() => {
shutdown_sequencer_handles(
committer_handle.clone(),
block_producer_handle.clone()
).await;
join_set.abort_all();
}
_ = sequencer_cancellation_token.cancelled() => {
shutdown_sequencer_handles(committer_handle.clone(), block_producer_handle.clone()).await;
}
}
info!("Server shut down started...");
Expand Down
35 changes: 22 additions & 13 deletions crates/l2/sequencer/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub enum CallMessage {
#[derive(Clone)]
pub enum InMessage {
Produce,
Abort,
}

#[derive(Clone)]
Expand Down Expand Up @@ -260,22 +261,30 @@ impl GenServer for BlockProducer {

async fn handle_cast(
&mut self,
_message: Self::CastMsg,
message: Self::CastMsg,
handle: &GenServerHandle<Self>,
) -> CastResponse {
// Right now we only have the Produce message, so we ignore the message
if let SequencerStatus::Sequencing = self.sequencer_state.status().await {
let _ = self
.produce_block()
.await
.inspect_err(|e| error!("Block Producer Error: {e}"));
match message {
InMessage::Produce => {
if let SequencerStatus::Sequencing = self.sequencer_state.status().await {
let _ = self
.produce_block()
.await
.inspect_err(|e| error!("Block Producer Error: {e}"));
}
send_after(
Duration::from_millis(self.block_time_ms),
handle.clone(),
Self::CastMsg::Produce,
);
CastResponse::NoReply
}
InMessage::Abort => {
// start_blocking keeps this GenServer alive even if the JoinSet aborts the task.
// Returning CastResponse::Stop is how the blocking runner actually shuts down.
CastResponse::Stop
}
}
send_after(
Duration::from_millis(self.block_time_ms),
handle.clone(),
Self::CastMsg::Produce,
);
CastResponse::NoReply
}

async fn handle_call(
Expand Down
43 changes: 29 additions & 14 deletions crates/l2/sequencer/l1_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub enum CallMessage {
#[derive(Clone)]
pub enum InMessage {
Commit,
Abort,
}

#[derive(Clone)]
Expand Down Expand Up @@ -1086,21 +1087,8 @@ impl L1Committer {
on_chain_proposer_address: self.on_chain_proposer_address,
})))
}
}

impl GenServer for L1Committer {
type CallMsg = CallMessage;
type CastMsg = InMessage;
type OutMsg = OutMessage;

type Error = CommitterError;

// Right now we only have the `Commit` message, so we ignore the `message` parameter
async fn handle_cast(
&mut self,
_message: Self::CastMsg,
handle: &GenServerHandle<Self>,
) -> CastResponse {
async fn handle_commit_message(&mut self, handle: &GenServerHandle<Self>) -> CastResponse {
if let SequencerStatus::Sequencing = self.sequencer_state.status().await {
let current_last_committed_batch =
get_last_committed_batch(&self.eth_client, self.on_chain_proposer_address)
Expand Down Expand Up @@ -1151,6 +1139,33 @@ impl GenServer for L1Committer {
self.schedule_commit(self.committer_wake_up_ms, handle.clone());
CastResponse::NoReply
}
}

impl GenServer for L1Committer {
type CallMsg = CallMessage;
type CastMsg = InMessage;
type OutMsg = OutMessage;

type Error = CommitterError;

// Right now we only have the `Commit` message, so we ignore the `message` parameter
async fn handle_cast(
&mut self,
message: Self::CastMsg,
handle: &GenServerHandle<Self>,
) -> CastResponse {
match message {
InMessage::Commit => self.handle_commit_message(handle).await,
InMessage::Abort => {
// start_blocking keeps the committer loop alive even if the JoinSet aborts the task.
// Returning CastResponse::Stop is what unblocks shutdown by ending that blocking loop.
if let Some(ct) = self.cancellation_token.take() {
ct.cancel()
};
CastResponse::Stop
}
}
}

async fn handle_call(
&mut self,
Expand Down
63 changes: 41 additions & 22 deletions crates/l2/sequencer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use l1_watcher::L1Watcher;
use metrics::MetricsGatherer;
use proof_coordinator::ProofCoordinator;
use reqwest::Url;
use spawned_concurrency::tasks::GenServerHandle;
use std::pin::Pin;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use utils::get_needed_proof_types;
Expand Down Expand Up @@ -49,7 +51,14 @@ pub async fn start_l2(
_l2_url: Url,
genesis: Genesis,
checkpoints_dir: PathBuf,
) -> Result<(), errors::SequencerError> {
) -> Result<
(
Option<GenServerHandle<L1Committer>>,
Option<GenServerHandle<BlockProducer>>,
Pin<Box<dyn Future<Output = Result<(), errors::SequencerError>> + Send>>,
),
errors::SequencerError,
> {
let initial_status = if cfg.based.enabled {
SequencerStatus::default()
} else {
Expand All @@ -76,7 +85,11 @@ pub async fn start_l2(
)
.await
.inspect_err(|e| error!("Error starting Sequencer: {e}")) else {
return Ok(());
return Ok((
None,
None,
Box::pin(async { Ok::<(), errors::SequencerError>(()) }),
));
};

if needed_proof_types.contains(&ProverType::TDX)
Expand All @@ -85,7 +98,11 @@ pub async fn start_l2(
error!(
"A private key for TDX is required. Please set the flag `--proof-coordinator.tdx-private-key <KEY>` or use the `ETHREX_PROOF_COORDINATOR_TDX_PRIVATE_KEY` environment variable to set the private key"
);
return Ok(());
return Ok((
None,
None,
Box::pin(async { Ok::<(), errors::SequencerError>(()) }),
));
}

let l1_watcher = L1Watcher::spawn(
Expand Down Expand Up @@ -120,7 +137,6 @@ pub async fn start_l2(
.inspect_err(|err| {
error!("Error starting Proof Coordinator: {err}");
});

let l1_proof_sender = L1ProofSender::spawn(
cfg.clone(),
shared_state.clone(),
Expand Down Expand Up @@ -196,15 +212,17 @@ pub async fn start_l2(
.await?;
}

let l1_committer_handle = l1_committer.ok();
let block_producer_handle = block_producer.ok();
let admin_server = start_api(
format!(
"{}:{}",
cfg.admin_server.listen_ip, cfg.admin_server.listen_port
),
l1_committer.ok(),
l1_committer_handle.clone(),
l1_watcher.ok(),
l1_proof_sender.ok(),
block_producer.ok(),
block_producer_handle.clone(),
#[cfg(feature = "metrics")]
metrics_gatherer.ok(),
)
Expand All @@ -214,26 +232,27 @@ pub async fn start_l2(
})
.ok();

match (verifier_handle, admin_server) {
(Some(handle), Some(admin_server)) => {
let (server_res, verifier_res) = tokio::join!(admin_server.into_future(), handle);
if let Err(e) = server_res {
error!("Admin server task error: {e}");
let driver = Box::pin(async move {
match (verifier_handle, admin_server) {
(Some(handle), Some(admin_server)) => {
let (server_res, verifier_res) = tokio::join!(admin_server.into_future(), handle);
if let Err(e) = server_res {
error!("Admin server task error: {e}");
}
handle_verifier_result(verifier_res).await;
}
handle_verifier_result(verifier_res).await;
}
(Some(handle), None) => {
handle_verifier_result(tokio::join!(handle).0).await;
}
(None, Some(admin_server)) => {
if let Err(e) = admin_server.into_future().await {
error!("Admin server task error: {e}");
(Some(handle), None) => handle_verifier_result(tokio::join!(handle).0).await,
(None, Some(admin_server)) => {
if let Err(e) = admin_server.into_future().await {
error!("Admin server task error: {e}");
}
}
(None, None) => {}
}
(None, None) => {}
}

Ok(())
Ok(())
});
Ok((l1_committer_handle, block_producer_handle, driver))
}

async fn handle_verifier_result(res: Result<Result<(), SequencerError>, tokio::task::JoinError>) {
Expand Down