-
Notifications
You must be signed in to change notification settings - Fork 155
fix(l2): shutdown node with ctrl+c #5208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
f588d8e
d2d77e1
66a69eb
c1843c4
3c180ca
e7c1d6e
11d514d
c3b8088
1c41453
7b686d5
bde0e8e
8a164a0
57da92d
cc0fd39
5788c22
52e78bb
9c15a20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,8 @@ use ethrex_common::fd_limit::raise_fd_limit; | |
| use ethrex_common::types::fee_config::{FeeConfig, L1FeeConfig, OperatorFeeConfig}; | ||
| use ethrex_common::{Address, types::DEFAULT_BUILDER_GAS_CEIL}; | ||
| use ethrex_l2::SequencerConfig; | ||
| use ethrex_l2::sequencer::block_producer; | ||
| use ethrex_l2::sequencer::l1_committer; | ||
| use ethrex_l2::sequencer::l1_committer::regenerate_head_state; | ||
| use ethrex_p2p::{ | ||
| discv4::peer_table::PeerTable, | ||
|
|
@@ -149,9 +151,9 @@ pub async fn init_l2( | |
| log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>, | ||
| ) -> eyre::Result<()> { | ||
| raise_fd_limit()?; | ||
|
|
||
| let datadir = opts.node_opts.datadir.clone(); | ||
| init_datadir(&opts.node_opts.datadir); | ||
|
|
||
| let rollup_store_dir = datadir.join("rollup_store"); | ||
|
|
||
| // Checkpoints are stored in the main datadir | ||
|
|
@@ -282,14 +284,12 @@ pub async fn init_l2( | |
| } else { | ||
| info!("P2P is disabled"); | ||
| } | ||
|
|
||
| let l2_url = Url::parse(&format!( | ||
| "http://{}:{}", | ||
| opts.node_opts.http_addr, opts.node_opts.http_port | ||
| )) | ||
| .map_err(|err| eyre::eyre!("Failed to parse L2 RPC URL: {err}"))?; | ||
|
|
||
| let l2_sequencer = ethrex_l2::start_l2( | ||
| let (committer_handle, block_producer_handle, l2_sequencer) = ethrex_l2::start_l2( | ||
| store, | ||
| rollup_store, | ||
| blockchain, | ||
|
|
@@ -299,15 +299,43 @@ pub async fn init_l2( | |
| genesis, | ||
| checkpoints_dir, | ||
| ) | ||
| .into_future(); | ||
|
|
||
| .await?; | ||
| join_set.spawn(l2_sequencer); | ||
|
|
||
| tokio::select! { | ||
| _ = tokio::signal::ctrl_c() => { | ||
| if let Some(mut handle) = committer_handle.clone() { | ||
| handle | ||
| .cast(l1_committer::InMessage::Abort) | ||
| .await | ||
| .inspect_err(|err| warn!("Failed to send committer abort: {err:?}")) | ||
| .ok(); | ||
| } | ||
| if let Some(mut handle) = block_producer_handle.clone() { | ||
| handle | ||
| .cast(block_producer::InMessage::Abort) | ||
| .await | ||
| .inspect_err(|err| warn!("Failed to send block producer abort: {err:?}")) | ||
| .ok(); | ||
| } | ||
|
||
| join_set.abort_all(); | ||
| } | ||
|
|
||
| _ = cancellation_token.cancelled() => { | ||
| if let Some(mut handle) = committer_handle.clone() { | ||
| handle | ||
| .cast(l1_committer::InMessage::Abort) | ||
| .await | ||
| .inspect_err(|err| warn!("Failed to send committer abort: {err:?}")) | ||
| .ok(); | ||
| } | ||
| if let Some(mut handle) = block_producer_handle.clone() { | ||
| handle | ||
| .cast(block_producer::InMessage::Abort) | ||
| .await | ||
| .inspect_err(|err| warn!("Failed to send block producer abort: {err:?}")) | ||
| .ok(); | ||
| } | ||
| } | ||
| } | ||
| info!("Server shut down started..."); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,6 +81,7 @@ pub enum CallMessage { | |
| #[derive(Clone)] | ||
| pub enum InMessage { | ||
| Commit, | ||
| Abort, | ||
| } | ||
|
|
||
| #[derive(Clone)] | ||
|
|
@@ -1098,58 +1099,69 @@ impl GenServer for L1Committer { | |
| // Right now we only have the `Commit` message, so we ignore the `message` parameter | ||
| async fn handle_cast( | ||
| &mut self, | ||
| _message: Self::CastMsg, | ||
| message: Self::CastMsg, | ||
| handle: &GenServerHandle<Self>, | ||
| ) -> CastResponse { | ||
| if let SequencerStatus::Sequencing = self.sequencer_state.status().await { | ||
| let current_last_committed_batch = | ||
| get_last_committed_batch(&self.eth_client, self.on_chain_proposer_address) | ||
| .await | ||
| .unwrap_or(self.last_committed_batch); | ||
| let Some(current_time) = utils::system_now_ms() else { | ||
| self.schedule_commit(self.committer_wake_up_ms, handle.clone()); | ||
| return CastResponse::NoReply; | ||
| }; | ||
|
|
||
| // In the event that the current batch in L1 is greater than the one we have recorded we shouldn't send a new batch | ||
| if current_last_committed_batch > self.last_committed_batch { | ||
| info!( | ||
| l1_batch = current_last_committed_batch, | ||
| last_batch_registered = self.last_committed_batch, | ||
| "Committer was not aware of new L1 committed batches, updating internal state accordingly" | ||
| ); | ||
| self.last_committed_batch = current_last_committed_batch; | ||
| self.last_committed_batch_timestamp = current_time; | ||
| match message { | ||
| InMessage::Commit => { | ||
|
||
| if let SequencerStatus::Sequencing = self.sequencer_state.status().await { | ||
| let current_last_committed_batch = | ||
| get_last_committed_batch(&self.eth_client, self.on_chain_proposer_address) | ||
| .await | ||
| .unwrap_or(self.last_committed_batch); | ||
| let Some(current_time) = utils::system_now_ms() else { | ||
| self.schedule_commit(self.committer_wake_up_ms, handle.clone()); | ||
| return CastResponse::NoReply; | ||
| }; | ||
|
|
||
| // In the event that the current batch in L1 is greater than the one we have recorded we shouldn't send a new batch | ||
| if current_last_committed_batch > self.last_committed_batch { | ||
| info!( | ||
| l1_batch = current_last_committed_batch, | ||
| last_batch_registered = self.last_committed_batch, | ||
| "Committer was not aware of new L1 committed batches, updating internal state accordingly" | ||
| ); | ||
| self.last_committed_batch = current_last_committed_batch; | ||
| self.last_committed_batch_timestamp = current_time; | ||
| self.schedule_commit(self.committer_wake_up_ms, handle.clone()); | ||
| return CastResponse::NoReply; | ||
| } | ||
|
|
||
| let commit_time: u128 = self.commit_time_ms.into(); | ||
| let should_send_commitment = | ||
| current_time - self.last_committed_batch_timestamp > commit_time; | ||
|
|
||
| debug!( | ||
| last_committed_batch_at = self.last_committed_batch_timestamp, | ||
| will_send_commitment = should_send_commitment, | ||
| last_committed_batch = self.last_committed_batch, | ||
| "Committer woke up" | ||
| ); | ||
|
|
||
| #[expect(clippy::collapsible_if)] | ||
| if should_send_commitment { | ||
| if self | ||
| .commit_next_batch_to_l1() | ||
| .await | ||
| .inspect_err(|e| error!("L1 Committer Error: {e}")) | ||
| .is_ok() | ||
| { | ||
| self.last_committed_batch_timestamp = | ||
| system_now_ms().unwrap_or(current_time); | ||
| self.last_committed_batch = current_last_committed_batch + 1; | ||
| } | ||
| } | ||
| } | ||
| self.schedule_commit(self.committer_wake_up_ms, handle.clone()); | ||
| return CastResponse::NoReply; | ||
| CastResponse::NoReply | ||
| } | ||
|
|
||
| let commit_time: u128 = self.commit_time_ms.into(); | ||
| let should_send_commitment = | ||
| current_time - self.last_committed_batch_timestamp > commit_time; | ||
|
|
||
| debug!( | ||
| last_committed_batch_at = self.last_committed_batch_timestamp, | ||
| will_send_commitment = should_send_commitment, | ||
| last_committed_batch = self.last_committed_batch, | ||
| "Committer woke up" | ||
| ); | ||
|
|
||
| #[expect(clippy::collapsible_if)] | ||
| if should_send_commitment { | ||
| if self | ||
| .commit_next_batch_to_l1() | ||
| .await | ||
| .inspect_err(|e| error!("L1 Committer Error: {e}")) | ||
| .is_ok() | ||
| { | ||
| self.last_committed_batch_timestamp = system_now_ms().unwrap_or(current_time); | ||
| self.last_committed_batch = current_last_committed_batch + 1; | ||
| } | ||
| InMessage::Abort => { | ||
| if let Some(ct) = self.cancellation_token.take() { | ||
| ct.cancel() | ||
| }; | ||
| CastResponse::Stop | ||
| } | ||
| } | ||
| self.schedule_commit(self.committer_wake_up_ms, handle.clone()); | ||
| CastResponse::NoReply | ||
| } | ||
|
|
||
| async fn handle_call( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.