Skip to content
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
cc4e8c2
Introduce separate module for GraphQL extensions
Feb 13, 2025
3b48d47
Revert changes to test structure
Feb 13, 2025
9bd97b7
Add scaffolding for the `current_stf_version` extension
Feb 13, 2025
ac861c4
Add possible way to go with the SFT update notification
Feb 13, 2025
c022ba7
Add `current_consensus_parameters_version` GraphQL extension
Feb 13, 2025
56d4c57
Elaborate on the possible ways to go for STF
Feb 13, 2025
51237b3
Use `latest_consensus_parameters_version()` instead of `latest_consen…
Feb 13, 2025
335b243
Add `current_stf_version` GraphQL extension
Feb 13, 2025
946e315
Update comment
Feb 13, 2025
2616cd7
Update changelog
Feb 13, 2025
da57953
Merge remote-tracking branch 'upstream/master' into rafal/2596_additi…
Feb 14, 2025
2772ba7
Add test checking that proper consensus parameters version is returne…
Feb 14, 2025
3359d61
Add test checking that proper stf version is returned via GraphQL ext…
Feb 14, 2025
d51222f
Clean-up
Feb 14, 2025
5c24c74
Merge remote-tracking branch 'origin/master' into rafal/2596_addition…
Feb 17, 2025
bd610e7
Merge STF and CP extensions into one
Feb 17, 2025
3f3e971
Provide current block height via 'chain state info' extension
Feb 17, 2025
c3c0148
Typos
Feb 17, 2025
ef9c445
Access latest STF and CP version via `FuelClient`
Feb 17, 2025
b861a51
Cache STF version in consensus parameters provider
Feb 18, 2025
0b27037
Move all GraphQL extensions into a single place
Feb 18, 2025
946fc41
Worker service now uses STF provider, not ConsensusParameters provider
Feb 18, 2025
accfae0
Unify `current` vs `latest` naming in the code around grapqhql extens…
Feb 18, 2025
4cd21b4
Merge remote-tracking branch 'origin/master' into rafal/2596_addition…
Feb 20, 2025
bbecdb8
Update changelog according to the new changelog management system
Feb 20, 2025
e0f21bf
Merge remote-tracking branch 'origin/master' into rafal/2596_addition…
Feb 21, 2025
6d1ec52
Move const into the test code
Feb 21, 2025
3f32362
Remove unnecessary comment
Feb 21, 2025
d8ea573
`ConsensusParametersProvider` uses `RwLock` instead of `Mutex`
Feb 21, 2025
69b0da5
Match all variants explicitly
Feb 21, 2025
3aa2d2d
Chain state info is updated in `decode_response` to also be included …
Feb 21, 2025
e03baac
Fix formatting
Feb 21, 2025
932a1c5
Do not add ConsensusParametersProvider to worker service
Feb 21, 2025
ed1dd26
Clean-up
Feb 21, 2025
bfc5f58
Clean-up
Feb 21, 2025
03f9645
Merge remote-tracking branch 'origin/master' into rafal/2596_addition…
Feb 21, 2025
b44cac9
Applied comments from the PR
xgreenx Feb 22, 2025
9c963d5
Merge branch 'master' into rafal/2596_additional_graphql_extensions
rafal-ch Feb 24, 2025
2d682fc
Merge branch 'master' into rafal/2596_additional_graphql_extensions
rafal-ch Feb 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/changed/2715.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Each GraphQL response contains `current_consensus_parameters_version` and `current_stf_version` in the `extensions` section.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ codegen-units = 1
lto = "fat"
# The difference in performance for "fat" and "thin" is small,
# but "thin" LTO is much faster to compile.
# If you play with benchamrks or flamegraph, it is better to use "thin"
# To speedup iterations between compialtion.
# If you play with benchmarks or flamegraphs, it is better to use "thin"
# To speedup iterations between compilation.
#lto = "thin"
panic = "unwind"

Expand Down
148 changes: 107 additions & 41 deletions crates/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ use cynic::{
QueryBuilder,
};
use fuel_core_types::{
blockchain::header::{
ConsensusParametersVersion,
StateTransitionBytecodeVersion,
},
fuel_asm::{
Instruction,
Word,
Expand Down Expand Up @@ -210,13 +214,36 @@ impl Clone for ConsistencyPolicy {
}
}

#[derive(Debug, Default)]
struct ChainStateInfo {
current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
}

impl Clone for ChainStateInfo {
fn clone(&self) -> Self {
Self {
current_stf_version: Arc::new(Mutex::new(
self.current_stf_version.lock().ok().and_then(|v| *v),
)),
current_consensus_parameters_version: Arc::new(Mutex::new(
self.current_consensus_parameters_version
.lock()
.ok()
.and_then(|v| *v),
)),
}
}
}

#[derive(Debug, Clone)]
pub struct FuelClient {
client: reqwest::Client,
#[cfg(feature = "subscriptions")]
cookie: std::sync::Arc<reqwest::cookie::Jar>,
url: reqwest::Url,
require_height: ConsistencyPolicy,
chain_state_info: ChainStateInfo,
}

impl FromStr for FuelClient {
Expand Down Expand Up @@ -247,6 +274,7 @@ impl FromStr for FuelClient {
require_height: ConsistencyPolicy::Auto {
height: Arc::new(Mutex::new(None)),
},
chain_state_info: Default::default(),
})
}

Expand All @@ -259,6 +287,7 @@ impl FromStr for FuelClient {
require_height: ConsistencyPolicy::Auto {
height: Arc::new(Mutex::new(None)),
},
chain_state_info: Default::default(),
})
}
}
Expand Down Expand Up @@ -322,6 +351,51 @@ impl FuelClient {
}
}

fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
if let Some(current_sft_version) = response
.extensions
.as_ref()
.and_then(|e| e.current_stf_version)
{
if let Ok(mut c) = self.chain_state_info.current_stf_version.lock() {
*c = Some(current_sft_version);
}
}

if let Some(current_consensus_parameters_version) = response
.extensions
.as_ref()
.and_then(|e| e.current_consensus_parameters_version)
{
if let Ok(mut c) = self
.chain_state_info
.current_consensus_parameters_version
.lock()
{
*c = Some(current_consensus_parameters_version);
}
}

let inner_required_height = match &self.require_height {
ConsistencyPolicy::Auto { height } => Some(height.clone()),
ConsistencyPolicy::Manual { .. } => None,
};

if let Some(inner_required_height) = inner_required_height {
if let Some(current_fuel_block_height) = response
.extensions
.as_ref()
.and_then(|e| e.current_fuel_block_height)
{
let mut lock = inner_required_height.lock().expect("Mutex poisoned");

if current_fuel_block_height >= lock.unwrap_or_default() {
*lock = Some(current_fuel_block_height);
}
}
}
}

/// Send the GraphQL query to the client.
pub async fn query<ResponseData, Vars>(
&self,
Expand All @@ -340,34 +414,14 @@ impl FuelClient {
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

let inner_required_height = match &self.require_height {
ConsistencyPolicy::Auto { height } => Some(height.clone()),
_ => None,
};

Self::decode_response(response, inner_required_height)
self.decode_response(response)
}

fn decode_response<R, E>(
response: FuelGraphQlResponse<R, E>,
inner_required_height: Option<Arc<Mutex<Option<BlockHeight>>>>,
) -> io::Result<R>
fn decode_response<R, E>(&self, response: FuelGraphQlResponse<R, E>) -> io::Result<R>
where
R: serde::de::DeserializeOwned + 'static,
{
if let Some(inner_required_height) = inner_required_height {
if let Some(current_fuel_block_height) = response
.extensions
.as_ref()
.and_then(|e| e.current_fuel_block_height)
{
let mut lock = inner_required_height.lock().expect("Mutex poisoned");

if current_fuel_block_height >= lock.unwrap_or_default() {
*lock = Some(current_fuel_block_height);
}
}
}
self.update_chain_state_info(&response);

if let Some(failed) = response
.extensions
Expand Down Expand Up @@ -398,7 +452,7 @@ impl FuelClient {
async fn subscribe<ResponseData, Vars>(
&self,
q: StreamingOperation<ResponseData, Vars>,
) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>>>
) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>> + '_>
where
Vars: serde::Serialize,
ResponseData: serde::de::DeserializeOwned + 'static,
Expand Down Expand Up @@ -471,25 +525,19 @@ impl FuelClient {

let mut last = None;

let inner_required_height = match &self.require_height {
ConsistencyPolicy::Auto { height } => Some(height.clone()),
_ => None,
};

let stream = es::Client::stream(&client)
.zip(futures::stream::repeat(inner_required_height))
.take_while(|(result, _)| {
.take_while(|result| {
futures::future::ready(!matches!(result, Err(es::Error::Eof)))
})
.filter_map(move |(result, inner_required_height)| {
.filter_map(move |result| {
tracing::debug!("Got result: {result:?}");
let r = match result {
Ok(es::SSE::Event(es::Event { data, .. })) => {
match serde_json::from_str::<FuelGraphQlResponse<ResponseData>>(
&data,
) {
Ok(resp) => {
match Self::decode_response(resp, inner_required_height) {
match self.decode_response(resp) {
Ok(resp) => {
match last.replace(data) {
// Remove duplicates
Expand Down Expand Up @@ -527,6 +575,24 @@ impl FuelClient {
Ok(stream)
}

pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
self.chain_state_info
.current_stf_version
.lock()
.ok()
.and_then(|value| *value)
}

pub fn latest_consensus_parameters_version(
&self,
) -> Option<ConsensusParametersVersion> {
self.chain_state_info
.current_consensus_parameters_version
.lock()
.ok()
.and_then(|value| *value)
}

pub async fn health(&self) -> io::Result<bool> {
let query = schema::Health::build(());
self.query(query).await.map(|r| r.health)
Expand Down Expand Up @@ -764,10 +830,10 @@ impl FuelClient {
/// Compared to the `submit_and_await_commit`, the stream also contains
/// `SubmittedStatus` as an intermediate state.
#[cfg(feature = "subscriptions")]
pub async fn submit_and_await_status(
&self,
tx: &Transaction,
) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>>> {
pub async fn submit_and_await_status<'a>(
&'a self,
tx: &'a Transaction,
) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
use cynic::SubscriptionBuilder;
let tx = tx.clone().to_bytes();
let s = schema::tx::SubmitAndAwaitStatusSubscription::build(TxArg {
Expand Down Expand Up @@ -926,10 +992,10 @@ impl FuelClient {
#[tracing::instrument(skip(self), level = "debug")]
#[cfg(feature = "subscriptions")]
/// Subscribe to the status of a transaction
pub async fn subscribe_transaction_status(
&self,
id: &TxId,
) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>>> {
pub async fn subscribe_transaction_status<'a>(
&'a self,
id: &'a TxId,
) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + 'a> {
use cynic::SubscriptionBuilder;
let tx_id: TransactionId = (*id).into();
let s = schema::tx::StatusChangeSubscription::build(TxIdArgs { id: tx_id });
Expand Down
10 changes: 9 additions & 1 deletion crates/client/src/reqwest_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ use cynic::{
GraphQlResponse,
Operation,
};
use fuel_core_types::fuel_types::BlockHeight;
use fuel_core_types::{
blockchain::header::{
ConsensusParametersVersion,
StateTransitionBytecodeVersion,
},
fuel_types::BlockHeight,
};
use std::{
future::Future,
marker::PhantomData,
Expand All @@ -20,6 +26,8 @@ pub struct ExtensionsResponse {
pub required_fuel_block_height: Option<BlockHeight>,
pub current_fuel_block_height: Option<BlockHeight>,
pub fuel_block_height_precondition_failed: Option<bool>,
pub current_stf_version: Option<StateTransitionBytecodeVersion>,
pub current_consensus_parameters_version: Option<ConsensusParametersVersion>,
}

#[derive(Debug, serde::Serialize)]
Expand Down
4 changes: 1 addition & 3 deletions crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ pub mod api_service;
pub(crate) mod block_height_subscription;
pub mod da_compression;
pub mod database;
pub(crate) mod extensions;
pub(crate) mod indexation;
pub(crate) mod metrics_extension;
pub mod ports;
pub(crate) mod required_fuel_block_height_extension;
pub mod storage;
pub(crate) mod validation_extension;
pub mod worker_service;

#[derive(Clone, Debug)]
Expand Down
12 changes: 8 additions & 4 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{
fuel_core_graphql_api::{
metrics_extension::MetricsExtension,
ports::{
BlockProducerPort,
ConsensusModulePort,
Expand All @@ -11,12 +10,16 @@ use crate::{
P2pPort,
TxPoolPort,
},
validation_extension::ValidationExtension,
Config,
},
graphql_api::{
self,
required_fuel_block_height_extension::RequiredFuelBlockHeightExtension,
extensions::{
chain_state_info::ChainStateInfoExtension,
metrics::MetricsExtension,
required_fuel_block_height::RequiredFuelBlockHeightExtension,
validation::ValidationExtension,
},
},
schema::{
CoreSchema,
Expand Down Expand Up @@ -294,8 +297,9 @@ where
.extension(RequiredFuelBlockHeightExtension::new(
required_fuel_block_height_tolerance,
required_fuel_block_height_timeout,
block_height_subscriber,
block_height_subscriber.clone(),
))
.extension(ChainStateInfoExtension::new(block_height_subscriber))
.finish();

let graphql_endpoint = "/v1/graphql";
Expand Down
14 changes: 7 additions & 7 deletions crates/fuel-core/src/graphql_api/block_height_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Handler {
// get all sending endpoint corresponding to subscribers that are waiting for a block height
// that is at most `block_height`.
let to_notify = inner_map.tx_handles.split_off(&Reverse(block_height));
inner_map.latest_seen_block_height = block_height;
inner_map.current_block_height = block_height;

to_notify.into_values().flatten()
};
Expand All @@ -59,7 +59,7 @@ impl Subscriber {
let future = {
let mut inner_map = self.inner.write();

if inner_map.latest_seen_block_height >= block_height {
if inner_map.current_block_height >= block_height {
return Ok(());
}

Expand All @@ -77,22 +77,22 @@ impl Subscriber {
})
}

pub fn latest_seen_block_height(&self) -> BlockHeight {
self.inner.read().latest_seen_block_height
pub fn current_block_height(&self) -> BlockHeight {
self.inner.read().current_block_height
}
}

#[derive(Debug, Default)]
struct HandlersMapInner {
tx_handles: BTreeMap<Reverse<BlockHeight>, Vec<oneshot::Sender<()>>>,
latest_seen_block_height: BlockHeight,
current_block_height: BlockHeight,
}

impl HandlersMapInner {
fn new(latest_seen_block_height: BlockHeight) -> Self {
fn new(current_block_height: BlockHeight) -> Self {
Self {
tx_handles: BTreeMap::new(),
latest_seen_block_height,
current_block_height,
}
}
}
4 changes: 4 additions & 0 deletions crates/fuel-core/src/graphql_api/extensions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub(crate) mod chain_state_info;
pub(crate) mod metrics;
pub(crate) mod required_fuel_block_height;
pub(crate) mod validation;
Loading
Loading