Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions e2e/helpers/src/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,19 @@ impl CommitterProcess {
Ok(response)
}

pub async fn latest_costs(&self) -> anyhow::Result<Vec<BundleCost>> {
let response = reqwest::get(format!(
"http://localhost:{}/v1/costs?variant=latest",
self.port,
))
.await?
.error_for_status()?
.json()
.await?;

Ok(response)
}

pub fn port(&self) -> u16 {
self.port
}
Expand Down
66 changes: 66 additions & 0 deletions e2e/tests/tests/eigen_costs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use anyhow::Result;
use e2e_helpers::whole_stack::{
create_and_fund_kms_signers, deploy_contract, start_db, start_eigen_committer, start_eth,
start_fuel_node, start_kms,
};
use k256::ecdsa::SigningKey as K256SigningKey;
use std::time::Duration;
use tracing::info;

#[tokio::test]
async fn test_eigen_costs() -> Result<()> {
// Start required services
let logs = true;
let kms = start_kms(logs).await?;
let eth_node = start_eth(logs).await?;
let eth_signers = create_and_fund_kms_signers(&kms, &eth_node).await?;

// Get Eigen key from environment and inject into KMS
let eigen_key_hex = std::env::var("EIGEN_KEY")
.expect("EIGEN_KEY environment variable must be set for Eigen tests");

// Convert hex string to bytes
let key_bytes: [u8; 32] = hex::decode(&eigen_key_hex)
.expect("Failed to decode EIGEN_KEY hex")
.try_into()
.expect("EIGEN_KEY must be 32 bytes");

// Create signing key and inject into KMS
let secret_key = k256::elliptic_curve::SecretKey::from_slice(&key_bytes)?;
let k256_signing_key = K256SigningKey::from(&secret_key);
let kms_key_id = kms.inject_secp256k1_key(&k256_signing_key).await?;

// Deploy contract and start services
let request_timeout = Duration::from_secs(50);
let max_fee = 1_000_000_000_000;
let (_contract_args, deployed_contract) =
deploy_contract(&eth_node, eth_signers.clone(), max_fee, request_timeout).await?;
let db = start_db().await?;
let fuel_node = start_fuel_node(logs, Some(Duration::from_millis(200))).await?;

// Start Eigen committer with KMS key
let logs = true;
let committer = start_eigen_committer(
logs,
db.clone(),
&eth_node,
fuel_node.url(),
&deployed_contract,
eth_signers.main,
kms_key_id, // Use the KMS key ID instead of raw EIGEN_KEY
"1 KB",
)
.await?;

// Wait for some blocks to be processed
tokio::time::sleep(Duration::from_secs(100)).await;

// Check if committer has updated the costs
let costs = committer.latest_costs().await?;

info!("received {costs:?}");

assert!(costs.iter().all(|cost| cost.size > 0));

Ok(())
}
8 changes: 8 additions & 0 deletions packages/adapters/storage/migrations/0010_eigen_indexes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
BEGIN;

CREATE INDEX idx_eigen_submission_fragments_id_desc ON eigen_submission_fragments (id DESC);
CREATE INDEX idx_eigen_submission_status ON eigen_submission (status);
CREATE INDEX idx_l1_fragments_bundle_id ON l1_fragments (bundle_id);
CREATE INDEX idx_eigen_submission_status_id ON eigen_submission (status, id);

COMMIT;
4 changes: 4 additions & 0 deletions packages/adapters/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ impl services::state_listener::port::Storage for Postgres {
.await
.map_err(Into::into)
}

async fn update_eigen_costs(&self) -> services::Result<()> {
self._update_eigen_costs().await.map_err(Into::into)
}
}

impl services::cost_reporter::port::Storage for Postgres {
Expand Down
35 changes: 35 additions & 0 deletions packages/adapters/storage/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1468,6 +1468,41 @@ impl Postgres {
Ok(())
}

pub(crate) async fn _update_eigen_costs(&self) -> Result<()> {
let mut tx = self.connection_pool.begin().await?;

sqlx::query!(
r#"
INSERT INTO bundle_cost (bundle_id, da_block_height, cost, size, is_finalized)
SELECT DISTINCT
b.id as bundle_id,
0 as da_block_height,
0 as cost,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i feel like we need some more metadata i.e over how much time the bundle was posted, and then compute it with the fixed cost we have with eigen that is in bytes/sec

COALESCE(SUM(lf.total_bytes), 0) as size,
true as is_finalized
FROM eigen_submission_fragments esf
JOIN eigen_submission es ON esf.submission_id = es.id
JOIN l1_fragments lf ON esf.fragment_id = lf.id
JOIN bundles b ON lf.bundle_id = b.id
LEFT JOIN bundle_cost bc ON b.id = bc.bundle_id
WHERE es.status = $1
AND bc.bundle_id IS NULL -- Don't insert duplicates
AND esf.id IN (
SELECT id FROM eigen_submission_fragments
ORDER BY id DESC
)
GROUP BY b.id;
"#,
i16::from(eigen_tables::SubmissionStatus::Finalized),
)
.execute(&mut *tx)
.await?;

tx.commit().await?;

Ok(())
}

pub(crate) async fn _get_latest_block_height(&self) -> Result<u32> {
let result = sqlx::query!(r#"SELECT MAX(height) as max_height FROM fuel_blocks"#)
.fetch_one(&self.connection_pool)
Expand Down
4 changes: 4 additions & 0 deletions packages/adapters/storage/src/test_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ impl services::state_listener::port::Storage for DbWithProcess {
) -> services::Result<()> {
unimplemented!();
}

async fn update_eigen_costs(&self) -> services::Result<()> {
unimplemented!()
}
}

impl block_importer::port::Storage for DbWithProcess {
Expand Down
2 changes: 2 additions & 0 deletions packages/services/src/state_listener/eigen_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ where

self.storage.update_eigen_submissions(changes).await?;

self.storage.update_eigen_costs().await?;

Ok(())
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/services/src/state_listener/port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub trait Storage: Sync {
request_id: &EigenDARequestId,
) -> Result<Option<DateTime<Utc>>>;
async fn update_eigen_submissions(&self, changes: Vec<(u32, DispersalStatus)>) -> Result<()>;
async fn update_eigen_costs(&self) -> Result<()>;
}

pub trait Clock {
Expand Down
Loading