diff --git a/examples/examples/custom_config.rs b/examples/examples/custom_config.rs index d9b2c95f5ad..d0caba54b16 100644 --- a/examples/examples/custom_config.rs +++ b/examples/examples/custom_config.rs @@ -39,7 +39,6 @@ impl Config for MyConfig { type Address = ::Address; type Header = ::Header; type Signature = ::Signature; - type Extrinsic = ::Extrinsic; // ExtrinsicParams makes use of the index type, so we need to adjust it // too to align with our modified index type, above: type ExtrinsicParams = SubstrateExtrinsicParams; diff --git a/examples/examples/rpc_call_subscribe_blocks.rs b/examples/examples/rpc_call_subscribe_blocks.rs index cc2095200de..232ef46ae1e 100644 --- a/examples/examples/rpc_call_subscribe_blocks.rs +++ b/examples/examples/rpc_call_subscribe_blocks.rs @@ -29,7 +29,7 @@ async fn main() -> Result<(), Box> { // For non-finalised blocks use `.subscribe_blocks()` let mut blocks: Subscription> = - api.rpc().subscribe_finalized_blocks().await?; + api.rpc().subscribe_finalized_block_headers().await?; while let Some(Ok(block)) = blocks.next().await { println!( diff --git a/examples/examples/subscribe_all_events.rs b/examples/examples/subscribe_block_events.rs similarity index 88% rename from examples/examples/subscribe_all_events.rs rename to examples/examples/subscribe_block_events.rs index 5eef7447d41..b1c60580e5d 100644 --- a/examples/examples/subscribe_all_events.rs +++ b/examples/examples/subscribe_block_events.rs @@ -31,8 +31,8 @@ async fn main() -> Result<(), Box> { // Create a client to use: let api = OnlineClient::::new().await?; - // Subscribe to any events that occur: - let mut event_sub = api.events().subscribe().await?; + // Subscribe to (in this case, finalized) blocks. + let mut block_sub = api.blocks().subscribe_finalized().await?; // While this subscription is active, balance transfers are made somewhere: tokio::task::spawn({ @@ -58,10 +58,14 @@ async fn main() -> Result<(), Box> { } }); - // Our subscription will see the events emitted as a result of this: - while let Some(events) = event_sub.next().await { - let events = events?; - let block_hash = events.block_hash(); + // Get each finalized block as it arrives. + while let Some(block) = block_sub.next().await { + let block = block?; + + // Ask for the events for this block. + let events = block.events().await?; + + let block_hash = block.hash(); // We can dynamically decode events: println!(" Dynamic event details: {block_hash:?}:"); diff --git a/examples/examples/subscribe_blocks.rs b/examples/examples/subscribe_blocks.rs new file mode 100644 index 00000000000..1838952a46b --- /dev/null +++ b/examples/examples/subscribe_blocks.rs @@ -0,0 +1,64 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +//! To run this example, a local polkadot node should be running. Example verified against polkadot 0.9.29-41a9d84b152. +//! +//! E.g. +//! ```bash +//! curl "https://github.com/paritytech/polkadot/releases/download/v0.9.29/polkadot" --output /usr/local/bin/polkadot --location +//! polkadot --dev --tmp +//! ``` + +use futures::StreamExt; +use subxt::{ + OnlineClient, + PolkadotConfig, +}; + +#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")] +pub mod polkadot {} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + // Create a client to use: + let api = OnlineClient::::new().await?; + + // Subscribe to all finalized blocks: + let mut blocks_sub = api.blocks().subscribe_finalized().await?; + + while let Some(block) = blocks_sub.next().await { + let block = block?; + + let block_number = block.header().number; + let block_hash = block.hash(); + + println!("Block #{block_number}:"); + println!(" Hash: {block_hash}"); + println!(" Extrinsics:"); + + let body = block.body().await?; + for ext in body.extrinsics() { + let idx = ext.index(); + let events = ext.events().await?; + let bytes_hex = format!("0x{}", hex::encode(ext.bytes())); + + println!(" Extrinsic #{idx}:"); + println!(" Bytes: {bytes_hex}"); + println!(" Events:"); + + for evt in events.iter() { + let evt = evt?; + + let pallet_name = evt.pallet_name(); + let event_name = evt.variant_name(); + + println!(" {pallet_name}_{event_name}"); + } + } + } + + Ok(()) +} diff --git a/examples/examples/subscribe_one_event.rs b/examples/examples/subscribe_one_event.rs deleted file mode 100644 index 72baa1a21c5..00000000000 --- a/examples/examples/subscribe_one_event.rs +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright 2019-2022 Parity Technologies (UK) Ltd. -// This file is dual-licensed as Apache-2.0 or GPL-3.0. -// see LICENSE for license details. - -//! To run this example, a local polkadot node should be running. Example verified against polkadot v0.9.28-9ffe6e9e3da. -//! -//! E.g. -//! ```bash -//! curl "https://github.com/paritytech/polkadot/releases/download/v0.9.28/polkadot" --output /usr/local/bin/polkadot --location -//! polkadot --dev --tmp -//! ``` - -use futures::StreamExt; -use sp_keyring::AccountKeyring; -use std::time::Duration; -use subxt::{ - tx::PairSigner, - OnlineClient, - PolkadotConfig, -}; - -#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")] -pub mod polkadot {} - -/// Subscribe to all events, and then manually look through them and -/// pluck out the events that we care about. -#[tokio::main] -async fn main() -> Result<(), Box> { - tracing_subscriber::fmt::init(); - - // Create a client to use: - let api = OnlineClient::::new().await?; - - // Subscribe to just balance transfer events, making use of `filter_events` - // to select a single event type (note the 1-tuple) to filter out and return. - let mut transfer_events = api - .events() - .subscribe() - .await? - .filter_events::<(polkadot::balances::events::Transfer,)>(); - - // While this subscription is active, balance transfers are made somewhere: - tokio::task::spawn({ - let api = api.clone(); - async move { - let signer = PairSigner::new(AccountKeyring::Alice.pair()); - let mut transfer_amount = 1_000_000_000; - - // Make small balance transfers from Alice to Bob in a loop: - loop { - let transfer_tx = polkadot::tx().balances().transfer( - AccountKeyring::Bob.to_account_id().into(), - transfer_amount, - ); - api.tx() - .sign_and_submit_default(&transfer_tx, &signer) - .await - .unwrap(); - - tokio::time::sleep(Duration::from_secs(10)).await; - transfer_amount += 100_000_000; - } - } - }); - - // Our subscription will see all of the transfer events emitted as a result of this: - while let Some(transfer_event) = transfer_events.next().await { - println!("Balance transfer event: {transfer_event:?}"); - } - - Ok(()) -} diff --git a/examples/examples/subscribe_some_events.rs b/examples/examples/subscribe_some_events.rs deleted file mode 100644 index bea82460aaf..00000000000 --- a/examples/examples/subscribe_some_events.rs +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2019-2022 Parity Technologies (UK) Ltd. -// This file is dual-licensed as Apache-2.0 or GPL-3.0. -// see LICENSE for license details. - -//! To run this example, a local polkadot node should be running. Example verified against polkadot v0.9.28-9ffe6e9e3da. -//! -//! E.g. -//! ```bash -//! curl "https://github.com/paritytech/polkadot/releases/download/v0.9.28/polkadot" --output /usr/local/bin/polkadot --location -//! polkadot --dev --tmp -//! ``` - -use futures::StreamExt; -use sp_keyring::AccountKeyring; -use std::time::Duration; -use subxt::{ - tx::PairSigner, - OnlineClient, - PolkadotConfig, -}; - -#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")] -pub mod polkadot {} - -/// Subscribe to all events, and then manually look through them and -/// pluck out the events that we care about. -#[tokio::main] -async fn main() -> Result<(), Box> { - tracing_subscriber::fmt::init(); - - // Create a client to use: - let api = OnlineClient::::new().await?; - - // Subscribe to several balance related events. If we ask for more than one event, - // we'll be given a correpsonding tuple of `Option`'s, with exactly one - // variant populated each time. - let mut balance_events = api.events().subscribe().await?.filter_events::<( - polkadot::balances::events::Withdraw, - polkadot::balances::events::Transfer, - polkadot::balances::events::Deposit, - )>(); - - // While this subscription is active, balance transfers are made somewhere: - tokio::task::spawn({ - let api = api.clone(); - async move { - let signer = PairSigner::new(AccountKeyring::Alice.pair()); - let mut transfer_amount = 1_000_000_000; - - // Make small balance transfers from Alice to Bob in a loop: - loop { - let transfer_tx = polkadot::tx().balances().transfer( - AccountKeyring::Bob.to_account_id().into(), - transfer_amount, - ); - api.tx() - .sign_and_submit_default(&transfer_tx, &signer) - .await - .unwrap(); - - tokio::time::sleep(Duration::from_secs(10)).await; - transfer_amount += 100_000_000; - } - } - }); - - // Our subscription will see all of the balance events we're filtering on: - while let Some(ev) = balance_events.next().await { - let event_details = ev?; - - let block_hash = event_details.block_hash; - let event = event_details.event; - println!("Event at {:?}:", block_hash); - - if let (Some(withdraw), _, _) = &event { - println!(" Withdraw event: {withdraw:?}"); - } - if let (_, Some(transfer), _) = &event { - println!(" Transfer event: {transfer:?}"); - } - if let (_, _, Some(deposit)) = &event { - println!(" Deposit event: {deposit:?}"); - } - } - - Ok(()) -} diff --git a/subxt/src/blocks/block_types.rs b/subxt/src/blocks/block_types.rs new file mode 100644 index 00000000000..3522991521e --- /dev/null +++ b/subxt/src/blocks/block_types.rs @@ -0,0 +1,289 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use crate::{ + client::{ + OfflineClientT, + OnlineClientT, + }, + error::{ + BlockError, + Error, + }, + events, + rpc::ChainBlockResponse, + Config, +}; +use derivative::Derivative; +use futures::lock::Mutex as AsyncMutex; +use sp_runtime::traits::{ + Hash, + Header, +}; +use std::sync::Arc; + +/// A representation of a block. +pub struct Block { + header: T::Header, + client: C, + // Since we obtain the same events for every extrinsic, let's + // cache them so that we only ever do that once: + cached_events: CachedEvents, +} + +// A cache for our events so we don't fetch them more than once when +// iterating over events for extrinsics. +type CachedEvents = Arc>>>; + +impl Block +where + T: Config, + C: OfflineClientT, +{ + pub(crate) fn new(header: T::Header, client: C) -> Self { + Block { + header, + client, + cached_events: Default::default(), + } + } + + /// Return the block hash. + pub fn hash(&self) -> T::Hash { + self.header.hash() + } + + /// Return the block number. + pub fn number(&self) -> T::BlockNumber { + *self.header().number() + } + + /// Return the entire block header. + pub fn header(&self) -> &T::Header { + &self.header + } +} + +impl Block +where + T: Config, + C: OnlineClientT, +{ + /// Return the events associated with the block, fetching them from the node if necessary. + pub async fn events(&self) -> Result, Error> { + get_events(&self.client, self.header.hash(), &self.cached_events).await + } + + /// Fetch and return the block body. + pub async fn body(&self) -> Result, Error> { + let block_hash = self.header.hash(); + let block_details = match self.client.rpc().block(Some(block_hash)).await? { + Some(block) => block, + None => return Err(BlockError::block_hash_not_found(block_hash).into()), + }; + + Ok(BlockBody::new( + self.client.clone(), + block_details, + self.cached_events.clone(), + )) + } +} + +/// The body of a block. +pub struct BlockBody { + details: ChainBlockResponse, + client: C, + cached_events: CachedEvents, +} + +impl BlockBody +where + T: Config, + C: OfflineClientT, +{ + pub(crate) fn new( + client: C, + details: ChainBlockResponse, + cached_events: CachedEvents, + ) -> Self { + Self { + details, + client, + cached_events, + } + } + + /// Returns an iterator over the extrinsics in the block body. + pub fn extrinsics(&self) -> impl Iterator> { + self.details + .block + .extrinsics + .iter() + .enumerate() + .map(|(idx, e)| { + Extrinsic { + index: idx as u32, + bytes: &e.0, + client: self.client.clone(), + block_hash: self.details.block.header.hash(), + cached_events: self.cached_events.clone(), + _marker: std::marker::PhantomData, + } + }) + } +} + +/// A single extrinsic in a block. +pub struct Extrinsic<'a, T: Config, C> { + index: u32, + bytes: &'a [u8], + client: C, + block_hash: T::Hash, + cached_events: CachedEvents, + _marker: std::marker::PhantomData, +} + +impl<'a, T, C> Extrinsic<'a, T, C> +where + T: Config, + C: OfflineClientT, +{ + /// The index of the extrinsic in the block. + pub fn index(&self) -> u32 { + self.index + } + + /// The bytes of the extrinsic. + pub fn bytes(&self) -> &'a [u8] { + self.bytes + } +} + +impl<'a, T, C> Extrinsic<'a, T, C> +where + T: Config, + C: OnlineClientT, +{ + /// The events associated with the extrinsic. + pub async fn events(&self) -> Result, Error> { + let events = + get_events(&self.client, self.block_hash, &self.cached_events).await?; + let ext_hash = T::Hashing::hash_of(&self.bytes); + Ok(ExtrinsicEvents::new(ext_hash, self.index, events)) + } +} + +/// The events associated with a given extrinsic. +#[derive(Derivative)] +#[derivative(Debug(bound = ""))] +pub struct ExtrinsicEvents { + // The hash of the extrinsic (handy to expose here because + // this type is returned from TxProgress things in the most + // basic flows, so it's the only place people can access it + // without complicating things for themselves). + ext_hash: T::Hash, + // The index of the extrinsic: + idx: u32, + // All of the events in the block: + events: events::Events, +} + +impl ExtrinsicEvents { + pub(crate) fn new(ext_hash: T::Hash, idx: u32, events: events::Events) -> Self { + Self { + ext_hash, + idx, + events, + } + } + + /// Return the hash of the block that the extrinsic is in. + pub fn block_hash(&self) -> T::Hash { + self.events.block_hash() + } + + /// The index of the extrinsic that these events are produced from. + pub fn extrinsic_index(&self) -> u32 { + self.idx + } + + /// Return the hash of the extrinsic. + pub fn extrinsic_hash(&self) -> T::Hash { + self.ext_hash + } + + /// Return all of the events in the block that the extrinsic is in. + pub fn all_events_in_block(&self) -> &events::Events { + &self.events + } + + /// Iterate over all of the raw events associated with this transaction. + /// + /// This works in the same way that [`events::Events::iter()`] does, with the + /// exception that it filters out events not related to the submitted extrinsic. + pub fn iter(&self) -> impl Iterator> + '_ { + self.events.iter().filter(|ev| { + ev.as_ref() + .map(|ev| ev.phase() == events::Phase::ApplyExtrinsic(self.idx)) + .unwrap_or(true) // Keep any errors. + }) + } + + /// Find all of the transaction events matching the event type provided as a generic parameter. + /// + /// This works in the same way that [`events::Events::find()`] does, with the + /// exception that it filters out events not related to the submitted extrinsic. + pub fn find( + &self, + ) -> impl Iterator> + '_ { + self.iter().filter_map(|ev| { + ev.and_then(|ev| ev.as_event::().map_err(Into::into)) + .transpose() + }) + } + + /// Iterate through the transaction events using metadata to dynamically decode and skip + /// them, and return the first event found which decodes to the provided `Ev` type. + /// + /// This works in the same way that [`events::Events::find_first()`] does, with the + /// exception that it ignores events not related to the submitted extrinsic. + pub fn find_first(&self) -> Result, Error> { + self.find::().next().transpose() + } + + /// Find an event in those associated with this transaction. Returns true if it was found. + /// + /// This works in the same way that [`events::Events::has()`] does, with the + /// exception that it ignores events not related to the submitted extrinsic. + pub fn has(&self) -> Result { + Ok(self.find::().next().transpose()?.is_some()) + } +} + +// Return Events from the cache, or fetch from the node if needed. +async fn get_events( + client: &C, + block_hash: T::Hash, + cached_events: &AsyncMutex>>, +) -> Result, Error> +where + T: Config, + C: OnlineClientT, +{ + // Acquire lock on the events cache. We either get back our events or we fetch and set them + // before unlocking, so only one fetch call should ever be made. We do this because the + // same events can be shared across all extrinsics in the block. + let lock = cached_events.lock().await; + let events = match &*lock { + Some(events) => events.clone(), + None => { + events::EventsClient::new(client.clone()) + .at(Some(block_hash)) + .await? + } + }; + + Ok(events) +} diff --git a/subxt/src/blocks/blocks_client.rs b/subxt/src/blocks/blocks_client.rs index 190ec9f4f18..9fa82b66ccc 100644 --- a/subxt/src/blocks/blocks_client.rs +++ b/subxt/src/blocks/blocks_client.rs @@ -2,9 +2,13 @@ // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. +use super::Block; use crate::{ client::OnlineClientT, - error::Error, + error::{ + BlockError, + Error, + }, utils::PhantomDataSendSync, Config, }; @@ -16,7 +20,13 @@ use futures::{ StreamExt, }; use sp_runtime::traits::Header; -use std::future::Future; +use std::{ + future::Future, + pin::Pin, +}; + +type BlockStream = Pin> + Send>>; +type BlockStreamRes = Result, Error>; /// A client for working with blocks. #[derive(Derivative)] @@ -41,64 +51,132 @@ where T: Config, Client: OnlineClientT, { - /// Subscribe to new best block headers. + /// Obtain block details given the provided block hash, or the latest block if `None` is + /// provided. /// - /// # Note + /// # Warning /// - /// This does not produce all the blocks from the chain, just the best blocks. - /// The best block is selected by the consensus algorithm. - /// This calls under the hood the `chain_subscribeNewHeads` RPC method, if you need - /// a subscription of all the blocks please use the `chain_subscribeAllHeads` method. + /// This call only supports blocks produced since the most recent + /// runtime upgrade. You can attempt to retrieve older blocks, + /// but may run into errors attempting to work with them. + pub fn at( + &self, + block_hash: Option, + ) -> impl Future, Error>> + Send + 'static { + let client = self.client.clone(); + async move { + // If block hash is not provided, get the hash + // for the latest block and use that. + let block_hash = match block_hash { + Some(hash) => hash, + None => { + client + .rpc() + .block_hash(None) + .await? + .expect("didn't pass a block number; qed") + } + }; + + let block_header = match client.rpc().header(Some(block_hash)).await? { + Some(header) => header, + None => return Err(BlockError::block_hash_not_found(block_hash).into()), + }; + + Ok(Block::new(block_header, client)) + } + } + + /// Subscribe to all new blocks imported by the node. /// - /// These blocks haven't necessarily been finalised yet. Prefer - /// [`BlocksClient::subscribe_finalized_headers()`] if that is important. - pub fn subscribe_headers( + /// **Note:** You probably want to use [`Self::subscribe_finalized()`] most of + /// the time. + pub fn subscribe_all( &self, - ) -> impl Future>, Error>> - + Send - + 'static { + ) -> impl Future>, Error>> + Send + 'static + where + Client: Send + Sync + 'static, + { let client = self.client.clone(); - async move { client.rpc().subscribe_blocks().await } + header_sub_fut_to_block_sub(self.clone(), async move { + let sub = client.rpc().subscribe_all_block_headers().await?; + BlockStreamRes::Ok(Box::pin(sub)) + }) } - /// Subscribe to finalized block headers. + /// Subscribe to all new blocks imported by the node onto the current best fork. /// - /// While the Substrate RPC method does not guarantee that all finalized block headers are - /// provided, this function does. - /// ``` - pub fn subscribe_finalized_headers( + /// **Note:** You probably want to use [`Self::subscribe_finalized()`] most of + /// the time. + pub fn subscribe_best( &self, - ) -> impl Future>, Error>> - + Send - + 'static { + ) -> impl Future>, Error>> + Send + 'static + where + Client: Send + Sync + 'static, + { let client = self.client.clone(); - async move { subscribe_finalized_headers(client).await } + header_sub_fut_to_block_sub(self.clone(), async move { + let sub = client.rpc().subscribe_best_block_headers().await?; + BlockStreamRes::Ok(Box::pin(sub)) + }) + } + + /// Subscribe to finalized blocks. + pub fn subscribe_finalized( + &self, + ) -> impl Future>, Error>> + Send + 'static + where + Client: Send + Sync + 'static, + { + let client = self.client.clone(); + header_sub_fut_to_block_sub(self.clone(), async move { + // Fetch the last finalised block details immediately, so that we'll get + // all blocks after this one. + let last_finalized_block_hash = client.rpc().finalized_head().await?; + let last_finalized_block_num = client + .rpc() + .header(Some(last_finalized_block_hash)) + .await? + .map(|h| (*h.number()).into()); + + let sub = client.rpc().subscribe_finalized_block_headers().await?; + + // Adjust the subscription stream to fill in any missing blocks. + BlockStreamRes::Ok( + subscribe_to_block_headers_filling_in_gaps( + client, + last_finalized_block_num, + sub, + ) + .boxed(), + ) + }) } } -async fn subscribe_finalized_headers( - client: Client, -) -> Result>, Error> +/// Take a promise that will return a subscription to some block headers, +/// and return a subscription to some blocks based on this. +async fn header_sub_fut_to_block_sub( + blocks_client: BlocksClient, + sub: S, +) -> Result>, Error> where T: Config, - Client: OnlineClientT, + S: Future, Error>> + Send + 'static, + Client: OnlineClientT + Send + Sync + 'static, { - // Fetch the last finalised block details immediately, so that we'll get - // all blocks after this one. - let last_finalized_block_hash = client.rpc().finalized_head().await?; - let last_finalized_block_num = client - .rpc() - .header(Some(last_finalized_block_hash)) - .await? - .map(|h| (*h.number()).into()); - - let sub = client.rpc().subscribe_finalized_blocks().await?; - - // Adjust the subscription stream to fill in any missing blocks. - Ok( - subscribe_to_block_headers_filling_in_gaps(client, last_finalized_block_num, sub) - .boxed(), - ) + let sub = sub.await?.then(move |header| { + let client = blocks_client.client.clone(); + async move { + let header = match header { + Ok(header) => header, + Err(e) => return Err(e), + }; + + Ok(Block::new(header, client)) + } + }); + BlockStreamRes::Ok(Box::pin(sub)) } /// Note: This is exposed for testing but is not considered stable and may change diff --git a/subxt/src/blocks/mod.rs b/subxt/src/blocks/mod.rs index f3a575e8bdc..ae7f212f484 100644 --- a/subxt/src/blocks/mod.rs +++ b/subxt/src/blocks/mod.rs @@ -4,8 +4,14 @@ //! This module exposes the necessary functionality for working with events. +mod block_types; mod blocks_client; +pub use block_types::{ + Block, + Extrinsic, + ExtrinsicEvents, +}; pub use blocks_client::{ subscribe_to_block_headers_filling_in_gaps, BlocksClient, diff --git a/subxt/src/config.rs b/subxt/src/config.rs index e7a7a0efbab..ddd04d763cc 100644 --- a/subxt/src/config.rs +++ b/subxt/src/config.rs @@ -16,7 +16,6 @@ use codec::{ use core::fmt::Debug; use sp_runtime::traits::{ AtLeast32Bit, - Extrinsic, Hash, Header, MaybeSerializeDeserialize, @@ -78,9 +77,6 @@ pub trait Config: 'static { /// Signature type. type Signature: Verify + Encode + Send + Sync + 'static; - /// Extrinsic type within blocks. - type Extrinsic: Parameter + Extrinsic + Debug + MaybeSerializeDeserialize + Send; - /// This type defines the extrinsic extra and additional parameters. type ExtrinsicParams: crate::tx::ExtrinsicParams; } @@ -104,7 +100,6 @@ impl Config for SubstrateConfig { type Header = sp_runtime::generic::Header; type Signature = sp_runtime::MultiSignature; - type Extrinsic = sp_runtime::OpaqueExtrinsic; type ExtrinsicParams = crate::tx::SubstrateExtrinsicParams; } @@ -145,6 +140,5 @@ impl> Config type Address = T::Address; type Header = T::Header; type Signature = T::Signature; - type Extrinsic = T::Extrinsic; type ExtrinsicParams = E; } diff --git a/subxt/src/error.rs b/subxt/src/error.rs index 4e848b15a0a..0d91adbab5b 100644 --- a/subxt/src/error.rs +++ b/subxt/src/error.rs @@ -63,6 +63,9 @@ pub enum Error { /// Transaction progress error. #[error("Transaction error: {0}")] Transaction(#[from] TransactionError), + /// Block related error. + #[error("Block error: {0}")] + Block(#[from] BlockError), /// An error encoding a storage address. #[error("Error encoding storage address: {0}")] StorageAddress(#[from] StorageAddressError), @@ -237,6 +240,24 @@ impl DispatchError { } } +/// Block error +#[derive(Clone, Debug, Eq, thiserror::Error, PartialEq)] +pub enum BlockError { + /// The block + #[error( + "Could not find a block with hash {0} (perhaps it was on a non-finalized fork?)" + )] + BlockHashNotFound(String), +} + +impl BlockError { + /// Produce an error that a block with the given hash cannot be found. + pub fn block_hash_not_found(hash: impl AsRef<[u8]>) -> BlockError { + let hash = format!("0x{}", hex::encode(hash)); + BlockError::BlockHashNotFound(hash) + } +} + /// Transaction error. #[derive(Clone, Debug, Eq, thiserror::Error, PartialEq)] pub enum TransactionError { diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs deleted file mode 100644 index 85fd4f221ac..00000000000 --- a/subxt/src/events/event_subscription.rs +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright 2019-2022 Parity Technologies (UK) Ltd. -// This file is dual-licensed as Apache-2.0 or GPL-3.0. -// see LICENSE for license details. - -//! Subscribing to events. - -use crate::{ - client::OnlineClientT, - error::Error, - events::EventsClient, - Config, -}; -use derivative::Derivative; -use futures::{ - stream::BoxStream, - Future, - FutureExt, - Stream, - StreamExt, -}; -use sp_runtime::traits::Header; -use std::{ - marker::Unpin, - task::Poll, -}; - -pub use super::{ - EventDetails, - EventFilter, - Events, - FilterEvents, -}; - -/// A Subscription. This forms a part of the `EventSubscription` type handed back -/// in codegen from `subscribe_finalized`, and is exposed to be used in codegen. -#[doc(hidden)] -pub type FinalizedEventSub
= BoxStream<'static, Result>; - -/// A Subscription. This forms a part of the `EventSubscription` type handed back -/// in codegen from `subscribe`, and is exposed to be used in codegen. -#[doc(hidden)] -pub type EventSub = BoxStream<'static, Result>; - -/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block. -#[derive(Derivative)] -#[derivative(Debug(bound = "Sub: std::fmt::Debug, Client: std::fmt::Debug"))] -pub struct EventSubscription { - finished: bool, - client: Client, - block_header_subscription: Sub, - #[derivative(Debug = "ignore")] - at: Option, Error>> + Send>>>, -} - -impl> EventSubscription -where - Sub: Stream> + Unpin, -{ - /// Create a new [`EventSubscription`] from a client and a subscription - /// which returns block headers. - pub fn new(client: Client, block_header_subscription: Sub) -> Self { - EventSubscription { - finished: false, - client, - block_header_subscription, - at: None, - } - } - - /// Return only specific events matching the tuple of 1 or more event - /// types that has been provided as the `Filter` type parameter. - /// - /// # Example - /// - /// ```no_run - /// use futures::StreamExt; - /// use subxt::{OnlineClient, PolkadotConfig}; - /// - /// #[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")] - /// pub mod polkadot {} - /// - /// # #[tokio::main] - /// # async fn main() { - /// let api = OnlineClient::::new().await.unwrap(); - /// - /// let mut events = api - /// .events() - /// .subscribe() - /// .await - /// .unwrap() - /// .filter_events::<( - /// polkadot::balances::events::Transfer, - /// polkadot::balances::events::Deposit - /// )>(); - /// - /// while let Some(ev) = events.next().await { - /// let event_details = ev.unwrap(); - /// match event_details.event { - /// (Some(transfer), None) => println!("Balance transfer event: {transfer:?}"), - /// (None, Some(deposit)) => println!("Balance deposit event: {deposit:?}"), - /// _ => unreachable!() - /// } - /// } - /// # } - /// ``` - pub fn filter_events( - self, - ) -> FilterEvents<'static, Self, T, Filter> { - FilterEvents::new(self) - } -} - -impl Unpin for EventSubscription {} - -// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose -// way to roughly implement the following function: -// -// ``` -// fn subscribe_events(client: &'_ Client, block_sub: Subscription) -> impl Stream, Error>> + '_ { -// use futures::StreamExt; -// block_sub.then(move |block_header_res| async move { -// use sp_runtime::traits::Header; -// let block_header = block_header_res?; -// let block_hash = block_header.hash(); -// at(client, block_hash).await -// }) -// } -// ``` -// -// The advantage of this manual implementation is that we have a named type that we (and others) -// can derive things on, store away, alias etc. -impl Stream for EventSubscription -where - T: Config, - Client: OnlineClientT, - Sub: Stream> + Unpin, - E: Into, -{ - type Item = Result, Error>; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - // We are finished; return None. - if self.finished { - return Poll::Ready(None) - } - - // If there isn't an `at` function yet that's busy resolving a block hash into - // some event details, then poll the block header subscription to get one. - if self.at.is_none() { - match futures::ready!(self.block_header_subscription.poll_next_unpin(cx)) { - None => { - self.finished = true; - return Poll::Ready(None) - } - Some(Err(e)) => { - self.finished = true; - return Poll::Ready(Some(Err(e.into()))) - } - Some(Ok(block_header)) => { - // Note [jsdw]: We may be able to get rid of the per-item allocation - // with https://github.com/oblique/reusable-box-future. - let at = EventsClient::new(self.client.clone()) - .at(Some(block_header.hash())); - self.at = Some(Box::pin(at)); - // Continue, so that we poll this function future we've just created. - } - } - } - - // If we get here, there will be an `at` function stored. Unwrap it and poll it to - // completion to get our events, throwing it away as soon as it is ready. - let at_fn = self - .at - .as_mut() - .expect("'at' function should have been set above'"); - let events = futures::ready!(at_fn.poll_unpin(cx)); - self.at = None; - Poll::Ready(Some(events)) - } -} - -#[cfg(test)] -mod test { - use super::*; - - // Ensure `EventSubscription` can be sent; only actually a compile-time check. - #[allow(unused)] - fn check_sendability() { - fn assert_send() {} - assert_send::< - EventSubscription< - crate::SubstrateConfig, - (), - EventSub<::Header>, - >, - >(); - assert_send::< - EventSubscription< - crate::SubstrateConfig, - (), - FinalizedEventSub<::Header>, - >, - >(); - } -} diff --git a/subxt/src/events/events_client.rs b/subxt/src/events/events_client.rs index fc19231cf98..d774ff1fa75 100644 --- a/subxt/src/events/events_client.rs +++ b/subxt/src/events/events_client.rs @@ -5,12 +5,7 @@ use crate::{ client::OnlineClientT, error::Error, - events::{ - EventSub, - EventSubscription, - Events, - FinalizedEventSub, - }, + events::Events, Config, }; use derivative::Derivative; @@ -44,6 +39,12 @@ where Client: OnlineClientT, { /// Obtain events at some block hash. + /// + /// # Warning + /// + /// This call only supports blocks produced since the most recent + /// runtime upgrade. You can attempt to retrieve events from older blocks, + /// but may run into errors attempting to work with them. pub fn at( &self, block_hash: Option, @@ -51,122 +52,30 @@ where // Clone and pass the client in like this so that we can explicitly // return a Future that's Send + 'static, rather than tied to &self. let client = self.client.clone(); - async move { at(client, block_hash).await } - } + async move { + // If block hash is not provided, get the hash + // for the latest block and use that. + let block_hash = match block_hash { + Some(hash) => hash, + None => { + client + .rpc() + .block_hash(None) + .await? + .expect("didn't pass a block number; qed") + } + }; - /// Subscribe to all events from blocks. - /// - /// **Note:** these blocks haven't necessarily been finalised yet; prefer - /// [`EventsClient::subscribe_finalized()`] if that is important. - /// - /// # Example - /// - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use futures::StreamExt; - /// use subxt::{ OnlineClient, PolkadotConfig }; - /// - /// let api = OnlineClient::::new().await.unwrap(); - /// - /// let mut events = api.events().subscribe().await.unwrap(); - /// - /// while let Some(ev) = events.next().await { - /// // Obtain all events from this block. - /// let ev = ev.unwrap(); - /// // Print block hash. - /// println!("Event at block hash {:?}", ev.block_hash()); - /// // Iterate over all events. - /// let mut iter = ev.iter(); - /// while let Some(event_details) = iter.next() { - /// println!("Event details {:?}", event_details); - /// } - /// } - /// # } - /// ``` - pub fn subscribe( - &self, - ) -> impl Future< - Output = Result>, Error>, - > + Send - + 'static - where - Client: Send + Sync + 'static, - { - let client = self.client.clone(); - async move { subscribe(client).await } - } - - /// Subscribe to events from finalized blocks. See [`EventsClient::subscribe()`] for details. - pub fn subscribe_finalized( - &self, - ) -> impl Future< - Output = Result< - EventSubscription>, - Error, - >, - > + Send - + 'static - where - Client: Send + Sync + 'static, - { - let client = self.client.clone(); - async move { subscribe_finalized(client).await } - } -} - -async fn at( - client: Client, - block_hash: Option, -) -> Result, Error> -where - T: Config, - Client: OnlineClientT, -{ - // If block hash is not provided, get the hash - // for the latest block and use that. - let block_hash = match block_hash { - Some(hash) => hash, - None => { - client + let event_bytes = client .rpc() - .block_hash(None) + .storage(&*system_events_key().0, Some(block_hash)) .await? - .expect("didn't pass a block number; qed") - } - }; - - let event_bytes = client - .rpc() - .storage(&*system_events_key().0, Some(block_hash)) - .await? - .map(|e| e.0) - .unwrap_or_else(Vec::new); - - Ok(Events::new(client.metadata(), block_hash, event_bytes)) -} - -async fn subscribe( - client: Client, -) -> Result>, Error> -where - T: Config, - Client: OnlineClientT, -{ - let block_subscription = client.blocks().subscribe_headers().await?; - Ok(EventSubscription::new(client, Box::pin(block_subscription))) -} + .map(|e| e.0) + .unwrap_or_else(Vec::new); -/// Subscribe to events from finalized blocks. -async fn subscribe_finalized( - client: Client, -) -> Result>, Error> -where - T: Config, - Client: OnlineClientT, -{ - let block_subscription = client.blocks().subscribe_finalized_headers().await?; - Ok(EventSubscription::new(client, Box::pin(block_subscription))) + Ok(Events::new(client.metadata(), block_hash, event_bytes)) + } + } } // The storage key needed to access events. diff --git a/subxt/src/events/events_type.rs b/subxt/src/events/events_type.rs index f14e2458777..d0b8ade853e 100644 --- a/subxt/src/events/events_type.rs +++ b/subxt/src/events/events_type.rs @@ -25,7 +25,7 @@ use std::sync::Arc; /// A collection of events obtained from a block, bundled with the necessary /// information needed to decode and iterate over them. #[derive(Derivative)] -#[derivative(Debug(bound = ""))] +#[derivative(Debug(bound = ""), Clone(bound = ""))] pub struct Events { metadata: Metadata, block_hash: T::Hash, @@ -83,6 +83,8 @@ impl Events { /// Iterate over all of the events, using metadata to dynamically /// decode them as we go, and returning the raw bytes and other associated /// details. If an error occurs, all subsequent iterations return `None`. + // Dev note: The returned iterator is 'static + Send so that we can box it up and make + // use of it with our `FilterEvents` stuff. pub fn iter( &self, ) -> impl Iterator> + Send + Sync + 'static { diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs deleted file mode 100644 index 9507c1c4121..00000000000 --- a/subxt/src/events/filter_events.rs +++ /dev/null @@ -1,403 +0,0 @@ -// Copyright 2019-2022 Parity Technologies (UK) Ltd. -// This file is dual-licensed as Apache-2.0 or GPL-3.0. -// see LICENSE for license details. - -//! Filtering individual events from subscriptions. - -use super::{ - Events, - Phase, - StaticEvent, -}; -use crate::{ - Config, - Error, -}; -use futures::{ - Stream, - StreamExt, -}; -use std::{ - marker::Unpin, - task::Poll, -}; - -/// A stream which filters events based on the `Filter` param provided. -/// If `Filter` is a 1-tuple of a single `Event` type, it will return every -/// instance of that event as it's found. If `filter` is ` tuple of multiple -/// `Event` types, it will return a corresponding tuple of `Option`s, where -/// exactly one of these will be `Some(event)` each iteration. -pub struct FilterEvents<'a, Sub: 'a, T: Config, Filter: EventFilter> { - // A subscription; in order for the Stream impl to apply, this will - // impl `Stream, Error>> + Unpin + 'a`. - sub: Sub, - // Each time we get Events from our subscription, they are stored here - // and iterated through in future stream iterations until exhausted. - events: Option< - Box< - dyn Iterator< - Item = Result< - FilteredEventDetails, - Error, - >, - > + Send - + 'a, - >, - >, -} - -impl<'a, Sub: 'a, T: Config, Filter: EventFilter> Unpin - for FilterEvents<'a, Sub, T, Filter> -{ -} - -impl<'a, Sub: 'a, T: Config, Filter: EventFilter> FilterEvents<'a, Sub, T, Filter> { - pub(crate) fn new(sub: Sub) -> Self { - Self { sub, events: None } - } -} - -impl<'a, Sub, T, Filter> Stream for FilterEvents<'a, Sub, T, Filter> -where - Sub: Stream, Error>> + Unpin + 'a, - T: Config, - Filter: EventFilter, -{ - type Item = Result, Error>; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - loop { - // Drain the current events we're iterating over first: - if let Some(events_iter) = self.events.as_mut() { - match events_iter.next() { - Some(res) => return Poll::Ready(Some(res)), - None => { - self.events = None; - } - } - } - - // Wait for new events to come in: - match futures::ready!(self.sub.poll_next_unpin(cx)) { - None => return Poll::Ready(None), - Some(Err(e)) => return Poll::Ready(Some(Err(e))), - Some(Ok(events)) => { - self.events = Some(Filter::filter(events)); - } - }; - } - } -} - -/// This is returned from the [`FilterEvents`] impl of [`Stream`]. It contains -/// some type representing an event we've filtered on, along with couple of additional -/// pieces of information about that event. -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct FilteredEventDetails { - /// During which [`Phase`] was the event produced? - pub phase: Phase, - /// Hash of the block that this event came from. - pub block_hash: BlockHash, - /// A type containing an event that we've filtered on. - /// Depending on the filter type, this may be a tuple - /// or a single event. - pub event: Evs, -} - -/// This trait is implemented for tuples of Event types; any such tuple (up to size 8) can be -/// used to filter an event subscription to return only the specified events. -pub trait EventFilter: private::Sealed { - /// The type we'll be handed back from filtering. - type ReturnType; - /// Filter the events based on the type implementing this trait. - fn filter( - events: Events, - ) -> Box< - dyn Iterator< - Item = Result, Error>, - > + Send, - >; -} - -// Prevent userspace implementations of the above trait; the interface is not considered stable -// and is not a particularly nice API to work with (particularly because it involves boxing, which -// would be nice to get rid of eventually). -pub(crate) mod private { - pub trait Sealed {} -} - -// A special case impl for searching for a tuple of exactly one event (in this case, we don't -// need to return an `(Option,)`; we can just return `Event`. -impl private::Sealed for (Ev,) {} -impl EventFilter for (Ev,) { - type ReturnType = Ev; - fn filter( - events: Events, - ) -> Box, Error>> + Send> - { - let block_hash = events.block_hash(); - let mut iter = events.iter(); - Box::new(std::iter::from_fn(move || { - for ev in iter.by_ref() { - // Forward any error immediately: - let raw_event = match ev { - Ok(ev) => ev, - Err(e) => return Some(Err(e)), - }; - // Try decoding each type until we hit a match or an error: - let ev = raw_event.as_event::(); - if let Ok(Some(event)) = ev { - // We found a match; return our tuple. - return Some(Ok(FilteredEventDetails { - phase: raw_event.phase(), - block_hash, - event, - })) - } - if let Err(e) = ev { - // We hit an error. Return it. - return Some(Err(e.into())) - } - } - None - })) - } -} - -// A generalised impl for tuples of sizes greater than 1: -macro_rules! impl_event_filter { - ($($ty:ident $idx:tt),+) => { - impl <$($ty: StaticEvent),+> private::Sealed for ( $($ty,)+ ) {} - impl <$($ty: StaticEvent),+> EventFilter for ( $($ty,)+ ) { - type ReturnType = ( $(Option<$ty>,)+ ); - fn filter( - events: Events - ) -> Box, Error>> + Send> { - let block_hash = events.block_hash(); - let mut iter = events.iter(); - Box::new(std::iter::from_fn(move || { - let mut out: ( $(Option<$ty>,)+ ) = Default::default(); - for ev in iter.by_ref() { - // Forward any error immediately: - let raw_event = match ev { - Ok(ev) => ev, - Err(e) => return Some(Err(e)) - }; - // Try decoding each type until we hit a match or an error: - $({ - let ev = raw_event.as_event::<$ty>(); - if let Ok(Some(ev)) = ev { - // We found a match; return our tuple. - out.$idx = Some(ev); - return Some(Ok(FilteredEventDetails { - phase: raw_event.phase(), - block_hash, - event: out - })) - } - if let Err(e) = ev { - // We hit an error. Return it. - return Some(Err(e.into())) - } - })+ - } - None - })) - } - } - } -} - -impl_event_filter!(A 0, B 1); -impl_event_filter!(A 0, B 1, C 2); -impl_event_filter!(A 0, B 1, C 2, D 3); -impl_event_filter!(A 0, B 1, C 2, D 3, E 4); -impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5); -impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6); -impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6, H 7); - -#[cfg(test)] -mod test { - use super::{ - super::events_type::test_utils::{ - event_record, - events, - metadata, - }, - *, - }; - use crate::{ - Config, - Metadata, - SubstrateConfig, - }; - use codec::{ - Decode, - Encode, - }; - use futures::{ - stream, - Stream, - StreamExt, - }; - use scale_info::TypeInfo; - - // Some pretend events in a pallet - #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] - enum PalletEvents { - A(EventA), - B(EventB), - C(EventC), - } - - // An event in our pallet that we can filter on. - #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] - struct EventA(u8); - impl StaticEvent for EventA { - const PALLET: &'static str = "Test"; - const EVENT: &'static str = "A"; - } - - // An event in our pallet that we can filter on. - #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] - struct EventB(bool); - impl StaticEvent for EventB { - const PALLET: &'static str = "Test"; - const EVENT: &'static str = "B"; - } - - // An event in our pallet that we can filter on. - #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] - struct EventC(u8, bool); - impl StaticEvent for EventC { - const PALLET: &'static str = "Test"; - const EVENT: &'static str = "C"; - } - - // A stream of fake events for us to try filtering on. - fn events_stream( - metadata: Metadata, - ) -> impl Stream, Error>> { - stream::iter(vec![ - events::( - metadata.clone(), - vec![ - event_record(Phase::Initialization, PalletEvents::A(EventA(1))), - event_record(Phase::ApplyExtrinsic(0), PalletEvents::B(EventB(true))), - event_record(Phase::Finalization, PalletEvents::A(EventA(2))), - ], - ), - events::( - metadata.clone(), - vec![event_record( - Phase::ApplyExtrinsic(1), - PalletEvents::B(EventB(false)), - )], - ), - events::( - metadata, - vec![ - event_record(Phase::ApplyExtrinsic(2), PalletEvents::B(EventB(true))), - event_record(Phase::ApplyExtrinsic(3), PalletEvents::A(EventA(3))), - ], - ), - ]) - .map(Ok::<_, Error>) - } - - #[tokio::test] - async fn filter_one_event_from_stream() { - let metadata = metadata::(); - - // Filter out fake event stream to select events matching `EventA` only. - let actual: Vec<_> = - FilterEvents::<_, SubstrateConfig, (EventA,)>::new(events_stream(metadata)) - .map(|e| e.unwrap()) - .collect() - .await; - - let expected = vec![ - FilteredEventDetails { - phase: Phase::Initialization, - block_hash: ::Hash::default(), - event: EventA(1), - }, - FilteredEventDetails { - phase: Phase::Finalization, - block_hash: ::Hash::default(), - event: EventA(2), - }, - FilteredEventDetails { - phase: Phase::ApplyExtrinsic(3), - block_hash: ::Hash::default(), - event: EventA(3), - }, - ]; - - assert_eq!(actual, expected); - } - - #[tokio::test] - async fn filter_some_events_from_stream() { - let metadata = metadata::(); - - // Filter out fake event stream to select events matching `EventA` or `EventB`. - let actual: Vec<_> = FilterEvents::<_, SubstrateConfig, (EventA, EventB)>::new( - events_stream(metadata), - ) - .map(|e| e.unwrap()) - .collect() - .await; - - let expected = vec![ - FilteredEventDetails { - phase: Phase::Initialization, - block_hash: ::Hash::default(), - event: (Some(EventA(1)), None), - }, - FilteredEventDetails { - phase: Phase::ApplyExtrinsic(0), - block_hash: ::Hash::default(), - event: (None, Some(EventB(true))), - }, - FilteredEventDetails { - phase: Phase::Finalization, - block_hash: ::Hash::default(), - event: (Some(EventA(2)), None), - }, - FilteredEventDetails { - phase: Phase::ApplyExtrinsic(1), - block_hash: ::Hash::default(), - event: (None, Some(EventB(false))), - }, - FilteredEventDetails { - phase: Phase::ApplyExtrinsic(2), - block_hash: ::Hash::default(), - event: (None, Some(EventB(true))), - }, - FilteredEventDetails { - phase: Phase::ApplyExtrinsic(3), - block_hash: ::Hash::default(), - event: (Some(EventA(3)), None), - }, - ]; - - assert_eq!(actual, expected); - } - - #[tokio::test] - async fn filter_no_events_from_stream() { - let metadata = metadata::(); - - // Filter out fake event stream to select events matching `EventC` (none exist). - let actual: Vec<_> = - FilterEvents::<_, SubstrateConfig, (EventC,)>::new(events_stream(metadata)) - .map(|e| e.unwrap()) - .collect() - .await; - - assert_eq!(actual, vec![]); - } -} diff --git a/subxt/src/events/mod.rs b/subxt/src/events/mod.rs index fb66f1502c2..69213532b6c 100644 --- a/subxt/src/events/mod.rs +++ b/subxt/src/events/mod.rs @@ -6,26 +6,14 @@ //! The two main entry points into events are [`crate::OnlineClient::events()`] //! and calls like [crate::tx::TxProgress::wait_for_finalized_success()]. -mod event_subscription; mod events_client; mod events_type; -mod filter_events; -pub use event_subscription::{ - EventSub, - EventSubscription, - FinalizedEventSub, -}; pub use events_client::EventsClient; pub use events_type::{ EventDetails, Events, }; -pub use filter_events::{ - EventFilter, - FilterEvents, - FilteredEventDetails, -}; use codec::{ Decode, diff --git a/subxt/src/lib.rs b/subxt/src/lib.rs index 38287f4ca9e..86a4db7496f 100644 --- a/subxt/src/lib.rs +++ b/subxt/src/lib.rs @@ -133,6 +133,11 @@ )] #![allow(clippy::type_complexity)] +// Suppress an unused dependency warning because tokio is +// only used in example code snippets at the time of writing. +#[cfg(test)] +use tokio as _; + pub use subxt_macro::subxt; pub mod blocks; diff --git a/subxt/src/rpc/rpc.rs b/subxt/src/rpc/rpc.rs index f8f54823df2..47f716c8db9 100644 --- a/subxt/src/rpc/rpc.rs +++ b/subxt/src/rpc/rpc.rs @@ -69,13 +69,7 @@ use sp_core::{ Bytes, U256, }; -use sp_runtime::{ - generic::{ - Block, - SignedBlock, - }, - ApplyExtrinsicResult, -}; +use sp_runtime::ApplyExtrinsicResult; use std::{ collections::HashMap, sync::Arc, @@ -98,9 +92,40 @@ pub enum NumberOrHex { Hex(U256), } -/// Alias for the type of a block returned by `chain_getBlock` -pub type ChainBlock = - SignedBlock::Header, ::Extrinsic>>; +/// The response from `chain_getBlock` +#[derive(Debug, Deserialize)] +#[serde(bound = "T: Config")] +pub struct ChainBlockResponse { + /// The block itself. + pub block: ChainBlock, + /// Block justification. + pub justifications: Option, +} + +/// Block details in the [`ChainBlockResponse`]. +#[derive(Debug, Deserialize)] +pub struct ChainBlock { + /// The block header. + pub header: T::Header, + /// The accompanying extrinsics. + pub extrinsics: Vec, +} + +/// Bytes representing an extrinsic in a [`ChainBlock`]. +#[derive(Debug)] +pub struct ChainBlockExtrinsic(pub Vec); + +impl<'a> ::serde::Deserialize<'a> for ChainBlockExtrinsic { + fn deserialize(de: D) -> Result + where + D: ::serde::Deserializer<'a>, + { + let r = sp_core::bytes::deserialize(de)?; + let bytes = Decode::decode(&mut &r[..]) + .map_err(|e| ::serde::de::Error::custom(format!("Decode error: {}", e)))?; + Ok(ChainBlockExtrinsic(bytes)) + } +} /// Wrapper for NumberOrHex to allow custom From impls #[derive(Serialize)] @@ -498,7 +523,7 @@ impl Rpc { pub async fn block( &self, hash: Option, - ) -> Result>, Error> { + ) -> Result>, Error> { let params = rpc_params![hash]; let block = self.client.request("chain_getBlock", params).await?; Ok(block) @@ -543,11 +568,16 @@ impl Rpc { Ok(version) } - /// Subscribe to blocks. - pub async fn subscribe_blocks(&self) -> Result, Error> { + /// Subscribe to all new best block headers. + pub async fn subscribe_best_block_headers( + &self, + ) -> Result, Error> { let subscription = self .client .subscribe( + // Despite the name, this returns a stream of all new blocks + // imported by the node that happen to be added to the current best chain + // (ie all best blocks). "chain_subscribeNewHeads", rpc_params![], "chain_unsubscribeNewHeads", @@ -557,8 +587,32 @@ impl Rpc { Ok(subscription) } - /// Subscribe to finalized blocks. - pub async fn subscribe_finalized_blocks( + /// Subscribe to all new block headers. + pub async fn subscribe_all_block_headers( + &self, + ) -> Result, Error> { + let subscription = self + .client + .subscribe( + // Despite the name, this returns a stream of all new blocks + // imported by the node that happen to be added to the current best chain + // (ie all best blocks). + "chain_subscribeAllHeads", + rpc_params![], + "chain_unsubscribeAllHeads", + ) + .await?; + + Ok(subscription) + } + + /// Subscribe to finalized block headers. + /// + /// Note: this may not produce _every_ block in the finalized chain; + /// sometimes multiple blocks are finalized at once, and in this case only the + /// latest one is returned. the higher level APIs that use this "fill in" the + /// gaps for us. + pub async fn subscribe_finalized_block_headers( &self, ) -> Result, Error> { let subscription = self diff --git a/subxt/src/tx/mod.rs b/subxt/src/tx/mod.rs index 8288898efb6..cb0fcd4e518 100644 --- a/subxt/src/tx/mod.rs +++ b/subxt/src/tx/mod.rs @@ -52,7 +52,6 @@ pub use self::{ TxPayload, }, tx_progress::{ - TxEvents, TxInBlock, TxProgress, TxStatus, diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index 45d409b4747..0db6a53d80f 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -14,14 +14,7 @@ use crate::{ RpcError, TransactionError, }, - events::{ - self, - EventDetails, - Events, - EventsClient, - Phase, - StaticEvent, - }, + events::EventsClient, rpc::{ Subscription, SubstrateTxStatus, @@ -147,7 +140,9 @@ where /// may well indicate with some probability that the transaction will not make it into a block, /// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower /// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. - pub async fn wait_for_finalized_success(self) -> Result, Error> { + pub async fn wait_for_finalized_success( + self, + ) -> Result, Error> { let evs = self.wait_for_finalized().await?.wait_for_success().await?; Ok(evs) } @@ -343,7 +338,9 @@ impl> TxInBlock { /// /// **Note:** This has to download block details from the node and decode events /// from them. - pub async fn wait_for_success(&self) -> Result, Error> { + pub async fn wait_for_success( + &self, + ) -> Result, Error> { let events = self.fetch_events().await?; // Try to find any errors; return the first one we encounter. @@ -365,7 +362,7 @@ impl> TxInBlock { /// /// **Note:** This has to download block details from the node and decode events /// from them. - pub async fn fetch_events(&self) -> Result, Error> { + pub async fn fetch_events(&self) -> Result, Error> { let block = self .client .rpc() @@ -376,7 +373,7 @@ impl> TxInBlock { let extrinsic_idx = block.block.extrinsics .iter() .position(|ext| { - let hash = T::Hashing::hash_of(ext); + let hash = T::Hashing::hash_of(&ext.0); hash == self.ext_hash }) // If we successfully obtain the block hash we think contains our @@ -387,77 +384,10 @@ impl> TxInBlock { .at(Some(self.block_hash)) .await?; - Ok(TxEvents { - ext_hash: self.ext_hash, - ext_idx: extrinsic_idx as u32, + Ok(crate::blocks::ExtrinsicEvents::new( + self.ext_hash, + extrinsic_idx as u32, events, - }) - } -} - -/// This represents the events related to our transaction. -/// We can iterate over the events, or look for a specific one. -#[derive(Derivative)] -#[derivative(Debug(bound = ""))] -pub struct TxEvents { - ext_hash: T::Hash, - ext_idx: u32, - events: Events, -} - -impl TxEvents { - /// Return the hash of the block that the transaction has made it into. - pub fn block_hash(&self) -> T::Hash { - self.events.block_hash() - } - - /// Return the hash of the extrinsic. - pub fn extrinsic_hash(&self) -> T::Hash { - self.ext_hash - } - - /// Return all of the events in the block that the transaction made it into. - pub fn all_events_in_block(&self) -> &events::Events { - &self.events - } - - /// Iterate over all of the raw events associated with this transaction. - /// - /// This works in the same way that [`events::Events::iter()`] does, with the - /// exception that it filters out events not related to the submitted extrinsic. - pub fn iter(&self) -> impl Iterator> + '_ { - self.events.iter().filter(|ev| { - ev.as_ref() - .map(|ev| ev.phase() == Phase::ApplyExtrinsic(self.ext_idx)) - .unwrap_or(true) // Keep any errors. - }) - } - - /// Find all of the transaction events matching the event type provided as a generic parameter. - /// - /// This works in the same way that [`events::Events::find()`] does, with the - /// exception that it filters out events not related to the submitted extrinsic. - pub fn find(&self) -> impl Iterator> + '_ { - self.iter().filter_map(|ev| { - ev.and_then(|ev| ev.as_event::().map_err(Into::into)) - .transpose() - }) - } - - /// Iterate through the transaction events using metadata to dynamically decode and skip - /// them, and return the first event found which decodes to the provided `Ev` type. - /// - /// This works in the same way that [`events::Events::find_first()`] does, with the - /// exception that it ignores events not related to the submitted extrinsic. - pub fn find_first(&self) -> Result, Error> { - self.find::().next().transpose() - } - - /// Find an event in those associated with this transaction. Returns true if it was found. - /// - /// This works in the same way that [`events::Events::has()`] does, with the - /// exception that it ignores events not related to the submitted extrinsic. - pub fn has(&self) -> Result { - Ok(self.find::().next().transpose()?.is_some()) + )) } } diff --git a/testing/integration-tests/src/blocks/mod.rs b/testing/integration-tests/src/blocks/mod.rs index 3ec348b6d20..d3449ecc663 100644 --- a/testing/integration-tests/src/blocks/mod.rs +++ b/testing/integration-tests/src/blocks/mod.rs @@ -11,7 +11,7 @@ async fn non_finalized_headers_subscription() -> Result<(), subxt::Error> { let ctx = test_context().await; let api = ctx.client(); - let mut sub = api.blocks().subscribe_headers().await?; + let mut sub = api.blocks().subscribe_best().await?; // Wait for the next set of headers, and check that the // associated block hash is the one we just finalized. @@ -30,7 +30,7 @@ async fn finalized_headers_subscription() -> Result<(), subxt::Error> { let ctx = test_context().await; let api = ctx.client(); - let mut sub = api.blocks().subscribe_finalized_headers().await?; + let mut sub = api.blocks().subscribe_finalized().await?; // Wait for the next set of headers, and check that the // associated block hash is the one we just finalized. @@ -52,7 +52,7 @@ async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::Error> { // that there will be some gaps, even if there aren't any from the subscription. let some_finalized_blocks = api .rpc() - .subscribe_finalized_blocks() + .subscribe_finalized_block_headers() .await? .enumerate() .take(6) diff --git a/testing/integration-tests/src/client/mod.rs b/testing/integration-tests/src/client/mod.rs index b8aa43f5356..92cdbf2a5cd 100644 --- a/testing/integration-tests/src/client/mod.rs +++ b/testing/integration-tests/src/client/mod.rs @@ -74,11 +74,20 @@ async fn fetch_read_proof() { } #[tokio::test] -async fn chain_subscribe_blocks() { +async fn chain_subscribe_all_blocks() { let ctx = test_context().await; let api = ctx.client(); - let mut blocks = api.rpc().subscribe_blocks().await.unwrap(); + let mut blocks = api.rpc().subscribe_all_block_headers().await.unwrap(); + blocks.next().await.unwrap().unwrap(); +} + +#[tokio::test] +async fn chain_subscribe_best_blocks() { + let ctx = test_context().await; + let api = ctx.client(); + + let mut blocks = api.rpc().subscribe_best_block_headers().await.unwrap(); blocks.next().await.unwrap().unwrap(); } @@ -87,7 +96,7 @@ async fn chain_subscribe_finalized_blocks() { let ctx = test_context().await; let api = ctx.client(); - let mut blocks = api.rpc().subscribe_finalized_blocks().await.unwrap(); + let mut blocks = api.rpc().subscribe_finalized_block_headers().await.unwrap(); blocks.next().await.unwrap().unwrap(); } diff --git a/testing/integration-tests/src/events/mod.rs b/testing/integration-tests/src/events/mod.rs deleted file mode 100644 index bf629d600e7..00000000000 --- a/testing/integration-tests/src/events/mod.rs +++ /dev/null @@ -1,212 +0,0 @@ -// Copyright 2019-2022 Parity Technologies (UK) Ltd. -// This file is dual-licensed as Apache-2.0 or GPL-3.0. -// see LICENSE for license details. - -use crate::{ - node_runtime::{ - self, - balances, - system, - }, - pair_signer, - test_context, - utils::wait_for_blocks, -}; -use futures::StreamExt; -use sp_keyring::AccountKeyring; - -// Check that we can subscribe to non-finalized block events. -#[tokio::test] -async fn non_finalized_block_subscription() -> Result<(), subxt::Error> { - let ctx = test_context().await; - let api = ctx.client(); - - let mut event_sub = api.events().subscribe().await?; - - // Wait for the next set of events, and check that the - // associated block hash is not finalized yet. - let events = event_sub.next().await.unwrap()?; - let event_block_hash = events.block_hash(); - let current_block_hash = api.rpc().block_hash(None).await?.unwrap(); - - assert_eq!(event_block_hash, current_block_hash); - Ok(()) -} - -// Check that we can subscribe to finalized block events. -#[tokio::test] -async fn finalized_block_subscription() -> Result<(), subxt::Error> { - let ctx = test_context().await; - let api = ctx.client(); - - let mut event_sub = api.events().subscribe_finalized().await?; - - // Wait for the next set of events, and check that the - // associated block hash is the one we just finalized. - // (this can be a bit slow as we have to wait for finalization) - let events = event_sub.next().await.unwrap()?; - let event_block_hash = events.block_hash(); - let finalized_hash = api.rpc().finalized_head().await?; - - assert_eq!(event_block_hash, finalized_hash); - Ok(()) -} - -// Check that our subscription actually keeps producing events for -// a few blocks. -#[tokio::test] -async fn subscription_produces_events_each_block() -> Result<(), subxt::Error> { - let ctx = test_context().await; - let api = ctx.client(); - - wait_for_blocks(&api).await; - - let mut event_sub = api.events().subscribe().await?; - - for i in 0..3 { - let events = event_sub - .next() - .await - .expect("events expected each block")?; - - let success_event = events - .find_first::() - .expect("decode error"); - - if success_event.is_none() { - let n = events.len(); - panic!("Expected an extrinsic success event on iteration {i} (saw {n} other events)") - } - } - - Ok(()) -} - -// Iterate all of the events in a few blocks to ensure we can decode them properly. -#[tokio::test] -async fn decoding_all_events_in_a_block_works() -> Result<(), subxt::Error> { - let ctx = test_context().await; - let api = ctx.client(); - - wait_for_blocks(&api).await; - - let mut event_sub = api.events().subscribe().await?; - - tokio::spawn(async move { - let alice = pair_signer(AccountKeyring::Alice.pair()); - let bob = AccountKeyring::Bob.to_account_id(); - let transfer_tx = node_runtime::tx() - .balances() - .transfer(bob.clone().into(), 10_000); - - // Make a load of transfers to get lots of events going. - for _i in 0..10 { - api.tx() - .sign_and_submit_then_watch_default(&transfer_tx, &alice) - .await - .expect("can submit_transaction"); - } - }); - - for _ in 0..4 { - let events = event_sub - .next() - .await - .expect("events expected each block")?; - - for event in events.iter() { - // make sure that we can get every event properly. - let event = event.expect("valid event decoded"); - // make sure that we can decode the field values from every event. - event.field_values().expect("can decode fields"); - } - } - - Ok(()) -} - -// Check that our subscription receives events, and we can filter them based on -// it's Stream impl, and ultimately see the event we expect. -#[tokio::test] -async fn balance_transfer_subscription() -> Result<(), subxt::Error> { - let ctx = test_context().await; - let api = ctx.client(); - - // Subscribe to balance transfer events, ignoring all else. - let event_sub = api - .events() - .subscribe() - .await? - .filter_events::<(balances::events::Transfer,)>(); - - // Calling `.next()` on the above borrows it, and the `filter_map` - // means it's no longer `Unpin`, so we pin it on the stack: - futures::pin_mut!(event_sub); - - // Make a transfer: - let alice = pair_signer(AccountKeyring::Alice.pair()); - let bob = AccountKeyring::Bob.to_account_id(); - let transfer_tx = node_runtime::tx() - .balances() - .transfer(bob.clone().into(), 10_000); - - api.tx() - .sign_and_submit_then_watch_default(&transfer_tx, &alice) - .await?; - - // Wait for the next balance transfer event in our subscription stream - // and check that it lines up: - let event = event_sub.next().await.unwrap().unwrap().event; - assert_eq!( - event, - balances::events::Transfer { - from: alice.account_id().clone(), - to: bob.clone(), - amount: 10_000 - } - ); - - Ok(()) -} - -// This is just a compile-time check that we can subscribe to events in -// a context that requires the event subscription/filtering to be Send-able. -// We test a typical use of EventSubscription and FilterEvents. We don't need -// to run this code; just check that it compiles. -#[allow(unused)] -async fn check_events_are_sendable() { - // check that EventSubscription can be used across await points. - tokio::task::spawn(async { - let ctx = test_context().await; - - let mut event_sub = ctx.client().events().subscribe().await?; - - while let Some(ev) = event_sub.next().await { - // if `event_sub` doesn't implement Send, we can't hold - // it across an await point inside of a tokio::spawn, which - // requires Send. This will lead to a compile error. - } - - Ok::<_, subxt::Error>(()) - }); - - // Check that FilterEvents can be used across await points. - tokio::task::spawn(async { - let ctx = test_context().await; - - let mut event_sub = ctx - .client() - .events() - .subscribe() - .await? - .filter_events::<(balances::events::Transfer,)>(); - - while let Some(ev) = event_sub.next().await { - // if `event_sub` doesn't implement Send, we can't hold - // it across an await point inside of a tokio::spawn, which - // requires Send; This will lead to a compile error. - } - - Ok::<_, subxt::Error>(()) - }); -} diff --git a/testing/integration-tests/src/lib.rs b/testing/integration-tests/src/lib.rs index 4959d675063..12f61e56677 100644 --- a/testing/integration-tests/src/lib.rs +++ b/testing/integration-tests/src/lib.rs @@ -14,8 +14,6 @@ mod blocks; #[cfg(test)] mod client; #[cfg(test)] -mod events; -#[cfg(test)] mod frame; #[cfg(test)] mod metadata; diff --git a/testing/integration-tests/src/utils/wait_for_blocks.rs b/testing/integration-tests/src/utils/wait_for_blocks.rs index 0d190ec8ec3..d17ae33076e 100644 --- a/testing/integration-tests/src/utils/wait_for_blocks.rs +++ b/testing/integration-tests/src/utils/wait_for_blocks.rs @@ -11,7 +11,7 @@ use subxt::{ /// (the genesis block and another one) seems to be enough to allow tests /// like `dry_run_passes` to work properly. pub async fn wait_for_blocks(api: &impl OnlineClientT) { - let mut sub = api.rpc().subscribe_blocks().await.unwrap(); + let mut sub = api.rpc().subscribe_all_block_headers().await.unwrap(); sub.next().await; sub.next().await; }