Skip to content

Commit 4ba9da8

Browse files
rymncxgreenx
andauthored
feat(gas_price_service): update block committer da source with established contract (#2265)
> [!NOTE] > This is PR 7/7 for #2139 ## Linked Issues/PRs <!-- List of related issues/PRs --> - #2139 ## Description <!-- List of detailed changes --> Updates the `BlockCommitterApi` to implement the contract established with us and the block committer team. There is some business logic that is slightly unclear, need @MitchTurner for insights. ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? --------- Co-authored-by: Green Baneling <[email protected]>
1 parent 538b2a4 commit 4ba9da8

File tree

3 files changed

+291
-29
lines changed

3 files changed

+291
-29
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1111
- [2131](https://github.com/FuelLabs/fuel-core/pull/2131): Add flow in TxPool in order to ask to newly connected peers to share their transaction pool
1212
- [2182](https://github.com/FuelLabs/fuel-core/pull/2151): Limit number of transactions that can be fetched via TxSource::next
1313
- [2189](https://github.com/FuelLabs/fuel-core/pull/2151): Select next DA height to never include more than u16::MAX -1 transactions from L1.
14-
14+
- [2265](https://github.com/FuelLabs/fuel-core/pull/2265): Integrate Block Committer API for DA Block Costs.
1515

1616
### Changed
1717

crates/services/gas_price_service/src/v1/da_source_adapter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub const POLLING_INTERVAL_MS: u64 = 10_000;
2424

2525
#[derive(Debug, Default, Clone, Eq, Hash, PartialEq)]
2626
pub struct DaBlockCosts {
27-
pub l2_block_range: core::ops::Range<u32>,
27+
pub l2_block_range: core::ops::Range<u64>,
2828
pub blob_size_bytes: u32,
2929
pub blob_cost_wei: u128,
3030
}
Lines changed: 289 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![allow(clippy::arithmetic_side_effects)]
2+
13
use crate::v1::da_source_adapter::{
24
service::{
35
DaBlockCostsSource,
@@ -6,57 +8,317 @@ use crate::v1::da_source_adapter::{
68
DaBlockCosts,
79
};
810
use anyhow::anyhow;
9-
use reqwest::Url;
11+
use fuel_core_types::blockchain::primitives::DaBlockHeight;
1012
use serde::{
1113
Deserialize,
1214
Serialize,
1315
};
1416

15-
/// This struct is used to denote the block committer da gas price source,,
17+
#[async_trait::async_trait]
18+
trait BlockCommitterApi: Send + Sync {
19+
/// Used on first run to get the latest costs and seqno
20+
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>>;
21+
/// Used to get the costs for a specific seqno
22+
async fn get_costs_by_seqno(
23+
&self,
24+
number: u64,
25+
) -> DaBlockCostsResult<Option<RawDaBlockCosts>>;
26+
/// Used to get the costs for a range of blocks (inclusive)
27+
async fn get_cost_bundles_by_range(
28+
&self,
29+
range: core::ops::Range<u64>,
30+
) -> DaBlockCostsResult<Vec<Option<RawDaBlockCosts>>>;
31+
}
32+
33+
/// This struct is used to denote the block committer da block costs source
1634
/// which receives data from the block committer (only http api for now)
17-
pub struct BlockCommitterDaBlockCosts {
18-
client: reqwest::Client,
19-
url: Url,
35+
pub struct BlockCommitterDaBlockCosts<BlockCommitter> {
36+
client: BlockCommitter,
37+
last_raw_da_block_costs: Option<RawDaBlockCosts>,
2038
}
2139

2240
#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq)]
23-
struct RawDaBlockCosts {
24-
pub l2_block_range: core::ops::Range<u32>,
25-
pub blob_size_bytes: u32,
26-
pub blob_cost: u128,
41+
pub struct RawDaBlockCosts {
42+
/// Sequence number (Monotonically increasing nonce)
43+
pub sequence_number: u64,
44+
/// The range of blocks that the costs apply to
45+
pub blocks_range: core::ops::Range<u64>,
46+
/// The DA block height of the last transaction for the range of blocks
47+
pub da_block_height: DaBlockHeight,
48+
/// Rolling sum cost of posting blobs (wei)
49+
pub total_cost: u128,
50+
/// Rolling sum size of blobs (bytes)
51+
pub total_size_bytes: u32,
2752
}
2853

29-
impl From<RawDaBlockCosts> for DaBlockCosts {
30-
fn from(raw: RawDaBlockCosts) -> Self {
54+
impl From<&RawDaBlockCosts> for DaBlockCosts {
55+
fn from(raw_da_block_costs: &RawDaBlockCosts) -> Self {
3156
DaBlockCosts {
32-
l2_block_range: raw.l2_block_range,
33-
blob_size_bytes: raw.blob_size_bytes,
34-
blob_cost_wei: raw.blob_cost,
57+
l2_block_range: raw_da_block_costs.blocks_range.clone(),
58+
blob_size_bytes: raw_da_block_costs.total_size_bytes,
59+
blob_cost_wei: raw_da_block_costs.total_cost,
3560
}
3661
}
3762
}
3863

39-
impl BlockCommitterDaBlockCosts {
40-
/// Create a new instance of the block committer da gas price source
41-
pub fn new(url: String) -> Self {
64+
impl<BlockCommitter> BlockCommitterDaBlockCosts<BlockCommitter> {
65+
/// Create a new instance of the block committer da block costs source
66+
pub fn new(client: BlockCommitter, last_value: Option<RawDaBlockCosts>) -> Self {
4267
Self {
43-
client: reqwest::Client::new(),
44-
url: Url::parse(&url).unwrap(),
68+
client,
69+
last_raw_da_block_costs: last_value,
4570
}
4671
}
4772
}
4873

4974
#[async_trait::async_trait]
50-
impl DaBlockCostsSource for BlockCommitterDaBlockCosts {
75+
impl<BlockCommitter> DaBlockCostsSource for BlockCommitterDaBlockCosts<BlockCommitter>
76+
where
77+
BlockCommitter: BlockCommitterApi,
78+
{
5179
async fn request_da_block_cost(&mut self) -> DaBlockCostsResult<DaBlockCosts> {
52-
let response = self.client.get(self.url.clone()).send().await?;
53-
if !response.status().is_success() {
54-
return Err(anyhow!("failed with response: {}", response.status()));
80+
let raw_da_block_costs = match self.last_raw_da_block_costs {
81+
Some(ref last_value) => self
82+
.client
83+
.get_costs_by_seqno(last_value.sequence_number + 1),
84+
_ => self.client.get_latest_costs(),
5585
}
56-
let response = response
86+
.await?;
87+
88+
let Some(ref raw_da_block_costs) = raw_da_block_costs else {
89+
return Err(anyhow!("No response from block committer"))
90+
};
91+
92+
let da_block_costs = self.last_raw_da_block_costs.iter().fold(
93+
Ok(raw_da_block_costs.into()),
94+
|costs: DaBlockCostsResult<DaBlockCosts>, last_value| {
95+
let costs = costs.expect("Defined to be OK");
96+
let blob_size_bytes = costs
97+
.blob_size_bytes
98+
.checked_sub(last_value.total_size_bytes)
99+
.ok_or(anyhow!("Blob size bytes underflow"))?;
100+
let blob_cost_wei = raw_da_block_costs
101+
.total_cost
102+
.checked_sub(last_value.total_cost)
103+
.ok_or(anyhow!("Blob cost wei underflow"))?;
104+
Ok(DaBlockCosts {
105+
blob_size_bytes,
106+
blob_cost_wei,
107+
..costs
108+
})
109+
},
110+
)?;
111+
112+
self.last_raw_da_block_costs = Some(raw_da_block_costs.clone());
113+
Ok(da_block_costs)
114+
}
115+
}
116+
117+
pub struct BlockCommitterHttpApi {
118+
client: reqwest::Client,
119+
url: String,
120+
}
121+
122+
impl BlockCommitterHttpApi {
123+
pub fn new(url: String) -> Self {
124+
Self {
125+
client: reqwest::Client::new(),
126+
url,
127+
}
128+
}
129+
}
130+
131+
#[async_trait::async_trait]
132+
impl BlockCommitterApi for BlockCommitterHttpApi {
133+
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
134+
let response = self
135+
.client
136+
.get(&self.url)
137+
.send()
138+
.await?
139+
.json::<RawDaBlockCosts>()
140+
.await?;
141+
Ok(Some(response))
142+
}
143+
144+
async fn get_costs_by_seqno(
145+
&self,
146+
number: u64,
147+
) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
148+
let response = self
149+
.client
150+
.get(&format!("{}/{}", self.url, number))
151+
.send()
152+
.await?
57153
.json::<RawDaBlockCosts>()
58-
.await
59-
.map_err(|err| anyhow!(err))?;
60-
Ok(response.into())
154+
.await?;
155+
Ok(Some(response))
156+
}
157+
158+
async fn get_cost_bundles_by_range(
159+
&self,
160+
range: core::ops::Range<u64>,
161+
) -> DaBlockCostsResult<Vec<Option<RawDaBlockCosts>>> {
162+
let response = self
163+
.client
164+
.get(&format!("{}/{}-{}", self.url, range.start, range.end))
165+
.send()
166+
.await?
167+
.json::<Vec<RawDaBlockCosts>>()
168+
.await?;
169+
Ok(response.into_iter().map(Some).collect())
170+
}
171+
}
172+
173+
#[cfg(test)]
174+
#[allow(non_snake_case)]
175+
mod tests {
176+
use super::*;
177+
178+
struct MockBlockCommitterApi {
179+
value: Option<RawDaBlockCosts>,
180+
}
181+
182+
impl MockBlockCommitterApi {
183+
fn new(value: Option<RawDaBlockCosts>) -> Self {
184+
Self { value }
185+
}
186+
}
187+
188+
#[async_trait::async_trait]
189+
impl BlockCommitterApi for MockBlockCommitterApi {
190+
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
191+
Ok(self.value.clone())
192+
}
193+
async fn get_costs_by_seqno(
194+
&self,
195+
seq_no: u64,
196+
) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
197+
// arbitrary logic to generate a new value
198+
let mut value = self.value.clone();
199+
if let Some(value) = &mut value {
200+
value.sequence_number = seq_no;
201+
value.blocks_range =
202+
value.blocks_range.end * seq_no..value.blocks_range.end * seq_no + 10;
203+
value.da_block_height = value.da_block_height + (seq_no + 1).into();
204+
value.total_cost += 1;
205+
value.total_size_bytes += 1;
206+
}
207+
Ok(value)
208+
}
209+
async fn get_cost_bundles_by_range(
210+
&self,
211+
_: core::ops::Range<u64>,
212+
) -> DaBlockCostsResult<Vec<Option<RawDaBlockCosts>>> {
213+
Ok(vec![self.value.clone()])
214+
}
215+
}
216+
217+
fn test_da_block_costs() -> RawDaBlockCosts {
218+
RawDaBlockCosts {
219+
sequence_number: 1,
220+
blocks_range: 0..10,
221+
da_block_height: 1u64.into(),
222+
total_cost: 1,
223+
total_size_bytes: 1,
224+
}
225+
}
226+
227+
#[tokio::test]
228+
async fn request_da_block_cost__when_last_value_is_none__then_get_latest_costs_is_called(
229+
) {
230+
// given
231+
let da_block_costs = test_da_block_costs();
232+
let expected = (&da_block_costs).into();
233+
let mock_api = MockBlockCommitterApi::new(Some(da_block_costs));
234+
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api, None);
235+
236+
// when
237+
let actual = block_committer.request_da_block_cost().await.unwrap();
238+
239+
// then
240+
assert_eq!(actual, expected);
241+
}
242+
243+
#[tokio::test]
244+
async fn request_da_block_cost__when_last_value_is_some__then_get_costs_by_seqno_is_called(
245+
) {
246+
// given
247+
let mut da_block_costs = test_da_block_costs();
248+
let mock_api = MockBlockCommitterApi::new(Some(da_block_costs.clone()));
249+
let mut block_committer =
250+
BlockCommitterDaBlockCosts::new(mock_api, Some(da_block_costs.clone()));
251+
252+
// when
253+
let actual = block_committer.request_da_block_cost().await.unwrap();
254+
255+
// then
256+
assert_ne!(da_block_costs.blocks_range, actual.l2_block_range);
257+
}
258+
259+
#[tokio::test]
260+
async fn request_da_block_cost__when_response_is_none__then_error() {
261+
// given
262+
let mock_api = MockBlockCommitterApi::new(None);
263+
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api, None);
264+
265+
// when
266+
let result = block_committer.request_da_block_cost().await;
267+
268+
// then
269+
assert!(result.is_err());
270+
}
271+
272+
struct UnderflowingMockBlockCommitterApi {
273+
value: Option<RawDaBlockCosts>,
274+
}
275+
276+
impl UnderflowingMockBlockCommitterApi {
277+
fn new(value: Option<RawDaBlockCosts>) -> Self {
278+
Self { value }
279+
}
280+
}
281+
282+
#[async_trait::async_trait]
283+
impl BlockCommitterApi for UnderflowingMockBlockCommitterApi {
284+
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
285+
Ok(self.value.clone())
286+
}
287+
async fn get_costs_by_seqno(
288+
&self,
289+
seq_no: u64,
290+
) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
291+
// arbitrary logic to generate a new value
292+
let mut value = self.value.clone();
293+
if let Some(value) = &mut value {
294+
value.sequence_number = seq_no;
295+
value.blocks_range = value.blocks_range.end..value.blocks_range.end + 10;
296+
value.da_block_height = value.da_block_height + 1u64.into();
297+
value.total_cost -= 1;
298+
value.total_size_bytes -= 1;
299+
}
300+
Ok(value)
301+
}
302+
async fn get_cost_bundles_by_range(
303+
&self,
304+
_: core::ops::Range<u64>,
305+
) -> DaBlockCostsResult<Vec<Option<RawDaBlockCosts>>> {
306+
Ok(vec![self.value.clone()])
307+
}
308+
}
309+
310+
#[tokio::test]
311+
async fn request_da_block_cost__when_underflow__then_error() {
312+
// given
313+
let da_block_costs = test_da_block_costs();
314+
let mock_api = UnderflowingMockBlockCommitterApi::new(Some(da_block_costs));
315+
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api, None);
316+
let _ = block_committer.request_da_block_cost().await.unwrap();
317+
318+
// when
319+
let result = block_committer.request_da_block_cost().await;
320+
321+
// then
322+
assert!(result.is_err());
61323
}
62324
}

0 commit comments

Comments
 (0)