From 3a2c7d5012f518430adec1b2e0b38fdf076047bc Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Wed, 5 Feb 2025 13:37:04 +0200 Subject: [PATCH 1/3] Add builder ssz flow --- Cargo.lock | 29 ++- Cargo.toml | 4 + crates/common/Cargo.toml | 4 + crates/common/src/pbs/types/beacon_block.rs | 7 +- .../src/pbs/types/blinded_block_body.rs | 35 ++-- crates/common/src/pbs/types/blobs_bundle.rs | 3 +- .../common/src/pbs/types/execution_payload.rs | 7 +- crates/common/src/pbs/types/get_header.rs | 5 +- crates/common/src/pbs/types/kzg.rs | 39 ++++ crates/common/src/pbs/types/utils.rs | 13 ++ crates/common/src/utils.rs | 166 +++++++++++++++++- crates/pbs/Cargo.toml | 1 + crates/pbs/src/routes/get_header.rs | 33 +++- crates/pbs/src/routes/submit_block.rs | 37 +++- 14 files changed, 339 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9a9a6389..0cebdf56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,7 +225,7 @@ dependencies = [ "c-kzg", "derive_more", "ethereum_ssz 0.8.0", - "ethereum_ssz_derive", + "ethereum_ssz_derive 0.8.0", "once_cell", "serde", "sha2 0.10.8", @@ -479,7 +479,7 @@ dependencies = [ "alloy-rpc-types-engine", "alloy-serde", "ethereum_ssz 0.8.0", - "ethereum_ssz_derive", + "ethereum_ssz_derive 0.8.0", "serde", "serde_with", "thiserror 2.0.6", @@ -498,7 +498,7 @@ dependencies = [ "alloy-serde", "derive_more", "ethereum_ssz 0.8.0", - "ethereum_ssz_derive", + "ethereum_ssz_derive 0.8.0", "serde", "strum", ] @@ -1307,13 +1307,17 @@ dependencies = [ "base64 0.22.1", "bimap", "blst", + "bytes", "cipher 0.4.4", "ctr 0.9.2", "derive_more", "eth2_keystore", "ethereum_serde_utils 0.7.0", + "ethereum_ssz 0.7.1", + "ethereum_ssz_derive 0.7.1", "eyre", "k256", + "mediatype", "pbkdf2 0.12.2", "rand", "reqwest", @@ -1358,6 +1362,7 @@ dependencies = [ "cb-common", "cb-metrics", "dashmap 5.5.3", + "ethereum_ssz 0.7.1", "eyre", "futures", "lazy_static", @@ -2135,6 +2140,18 @@ dependencies = [ "typenum", ] +[[package]] +name = "ethereum_ssz_derive" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3deae99c8e74829a00ba7a92d49055732b3c1f093f2ccfa3cbc621679b6fa91" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "ethereum_ssz_derive" version = "0.8.0" @@ -2845,6 +2862,12 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "mediatype" +version = "0.19.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8878cd8d1b3c8c8ae4b2ba0a36652b7cf192f618a599a7fbdfa25cffd4ea72dd" + [[package]] name = "memchr" version = "2.7.2" diff --git a/Cargo.toml b/Cargo.toml index 1bf3de64..b60d4f30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,8 @@ alloy = { version = "0.8.0", features = [ ] } ssz_types = "0.8" ethereum_serde_utils = "0.7.0" +ethereum_ssz = "0.7" +ethereum_ssz_derive = "0.7" # networking axum = { version = "0.8.1", features = ["macros"] } @@ -101,3 +103,5 @@ derive_more = { version = "1.0.0", features = [ "deref", "display", ] } +mediatype = "0.19.13" +bytes = "1.6" diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 1ff1c979..a77dd25c 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -9,6 +9,8 @@ publish = false # ethereum alloy.workspace = true ssz_types.workspace = true +ethereum_ssz.workspace = true +ethereum_ssz_derive.workspace = true ethereum_serde_utils.workspace = true # networking @@ -49,6 +51,8 @@ url.workspace = true rand.workspace = true bimap.workspace = true derive_more.workspace = true +mediatype.workspace = true +bytes.workspace = true unicode-normalization.workspace = true base64.workspace = true diff --git a/crates/common/src/pbs/types/beacon_block.rs b/crates/common/src/pbs/types/beacon_block.rs index e17f1f8a..289637bf 100644 --- a/crates/common/src/pbs/types/beacon_block.rs +++ b/crates/common/src/pbs/types/beacon_block.rs @@ -1,12 +1,13 @@ use alloy::{primitives::B256, rpc::types::beacon::BlsSignature}; use serde::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; use super::{ blinded_block_body::BlindedBeaconBlockBody, blobs_bundle::BlobsBundle, execution_payload::ExecutionPayload, spec::DenebSpec, utils::VersionedResponse, }; -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] /// Sent to relays in submit_block pub struct SignedBlindedBeaconBlock { pub message: BlindedBeaconBlock, @@ -19,7 +20,7 @@ impl SignedBlindedBeaconBlock { } } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct BlindedBeaconBlock { #[serde(with = "serde_utils::quoted_u64")] pub slot: u64, @@ -33,7 +34,7 @@ pub struct BlindedBeaconBlock { /// Returned by relay in submit_block pub type SubmitBlindedBlockResponse = VersionedResponse; -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode)] pub struct PayloadAndBlobs { pub execution_payload: ExecutionPayload, pub blobs_bundle: Option>, diff --git a/crates/common/src/pbs/types/blinded_block_body.rs b/crates/common/src/pbs/types/blinded_block_body.rs index ded4f215..20785954 100644 --- a/crates/common/src/pbs/types/blinded_block_body.rs +++ b/crates/common/src/pbs/types/blinded_block_body.rs @@ -3,6 +3,7 @@ use alloy::{ rpc::types::beacon::{BlsPublicKey, BlsSignature}, }; use serde::{Deserialize, Serialize}; +use ssz_derive::Decode; use ssz_types::{typenum, BitList, BitVector, FixedVector, VariableList}; use super::{ @@ -10,7 +11,7 @@ use super::{ }; use crate::utils::as_str; -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct BlindedBeaconBlockBody { pub randao_reveal: BlsSignature, pub eth1_data: Eth1Data, @@ -27,7 +28,7 @@ pub struct BlindedBeaconBlockBody { pub blob_kzg_commitments: KzgCommitments, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct Eth1Data { pub deposit_root: B256, #[serde(with = "serde_utils::quoted_u64")] @@ -35,7 +36,7 @@ pub struct Eth1Data { pub block_hash: B256, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct BeaconBlockHeader { #[serde(with = "serde_utils::quoted_u64")] pub slot: u64, @@ -46,13 +47,13 @@ pub struct BeaconBlockHeader { pub body_root: B256, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct SignedBeaconBlockHeader { pub message: BeaconBlockHeader, pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct BlsToExecutionChange { #[serde(with = "as_str")] pub validator_index: u64, @@ -60,25 +61,25 @@ pub struct BlsToExecutionChange { pub to_execution_address: Address, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct SignedBlsToExecutionChange { pub message: BlsToExecutionChange, pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct ProposerSlashing { pub signed_header_1: SignedBeaconBlockHeader, pub signed_header_2: SignedBeaconBlockHeader, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct AttesterSlashing { pub attestation_1: IndexedAttestation, pub attestation_2: IndexedAttestation, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] #[serde(bound = "T: EthSpec")] pub struct IndexedAttestation { /// Lists validator registry indices, not committee indices. @@ -88,7 +89,7 @@ pub struct IndexedAttestation { pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct AttestationData { #[serde(with = "serde_utils::quoted_u64")] pub slot: u64, @@ -101,14 +102,14 @@ pub struct AttestationData { pub target: Checkpoint, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct Checkpoint { #[serde(with = "serde_utils::quoted_u64")] pub epoch: u64, pub root: B256, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Decode)] #[serde(bound = "T: EthSpec")] pub struct Attestation { pub aggregation_bits: BitList, @@ -116,13 +117,13 @@ pub struct Attestation { pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct Deposit { pub proof: FixedVector, // put this in EthSpec? pub data: DepositData, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct DepositData { pub pubkey: BlsPublicKey, pub withdrawal_credentials: B256, @@ -131,13 +132,13 @@ pub struct DepositData { pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct SignedVoluntaryExit { pub message: VoluntaryExit, pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] pub struct VoluntaryExit { /// Earliest epoch when voluntary exit can be processed. #[serde(with = "serde_utils::quoted_u64")] @@ -146,7 +147,7 @@ pub struct VoluntaryExit { pub validator_index: u64, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] #[serde(bound = "T: EthSpec")] pub struct SyncAggregate { pub sync_committee_bits: BitVector, diff --git a/crates/common/src/pbs/types/blobs_bundle.rs b/crates/common/src/pbs/types/blobs_bundle.rs index 778679b2..4f95beda 100644 --- a/crates/common/src/pbs/types/blobs_bundle.rs +++ b/crates/common/src/pbs/types/blobs_bundle.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use ssz_derive::Encode; use ssz_types::{FixedVector, VariableList}; use super::{ @@ -6,7 +7,7 @@ use super::{ spec::EthSpec, }; -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode)] #[serde(bound = "T: EthSpec")] pub struct BlobsBundle { pub commitments: KzgCommitments, diff --git a/crates/common/src/pbs/types/execution_payload.rs b/crates/common/src/pbs/types/execution_payload.rs index e593f5f1..6116fac5 100644 --- a/crates/common/src/pbs/types/execution_payload.rs +++ b/crates/common/src/pbs/types/execution_payload.rs @@ -1,5 +1,6 @@ use alloy::primitives::{b256, Address, B256, U256}; use serde::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; use ssz_types::{FixedVector, VariableList}; use tree_hash_derive::TreeHash; @@ -9,7 +10,7 @@ use crate::utils::as_str; pub const EMPTY_TX_ROOT_HASH: B256 = b256!("7ffe241ea60187fdb0187bfa22de35d1f9bed7ab061d9401fd47e34a54fbede1"); -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode)] pub struct ExecutionPayload { pub parent_hash: B256, pub fee_recipient: Address, @@ -46,7 +47,7 @@ pub type Transactions = VariableList< >; pub type Transaction = VariableList; -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode)] pub struct Withdrawal { #[serde(with = "serde_utils::quoted_u64")] pub index: u64, @@ -57,7 +58,7 @@ pub struct Withdrawal { pub amount: u64, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, TreeHash)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, TreeHash, Encode, Decode)] pub struct ExecutionPayloadHeader { pub parent_hash: B256, pub fee_recipient: Address, diff --git a/crates/common/src/pbs/types/get_header.rs b/crates/common/src/pbs/types/get_header.rs index ebffa946..55b893db 100644 --- a/crates/common/src/pbs/types/get_header.rs +++ b/crates/common/src/pbs/types/get_header.rs @@ -3,6 +3,7 @@ use alloy::{ rpc::types::beacon::{BlsPublicKey, BlsSignature}, }; use serde::{Deserialize, Serialize}; +use ssz_derive::Encode; use tree_hash_derive::TreeHash; use super::{ @@ -37,13 +38,13 @@ impl GetHeaderResponse { } } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode)] pub struct SignedExecutionPayloadHeader { pub message: ExecutionPayloadHeaderMessage, pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, TreeHash)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, TreeHash, Encode)] pub struct ExecutionPayloadHeaderMessage { pub header: ExecutionPayloadHeader, pub blob_kzg_commitments: KzgCommitments, diff --git a/crates/common/src/pbs/types/kzg.rs b/crates/common/src/pbs/types/kzg.rs index e5b3fe6f..165302ea 100644 --- a/crates/common/src/pbs/types/kzg.rs +++ b/crates/common/src/pbs/types/kzg.rs @@ -4,6 +4,7 @@ use std::{ }; use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use ssz::{Encode, Decode}; use ssz_types::VariableList; use tree_hash::{PackedEncoding, TreeHash}; @@ -21,6 +22,30 @@ impl From for [u8; 48] { } } +impl Decode for KzgCommitment { + fn is_ssz_fixed_len() -> bool { + <[u8; BYTES_PER_COMMITMENT] as ssz::Decode>::is_ssz_fixed_len() + } + + fn from_ssz_bytes(bytes: &[u8]) -> Result { + <[u8; BYTES_PER_COMMITMENT]>::from_ssz_bytes(bytes).and_then(|o| Ok(Self(o))) + } +} + +impl Encode for KzgCommitment { + fn is_ssz_fixed_len() -> bool { + <[u8; BYTES_PER_COMMITMENT] as ssz::Encode>::is_ssz_fixed_len() + } + + fn ssz_append(&self, buf: &mut Vec) { + self.0.ssz_append(buf) + } + + fn ssz_bytes_len(&self) -> usize { + self.0.ssz_bytes_len() + } +} + impl TreeHash for KzgCommitment { fn tree_hash_type() -> tree_hash::TreeHashType { <[u8; BYTES_PER_COMMITMENT] as TreeHash>::tree_hash_type() @@ -118,6 +143,20 @@ impl From<[u8; BYTES_PER_PROOF]> for KzgProof { } } +impl Encode for KzgProof { + fn is_ssz_fixed_len() -> bool { + <[u8; BYTES_PER_PROOF] as ssz::Encode>::is_ssz_fixed_len() + } + + fn ssz_append(&self, buf: &mut Vec) { + self.0.ssz_append(buf) + } + + fn ssz_bytes_len(&self) -> usize { + self.0.ssz_bytes_len() + } +} + impl Serialize for KzgProof { fn serialize(&self, serializer: S) -> Result where diff --git a/crates/common/src/pbs/types/utils.rs b/crates/common/src/pbs/types/utils.rs index 2aeb1793..1fc7e4c6 100644 --- a/crates/common/src/pbs/types/utils.rs +++ b/crates/common/src/pbs/types/utils.rs @@ -1,3 +1,5 @@ +use std::fmt; + use serde::{Deserialize, Serialize}; pub mod quoted_variable_list_u64 { @@ -40,4 +42,15 @@ pub enum Version { #[serde(rename = "deneb")] #[default] Deneb, + #[serde(rename = "electra")] + Electra, +} + +impl fmt::Display for Version { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Version::Deneb => write!(f, "deneb"), + Version::Electra => write!(f, "electra"), + } + } } diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index 42cb0427..378e5aa3 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -1,16 +1,23 @@ use std::{ + fmt, net::Ipv4Addr, + str::FromStr, time::{SystemTime, UNIX_EPOCH}, }; - +use bytes::Bytes; +use axum::{ + extract::{FromRequest, Request}, + response::{IntoResponse, Response}, + http::HeaderValue +}; use alloy::{ primitives::U256, rpc::types::beacon::{BlsPublicKey, BlsSignature}, }; -use axum::http::HeaderValue; use blst::min_pk::{PublicKey, Signature}; +use mediatype::{names, MediaType, MediaTypeList}; use rand::{distributions::Alphanumeric, Rng}; -use reqwest::header::HeaderMap; +use reqwest::{header::{HeaderMap, ACCEPT, CONTENT_TYPE}, StatusCode}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use tracing::Level; @@ -24,6 +31,7 @@ use crate::{ }; const MILLIS_PER_SECOND: u64 = 1_000; +pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version"; pub fn timestamp_of_slot_start_sec(slot: u64, chain: Chain) -> u64 { chain.genesis_time_sec() + slot * chain.slot_time_sec() @@ -273,6 +281,158 @@ pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result Accept { + Accept::from_str( + req_headers.get(ACCEPT).and_then(|value| value.to_str().ok()).unwrap_or("application/json"), + ) + .unwrap_or(Accept::Json) +} + +/// Parse CONTENT TYPE header, default to JSON if missing or mal-formatted +pub fn get_content_type_header(req_headers: &HeaderMap) -> ContentType { + ContentType::from_str( + req_headers.get(CONTENT_TYPE).and_then(|value| value.to_str().ok()).unwrap_or("application/json"), + ) + .unwrap_or(ContentType::Json) +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ContentType { + Json, + Ssz +} + +impl std::fmt::Display for ContentType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ContentType::Json => write!(f, "application/json"), + ContentType::Ssz => write!(f, "application/octet-stream"), + } + } +} + +impl FromStr for ContentType { + type Err = String; + fn from_str(value: &str) -> Result { + match value { + "application/json" => Ok(ContentType::Json), + "application/octet-stream" => Ok(ContentType::Ssz), + _ => Err(format!("unknown content type: {}", value)), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum Accept { + Json, + Ssz, + Any, +} + +impl fmt::Display for Accept { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Accept::Ssz => write!(f, "application/octet-stream"), + Accept::Json => write!(f, "application/json"), + Accept::Any => write!(f, "*/*"), + } + } +} + +impl FromStr for Accept { + type Err = String; + + fn from_str(s: &str) -> Result { + let media_type_list = MediaTypeList::new(s); + + // [q-factor weighting]: https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2 + // find the highest q-factor supported accept type + let mut highest_q = 0_u16; + let mut accept_type = None; + + const APPLICATION: &str = names::APPLICATION.as_str(); + const OCTET_STREAM: &str = names::OCTET_STREAM.as_str(); + const JSON: &str = names::JSON.as_str(); + const STAR: &str = names::_STAR.as_str(); + const Q: &str = names::Q.as_str(); + + media_type_list.into_iter().for_each(|item| { + if let Ok(MediaType { ty, subty, suffix: _, params }) = item { + let q_accept = match (ty.as_str(), subty.as_str()) { + (APPLICATION, OCTET_STREAM) => Some(Accept::Ssz), + (APPLICATION, JSON) => Some(Accept::Json), + (STAR, STAR) => Some(Accept::Any), + _ => None, + } + .map(|item_accept_type| { + let q_val = params + .iter() + .find_map(|(n, v)| match n.as_str() { + Q => { + Some((v.as_str().parse::().unwrap_or(0_f32) * 1000_f32) as u16) + } + _ => None, + }) + .or(Some(1000_u16)); + + (q_val.unwrap(), item_accept_type) + }); + + match q_accept { + Some((q, accept)) if q > highest_q => { + highest_q = q; + accept_type = Some(accept); + } + _ => (), + } + } + }); + accept_type.ok_or_else(|| "accept header is not supported".to_string()) + } +} + + +#[must_use] +#[derive(Debug, Clone, Copy, Default)] +pub struct JsonOrSsz(pub T); + +impl FromRequest for JsonOrSsz +where + T: serde::de::DeserializeOwned + ssz::Decode + 'static, + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request(req: Request, _state: &S) -> Result { + let headers = req.headers().clone(); + let content_type = headers + .get(CONTENT_TYPE) + .and_then(|value| value.to_str().ok()); + + let bytes = Bytes::from_request(req, _state) + .await + .map_err(IntoResponse::into_response)?; + + if let Some(content_type) = content_type { + if content_type.starts_with(&ContentType::Json.to_string()) { + let payload: T = serde_json::from_slice(&bytes) + .map_err(|_| StatusCode::BAD_REQUEST.into_response())?; + return Ok(Self(payload)); + } + + if content_type.starts_with(&ContentType::Ssz.to_string()) { + let payload = T::from_ssz_bytes(&bytes) + .map_err(|_| StatusCode::BAD_REQUEST.into_response())?; + return Ok(Self(payload)); + } + } + + Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response()) + } +} + + #[cfg(unix)] pub async fn wait_for_signal() -> eyre::Result<()> { use tokio::signal::unix::{signal, SignalKind}; diff --git a/crates/pbs/Cargo.toml b/crates/pbs/Cargo.toml index 5ef8bc64..1b259a59 100644 --- a/crates/pbs/Cargo.toml +++ b/crates/pbs/Cargo.toml @@ -11,6 +11,7 @@ cb-metrics.workspace = true # ethereum alloy.workspace = true +ethereum_ssz.workspace = true # networking axum.workspace = true diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 919bad11..77b0395a 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -1,14 +1,15 @@ use alloy::primitives::utils::format_ether; use axum::{ extract::{Path, State}, - http::HeaderMap, + http::{HeaderMap, HeaderValue}, response::IntoResponse, }; use cb_common::{ pbs::{BuilderEvent, GetHeaderParams}, - utils::{get_user_agent, ms_into_slot}, + utils::{get_accept_header, get_user_agent, ms_into_slot, Accept, CONSENSUS_VERSION_HEADER}, }; -use reqwest::StatusCode; +use reqwest::{header::CONTENT_TYPE, StatusCode}; +use ssz::Encode; use tracing::{error, info}; use uuid::Uuid; @@ -33,17 +34,37 @@ pub async fn handle_get_header>( let ua = get_user_agent(&req_headers); let ms_into_slot = ms_into_slot(params.slot, state.config.chain); + let accept_header = get_accept_header(&req_headers); info!(ua, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot); match A::get_header(params, req_headers, state.clone()).await { Ok(res) => { state.publish_event(BuilderEvent::GetHeaderResponse(Box::new(res.clone()))); - if let Some(max_bid) = res { info!(value_eth = format_ether(max_bid.value()), block_hash =% max_bid.block_hash(), "received header"); - BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc(); - Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) + let response = match accept_header { + Accept::Ssz => { + let mut res = { + info!("sending response as JSON"); + (StatusCode::OK, max_bid.data.as_ssz_bytes()).into_response() + }; + let Ok(consensus_version_header) = HeaderValue::from_str(&format!("{}", max_bid.version)) else { + info!("sending response as JSON"); + return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) + }; + let Ok(content_type_header) = HeaderValue::from_str(&format!("{}", Accept::Ssz)) else { + info!("sending response as JSON"); + return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) + }; + res.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + res.headers_mut().insert(CONTENT_TYPE, content_type_header); + info!("sending response as SSZ"); + res + }, + Accept::Json | Accept::Any => (StatusCode::OK, axum::Json(max_bid)).into_response(), + }; + Ok(response) } else { // spec: return 204 if request is valid but no bid available info!("no header available for slot"); diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index c9d206a1..8e7b7072 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -1,9 +1,10 @@ -use axum::{extract::State, http::HeaderMap, response::IntoResponse, Json}; +use axum::{extract::State, http::{HeaderMap, HeaderValue}, response::IntoResponse, Json}; use cb_common::{ pbs::{BuilderEvent, SignedBlindedBeaconBlock}, - utils::{get_user_agent, timestamp_of_slot_start_millis, utcnow_ms}, + utils::{get_content_type_header, get_user_agent, timestamp_of_slot_start_millis, utcnow_ms, JsonOrSsz, CONSENSUS_VERSION_HEADER}, }; -use reqwest::StatusCode; +use reqwest::{header::CONTENT_TYPE, StatusCode}; +use ssz::Encode; use tracing::{error, info, trace}; use uuid::Uuid; @@ -19,7 +20,7 @@ use crate::{ pub async fn handle_submit_block>( State(state): State>, req_headers: HeaderMap, - Json(signed_blinded_block): Json, + JsonOrSsz(signed_blinded_block): JsonOrSsz, ) -> Result { let state = state.read().clone(); @@ -31,6 +32,8 @@ pub async fn handle_submit_block>( let block_hash = signed_blinded_block.message.body.execution_payload_header.block_hash; let slot_start_ms = timestamp_of_slot_start_millis(slot, state.config.chain); let ua = get_user_agent(&req_headers); + let content_type_header = get_content_type_header(&req_headers); + info!(ua, ms_into_slot=now.saturating_sub(slot_start_ms), %block_hash); @@ -39,9 +42,31 @@ pub async fn handle_submit_block>( trace!(?res); state.publish_event(BuilderEvent::SubmitBlockResponse(Box::new(res.clone()))); info!("received unblinded block"); - BEACON_NODE_STATUS.with_label_values(&["200", SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG]).inc(); - Ok((StatusCode::OK, Json(res).into_response())) + + let response = match content_type_header { + cb_common::utils::ContentType::Json => { + info!("sending response as JSON"); + (StatusCode::OK, Json(res)).into_response() + }, + cb_common::utils::ContentType::Ssz => { + let mut response = (StatusCode::OK, res.data.as_ssz_bytes()).into_response(); + let Ok(consensus_version_header) = HeaderValue::from_str(&format!("{}", res.version)) else { + info!("sending response as JSON"); + return Ok((StatusCode::OK, axum::Json(res)).into_response()) + }; + let Ok(content_type_header) = HeaderValue::from_str(&content_type_header.to_string()) else { + info!("sending response as JSON"); + return Ok((StatusCode::OK, axum::Json(res)).into_response()) + }; + response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + info!("sending response as SSZ"); + response + }, + }; + + Ok(response) } Err(err) => { From 50595934e84181cc13de3bfc6f6ba8b877de3c83 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 18 Feb 2025 13:22:26 -0800 Subject: [PATCH 2/3] Integration tests, fixes, and clippy/linting --- Cargo.lock | 1 + crates/common/src/pbs/types/beacon_block.rs | 4 +- .../src/pbs/types/blinded_block_body.rs | 36 ++++----- crates/common/src/pbs/types/get_header.rs | 6 +- crates/common/src/pbs/types/kzg.rs | 6 +- crates/common/src/utils.rs | 37 +++++----- crates/pbs/src/routes/get_header.rs | 27 ++++--- crates/pbs/src/routes/submit_block.rs | 41 ++++++---- tests/Cargo.toml | 2 + tests/src/mock_relay.rs | 37 +++++++--- tests/src/mock_validator.rs | 51 +++++++++++-- tests/tests/pbs_integration.rs | 74 +++++++++++++++++-- 12 files changed, 229 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0cebdf56..49a5728b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1410,6 +1410,7 @@ dependencies = [ "axum", "cb-common", "cb-pbs", + "ethereum_ssz 0.7.1", "eyre", "reqwest", "serde_json", diff --git a/crates/common/src/pbs/types/beacon_block.rs b/crates/common/src/pbs/types/beacon_block.rs index 289637bf..a1fd8d11 100644 --- a/crates/common/src/pbs/types/beacon_block.rs +++ b/crates/common/src/pbs/types/beacon_block.rs @@ -7,7 +7,7 @@ use super::{ execution_payload::ExecutionPayload, spec::DenebSpec, utils::VersionedResponse, }; -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] /// Sent to relays in submit_block pub struct SignedBlindedBeaconBlock { pub message: BlindedBeaconBlock, @@ -20,7 +20,7 @@ impl SignedBlindedBeaconBlock { } } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct BlindedBeaconBlock { #[serde(with = "serde_utils::quoted_u64")] pub slot: u64, diff --git a/crates/common/src/pbs/types/blinded_block_body.rs b/crates/common/src/pbs/types/blinded_block_body.rs index 20785954..ebad3671 100644 --- a/crates/common/src/pbs/types/blinded_block_body.rs +++ b/crates/common/src/pbs/types/blinded_block_body.rs @@ -3,7 +3,7 @@ use alloy::{ rpc::types::beacon::{BlsPublicKey, BlsSignature}, }; use serde::{Deserialize, Serialize}; -use ssz_derive::Decode; +use ssz_derive::{Decode, Encode}; use ssz_types::{typenum, BitList, BitVector, FixedVector, VariableList}; use super::{ @@ -11,7 +11,7 @@ use super::{ }; use crate::utils::as_str; -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct BlindedBeaconBlockBody { pub randao_reveal: BlsSignature, pub eth1_data: Eth1Data, @@ -28,7 +28,7 @@ pub struct BlindedBeaconBlockBody { pub blob_kzg_commitments: KzgCommitments, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct Eth1Data { pub deposit_root: B256, #[serde(with = "serde_utils::quoted_u64")] @@ -36,7 +36,7 @@ pub struct Eth1Data { pub block_hash: B256, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct BeaconBlockHeader { #[serde(with = "serde_utils::quoted_u64")] pub slot: u64, @@ -47,13 +47,13 @@ pub struct BeaconBlockHeader { pub body_root: B256, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct SignedBeaconBlockHeader { pub message: BeaconBlockHeader, pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct BlsToExecutionChange { #[serde(with = "as_str")] pub validator_index: u64, @@ -61,25 +61,25 @@ pub struct BlsToExecutionChange { pub to_execution_address: Address, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct SignedBlsToExecutionChange { pub message: BlsToExecutionChange, pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct ProposerSlashing { pub signed_header_1: SignedBeaconBlockHeader, pub signed_header_2: SignedBeaconBlockHeader, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct AttesterSlashing { pub attestation_1: IndexedAttestation, pub attestation_2: IndexedAttestation, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] #[serde(bound = "T: EthSpec")] pub struct IndexedAttestation { /// Lists validator registry indices, not committee indices. @@ -89,7 +89,7 @@ pub struct IndexedAttestation { pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct AttestationData { #[serde(with = "serde_utils::quoted_u64")] pub slot: u64, @@ -102,14 +102,14 @@ pub struct AttestationData { pub target: Checkpoint, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct Checkpoint { #[serde(with = "serde_utils::quoted_u64")] pub epoch: u64, pub root: B256, } -#[derive(Debug, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)] #[serde(bound = "T: EthSpec")] pub struct Attestation { pub aggregation_bits: BitList, @@ -117,13 +117,13 @@ pub struct Attestation { pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct Deposit { pub proof: FixedVector, // put this in EthSpec? pub data: DepositData, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct DepositData { pub pubkey: BlsPublicKey, pub withdrawal_credentials: B256, @@ -132,13 +132,13 @@ pub struct DepositData { pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct SignedVoluntaryExit { pub message: VoluntaryExit, pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct VoluntaryExit { /// Earliest epoch when voluntary exit can be processed. #[serde(with = "serde_utils::quoted_u64")] @@ -147,7 +147,7 @@ pub struct VoluntaryExit { pub validator_index: u64, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Decode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] #[serde(bound = "T: EthSpec")] pub struct SyncAggregate { pub sync_committee_bits: BitVector, diff --git a/crates/common/src/pbs/types/get_header.rs b/crates/common/src/pbs/types/get_header.rs index 55b893db..796c8fcb 100644 --- a/crates/common/src/pbs/types/get_header.rs +++ b/crates/common/src/pbs/types/get_header.rs @@ -3,7 +3,7 @@ use alloy::{ rpc::types::beacon::{BlsPublicKey, BlsSignature}, }; use serde::{Deserialize, Serialize}; -use ssz_derive::Encode; +use ssz_derive::{Decode, Encode}; use tree_hash_derive::TreeHash; use super::{ @@ -38,13 +38,13 @@ impl GetHeaderResponse { } } -#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, Encode, Decode)] pub struct SignedExecutionPayloadHeader { pub message: ExecutionPayloadHeaderMessage, pub signature: BlsSignature, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, TreeHash, Encode)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, TreeHash, Encode, Decode)] pub struct ExecutionPayloadHeaderMessage { pub header: ExecutionPayloadHeader, pub blob_kzg_commitments: KzgCommitments, diff --git a/crates/common/src/pbs/types/kzg.rs b/crates/common/src/pbs/types/kzg.rs index 165302ea..61d618be 100644 --- a/crates/common/src/pbs/types/kzg.rs +++ b/crates/common/src/pbs/types/kzg.rs @@ -4,7 +4,7 @@ use std::{ }; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use ssz::{Encode, Decode}; +use ssz::{Decode, Encode}; use ssz_types::VariableList; use tree_hash::{PackedEncoding, TreeHash}; @@ -28,7 +28,7 @@ impl Decode for KzgCommitment { } fn from_ssz_bytes(bytes: &[u8]) -> Result { - <[u8; BYTES_PER_COMMITMENT]>::from_ssz_bytes(bytes).and_then(|o| Ok(Self(o))) + <[u8; BYTES_PER_COMMITMENT]>::from_ssz_bytes(bytes).map(Self) } } @@ -38,7 +38,7 @@ impl Encode for KzgCommitment { } fn ssz_append(&self, buf: &mut Vec) { - self.0.ssz_append(buf) + self.0.ssz_append(buf) } fn ssz_bytes_len(&self) -> usize { diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index 378e5aa3..0a4c88a0 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -4,20 +4,24 @@ use std::{ str::FromStr, time::{SystemTime, UNIX_EPOCH}, }; -use bytes::Bytes; -use axum::{ - extract::{FromRequest, Request}, - response::{IntoResponse, Response}, - http::HeaderValue -}; + use alloy::{ primitives::U256, rpc::types::beacon::{BlsPublicKey, BlsSignature}, }; +use axum::{ + extract::{FromRequest, Request}, + http::HeaderValue, + response::{IntoResponse, Response}, +}; use blst::min_pk::{PublicKey, Signature}; +use bytes::Bytes; use mediatype::{names, MediaType, MediaTypeList}; use rand::{distributions::Alphanumeric, Rng}; -use reqwest::{header::{HeaderMap, ACCEPT, CONTENT_TYPE}, StatusCode}; +use reqwest::{ + header::{HeaderMap, ACCEPT, CONTENT_TYPE}, + StatusCode, +}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use tracing::Level; @@ -292,7 +296,10 @@ pub fn get_accept_header(req_headers: &HeaderMap) -> Accept { /// Parse CONTENT TYPE header, default to JSON if missing or mal-formatted pub fn get_content_type_header(req_headers: &HeaderMap) -> ContentType { ContentType::from_str( - req_headers.get(CONTENT_TYPE).and_then(|value| value.to_str().ok()).unwrap_or("application/json"), + req_headers + .get(CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .unwrap_or("application/json"), ) .unwrap_or(ContentType::Json) } @@ -300,7 +307,7 @@ pub fn get_content_type_header(req_headers: &HeaderMap) -> ContentType { #[derive(Debug, Clone, Copy, PartialEq)] pub enum ContentType { Json, - Ssz + Ssz, } impl std::fmt::Display for ContentType { @@ -318,7 +325,7 @@ impl FromStr for ContentType { match value { "application/json" => Ok(ContentType::Json), "application/octet-stream" => Ok(ContentType::Ssz), - _ => Err(format!("unknown content type: {}", value)), + _ => Ok(ContentType::Json), } } } @@ -392,7 +399,6 @@ impl FromStr for Accept { } } - #[must_use] #[derive(Debug, Clone, Copy, Default)] pub struct JsonOrSsz(pub T); @@ -406,13 +412,9 @@ where async fn from_request(req: Request, _state: &S) -> Result { let headers = req.headers().clone(); - let content_type = headers - .get(CONTENT_TYPE) - .and_then(|value| value.to_str().ok()); + let content_type = headers.get(CONTENT_TYPE).and_then(|value| value.to_str().ok()); - let bytes = Bytes::from_request(req, _state) - .await - .map_err(IntoResponse::into_response)?; + let bytes = Bytes::from_request(req, _state).await.map_err(IntoResponse::into_response)?; if let Some(content_type) = content_type { if content_type.starts_with(&ContentType::Json.to_string()) { @@ -432,7 +434,6 @@ where } } - #[cfg(unix)] pub async fn wait_for_signal() -> eyre::Result<()> { use tokio::signal::unix::{signal, SignalKind}; diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 77b0395a..2e859aef 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -45,24 +45,29 @@ pub async fn handle_get_header>( BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc(); let response = match accept_header { Accept::Ssz => { - let mut res = { + let mut res = + { (StatusCode::OK, max_bid.data.as_ssz_bytes()).into_response() }; + let Ok(consensus_version_header) = + HeaderValue::from_str(&format!("{}", max_bid.version)) + else { info!("sending response as JSON"); - (StatusCode::OK, max_bid.data.as_ssz_bytes()).into_response() + return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()); }; - let Ok(consensus_version_header) = HeaderValue::from_str(&format!("{}", max_bid.version)) else { + let Ok(content_type_header) = + HeaderValue::from_str(&format!("{}", Accept::Ssz)) + else { info!("sending response as JSON"); - return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) + return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()); }; - let Ok(content_type_header) = HeaderValue::from_str(&format!("{}", Accept::Ssz)) else { - info!("sending response as JSON"); - return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) - }; - res.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + res.headers_mut() + .insert(CONSENSUS_VERSION_HEADER, consensus_version_header); res.headers_mut().insert(CONTENT_TYPE, content_type_header); info!("sending response as SSZ"); res - }, - Accept::Json | Accept::Any => (StatusCode::OK, axum::Json(max_bid)).into_response(), + } + Accept::Json | Accept::Any => { + (StatusCode::OK, axum::Json(max_bid)).into_response() + } }; Ok(response) } else { diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 8e7b7072..c40d098c 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -1,7 +1,15 @@ -use axum::{extract::State, http::{HeaderMap, HeaderValue}, response::IntoResponse, Json}; +use axum::{ + extract::State, + http::{HeaderMap, HeaderValue}, + response::IntoResponse, + Json, +}; use cb_common::{ pbs::{BuilderEvent, SignedBlindedBeaconBlock}, - utils::{get_content_type_header, get_user_agent, timestamp_of_slot_start_millis, utcnow_ms, JsonOrSsz, CONSENSUS_VERSION_HEADER}, + utils::{ + get_accept_header, get_user_agent, timestamp_of_slot_start_millis, utcnow_ms, ContentType, + JsonOrSsz, CONSENSUS_VERSION_HEADER, + }, }; use reqwest::{header::CONTENT_TYPE, StatusCode}; use ssz::Encode; @@ -32,8 +40,7 @@ pub async fn handle_submit_block>( let block_hash = signed_blinded_block.message.body.execution_payload_header.block_hash; let slot_start_ms = timestamp_of_slot_start_millis(slot, state.config.chain); let ua = get_user_agent(&req_headers); - let content_type_header = get_content_type_header(&req_headers); - + let accept_header = get_accept_header(&req_headers); info!(ua, ms_into_slot=now.saturating_sub(slot_start_ms), %block_hash); @@ -44,26 +51,32 @@ pub async fn handle_submit_block>( info!("received unblinded block"); BEACON_NODE_STATUS.with_label_values(&["200", SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG]).inc(); - let response = match content_type_header { - cb_common::utils::ContentType::Json => { + let response = match accept_header { + cb_common::utils::Accept::Json | cb_common::utils::Accept::Any => { info!("sending response as JSON"); (StatusCode::OK, Json(res)).into_response() - }, - cb_common::utils::ContentType::Ssz => { + } + cb_common::utils::Accept::Ssz => { let mut response = (StatusCode::OK, res.data.as_ssz_bytes()).into_response(); - let Ok(consensus_version_header) = HeaderValue::from_str(&format!("{}", res.version)) else { + let Ok(consensus_version_header) = + HeaderValue::from_str(&format!("{}", res.version)) + else { info!("sending response as JSON"); - return Ok((StatusCode::OK, axum::Json(res)).into_response()) + return Ok((StatusCode::OK, axum::Json(res)).into_response()); }; - let Ok(content_type_header) = HeaderValue::from_str(&content_type_header.to_string()) else { + let Ok(content_type_header) = + HeaderValue::from_str(&ContentType::Ssz.to_string()) + else { info!("sending response as JSON"); - return Ok((StatusCode::OK, axum::Json(res)).into_response()) + return Ok((StatusCode::OK, axum::Json(res)).into_response()); }; - response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + response + .headers_mut() + .insert(CONSENSUS_VERSION_HEADER, consensus_version_header); response.headers_mut().insert(CONTENT_TYPE, content_type_header); info!("sending response as SSZ"); response - }, + } }; Ok(response) diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 04b66330..17491294 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -23,3 +23,5 @@ tracing-subscriber.workspace = true tree_hash.workspace = true eyre.workspace = true + +ethereum_ssz.workspace = true diff --git a/tests/src/mock_relay.rs b/tests/src/mock_relay.rs index 672ca806..f338e220 100644 --- a/tests/src/mock_relay.rs +++ b/tests/src/mock_relay.rs @@ -9,22 +9,28 @@ use std::{ use alloy::{primitives::U256, rpc::types::beacon::relay::ValidatorRegistration}; use axum::{ extract::{Path, State}, - http::StatusCode, + http::{HeaderMap, HeaderValue, StatusCode}, response::{IntoResponse, Response}, routing::{get, post}, Json, Router, }; use cb_common::{ pbs::{ - GetHeaderParams, GetHeaderResponse, SubmitBlindedBlockResponse, BUILDER_API_PATH, - GET_HEADER_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, + GetHeaderParams, GetHeaderResponse, PayloadAndBlobs, SubmitBlindedBlockResponse, Version, + BUILDER_API_PATH, GET_HEADER_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, + SUBMIT_BLOCK_PATH, }, signature::sign_builder_root, signer::BlsSecretKey, types::Chain, - utils::{blst_pubkey_to_alloy, timestamp_of_slot_start_sec}, + utils::{ + blst_pubkey_to_alloy, get_content_type_header, timestamp_of_slot_start_sec, + CONSENSUS_VERSION_HEADER, + }, }; use cb_pbs::MAX_SIZE_SUBMIT_BLOCK; +use reqwest::header::CONTENT_TYPE; +use ssz::Encode; use tokio::net::TcpListener; use tracing::debug; use tree_hash::TreeHash; @@ -125,14 +131,27 @@ async fn handle_register_validator( StatusCode::OK } -async fn handle_submit_block(State(state): State>) -> impl IntoResponse { +async fn handle_submit_block( + State(state): State>, + headers: HeaderMap, +) -> impl IntoResponse { state.received_submit_block.fetch_add(1, Ordering::Relaxed); - - let response = if state.large_body { + let accept_header = get_content_type_header(&headers); + let data = if state.large_body { vec![1u8; 1 + MAX_SIZE_SUBMIT_BLOCK] } else { - serde_json::to_vec(&SubmitBlindedBlockResponse::default()).unwrap() + match accept_header { + cb_common::utils::ContentType::Json => { + serde_json::to_vec(&SubmitBlindedBlockResponse::default()).unwrap() + } + cb_common::utils::ContentType::Ssz => PayloadAndBlobs::default().as_ssz_bytes(), + } }; - (StatusCode::OK, Json(response)).into_response() + let mut response = (StatusCode::OK, data).into_response(); + let consensus_version_header = HeaderValue::from_str(&Version::Deneb.to_string()).unwrap(); + let content_type_header = HeaderValue::from_str(&accept_header.to_string()).unwrap(); + response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + response } diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index 9b7ff2c6..5ef01eb8 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -2,8 +2,18 @@ use alloy::{ primitives::B256, rpc::types::beacon::{relay::ValidatorRegistration, BlsPublicKey}, }; -use cb_common::pbs::{GetHeaderResponse, RelayClient, SignedBlindedBeaconBlock}; -use reqwest::Error; +use cb_common::{ + pbs::{ + GetHeaderResponse, RelayClient, SignedBlindedBeaconBlock, SignedExecutionPayloadHeader, + Version, + }, + utils::{get_content_type_header, Accept, ContentType, CONSENSUS_VERSION_HEADER}, +}; +use reqwest::{ + header::{ACCEPT, CONTENT_TYPE}, + Error, +}; +use ssz::{Decode, Encode}; use crate::utils::generate_mock_relay; @@ -16,13 +26,28 @@ impl MockValidator { Ok(Self { comm_boost: generate_mock_relay(port, BlsPublicKey::default())? }) } - pub async fn do_get_header(&self, pubkey: Option) -> Result<(), Error> { + pub async fn do_get_header( + &self, + pubkey: Option, + accept: Accept, + ) -> Result<(), Error> { let url = self .comm_boost .get_header_url(0, B256::ZERO, pubkey.unwrap_or(BlsPublicKey::ZERO)) .unwrap(); - let res = self.comm_boost.client.get(url).send().await?.bytes().await?; - assert!(serde_json::from_slice::(&res).is_ok()); + let res = + self.comm_boost.client.get(url).header(ACCEPT, &accept.to_string()).send().await?; + let content_type = get_content_type_header(res.headers()); + let res_bytes = res.bytes().await?; + + match content_type { + ContentType::Json => { + assert!(serde_json::from_slice::(&res_bytes).is_ok()) + } + ContentType::Ssz => { + assert!(SignedExecutionPayloadHeader::from_ssz_bytes(&res_bytes).is_ok()) + } + } Ok(()) } @@ -50,15 +75,27 @@ impl MockValidator { Ok(()) } - pub async fn do_submit_block(&self) -> Result<(), Error> { + pub async fn do_submit_block( + &self, + accept: Accept, + content_type: ContentType, + ) -> Result<(), Error> { let url = self.comm_boost.submit_block_url().unwrap(); let signed_blinded_block = SignedBlindedBeaconBlock::default(); + let body = match content_type { + ContentType::Json => serde_json::to_vec(&signed_blinded_block).unwrap(), + ContentType::Ssz => signed_blinded_block.as_ssz_bytes(), + }; + self.comm_boost .client .post(url) - .json(&signed_blinded_block) + .body(body) + .header(CONSENSUS_VERSION_HEADER, Version::Deneb.to_string()) + .header(CONTENT_TYPE, &content_type.to_string()) + .header(ACCEPT, &accept.to_string()) .send() .await? .error_for_status()?; diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs index 2e7f95e8..a230939b 100644 --- a/tests/tests/pbs_integration.rs +++ b/tests/tests/pbs_integration.rs @@ -12,7 +12,7 @@ use cb_common::{ pbs::RelayClient, signer::{random_secret, BlsPublicKey}, types::Chain, - utils::blst_pubkey_to_alloy, + utils::{blst_pubkey_to_alloy, Accept, ContentType}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ @@ -76,7 +76,36 @@ async fn test_get_header() -> Result<()> { let mock_validator = MockValidator::new(port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(None).await; + let res = mock_validator.do_get_header(None, Accept::Json).await; + + assert!(res.is_ok()); + assert_eq!(mock_state.received_get_header(), 1); + Ok(()) +} + +#[tokio::test] +async fn test_get_header_ssz() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); + + let chain = Chain::Holesky; + let port = 3000; + + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + let mock_relay = generate_mock_relay(port + 1, *pubkey)?; + tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); + + let config = to_pbs_config(chain, get_pbs_static_config(port), vec![mock_relay]); + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + // leave some time to start servers + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(port)?; + info!("Sending get header"); + let res = mock_validator.do_get_header(None, Accept::Ssz).await; assert!(res.is_ok()); assert_eq!(mock_state.received_get_header(), 1); @@ -207,9 +236,38 @@ async fn test_submit_block() -> Result<()> { let mock_validator = MockValidator::new(port)?; info!("Sending submit block"); - let res = mock_validator.do_submit_block().await; + let res = mock_validator.do_submit_block(Accept::Json, ContentType::Json).await; - assert!(res.is_err()); + assert!(!res.is_err()); + assert_eq!(mock_state.received_submit_block(), 1); + Ok(()) +} + +#[tokio::test] +async fn test_submit_block_ssz() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); + + let chain = Chain::Holesky; + let port = 3400; + + let relays = vec![generate_mock_relay(port + 1, *pubkey)?]; + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); + + let config = to_pbs_config(chain, get_pbs_static_config(port), relays); + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + // leave some time to start servers + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(port)?; + info!("Sending submit block"); + let res = mock_validator.do_submit_block(Accept::Ssz, ContentType::Ssz).await; + + assert!(!res.is_err()); assert_eq!(mock_state.received_submit_block(), 1); Ok(()) } @@ -236,7 +294,7 @@ async fn test_submit_block_too_large() -> Result<()> { let mock_validator = MockValidator::new(port)?; info!("Sending submit block"); - let res = mock_validator.do_submit_block().await; + let res = mock_validator.do_submit_block(Accept::Json, ContentType::Json).await; assert!(res.is_err()); assert_eq!(mock_state.received_submit_block(), 1); @@ -283,13 +341,13 @@ async fn test_mux() -> Result<()> { let mock_validator = MockValidator::new(port)?; info!("Sending get header with default"); - let res = mock_validator.do_get_header(None).await; + let res = mock_validator.do_get_header(None, Accept::Json).await; assert!(res.is_ok()); assert_eq!(mock_state.received_get_header(), 1); // only default relay was used info!("Sending get header with mux"); - let res = mock_validator.do_get_header(Some(validator_pubkey)).await; + let res = mock_validator.do_get_header(Some(validator_pubkey), Accept::Json).await; assert!(res.is_ok()); assert_eq!(mock_state.received_get_header(), 3); // two mux relays were used @@ -304,7 +362,7 @@ async fn test_mux() -> Result<()> { assert!(res.is_ok()); assert_eq!(mock_state.received_register_validator(), 3); // default + 2 mux relays were used - let res = mock_validator.do_submit_block().await; + let res = mock_validator.do_submit_block(Accept::Json, ContentType::Json).await; assert!(res.is_err()); assert_eq!(mock_state.received_submit_block(), 3); // default + 2 mux relays were used From 7005422f24ca90107b7b515830588c13747c752e Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 23 Feb 2025 11:13:33 -0800 Subject: [PATCH 3/3] fix tests --- tests/src/mock_relay.rs | 86 +++-- tests/tests/pbs_get_header.rs | 49 ++- tests/tests/pbs_integration.rs | 474 ------------------------- tests/tests/pbs_mux.rs | 10 +- tests/tests/pbs_post_blinded_blocks.rs | 44 ++- 5 files changed, 164 insertions(+), 499 deletions(-) delete mode 100644 tests/tests/pbs_integration.rs diff --git a/tests/src/mock_relay.rs b/tests/src/mock_relay.rs index abbf8700..8b14f93c 100644 --- a/tests/src/mock_relay.rs +++ b/tests/src/mock_relay.rs @@ -16,17 +16,17 @@ use axum::{ }; use cb_common::{ pbs::{ - ExecutionPayloadHeaderMessageDeneb, GetHeaderParams, GetHeaderResponse, + ExecutionPayloadHeaderMessageDeneb, ExecutionPayloadHeaderMessageElectra, GetHeaderParams, PayloadAndBlobsDeneb, PayloadAndBlobsElectra, SignedExecutionPayloadHeader, - SubmitBlindedBlockResponse, BUILDER_API_PATH, GET_HEADER_PATH, GET_STATUS_PATH, - REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, + SubmitBlindedBlockResponse, VersionedResponse, BUILDER_API_PATH, GET_HEADER_PATH, + GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, }, signature::sign_builder_root, signer::BlsSecretKey, types::Chain, utils::{ blst_pubkey_to_alloy, get_accept_header, get_consensus_version_header, - timestamp_of_slot_start_sec, CONSENSUS_VERSION_HEADER, + timestamp_of_slot_start_sec, Accept, ForkName, CONSENSUS_VERSION_HEADER, }, }; use cb_pbs::MAX_SIZE_SUBMIT_BLOCK; @@ -106,23 +106,67 @@ pub fn mock_relay_app_router(state: Arc) -> Router { async fn handle_get_header( State(state): State>, Path(GetHeaderParams { parent_hash, .. }): Path, + headers: HeaderMap, ) -> Response { state.received_get_header.fetch_add(1, Ordering::Relaxed); + let accept_header = get_accept_header(&headers); + let consensus_version_header = + get_consensus_version_header(&headers).unwrap_or(ForkName::Electra); + + let data = match consensus_version_header { + ForkName::Deneb => { + let mut response: SignedExecutionPayloadHeader = + SignedExecutionPayloadHeader::default(); + response.message.header.parent_hash = parent_hash; + response.message.header.block_hash.0[0] = 1; + response.message.value = U256::from(10); + response.message.pubkey = blst_pubkey_to_alloy(&state.signer.sk_to_pk()); + response.message.header.timestamp = timestamp_of_slot_start_sec(0, state.chain); + + let object_root = response.message.tree_hash_root().0; + response.signature = sign_builder_root(state.chain, &state.signer, object_root); + match accept_header { + Accept::Json | Accept::Any => { + let versioned_response: VersionedResponse< + SignedExecutionPayloadHeader, + SignedExecutionPayloadHeader, + > = VersionedResponse::Deneb(response); + serde_json::to_vec(&versioned_response).unwrap() + } + Accept::Ssz => response.as_ssz_bytes(), + } + } + ForkName::Electra => { + let mut response: SignedExecutionPayloadHeader = + SignedExecutionPayloadHeader::default(); + response.message.header.parent_hash = parent_hash; + response.message.header.block_hash.0[0] = 1; + response.message.value = U256::from(10); + response.message.pubkey = blst_pubkey_to_alloy(&state.signer.sk_to_pk()); + response.message.header.timestamp = timestamp_of_slot_start_sec(0, state.chain); + + let object_root = response.message.tree_hash_root().0; + response.signature = sign_builder_root(state.chain, &state.signer, object_root); + match accept_header { + Accept::Json | Accept::Any => { + let versioned_response: VersionedResponse< + SignedExecutionPayloadHeader, + SignedExecutionPayloadHeader, + > = VersionedResponse::Electra(response); + serde_json::to_vec(&versioned_response).unwrap() + } + Accept::Ssz => response.as_ssz_bytes(), + } + } + }; - let mut response: SignedExecutionPayloadHeader = - SignedExecutionPayloadHeader::default(); - - response.message.header.parent_hash = parent_hash; - response.message.header.block_hash.0[0] = 1; - response.message.value = U256::from(10); - response.message.pubkey = blst_pubkey_to_alloy(&state.signer.sk_to_pk()); - response.message.header.timestamp = timestamp_of_slot_start_sec(0, state.chain); - - let object_root = response.message.tree_hash_root().0; - response.signature = sign_builder_root(state.chain, &state.signer, object_root); - - let response = GetHeaderResponse::Deneb(response); - (StatusCode::OK, Json(response)).into_response() + let mut response = (StatusCode::OK, data).into_response(); + let consensus_version_header = + HeaderValue::from_str(&consensus_version_header.to_string()).unwrap(); + let content_type_header = HeaderValue::from_str(&accept_header.to_string()).unwrap(); + response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + response } async fn handle_get_status(State(state): State>) -> impl IntoResponse { @@ -145,8 +189,10 @@ async fn handle_submit_block( ) -> Response { state.received_submit_block.fetch_add(1, Ordering::Relaxed); let accept_header = get_accept_header(&headers); - let consensus_version_header = get_consensus_version_header(&headers).unwrap(); - let data = if state.large_body { + let consensus_version_header = + get_consensus_version_header(&headers).unwrap_or(ForkName::Electra); + + let data = if state.large_body() { vec![1u8; 1 + MAX_SIZE_SUBMIT_BLOCK] } else { match accept_header { diff --git a/tests/tests/pbs_get_header.rs b/tests/tests/pbs_get_header.rs index 519b2985..bc46cdf5 100644 --- a/tests/tests/pbs_get_header.rs +++ b/tests/tests/pbs_get_header.rs @@ -2,11 +2,11 @@ use std::{sync::Arc, time::Duration}; use alloy::primitives::{B256, U256}; use cb_common::{ - pbs::GetHeaderResponse, + pbs::{ExecutionPayloadHeaderMessageElectra, GetHeaderResponse, SignedExecutionPayloadHeader}, signature::sign_builder_root, signer::{random_secret, BlsPublicKey}, types::Chain, - utils::{blst_pubkey_to_alloy, timestamp_of_slot_start_sec, ForkName}, + utils::{blst_pubkey_to_alloy, timestamp_of_slot_start_sec, Accept, ForkName}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ @@ -16,6 +16,7 @@ use cb_tests::{ }; use eyre::Result; use reqwest::StatusCode; +use ssz::Decode; use tracing::info; use tree_hash::TreeHash; @@ -69,6 +70,50 @@ async fn test_get_header() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_get_header_ssz() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); + + let chain = Chain::Holesky; + let pbs_port = 3200; + let relay_port = pbs_port + 1; + + // Run a mock relay + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + let mock_relay = generate_mock_relay(relay_port, *pubkey)?; + tokio::spawn(start_mock_relay_service(mock_state.clone(), relay_port)); + + // Run the PBS service + let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), vec![mock_relay.clone()]); + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + // leave some time to start servers + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending get header"); + let res = mock_validator.do_get_header(None, Some(Accept::Ssz), ForkName::Electra).await?; + assert_eq!(res.status(), StatusCode::OK); + + let res: SignedExecutionPayloadHeader = + SignedExecutionPayloadHeader::from_ssz_bytes(&res.bytes().await?).unwrap(); + + assert_eq!(mock_state.received_get_header(), 1); + assert_eq!(res.message.header.block_hash.0[0], 1); + assert_eq!(res.message.header.parent_hash, B256::ZERO); + assert_eq!(res.message.value, U256::from(10)); + assert_eq!(res.message.pubkey, blst_pubkey_to_alloy(&mock_state.signer.sk_to_pk())); + assert_eq!(res.message.header.timestamp, timestamp_of_slot_start_sec(0, chain)); + assert_eq!( + res.signature, + sign_builder_root(chain, &mock_state.signer, res.message.tree_hash_root().0) + ); + Ok(()) +} + #[tokio::test] async fn test_get_header_returns_204_if_relay_down() -> Result<()> { setup_test_env(); diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs deleted file mode 100644 index 3556037a..00000000 --- a/tests/tests/pbs_integration.rs +++ /dev/null @@ -1,474 +0,0 @@ -use std::{ - collections::HashMap, - net::{Ipv4Addr, SocketAddr}, - sync::Arc, - time::Duration, - u64, -}; - -use alloy::{primitives::U256, rpc::types::beacon::relay::ValidatorRegistration}; -use cb_common::{ - config::{PbsConfig, PbsModuleConfig, RuntimeMuxConfig}, - pbs::{ - ExecutionPayloadHeaderMessageDeneb, ExecutionPayloadHeaderMessageElectra, RelayClient, - SignedExecutionPayloadHeader, VersionedResponse, - }, - signer::{random_secret, BlsPublicKey}, - types::Chain, - utils::{ - blst_pubkey_to_alloy, get_consensus_version_header, get_content_type_header, Accept, - ContentType, ForkName, - }, -}; -use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; -use cb_tests::{ - mock_relay::{start_mock_relay_service, MockRelayState}, - mock_validator::MockValidator, - utils::{generate_mock_relay, generate_mock_relay_with_batch_size, setup_test_env}, -}; -use eyre::Result; -use ssz::Decode; -use tracing::info; - -fn get_pbs_static_config(port: u16) -> PbsConfig { - PbsConfig { - host: Ipv4Addr::UNSPECIFIED, - port, - wait_all_registrations: true, - relay_check: true, - timeout_get_header_ms: u64::MAX, - timeout_get_payload_ms: u64::MAX, - timeout_register_validator_ms: u64::MAX, - skip_sigverify: false, - min_bid_wei: U256::ZERO, - late_in_slot_time_ms: u64::MAX, - extra_validation_enabled: false, - rpc_url: None, - } -} - -fn to_pbs_config(chain: Chain, pbs_config: PbsConfig, relays: Vec) -> PbsModuleConfig { - PbsModuleConfig { - chain, - endpoint: SocketAddr::new(pbs_config.host.into(), pbs_config.port), - pbs_config: Arc::new(pbs_config), - signer_client: None, - event_publisher: None, - all_relays: relays.clone(), - relays, - muxes: None, - } -} - -#[tokio::test] -async fn test_get_header() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - - let chain = Chain::Holesky; - let port = 3000; - - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - let mock_relay = generate_mock_relay(port + 1, *pubkey)?; - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - - let config = to_pbs_config(chain, get_pbs_static_config(port), vec![mock_relay]); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(port)?; - info!("Sending get header"); - let res = mock_validator.do_get_header(None, Some(Accept::Json), ForkName::Electra).await; - - assert!(res.is_ok()); - let response = res.unwrap(); - let content_type = get_content_type_header(&response.headers()); - let payload = response.bytes().await.unwrap(); - let _ = match content_type { - ContentType::Json => serde_json::from_slice::< - VersionedResponse< - SignedExecutionPayloadHeader, - SignedExecutionPayloadHeader, - >, - >(&payload), - ContentType::Ssz => panic!("Should be JSON"), - }; - assert_eq!(mock_state.received_get_header(), 1); - Ok(()) -} - -#[tokio::test] -async fn test_get_header_ssz() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - - let chain = Chain::Holesky; - let port = 3000; - - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - let mock_relay = generate_mock_relay(port + 1, *pubkey)?; - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - - let config = to_pbs_config(chain, get_pbs_static_config(port), vec![mock_relay]); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(port)?; - info!("Sending get header"); - let res = mock_validator.do_get_header(None, Some(Accept::Ssz), ForkName::Electra).await; - assert!(res.is_ok()); - let response = res.unwrap(); - let content_type = get_content_type_header(&response.headers()); - let consensus_version = get_consensus_version_header(&response.headers()).unwrap(); - let payload = response.bytes().await.unwrap(); - match content_type { - ContentType::Json => panic!("Should be SSZ"), - ContentType::Ssz => { - match consensus_version { - ForkName::Deneb => { - SignedExecutionPayloadHeader::::from_ssz_bytes(&payload).unwrap(); - } - ForkName::Electra => { - SignedExecutionPayloadHeader::::from_ssz_bytes(&payload).unwrap(); - } - }; - } - }; - assert_eq!(mock_state.received_get_header(), 1); - Ok(()) -} - -#[tokio::test] -async fn test_get_status() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - - let chain = Chain::Holesky; - let port = 3100; - - let relays = - vec![generate_mock_relay(port + 1, *pubkey)?, generate_mock_relay(port + 2, *pubkey)?]; - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 2)); - - let config = to_pbs_config(chain, get_pbs_static_config(port), relays); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(port)?; - info!("Sending get status"); - let res = mock_validator.do_get_status().await; - - assert!(res.is_ok()); - assert_eq!(mock_state.received_get_status(), 2); - Ok(()) -} - -#[tokio::test] -async fn test_register_validators() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - - let chain = Chain::Holesky; - let port = 3300; - - let relays = vec![generate_mock_relay(port + 1, *pubkey)?]; - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - - let config = to_pbs_config(chain, get_pbs_static_config(port), relays); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(port)?; - info!("Sending register validator"); - let res = mock_validator.do_register_validator().await; - - assert!(res.is_ok()); - assert_eq!(mock_state.received_register_validator(), 1); - Ok(()) -} - -#[tokio::test] -async fn test_batch_register_validators() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - - let chain = Chain::Holesky; - let port = 3310; - - let relays = vec![generate_mock_relay_with_batch_size(port + 1, *pubkey, 5)?]; - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - - let config = to_pbs_config(chain, get_pbs_static_config(port), relays); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let data = include_str!("../data/registration_holesky.json"); - let registrations: Vec = serde_json::from_str(data)?; - - let mock_validator = MockValidator::new(port)?; - info!("Sending register validator"); - let res = mock_validator.do_register_custom_validators(registrations.clone()).await; - - // registrations.len() == 17. 5 per batch, 4 batches - assert!(res.is_ok()); - assert_eq!(mock_state.received_register_validator(), 4); - - let mock_validator = MockValidator::new(port)?; - info!("Sending register validator"); - let res = mock_validator.do_register_custom_validators(registrations[..2].to_vec()).await; - - // Expected one more registration request - assert!(res.is_ok()); - assert_eq!(mock_state.received_register_validator(), 5); - - Ok(()) -} - -#[tokio::test] -async fn test_submit_block_deneb() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - - let chain = Chain::Holesky; - let port = 3400; - - let relays = vec![generate_mock_relay(port + 1, *pubkey)?]; - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - - let config = to_pbs_config(chain, get_pbs_static_config(port), relays); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(port)?; - info!("Sending submit block"); - let res = mock_validator - .do_submit_block(None, Accept::Json, ContentType::Json, ForkName::Deneb) - .await; - - assert!(!res.is_err()); - assert_eq!(mock_state.received_submit_block(), 1); - Ok(()) -} - -#[tokio::test] -async fn test_submit_block_electra() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - - let chain = Chain::Holesky; - let port = 3400; - - let relays = vec![generate_mock_relay(port + 1, *pubkey)?]; - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - - let config = to_pbs_config(chain, get_pbs_static_config(port), relays); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(port)?; - info!("Sending submit block"); - let res = mock_validator - .do_submit_block(None, Accept::Json, ContentType::Json, ForkName::Electra) - .await; - - assert!(!res.is_err()); - assert_eq!(mock_state.received_submit_block(), 1); - Ok(()) -} - -#[tokio::test] -async fn test_submit_block_ssz_deneb() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - - let chain = Chain::Holesky; - let port = 3400; - - let relays = vec![generate_mock_relay(port + 1, *pubkey)?]; - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - - let config = to_pbs_config(chain, get_pbs_static_config(port), relays); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(port)?; - info!("Sending submit block"); - let res = - mock_validator.do_submit_block(None, Accept::Ssz, ContentType::Ssz, ForkName::Deneb).await; - - assert!(!res.is_err()); - assert_eq!(mock_state.received_submit_block(), 1); - Ok(()) -} - -#[tokio::test] -async fn test_submit_block_ssz_electra() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - - let chain = Chain::Holesky; - let port = 3400; - - let relays = vec![generate_mock_relay(port + 1, *pubkey)?]; - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - - let config = to_pbs_config(chain, get_pbs_static_config(port), relays); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(port)?; - info!("Sending submit block"); - let res = mock_validator - .do_submit_block(None, Accept::Ssz, ContentType::Ssz, ForkName::Electra) - .await; - - assert!(!res.is_err()); - assert_eq!(mock_state.received_submit_block(), 1); - Ok(()) -} - -#[tokio::test] -async fn test_submit_block_too_large() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - - let chain = Chain::Holesky; - let port = 3500; - - let relays = vec![generate_mock_relay(port + 1, *pubkey)?]; - let mock_state = Arc::new(MockRelayState::new(chain, signer).with_large_body()); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - - let config = to_pbs_config(chain, get_pbs_static_config(port), relays); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(port)?; - info!("Sending submit block"); - let res = mock_validator - .do_submit_block(None, Accept::Json, ContentType::Json, ForkName::Electra) - .await; - - assert!(res.is_err()); - assert_eq!(mock_state.received_submit_block(), 1); - Ok(()) -} - -#[tokio::test] -async fn test_mux() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - - let chain = Chain::Holesky; - let port = 3600; - - let mux_relay_1 = generate_mock_relay(port + 1, *pubkey)?; - let mux_relay_2 = generate_mock_relay(port + 2, *pubkey)?; - let default_relay = generate_mock_relay(port + 3, *pubkey)?; - - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 2)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 3)); - - let relays = vec![default_relay.clone()]; - let mut config = to_pbs_config(chain, get_pbs_static_config(port), relays); - config.all_relays = vec![mux_relay_1.clone(), mux_relay_2.clone(), default_relay.clone()]; - - let mux = RuntimeMuxConfig { - id: String::from("test"), - config: config.pbs_config.clone(), - relays: vec![mux_relay_1, mux_relay_2], - }; - - let validator_pubkey = blst_pubkey_to_alloy(&random_secret().sk_to_pk()); - - config.muxes = Some(HashMap::from([(validator_pubkey, mux)])); - - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(port)?; - info!("Sending get header with default"); - let res = mock_validator.do_get_header(None, Some(Accept::Json), ForkName::Electra).await; - - assert!(res.is_ok()); - assert_eq!(mock_state.received_get_header(), 1); // only default relay was used - - info!("Sending get header with mux"); - let res = mock_validator.do_get_header(Some(validator_pubkey), Some(Accept::Json), ForkName::Electra).await; - - assert!(res.is_ok()); - assert_eq!(mock_state.received_get_header(), 3); // two mux relays were used - - let res = mock_validator.do_get_status().await; - - assert!(res.is_ok()); - assert_eq!(mock_state.received_get_status(), 3); // default + 2 mux relays were used - - let res = mock_validator.do_register_validator().await; - - assert!(res.is_ok()); - assert_eq!(mock_state.received_register_validator(), 3); // default + 2 mux relays were used - - let res = mock_validator - .do_submit_block(None, Accept::Json, ContentType::Json, ForkName::Electra) - .await; - - assert!(res.is_err()); - assert_eq!(mock_state.received_submit_block(), 3); // default + 2 mux relays were used - - Ok(()) -} diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 8dd6e75d..ee0bc7f0 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -61,13 +61,19 @@ async fn test_mux() -> Result<()> { // Send default request without specifying a validator key let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header with default"); - assert_eq!(mock_validator.do_get_header(None, None, ForkName::Electra).await?.status(), StatusCode::OK); + assert_eq!( + mock_validator.do_get_header(None, None, ForkName::Electra).await?.status(), + StatusCode::OK + ); assert_eq!(mock_state.received_get_header(), 1); // only default relay was used // Send request specifying a validator key to use mux info!("Sending get header with mux"); assert_eq!( - mock_validator.do_get_header(Some(validator_pubkey), None, ForkName::Electra).await?.status(), + mock_validator + .do_get_header(Some(validator_pubkey), None, ForkName::Electra) + .await? + .status(), StatusCode::OK ); assert_eq!(mock_state.received_get_header(), 3); // two mux relays were used diff --git a/tests/tests/pbs_post_blinded_blocks.rs b/tests/tests/pbs_post_blinded_blocks.rs index c7a332c6..a468ca1e 100644 --- a/tests/tests/pbs_post_blinded_blocks.rs +++ b/tests/tests/pbs_post_blinded_blocks.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use cb_common::{ - pbs::{SignedBlindedBeaconBlock, SubmitBlindedBlockResponse}, + pbs::{PayloadAndBlobsElectra, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse}, signer::{random_secret, BlsPublicKey}, types::Chain, utils::{blst_pubkey_to_alloy, Accept, ContentType, ForkName}, @@ -14,6 +14,7 @@ use cb_tests::{ }; use eyre::Result; use reqwest::StatusCode; +use ssz::Decode; use tracing::info; #[tokio::test] @@ -57,6 +58,47 @@ async fn test_submit_block() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_submit_block_ssz() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); + + let chain = Chain::Holesky; + let pbs_port = 3800; + + // Run a mock relay + let relays = vec![generate_mock_relay(pbs_port + 1, *pubkey)?]; + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1)); + + // Run the PBS service + let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), relays); + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + // leave some time to start servers + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending submit block"); + let res = mock_validator + .do_submit_block( + Some(SignedBlindedBeaconBlock::default()), + Accept::Ssz, + ContentType::Ssz, + ForkName::Electra, + ) + .await?; + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(mock_state.received_submit_block(), 1); + + let response_body = PayloadAndBlobsElectra::from_ssz_bytes(&res.bytes().await?).unwrap(); + assert_eq!(response_body.block_hash(), SubmitBlindedBlockResponse::default().block_hash()); + Ok(()) +} + #[tokio::test] async fn test_submit_block_too_large() -> Result<()> { setup_test_env();