diff --git a/Cargo.lock b/Cargo.lock index 4f53bbf1eb6..aa42fa01714 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10762,9 +10762,9 @@ dependencies = [ [[package]] name = "spawned-concurrency" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa00b753ef7c942c13ee953f13609746a41c0fb8cf221849bbf3f654811a6669" +checksum = "c34171afa9ef68114f07b4be6cf294f49a388380b62a036010b31534ecc0f927" dependencies = [ "futures", "pin-project-lite", @@ -10775,9 +10775,9 @@ dependencies = [ [[package]] name = "spawned-rt" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42396ff1bc8bfdcad31f099a3af74b4830fb7bdc09a70d843dcfa8bab74ecea4" +checksum = "b83e35512b851b51d27a6dcee6025da1c61e7d842e2ec79971124eab2be3e974" dependencies = [ "crossbeam 0.7.3", "tokio", diff --git a/Cargo.toml b/Cargo.toml index eedb1eaeb12..00d95287098 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -114,8 +114,8 @@ url = { version = "2.5.4", features = ["serde"] } kzg-rs = "0.2.6" libsql = "0.9.10" futures = "0.3.31" -spawned-concurrency = "0.4.1" -spawned-rt = "0.4.1" +spawned-concurrency = "0.4.2" +spawned-rt = "0.4.2" lambdaworks-crypto = "0.11.0" tui-logger = { version = "0.17.3", features = ["tracing-support"] } crossbeam = "0.8.4" diff --git a/crates/l2/tee/quote-gen/Cargo.lock b/crates/l2/tee/quote-gen/Cargo.lock index 379a3772642..2f3df124901 100644 --- a/crates/l2/tee/quote-gen/Cargo.lock +++ b/crates/l2/tee/quote-gen/Cargo.lock @@ -5858,9 +5858,9 @@ dependencies = [ [[package]] name = "spawned-concurrency" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa00b753ef7c942c13ee953f13609746a41c0fb8cf221849bbf3f654811a6669" +checksum = "c34171afa9ef68114f07b4be6cf294f49a388380b62a036010b31534ecc0f927" dependencies = [ "futures", "pin-project-lite", @@ -5871,9 +5871,9 @@ dependencies = [ [[package]] name = "spawned-rt" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42396ff1bc8bfdcad31f099a3af74b4830fb7bdc09a70d843dcfa8bab74ecea4" +checksum = "b83e35512b851b51d27a6dcee6025da1c61e7d842e2ec79971124eab2be3e974" dependencies = [ "crossbeam 0.7.3", "tokio", diff --git a/crates/l2/tee/quote-gen/src/main.rs b/crates/l2/tee/quote-gen/src/main.rs index d868852b1b6..f48ac2c51e0 100644 --- a/crates/l2/tee/quote-gen/src/main.rs +++ b/crates/l2/tee/quote-gen/src/main.rs @@ -2,6 +2,7 @@ mod sender; use configfs_tsm::create_tdx_quote; use ethrex_common::Bytes; +use ethrex_common::utils::keccak; use ethrex_l2::sequencer::proof_coordinator::get_commit_hash; use ethrex_l2_common::{ calldata::Value, @@ -9,7 +10,6 @@ use ethrex_l2_common::{ utils::get_address_from_secret_key, }; use guest_program::input::ProgramInput; -use ethrex_common::utils::keccak; use secp256k1::{Message, SecretKey, generate_keypair, rand}; use sender::{get_batch, submit_proof, submit_quote}; use std::time::Duration; diff --git a/crates/networking/p2p/discv4/peer_table.rs b/crates/networking/p2p/discv4/peer_table.rs index b7dd5ee5358..18f1278293e 100644 --- a/crates/networking/p2p/discv4/peer_table.rs +++ b/crates/networking/p2p/discv4/peer_table.rs @@ -9,7 +9,7 @@ use indexmap::{IndexMap, map::Entry}; use rand::seq::SliceRandom; use spawned_concurrency::{ error::GenServerError, - tasks::{CallResponse, CastResponse, GenServer, GenServerHandle}, + tasks::{CallResponse, CastResponse, GenServer, GenServerHandle, InitResult, send_message_on}, }; use std::{ collections::HashSet, @@ -127,7 +127,7 @@ pub struct PeerTable { impl PeerTable { pub fn spawn(target_peers: usize) -> PeerTable { PeerTable { - handle: PeerTableServer::new(target_peers).start(), + handle: PeerTableServer::new(target_peers).start_on_thread(), } } @@ -805,6 +805,7 @@ enum CastMessage { node_id: H256, }, Prune, + Shutdown, } #[derive(Clone, Debug)] @@ -862,6 +863,15 @@ impl GenServer for PeerTableServer { type OutMsg = OutMessage; type Error = PeerTableError; + async fn init(self, handle: &GenServerHandle) -> Result, Self::Error> { + send_message_on( + handle.clone(), + tokio::signal::ctrl_c(), + CastMessage::Shutdown, + ); + Ok(InitResult::Success(self)) + } + async fn handle_call( &mut self, message: Self::CallMsg, @@ -1049,6 +1059,7 @@ impl GenServer for PeerTableServer { .and_modify(|c| c.knows_us = true); } CastMessage::Prune => self.prune(), + CastMessage::Shutdown => return CastResponse::Stop, } CastResponse::NoReply } diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 37b90943dc9..7a57579157e 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -22,7 +22,7 @@ use spawned_concurrency::{ messages::Unused, tasks::{ CastResponse, GenServer, GenServerHandle, InitResult::Success, send_after, send_interval, - spawn_listener, + send_message_on, spawn_listener, }, }; use std::{net::SocketAddr, sync::Arc, time::Duration}; @@ -65,6 +65,7 @@ pub enum InMessage { Revalidate, Lookup, Prune, + Shutdown, } #[derive(Debug, Clone)] @@ -112,7 +113,7 @@ impl DiscoveryServer { .new_contacts(bootnodes, local_node.node_id()) .await?; - discovery_server.start(); + discovery_server.start_on_thread(); Ok(()) } @@ -512,6 +513,8 @@ impl GenServer for DiscoveryServer { send_interval(PRUNE_INTERVAL, handle.clone(), InMessage::Prune); let _ = handle.clone().cast(InMessage::Lookup).await; + send_message_on(handle.clone(), tokio::signal::ctrl_c(), InMessage::Shutdown); + Ok(Success(self)) } @@ -551,6 +554,7 @@ impl GenServer for DiscoveryServer { .await .inspect_err(|e| error!(err=?e, "Error Pruning peer table")); } + Self::CastMsg::Shutdown => return CastResponse::Stop, } CastResponse::NoReply } diff --git a/crates/networking/p2p/rlpx/connection/handshake.rs b/crates/networking/p2p/rlpx/connection/handshake.rs index 1dc22687453..55f35880b16 100644 --- a/crates/networking/p2p/rlpx/connection/handshake.rs +++ b/crates/networking/p2p/rlpx/connection/handshake.rs @@ -16,8 +16,8 @@ use crate::{ error::PeerConnectionError, message::EthCapVersion, utils::{ - compress_pubkey, decompress_pubkey, ecdh_xchng, kdf, log_peer_debug, sha256, - sha256_hmac, + compress_pubkey, decompress_pubkey, ecdh_xchng, kdf, log_peer_debug, log_peer_trace, + sha256, sha256_hmac, }, }, types::Node, @@ -83,7 +83,7 @@ pub(crate) async fn perform( let hashed_nonces: [u8; 32] = Keccak256::digest([remote_state.nonce.0, local_state.nonce.0].concat()).into(); let codec = RLPxCodec::new(&local_state, &remote_state, hashed_nonces, eth_version)?; - log_peer_debug(&node, "Completed handshake as initiator"); + log_peer_trace(&node, "Completed handshake as initiator"); (context, node, Framed::new(stream, codec)) } ConnectionState::Receiver(Receiver { @@ -109,7 +109,7 @@ pub(crate) async fn perform( peer_addr.port(), remote_state.public_key, ); - log_peer_debug(&node, "Completed handshake as receiver"); + log_peer_trace(&node, "Completed handshake as receiver"); (context, node, Framed::new(stream, codec)) } ConnectionState::Established(_) => { diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index d6486c6460b..4e915218688 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -61,7 +61,7 @@ use crate::{ SUPPORTED_ETH_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES, }, snap::TrieNodes, - utils::{log_peer_debug, log_peer_error, log_peer_warn}, + utils::{log_peer_debug, log_peer_error, log_peer_trace, log_peer_warn}, }, snap::{ process_account_range_request, process_byte_codes_request, process_storage_ranges_request, @@ -265,7 +265,7 @@ impl GenServer for PeerConnectionServer { let eth_version = Arc::new(RwLock::new(EthCapVersion::default())); match handshake::perform(self.state, eth_version.clone()).await { Ok((mut established_state, stream)) => { - log_peer_debug(&established_state.node, "Starting RLPx connection"); + log_peer_trace(&established_state.node, "Starting RLPx connection"); if let Err(reason) = initialize_connection(handle, &mut established_state, stream, eth_version).await @@ -326,21 +326,21 @@ impl GenServer for PeerConnectionServer { let peer_supports_l2 = established_state.l2_state.connection_state().is_ok(); let result = match message { Self::CastMsg::IncomingMessage(message) => { - log_peer_debug( + log_peer_trace( &established_state.node, &format!("Received incomming message: {message}"), ); handle_incoming_message(established_state, message).await } Self::CastMsg::OutgoingMessage(message) => { - log_peer_debug( + log_peer_trace( &established_state.node, &format!("Received outgoing request: {message}"), ); handle_outgoing_message(established_state, message).await } Self::CastMsg::OutgoingRequest(message, sender) => { - log_peer_debug( + log_peer_trace( &established_state.node, &format!("Received outgoing request: {message}"), ); @@ -366,19 +366,19 @@ impl GenServer for PeerConnectionServer { send(established_state, Message::Ping(PingMessage {})).await } Self::CastMsg::BroadcastMessage(id, msg) => { - log_peer_debug( + log_peer_trace( &established_state.node, &format!("Received broadcasted message: {msg}"), ); handle_broadcast(established_state, (id, msg)).await } Self::CastMsg::BlockRangeUpdate => { - log_peer_debug(&established_state.node, "Block Range Update"); + log_peer_trace(&established_state.node, "Block Range Update"); handle_block_range_update(established_state).await } #[cfg(feature = "l2")] Self::CastMsg::L2(msg) if peer_supports_l2 => { - log_peer_debug(&established_state.node, "Handling cast for L2 msg: {msg:?}"); + log_peer_trace(&established_state.node, "Handling cast for L2 msg: {msg:?}"); match msg { L2Cast::BatchBroadcast => { l2_connection::send_sealed_batch(established_state).await @@ -458,7 +458,7 @@ impl GenServer for PeerConnectionServer { async fn teardown(self, _handle: &GenServerHandle) -> Result<(), Self::Error> { match self.state { ConnectionState::Established(mut established_state) => { - log_peer_debug( + log_peer_trace( &established_state.node, "Closing connection with established peer", ); @@ -516,7 +516,7 @@ where ) .await?; - log_peer_debug(&state.node, "Peer connection initialized."); + log_peer_trace(&state.node, "Peer connection initialized."); // Send transactions transaction hashes from mempool at connection start send_all_pooled_tx_hashes(state, &mut connection).await?; @@ -612,7 +612,7 @@ async fn send_block_range_update(state: &mut Established) -> Result<(), PeerConn .as_ref() .is_some_and(|eth| eth.version >= 69) { - log_peer_debug(&state.node, "Sending BlockRangeUpdate"); + log_peer_trace(&state.node, "Sending BlockRangeUpdate"); let update = BlockRangeUpdate::new(&state.storage).await?; let lastet_block = update.latest_block; send(state, Message::BlockRangeUpdate(update)).await?; @@ -651,7 +651,7 @@ where ))); } }; - log_peer_debug(&state.node, "Sending status"); + log_peer_trace(&state.node, "Sending status"); send(state, status).await?; // The next immediate message in the ETH protocol is the // status, reference here: @@ -662,11 +662,11 @@ where }; match msg { Message::Status68(msg_data) => { - log_peer_debug(&state.node, "Received Status(68)"); + log_peer_trace(&state.node, "Received Status(68)"); backend::validate_status(msg_data, &state.storage, ð).await? } Message::Status69(msg_data) => { - log_peer_debug(&state.node, "Received Status(69)"); + log_peer_trace(&state.node, "Received Status(69)"); backend::validate_status(msg_data, &state.storage, ð).await? } Message::Disconnect(disconnect) => { @@ -772,7 +772,7 @@ where let mut negotiated_eth_version = 0; let mut negotiated_snap_version = 0; - log_peer_debug( + log_peer_trace( &state.node, &format!( "Hello message capabilities {:?}", @@ -870,7 +870,7 @@ async fn handle_incoming_message( Message::Disconnect(msg_data) => { let reason = msg_data.reason(); - log_peer_debug(&state.node, &format!("Received Disconnect: {reason}")); + log_peer_trace(&state.node, &format!("Received Disconnect: {reason}")); METRICS .record_new_rlpx_conn_disconnection( @@ -886,7 +886,7 @@ async fn handle_incoming_message( return Err(PeerConnectionError::DisconnectReceived(reason)); } Message::Ping(_) => { - log_peer_debug(&state.node, "Sending pong message"); + log_peer_trace(&state.node, "Sending pong message"); send(state, Message::Pong(PongMessage {})).await?; } Message::Pong(_) => { @@ -970,7 +970,7 @@ async fn handle_incoming_message( } } Message::BlockRangeUpdate(update) => { - log_peer_debug( + log_peer_trace( &state.node, &format!( "Block range update: {} to {}", @@ -1084,7 +1084,7 @@ async fn handle_outgoing_message( state: &mut Established, message: Message, ) -> Result<(), PeerConnectionError> { - log_peer_debug(&state.node, &format!("Sending message {message}")); + log_peer_trace(&state.node, &format!("Sending message {message}")); send(state, message).await?; Ok(()) } @@ -1100,7 +1100,7 @@ async fn handle_outgoing_request( .current_requests .insert(id, (format!("{message}"), sender)) }); - log_peer_debug(&state.node, &format!("Sending request {message}")); + log_peer_trace(&state.node, &format!("Sending request {message}")); send(state, message).await?; Ok(()) } diff --git a/crates/networking/p2p/rlpx/initiator.rs b/crates/networking/p2p/rlpx/initiator.rs index 73f4dd366c2..94cb2b97027 100644 --- a/crates/networking/p2p/rlpx/initiator.rs +++ b/crates/networking/p2p/rlpx/initiator.rs @@ -4,7 +4,7 @@ use crate::{ }; use spawned_concurrency::{ messages::Unused, - tasks::{CastResponse, GenServer, send_after}, + tasks::{CastResponse, GenServer, GenServerHandle, InitResult, send_after, send_message_on}, }; use std::time::Duration; use tracing::{debug, error, info}; @@ -44,7 +44,7 @@ impl RLPxInitiator { pub async fn spawn(context: P2PContext) { info!("Starting RLPx Initiator"); let state = RLPxInitiator::new(context); - let mut server = RLPxInitiator::start(state.clone()); + let mut server = RLPxInitiator::start_on_thread(state.clone()); let _ = server.cast(InMessage::LookForPeers).await; } @@ -81,6 +81,7 @@ impl RLPxInitiator { #[derive(Debug, Clone)] pub enum InMessage { LookForPeers, + Shutdown, } #[derive(Debug, Clone)] @@ -94,10 +95,15 @@ impl GenServer for RLPxInitiator { type OutMsg = OutMessage; type Error = std::convert::Infallible; + async fn init(self, handle: &GenServerHandle) -> Result, Self::Error> { + send_message_on(handle.clone(), tokio::signal::ctrl_c(), InMessage::Shutdown); + Ok(InitResult::Success(self)) + } + async fn handle_cast( &mut self, message: Self::CastMsg, - handle: &spawned_concurrency::tasks::GenServerHandle, + handle: &GenServerHandle, ) -> CastResponse { match message { Self::CastMsg::LookForPeers => { @@ -115,6 +121,7 @@ impl GenServer for RLPxInitiator { CastResponse::NoReply } + Self::CastMsg::Shutdown => CastResponse::Stop, } } } diff --git a/crates/networking/p2p/rlpx/utils.rs b/crates/networking/p2p/rlpx/utils.rs index 43465435dfe..0c9e80a2a01 100644 --- a/crates/networking/p2p/rlpx/utils.rs +++ b/crates/networking/p2p/rlpx/utils.rs @@ -6,7 +6,7 @@ use secp256k1::{PublicKey, SecretKey}; use sha3::{Digest, Keccak256}; use snap::raw::{Decoder as SnappyDecoder, Encoder as SnappyEncoder, max_compress_len}; use std::array::TryFromSliceError; -use tracing::{debug, error, warn}; +use tracing::{debug, error, trace, warn}; pub fn sha256(data: &[u8]) -> [u8; 32] { use sha2::{Digest, Sha256}; @@ -82,6 +82,10 @@ pub fn snappy_decompress(msg_data: &[u8]) -> Result, RLPDecodeError> { Ok(snappy_decoder.decompress_vec(msg_data)?) } +pub(crate) fn log_peer_trace(node: &Node, text: &str) { + trace!("{0}/[{1}]: {2}", node.client_name(), node, text) +} + pub(crate) fn log_peer_debug(node: &Node, text: &str) { debug!("{0}/[{1}]: {2}", node.client_name(), node, text) } @@ -89,6 +93,7 @@ pub(crate) fn log_peer_debug(node: &Node, text: &str) { pub(crate) fn log_peer_error(node: &Node, text: &str) { error!("{0}/[{1}]: {2}", node.client_name(), node, text) } + pub(crate) fn log_peer_warn(node: &Node, text: &str) { warn!("{0}/[{1}]: {2}", node.client_name(), node, text) }