Skip to content
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ sc-transaction-pool-api = { path = "../transaction-pool/api" }
sp-core = { path = "../../primitives/core" }
sp-runtime = { path = "../../primitives/runtime" }
sp-api = { path = "../../primitives/api" }
sp-rpc = { path = "../../primitives/rpc" }
sp-blockchain = { path = "../../primitives/blockchain" }
sp-version = { path = "../../primitives/version" }
sc-client-api = { path = "../api" }
Expand Down
11 changes: 9 additions & 2 deletions substrate/client/rpc-spec-v2/src/chain_head/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! API trait of the chain head.
use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use sp_rpc::list::ListOrValue;

#[rpc(client, server)]
pub trait ChainHeadApi<Hash> {
Expand Down Expand Up @@ -109,16 +110,22 @@ pub trait ChainHeadApi<Hash> {
call_parameters: String,
) -> RpcResult<MethodResponse>;

/// Unpin a block reported by the `follow` method.
/// Unpin a block or multiple blocks reported by the `follow` method.
///
/// Ongoing operations that require the provided block
/// will continue normally.
///
/// When this method returns an error, it is guaranteed that no blocks have been unpinned.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_unpin", blocking)]
fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>;
fn chain_head_unstable_unpin(
&self,
follow_subscription: String,
hash_or_hashes: ListOrValue<Hash>,
) -> RpcResult<()>;

/// Resumes a storage fetch started with `chainHead_storage` after it has generated an
/// `operationWaitingForContinue` event.
Expand Down
10 changes: 8 additions & 2 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use sc_client_api::{
use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{traits::CallContext, Bytes};
use sp_rpc::list::ListOrValue;
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc, time::Duration};

Expand Down Expand Up @@ -432,9 +433,14 @@ where
fn chain_head_unstable_unpin(
&self,
follow_subscription: String,
hash: Block::Hash,
hash_or_hashes: ListOrValue<Block::Hash>,
) -> RpcResult<()> {
match self.subscriptions.unpin_block(&follow_subscription, hash) {
let hashes = match hash_or_hashes {
ListOrValue::Value(hash) => vec![hash],
ListOrValue::List(hashes) => hashes,
};

match self.subscriptions.unpin_blocks(&follow_subscription, hashes) {
Ok(()) => Ok(()),
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
Expand Down
50 changes: 37 additions & 13 deletions substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,22 +750,46 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
}
}

pub fn unpin_block(
pub fn unpin_blocks(
&mut self,
sub_id: &str,
hash: Block::Hash,
hashes: Vec<Block::Hash>,
) -> Result<(), SubscriptionManagementError> {
let Some(sub) = self.subs.get_mut(sub_id) else {
// Check if the subscription ID is valid or not.
if self.subs.get(sub_id).is_none() {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};

// Check that unpin was not called before and the block was pinned
// for this subscription.
if !sub.unregister_block(hash) {
return Err(SubscriptionManagementError::BlockHashAbsent)
// Ensure that all blocks are part of the subscription.
{
let sub = self.subs.get(sub_id).expect("Subscription ID is present from above; qed");

for hash in &hashes {
if !sub.contains_block(*hash) {
return Err(SubscriptionManagementError::BlockHashAbsent);
}
}
}

for hash in hashes {
// Get a short-lived `&mut SubscriptionState` to avoid borrowing self twice.
// - once from `self.subs`
// - once from `self.global_unregister_block()`.
//
// Borrowing `self.sub` and `self.global_unregister_block` is again safe because they
// operate on different fields, but the compiler is not capable of introspecting at that
// level, not even with `#[inline(always)]` on `global_unregister_block`.
//
// This is safe because we already checked that the subscription ID is valid and we
// operate under a lock.
let sub =
self.subs.get_mut(sub_id).expect("Subscription ID is present from above; qed");

// Block was checked above for presence.
sub.unregister_block(hash);
self.global_unregister_block(hash);
}

self.global_unregister_block(hash);
Ok(())
}

Expand Down Expand Up @@ -1029,11 +1053,11 @@ mod tests {
assert_eq!(block.has_runtime(), true);

let invalid_id = "abc-invalid".to_string();
let err = subs.unpin_block(&invalid_id, hash).unwrap_err();
let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);

// Unpin the block.
subs.unpin_block(&id, hash).unwrap();
subs.unpin_blocks(&id, vec![hash]).unwrap();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
}
Expand Down Expand Up @@ -1077,13 +1101,13 @@ mod tests {
// Ensure the block propagated to the subscription.
subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap();

subs.unpin_block(&id, hash).unwrap();
subs.unpin_blocks(&id, vec![hash]).unwrap();
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
// Cannot unpin a block twice for the same subscription.
let err = subs.unpin_block(&id, hash).unwrap_err();
let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);

subs.unpin_block(&id_second, hash).unwrap();
subs.unpin_blocks(&id_second, vec![hash]).unwrap();
// Block unregistered from the memory.
assert!(subs.global_blocks.get(&hash).is_none());
}
Expand Down
17 changes: 9 additions & 8 deletions substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,23 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
inner.pin_block(sub_id, hash)
}

/// Unpin the block from the subscription.
/// Unpin the blocks from the subscription.
///
/// The last subscription that unpins the block is also unpinning the block
/// from the backend.
/// Blocks are reference counted and when the last subscription unpins a given block, the block
/// is also unpinned from the backend.
///
/// This method is called only once per subscription.
///
/// Returns an error if the block is not pinned for the subscription or
/// the subscription ID is invalid.
pub fn unpin_block(
/// Returns an error if the subscription ID is invalid, or any of the blocks are not pinned
/// for the subscriptions. When an error is returned, it is guaranteed that no blocks have
/// been unpinned.
pub fn unpin_blocks(
&self,
sub_id: &str,
hash: Block::Hash,
hashes: Vec<Block::Hash>,
) -> Result<(), SubscriptionManagementError> {
let mut inner = self.inner.write();
inner.unpin_block(sub_id, hash)
inner.unpin_blocks(sub_id, hashes)
}

/// Ensure the block remains pinned until the return object is dropped.
Expand Down
Loading