-
Notifications
You must be signed in to change notification settings - Fork 2.9k
RPC Consistency Proposal #2473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RPC Consistency Proposal #2473
Changes from 72 commits
a8f5f41
778dd7c
d1b1c33
b77506c
b764cd4
d9091e6
90c411f
625bf8f
085d173
a98ef9b
c95f09a
6099651
3b81537
11e6876
88b9c52
c5f82ff
844e4e6
71f8087
cbea694
da7f057
f5e63f6
5f4500b
37dd63e
9913620
f849eb5
1fddd80
e1f71d4
be7f927
0b1e31f
8e1b0a5
abe13c2
eae1e22
0a3da1f
e7e1db7
6b76769
b6f206d
2f52aaa
cd3be50
dbc5cdc
c5db12f
24b8d69
4a8d839
51ec4a9
37b03f1
396a953
af0c6d6
b1e89d6
6e50f3b
dcec472
8c6af1f
818a717
0717b2d
6397726
f6c9e70
a87f998
27cebde
3bd9b96
e41b8ab
ee36ce1
d479306
14fcc5f
3429123
bf3f85d
2972279
1219775
cf656bf
e630285
2435cfe
385eda2
703bc24
511431f
48d508d
461fbe7
af2484a
44cffec
5cb1639
eb25459
0f6f319
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,33 +1,40 @@ | ||
| #[cfg(feature = "subscriptions")] | ||
| use crate::client::types::StatusWithTransaction; | ||
| use crate::client::{ | ||
| schema::{ | ||
| block::BlockByHeightArgs, | ||
| coins::{ | ||
| ExcludeInput, | ||
| SpendQueryElementInput, | ||
| use crate::{ | ||
| client::{ | ||
| schema::{ | ||
| block::BlockByHeightArgs, | ||
| coins::{ | ||
| ExcludeInput, | ||
| SpendQueryElementInput, | ||
| }, | ||
| contract::ContractBalanceQueryArgs, | ||
| gas_price::EstimateGasPrice, | ||
| message::MessageStatusArgs, | ||
| relayed_tx::RelayedTransactionStatusArgs, | ||
| tx::DryRunArg, | ||
| Tai64Timestamp, | ||
| TransactionId, | ||
| }, | ||
| contract::ContractBalanceQueryArgs, | ||
| gas_price::EstimateGasPrice, | ||
| message::MessageStatusArgs, | ||
| relayed_tx::RelayedTransactionStatusArgs, | ||
| tx::DryRunArg, | ||
| Tai64Timestamp, | ||
| TransactionId, | ||
| }, | ||
| types::{ | ||
| asset::AssetDetail, | ||
| gas_price::LatestGasPrice, | ||
| message::MessageStatus, | ||
| primitives::{ | ||
| Address, | ||
| AssetId, | ||
| BlockId, | ||
| ContractId, | ||
| UtxoId, | ||
| types::{ | ||
| asset::AssetDetail, | ||
| gas_price::LatestGasPrice, | ||
| message::MessageStatus, | ||
| primitives::{ | ||
| Address, | ||
| AssetId, | ||
| BlockId, | ||
| ContractId, | ||
| UtxoId, | ||
| }, | ||
| upgrades::StateTransitionBytecode, | ||
| RelayedTransactionStatus, | ||
| }, | ||
| upgrades::StateTransitionBytecode, | ||
| RelayedTransactionStatus, | ||
| }, | ||
| reqwest_ext::{ | ||
| FuelGraphQlResponse, | ||
| FuelOperation, | ||
| ReqwestExt, | ||
| }, | ||
| }; | ||
| use anyhow::Context; | ||
|
|
@@ -39,8 +46,6 @@ use base64::prelude::{ | |
| #[cfg(feature = "subscriptions")] | ||
| use cynic::StreamingOperation; | ||
| use cynic::{ | ||
| http::ReqwestExt, | ||
| GraphQlResponse, | ||
| Id, | ||
| MutationBuilder, | ||
| Operation, | ||
|
|
@@ -129,6 +134,10 @@ use std::{ | |
| self, | ||
| FromStr, | ||
| }, | ||
| sync::{ | ||
| Arc, | ||
| Mutex, | ||
| }, | ||
| }; | ||
| use tai64::Tai64; | ||
| use tracing as _; | ||
|
|
@@ -151,12 +160,58 @@ pub mod types; | |
|
|
||
| type RegisterId = u32; | ||
|
|
||
| #[derive(Debug, derive_more::Display, derive_more::From)] | ||
| #[non_exhaustive] | ||
| /// Error occurring during interaction with the FuelClient | ||
| // anyhow::Error is wrapped inside a custom Error type, | ||
| // so that we can specific error variants in the future. | ||
| pub enum Error { | ||
| /// Unknown or not expected(by architecture) error. | ||
| #[from] | ||
| Other(anyhow::Error), | ||
| } | ||
|
|
||
| /// Consistency policy for the [`FuelClient`] to define the strategy | ||
| /// for the required height feature. | ||
| #[derive(Debug)] | ||
| pub enum ConsistencyPolicy { | ||
| /// Automatically fetch the next block height from the response and | ||
| /// use it as an input to the next query to guarantee consistency | ||
| /// of the results for the queries. | ||
| Auto { | ||
| /// The required block height for the queries. | ||
| height: Arc<Mutex<Option<BlockHeight>>>, | ||
| }, | ||
| /// Use manually sets the block height for all queries | ||
| /// via the [`FuelClient::with_required_fuel_block_height`]. | ||
| Manual { | ||
| /// The required block height for the queries. | ||
| height: Option<BlockHeight>, | ||
| }, | ||
| } | ||
|
|
||
| impl Clone for ConsistencyPolicy { | ||
| fn clone(&self) -> Self { | ||
| match self { | ||
| Self::Auto { height } => Self::Auto { | ||
| // We don't want to share the same mutex between the different | ||
| // instances of the `FuelClient`. | ||
| height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))), | ||
| }, | ||
| Self::Manual { height } => Self::Manual { | ||
| height: height.clone(), | ||
| }, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Clone)] | ||
| pub struct FuelClient { | ||
| client: reqwest::Client, | ||
| #[cfg(feature = "subscriptions")] | ||
| cookie: std::sync::Arc<reqwest::cookie::Jar>, | ||
| url: reqwest::Url, | ||
| require_height: ConsistencyPolicy, | ||
| } | ||
|
|
||
| impl FromStr for FuelClient { | ||
|
|
@@ -184,13 +239,20 @@ impl FromStr for FuelClient { | |
| client, | ||
| cookie, | ||
| url, | ||
| require_height: ConsistencyPolicy::Auto { | ||
| height: Arc::new(Mutex::new(None)), | ||
| }, | ||
| }) | ||
| } | ||
|
|
||
| #[cfg(not(feature = "subscriptions"))] | ||
| { | ||
| let client = reqwest::Client::new(); | ||
| Ok(Self { client, url }) | ||
| Ok(Self { | ||
| client, | ||
| url, | ||
| extensions: HashMap::new(), | ||
| }) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -223,6 +285,36 @@ impl FuelClient { | |
| Self::from_str(url.as_ref()) | ||
| } | ||
|
|
||
| pub fn with_required_fuel_block_height( | ||
| &mut self, | ||
| new_height: Option<BlockHeight>, | ||
| ) -> &mut Self { | ||
| match &mut self.require_height { | ||
| ConsistencyPolicy::Auto { height } => { | ||
| *height.lock().expect("Mutex poisoned") = new_height; | ||
| } | ||
| ConsistencyPolicy::Manual { height } => { | ||
| *height = new_height; | ||
| } | ||
| } | ||
| self | ||
| } | ||
|
|
||
| pub fn use_manual_consistency_policy( | ||
| &mut self, | ||
| height: Option<BlockHeight>, | ||
| ) -> &mut Self { | ||
| self.require_height = ConsistencyPolicy::Manual { height }; | ||
| self | ||
| } | ||
|
|
||
| pub fn required_block_height(&self) -> Option<BlockHeight> { | ||
| match &self.require_height { | ||
| ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h), | ||
| ConsistencyPolicy::Manual { height } => *height, | ||
| } | ||
| } | ||
|
|
||
| /// Send the GraphQL query to the client. | ||
| pub async fn query<ResponseData, Vars>( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done in d479306 |
||
| &self, | ||
|
|
@@ -232,20 +324,53 @@ impl FuelClient { | |
| Vars: serde::Serialize, | ||
| ResponseData: serde::de::DeserializeOwned + 'static, | ||
| { | ||
| let required_fuel_block_height = self.required_block_height(); | ||
| let fuel_operation = FuelOperation::new(q, required_fuel_block_height); | ||
| let response = self | ||
| .client | ||
| .post(self.url.clone()) | ||
| .run_graphql(q) | ||
| .run_fuel_graphql(fuel_operation) | ||
| .await | ||
| .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; | ||
|
|
||
| Self::decode_response(response) | ||
| let inner_required_height = match &self.require_height { | ||
| ConsistencyPolicy::Auto { height } => Some(height.clone()), | ||
| _ => None, | ||
| }; | ||
|
|
||
| Self::decode_response(response, inner_required_height) | ||
| } | ||
|
|
||
| fn decode_response<R>(response: GraphQlResponse<R>) -> io::Result<R> | ||
| fn decode_response<R, E>( | ||
| response: FuelGraphQlResponse<R, E>, | ||
| inner_required_height: Option<Arc<Mutex<Option<BlockHeight>>>>, | ||
| ) -> io::Result<R> | ||
|
Comment on lines
+344
to
+347
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While this is okay, I think it would be cleaner to separate the height extraction to a separate |
||
| where | ||
| R: serde::de::DeserializeOwned + 'static, | ||
| { | ||
| if let Some(inner_required_height) = inner_required_height { | ||
| if let Some(current_fuel_block_height) = | ||
| response.extensions.current_fuel_block_height | ||
| { | ||
| let mut lock = inner_required_height.lock().expect("Mutex poisoned"); | ||
|
|
||
| if current_fuel_block_height >= lock.unwrap_or_default() { | ||
| *lock = Some(current_fuel_block_height); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if let Some(failed) = response.extensions.fuel_block_height_precondition_failed { | ||
| if failed { | ||
| return Err(io::Error::new( | ||
| io::ErrorKind::Other, | ||
| "The required block height was not met", | ||
| )); | ||
| } | ||
| } | ||
|
|
||
| let response = response.response; | ||
|
|
||
| match (response.data, response.errors) { | ||
| (Some(d), _) => Ok(d), | ||
| (_, Some(e)) => Err(from_strings_errors_to_std_error( | ||
|
|
@@ -271,7 +396,11 @@ impl FuelClient { | |
| use reqwest::cookie::CookieStore; | ||
| let mut url = self.url.clone(); | ||
| url.set_path("/v1/graphql-sub"); | ||
| let json_query = serde_json::to_string(&q)?; | ||
|
|
||
| let required_fuel_block_height = self.required_block_height(); | ||
| let fuel_operation = FuelOperation::new(q, required_fuel_block_height); | ||
|
|
||
| let json_query = serde_json::to_string(&fuel_operation)?; | ||
| let mut client_builder = es::ClientBuilder::for_url(url.as_str()) | ||
| .map_err(|e| { | ||
| io::Error::new( | ||
|
|
@@ -329,18 +458,25 @@ impl FuelClient { | |
|
|
||
| let mut last = None; | ||
|
|
||
| let inner_required_height = match &self.require_height { | ||
| ConsistencyPolicy::Auto { height } => Some(height.clone()), | ||
| _ => None, | ||
| }; | ||
|
|
||
| let stream = es::Client::stream(&client) | ||
| .take_while(|result| { | ||
| .zip(futures::stream::repeat(inner_required_height)) | ||
| .take_while(|(result, _)| { | ||
| futures::future::ready(!matches!(result, Err(es::Error::Eof))) | ||
| }) | ||
| .filter_map(move |result| { | ||
| .filter_map(move |(result, inner_required_height)| { | ||
| tracing::debug!("Got result: {result:?}"); | ||
| let r = match result { | ||
| Ok(es::SSE::Event(es::Event { data, .. })) => { | ||
| match serde_json::from_str::<GraphQlResponse<ResponseData>>(&data) | ||
| { | ||
| match serde_json::from_str::<FuelGraphQlResponse<ResponseData>>( | ||
| &data, | ||
| ) { | ||
| Ok(resp) => { | ||
| match Self::decode_response(resp) { | ||
| match Self::decode_response(resp, inner_required_height) { | ||
| Ok(resp) => { | ||
| match last.replace(data) { | ||
| // Remove duplicates | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use
parking_lotMutex, or use directly aAtomicU32and avoid a Mutex altogether?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need locks for this at all? We have a mutable reference to self here already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I see that we need some type of interior mutability since the
queryfunction takes an immutable self receiver. However, I think it would be cleaner to change the signature of thequeryfunction to take a mutable self receiver.Since that's a breaking change, how about we leave this as-is for now but create a follow-up to simplify this which we can include in the next breaking release?