@@ -52,6 +52,10 @@ use cynic::{
5252 QueryBuilder ,
5353} ;
5454use fuel_core_types:: {
55+ blockchain:: header:: {
56+ ConsensusParametersVersion ,
57+ StateTransitionBytecodeVersion ,
58+ } ,
5559 fuel_asm:: {
5660 Instruction ,
5761 Word ,
@@ -210,13 +214,36 @@ impl Clone for ConsistencyPolicy {
210214 }
211215}
212216
217+ #[ derive( Debug , Default ) ]
218+ struct ChainStateInfo {
219+ current_stf_version : Arc < Mutex < Option < StateTransitionBytecodeVersion > > > ,
220+ current_consensus_parameters_version : Arc < Mutex < Option < ConsensusParametersVersion > > > ,
221+ }
222+
223+ impl Clone for ChainStateInfo {
224+ fn clone ( & self ) -> Self {
225+ Self {
226+ current_stf_version : Arc :: new ( Mutex :: new (
227+ self . current_stf_version . lock ( ) . ok ( ) . and_then ( |v| * v) ,
228+ ) ) ,
229+ current_consensus_parameters_version : Arc :: new ( Mutex :: new (
230+ self . current_consensus_parameters_version
231+ . lock ( )
232+ . ok ( )
233+ . and_then ( |v| * v) ,
234+ ) ) ,
235+ }
236+ }
237+ }
238+
213239#[ derive( Debug , Clone ) ]
214240pub struct FuelClient {
215241 client : reqwest:: Client ,
216242 #[ cfg( feature = "subscriptions" ) ]
217243 cookie : std:: sync:: Arc < reqwest:: cookie:: Jar > ,
218244 url : reqwest:: Url ,
219245 require_height : ConsistencyPolicy ,
246+ chain_state_info : ChainStateInfo ,
220247}
221248
222249impl FromStr for FuelClient {
@@ -247,6 +274,7 @@ impl FromStr for FuelClient {
247274 require_height : ConsistencyPolicy :: Auto {
248275 height : Arc :: new ( Mutex :: new ( None ) ) ,
249276 } ,
277+ chain_state_info : Default :: default ( ) ,
250278 } )
251279 }
252280
@@ -259,6 +287,7 @@ impl FromStr for FuelClient {
259287 require_height : ConsistencyPolicy :: Auto {
260288 height : Arc :: new ( Mutex :: new ( None ) ) ,
261289 } ,
290+ chain_state_info : Default :: default ( ) ,
262291 } )
263292 }
264293 }
@@ -322,6 +351,51 @@ impl FuelClient {
322351 }
323352 }
324353
354+ fn update_chain_state_info < R , E > ( & self , response : & FuelGraphQlResponse < R , E > ) {
355+ if let Some ( current_sft_version) = response
356+ . extensions
357+ . as_ref ( )
358+ . and_then ( |e| e. current_stf_version )
359+ {
360+ if let Ok ( mut c) = self . chain_state_info . current_stf_version . lock ( ) {
361+ * c = Some ( current_sft_version) ;
362+ }
363+ }
364+
365+ if let Some ( current_consensus_parameters_version) = response
366+ . extensions
367+ . as_ref ( )
368+ . and_then ( |e| e. current_consensus_parameters_version )
369+ {
370+ if let Ok ( mut c) = self
371+ . chain_state_info
372+ . current_consensus_parameters_version
373+ . lock ( )
374+ {
375+ * c = Some ( current_consensus_parameters_version) ;
376+ }
377+ }
378+
379+ let inner_required_height = match & self . require_height {
380+ ConsistencyPolicy :: Auto { height } => Some ( height. clone ( ) ) ,
381+ ConsistencyPolicy :: Manual { .. } => None ,
382+ } ;
383+
384+ if let Some ( inner_required_height) = inner_required_height {
385+ if let Some ( current_fuel_block_height) = response
386+ . extensions
387+ . as_ref ( )
388+ . and_then ( |e| e. current_fuel_block_height )
389+ {
390+ let mut lock = inner_required_height. lock ( ) . expect ( "Mutex poisoned" ) ;
391+
392+ if current_fuel_block_height >= lock. unwrap_or_default ( ) {
393+ * lock = Some ( current_fuel_block_height) ;
394+ }
395+ }
396+ }
397+ }
398+
325399 /// Send the GraphQL query to the client.
326400 pub async fn query < ResponseData , Vars > (
327401 & self ,
@@ -340,34 +414,14 @@ impl FuelClient {
340414 . await
341415 . map_err ( |e| io:: Error :: new ( io:: ErrorKind :: Other , e) ) ?;
342416
343- let inner_required_height = match & self . require_height {
344- ConsistencyPolicy :: Auto { height } => Some ( height. clone ( ) ) ,
345- _ => None ,
346- } ;
347-
348- Self :: decode_response ( response, inner_required_height)
417+ self . decode_response ( response)
349418 }
350419
351- fn decode_response < R , E > (
352- response : FuelGraphQlResponse < R , E > ,
353- inner_required_height : Option < Arc < Mutex < Option < BlockHeight > > > > ,
354- ) -> io:: Result < R >
420+ fn decode_response < R , E > ( & self , response : FuelGraphQlResponse < R , E > ) -> io:: Result < R >
355421 where
356422 R : serde:: de:: DeserializeOwned + ' static ,
357423 {
358- if let Some ( inner_required_height) = inner_required_height {
359- if let Some ( current_fuel_block_height) = response
360- . extensions
361- . as_ref ( )
362- . and_then ( |e| e. current_fuel_block_height )
363- {
364- let mut lock = inner_required_height. lock ( ) . expect ( "Mutex poisoned" ) ;
365-
366- if current_fuel_block_height >= lock. unwrap_or_default ( ) {
367- * lock = Some ( current_fuel_block_height) ;
368- }
369- }
370- }
424+ self . update_chain_state_info ( & response) ;
371425
372426 if let Some ( failed) = response
373427 . extensions
@@ -398,7 +452,7 @@ impl FuelClient {
398452 async fn subscribe < ResponseData , Vars > (
399453 & self ,
400454 q : StreamingOperation < ResponseData , Vars > ,
401- ) -> io:: Result < impl futures:: Stream < Item = io:: Result < ResponseData > > >
455+ ) -> io:: Result < impl futures:: Stream < Item = io:: Result < ResponseData > > + ' _ >
402456 where
403457 Vars : serde:: Serialize ,
404458 ResponseData : serde:: de:: DeserializeOwned + ' static ,
@@ -471,25 +525,19 @@ impl FuelClient {
471525
472526 let mut last = None ;
473527
474- let inner_required_height = match & self . require_height {
475- ConsistencyPolicy :: Auto { height } => Some ( height. clone ( ) ) ,
476- _ => None ,
477- } ;
478-
479528 let stream = es:: Client :: stream ( & client)
480- . zip ( futures:: stream:: repeat ( inner_required_height) )
481- . take_while ( |( result, _) | {
529+ . take_while ( |result| {
482530 futures:: future:: ready ( !matches ! ( result, Err ( es:: Error :: Eof ) ) )
483531 } )
484- . filter_map ( move |( result, inner_required_height ) | {
532+ . filter_map ( move |result| {
485533 tracing:: debug!( "Got result: {result:?}" ) ;
486534 let r = match result {
487535 Ok ( es:: SSE :: Event ( es:: Event { data, .. } ) ) => {
488536 match serde_json:: from_str :: < FuelGraphQlResponse < ResponseData > > (
489537 & data,
490538 ) {
491539 Ok ( resp) => {
492- match Self :: decode_response ( resp, inner_required_height ) {
540+ match self . decode_response ( resp) {
493541 Ok ( resp) => {
494542 match last. replace ( data) {
495543 // Remove duplicates
@@ -527,6 +575,24 @@ impl FuelClient {
527575 Ok ( stream)
528576 }
529577
578+ pub fn latest_stf_version ( & self ) -> Option < StateTransitionBytecodeVersion > {
579+ self . chain_state_info
580+ . current_stf_version
581+ . lock ( )
582+ . ok ( )
583+ . and_then ( |value| * value)
584+ }
585+
586+ pub fn latest_consensus_parameters_version (
587+ & self ,
588+ ) -> Option < ConsensusParametersVersion > {
589+ self . chain_state_info
590+ . current_consensus_parameters_version
591+ . lock ( )
592+ . ok ( )
593+ . and_then ( |value| * value)
594+ }
595+
530596 pub async fn health ( & self ) -> io:: Result < bool > {
531597 let query = schema:: Health :: build ( ( ) ) ;
532598 self . query ( query) . await . map ( |r| r. health )
@@ -764,10 +830,10 @@ impl FuelClient {
764830 /// Compared to the `submit_and_await_commit`, the stream also contains
765831 /// `SubmittedStatus` as an intermediate state.
766832 #[ cfg( feature = "subscriptions" ) ]
767- pub async fn submit_and_await_status (
768- & self ,
769- tx : & Transaction ,
770- ) -> io:: Result < impl Stream < Item = io:: Result < TransactionStatus > > > {
833+ pub async fn submit_and_await_status < ' a > (
834+ & ' a self ,
835+ tx : & ' a Transaction ,
836+ ) -> io:: Result < impl Stream < Item = io:: Result < TransactionStatus > > + ' a > {
771837 use cynic:: SubscriptionBuilder ;
772838 let tx = tx. clone ( ) . to_bytes ( ) ;
773839 let s = schema:: tx:: SubmitAndAwaitStatusSubscription :: build ( TxArg {
@@ -926,10 +992,10 @@ impl FuelClient {
926992 #[ tracing:: instrument( skip( self ) , level = "debug" ) ]
927993 #[ cfg( feature = "subscriptions" ) ]
928994 /// Subscribe to the status of a transaction
929- pub async fn subscribe_transaction_status (
930- & self ,
931- id : & TxId ,
932- ) -> io:: Result < impl futures:: Stream < Item = io:: Result < TransactionStatus > > > {
995+ pub async fn subscribe_transaction_status < ' a > (
996+ & ' a self ,
997+ id : & ' a TxId ,
998+ ) -> io:: Result < impl futures:: Stream < Item = io:: Result < TransactionStatus > > + ' a > {
933999 use cynic:: SubscriptionBuilder ;
9341000 let tx_id: TransactionId = ( * id) . into ( ) ;
9351001 let s = schema:: tx:: StatusChangeSubscription :: build ( TxIdArgs { id : tx_id } ) ;
0 commit comments