Skip to content
Merged
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ url = { version = "2.5.4", features = ["serde"] }
kzg-rs = "0.2.6"
libsql = "0.9.10"
futures = "0.3.31"
spawned-concurrency = "0.4.1"
spawned-rt = "0.4.1"
spawned-concurrency = "0.4.2"
spawned-rt = "0.4.2"
lambdaworks-crypto = "0.11.0"
tui-logger = { version = "0.17.3", features = ["tracing-support"] }
crossbeam = "0.8.4"
Expand Down
8 changes: 4 additions & 4 deletions crates/l2/tee/quote-gen/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/l2/tee/quote-gen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ mod sender;

use configfs_tsm::create_tdx_quote;
use ethrex_common::Bytes;
use ethrex_common::utils::keccak;
use ethrex_l2::sequencer::proof_coordinator::get_commit_hash;
use ethrex_l2_common::{
calldata::Value,
prover::{BatchProof, ProofCalldata, ProverType},
utils::get_address_from_secret_key,
};
use guest_program::input::ProgramInput;
use ethrex_common::utils::keccak;
use secp256k1::{Message, SecretKey, generate_keypair, rand};
use sender::{get_batch, submit_proof, submit_quote};
use std::time::Duration;
Expand Down
15 changes: 13 additions & 2 deletions crates/networking/p2p/discv4/peer_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use indexmap::{IndexMap, map::Entry};
use rand::seq::SliceRandom;
use spawned_concurrency::{
error::GenServerError,
tasks::{CallResponse, CastResponse, GenServer, GenServerHandle},
tasks::{CallResponse, CastResponse, GenServer, GenServerHandle, InitResult, send_message_on},
};
use std::{
collections::HashSet,
Expand Down Expand Up @@ -127,7 +127,7 @@ pub struct PeerTable {
impl PeerTable {
pub fn spawn(target_peers: usize) -> PeerTable {
PeerTable {
handle: PeerTableServer::new(target_peers).start(),
handle: PeerTableServer::new(target_peers).start_on_thread(),
}
}

Expand Down Expand Up @@ -805,6 +805,7 @@ enum CastMessage {
node_id: H256,
},
Prune,
Shutdown,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -862,6 +863,15 @@ impl GenServer for PeerTableServer {
type OutMsg = OutMessage;
type Error = PeerTableError;

async fn init(self, handle: &GenServerHandle<Self>) -> Result<InitResult<Self>, Self::Error> {
send_message_on(
handle.clone(),
tokio::signal::ctrl_c(),
CastMessage::Shutdown,
);
Ok(InitResult::Success(self))
}

async fn handle_call(
&mut self,
message: Self::CallMsg,
Expand Down Expand Up @@ -1049,6 +1059,7 @@ impl GenServer for PeerTableServer {
.and_modify(|c| c.knows_us = true);
}
CastMessage::Prune => self.prune(),
CastMessage::Shutdown => return CastResponse::Stop,
}
CastResponse::NoReply
}
Expand Down
8 changes: 6 additions & 2 deletions crates/networking/p2p/discv4/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use spawned_concurrency::{
messages::Unused,
tasks::{
CastResponse, GenServer, GenServerHandle, InitResult::Success, send_after, send_interval,
spawn_listener,
send_message_on, spawn_listener,
},
};
use std::{net::SocketAddr, sync::Arc, time::Duration};
Expand Down Expand Up @@ -65,6 +65,7 @@ pub enum InMessage {
Revalidate,
Lookup,
Prune,
Shutdown,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -112,7 +113,7 @@ impl DiscoveryServer {
.new_contacts(bootnodes, local_node.node_id())
.await?;

discovery_server.start();
discovery_server.start_on_thread();
Ok(())
}

Expand Down Expand Up @@ -512,6 +513,8 @@ impl GenServer for DiscoveryServer {
send_interval(PRUNE_INTERVAL, handle.clone(), InMessage::Prune);
let _ = handle.clone().cast(InMessage::Lookup).await;

send_message_on(handle.clone(), tokio::signal::ctrl_c(), InMessage::Shutdown);

Ok(Success(self))
}

Expand Down Expand Up @@ -551,6 +554,7 @@ impl GenServer for DiscoveryServer {
.await
.inspect_err(|e| error!(err=?e, "Error Pruning peer table"));
}
Self::CastMsg::Shutdown => return CastResponse::Stop,
}
CastResponse::NoReply
}
Expand Down
8 changes: 4 additions & 4 deletions crates/networking/p2p/rlpx/connection/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use crate::{
error::PeerConnectionError,
message::EthCapVersion,
utils::{
compress_pubkey, decompress_pubkey, ecdh_xchng, kdf, log_peer_debug, sha256,
sha256_hmac,
compress_pubkey, decompress_pubkey, ecdh_xchng, kdf, log_peer_debug, log_peer_trace,
sha256, sha256_hmac,
},
},
types::Node,
Expand Down Expand Up @@ -83,7 +83,7 @@ pub(crate) async fn perform(
let hashed_nonces: [u8; 32] =
Keccak256::digest([remote_state.nonce.0, local_state.nonce.0].concat()).into();
let codec = RLPxCodec::new(&local_state, &remote_state, hashed_nonces, eth_version)?;
log_peer_debug(&node, "Completed handshake as initiator");
log_peer_trace(&node, "Completed handshake as initiator");
(context, node, Framed::new(stream, codec))
}
ConnectionState::Receiver(Receiver {
Expand All @@ -109,7 +109,7 @@ pub(crate) async fn perform(
peer_addr.port(),
remote_state.public_key,
);
log_peer_debug(&node, "Completed handshake as receiver");
log_peer_trace(&node, "Completed handshake as receiver");
(context, node, Framed::new(stream, codec))
}
ConnectionState::Established(_) => {
Expand Down
40 changes: 20 additions & 20 deletions crates/networking/p2p/rlpx/connection/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::{
SUPPORTED_ETH_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES,
},
snap::TrieNodes,
utils::{log_peer_debug, log_peer_error, log_peer_warn},
utils::{log_peer_debug, log_peer_error, log_peer_trace, log_peer_warn},
},
snap::{
process_account_range_request, process_byte_codes_request, process_storage_ranges_request,
Expand Down Expand Up @@ -265,7 +265,7 @@ impl GenServer for PeerConnectionServer {
let eth_version = Arc::new(RwLock::new(EthCapVersion::default()));
match handshake::perform(self.state, eth_version.clone()).await {
Ok((mut established_state, stream)) => {
log_peer_debug(&established_state.node, "Starting RLPx connection");
log_peer_trace(&established_state.node, "Starting RLPx connection");

if let Err(reason) =
initialize_connection(handle, &mut established_state, stream, eth_version).await
Expand Down Expand Up @@ -326,21 +326,21 @@ impl GenServer for PeerConnectionServer {
let peer_supports_l2 = established_state.l2_state.connection_state().is_ok();
let result = match message {
Self::CastMsg::IncomingMessage(message) => {
log_peer_debug(
log_peer_trace(
&established_state.node,
&format!("Received incomming message: {message}"),
);
handle_incoming_message(established_state, message).await
}
Self::CastMsg::OutgoingMessage(message) => {
log_peer_debug(
log_peer_trace(
&established_state.node,
&format!("Received outgoing request: {message}"),
);
handle_outgoing_message(established_state, message).await
}
Self::CastMsg::OutgoingRequest(message, sender) => {
log_peer_debug(
log_peer_trace(
&established_state.node,
&format!("Received outgoing request: {message}"),
);
Expand All @@ -366,19 +366,19 @@ impl GenServer for PeerConnectionServer {
send(established_state, Message::Ping(PingMessage {})).await
}
Self::CastMsg::BroadcastMessage(id, msg) => {
log_peer_debug(
log_peer_trace(
&established_state.node,
&format!("Received broadcasted message: {msg}"),
);
handle_broadcast(established_state, (id, msg)).await
}
Self::CastMsg::BlockRangeUpdate => {
log_peer_debug(&established_state.node, "Block Range Update");
log_peer_trace(&established_state.node, "Block Range Update");
handle_block_range_update(established_state).await
}
#[cfg(feature = "l2")]
Self::CastMsg::L2(msg) if peer_supports_l2 => {
log_peer_debug(&established_state.node, "Handling cast for L2 msg: {msg:?}");
log_peer_trace(&established_state.node, "Handling cast for L2 msg: {msg:?}");
match msg {
L2Cast::BatchBroadcast => {
l2_connection::send_sealed_batch(established_state).await
Expand Down Expand Up @@ -458,7 +458,7 @@ impl GenServer for PeerConnectionServer {
async fn teardown(self, _handle: &GenServerHandle<Self>) -> Result<(), Self::Error> {
match self.state {
ConnectionState::Established(mut established_state) => {
log_peer_debug(
log_peer_trace(
&established_state.node,
"Closing connection with established peer",
);
Expand Down Expand Up @@ -516,7 +516,7 @@ where
)
.await?;

log_peer_debug(&state.node, "Peer connection initialized.");
log_peer_trace(&state.node, "Peer connection initialized.");

// Send transactions transaction hashes from mempool at connection start
send_all_pooled_tx_hashes(state, &mut connection).await?;
Expand Down Expand Up @@ -612,7 +612,7 @@ async fn send_block_range_update(state: &mut Established) -> Result<(), PeerConn
.as_ref()
.is_some_and(|eth| eth.version >= 69)
{
log_peer_debug(&state.node, "Sending BlockRangeUpdate");
log_peer_trace(&state.node, "Sending BlockRangeUpdate");
let update = BlockRangeUpdate::new(&state.storage).await?;
let lastet_block = update.latest_block;
send(state, Message::BlockRangeUpdate(update)).await?;
Expand Down Expand Up @@ -651,7 +651,7 @@ where
)));
}
};
log_peer_debug(&state.node, "Sending status");
log_peer_trace(&state.node, "Sending status");
send(state, status).await?;
// The next immediate message in the ETH protocol is the
// status, reference here:
Expand All @@ -662,11 +662,11 @@ where
};
match msg {
Message::Status68(msg_data) => {
log_peer_debug(&state.node, "Received Status(68)");
log_peer_trace(&state.node, "Received Status(68)");
backend::validate_status(msg_data, &state.storage, &eth).await?
}
Message::Status69(msg_data) => {
log_peer_debug(&state.node, "Received Status(69)");
log_peer_trace(&state.node, "Received Status(69)");
backend::validate_status(msg_data, &state.storage, &eth).await?
}
Message::Disconnect(disconnect) => {
Expand Down Expand Up @@ -772,7 +772,7 @@ where
let mut negotiated_eth_version = 0;
let mut negotiated_snap_version = 0;

log_peer_debug(
log_peer_trace(
&state.node,
&format!(
"Hello message capabilities {:?}",
Expand Down Expand Up @@ -870,7 +870,7 @@ async fn handle_incoming_message(
Message::Disconnect(msg_data) => {
let reason = msg_data.reason();

log_peer_debug(&state.node, &format!("Received Disconnect: {reason}"));
log_peer_trace(&state.node, &format!("Received Disconnect: {reason}"));

METRICS
.record_new_rlpx_conn_disconnection(
Expand All @@ -886,7 +886,7 @@ async fn handle_incoming_message(
return Err(PeerConnectionError::DisconnectReceived(reason));
}
Message::Ping(_) => {
log_peer_debug(&state.node, "Sending pong message");
log_peer_trace(&state.node, "Sending pong message");
send(state, Message::Pong(PongMessage {})).await?;
}
Message::Pong(_) => {
Expand Down Expand Up @@ -970,7 +970,7 @@ async fn handle_incoming_message(
}
}
Message::BlockRangeUpdate(update) => {
log_peer_debug(
log_peer_trace(
&state.node,
&format!(
"Block range update: {} to {}",
Expand Down Expand Up @@ -1084,7 +1084,7 @@ async fn handle_outgoing_message(
state: &mut Established,
message: Message,
) -> Result<(), PeerConnectionError> {
log_peer_debug(&state.node, &format!("Sending message {message}"));
log_peer_trace(&state.node, &format!("Sending message {message}"));
send(state, message).await?;
Ok(())
}
Expand All @@ -1100,7 +1100,7 @@ async fn handle_outgoing_request(
.current_requests
.insert(id, (format!("{message}"), sender))
});
log_peer_debug(&state.node, &format!("Sending request {message}"));
log_peer_trace(&state.node, &format!("Sending request {message}"));
send(state, message).await?;
Ok(())
}
Expand Down
Loading