Skip to content

Commit 34b2e92

Browse files
authored
chainHead: Support multiple hashes for chainHead_unpin method (paritytech#2295)
This PR adds support for multiple hashes being passed to the `chainHeda_unpin` parameters. The `hash` parameter is renamed to `hash_or_hashes` per paritytech/json-rpc-interface-spec#111. While at it, a new integration test is added to check the unpinning of multiple hashes. The API is checked against a hash or a vector of hashes. cc @paritytech/subxt-team --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
1 parent b516dcb commit 34b2e92

6 files changed

Lines changed: 223 additions & 29 deletions

File tree

substrate/client/rpc-spec-v2/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ sc-transaction-pool-api = { path = "../transaction-pool/api" }
2121
sp-core = { path = "../../primitives/core" }
2222
sp-runtime = { path = "../../primitives/runtime" }
2323
sp-api = { path = "../../primitives/api" }
24+
sp-rpc = { path = "../../primitives/rpc" }
2425
sp-blockchain = { path = "../../primitives/blockchain" }
2526
sp-version = { path = "../../primitives/version" }
2627
sc-client-api = { path = "../api" }

substrate/client/rpc-spec-v2/src/chain_head/api.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
//! API trait of the chain head.
2222
use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery};
2323
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
24+
use sp_rpc::list::ListOrValue;
2425

2526
#[rpc(client, server)]
2627
pub trait ChainHeadApi<Hash> {
@@ -109,16 +110,22 @@ pub trait ChainHeadApi<Hash> {
109110
call_parameters: String,
110111
) -> RpcResult<MethodResponse>;
111112

112-
/// Unpin a block reported by the `follow` method.
113+
/// Unpin a block or multiple blocks reported by the `follow` method.
113114
///
114115
/// Ongoing operations that require the provided block
115116
/// will continue normally.
116117
///
118+
/// When this method returns an error, it is guaranteed that no blocks have been unpinned.
119+
///
117120
/// # Unstable
118121
///
119122
/// This method is unstable and subject to change in the future.
120123
#[method(name = "chainHead_unstable_unpin", blocking)]
121-
fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>;
124+
fn chain_head_unstable_unpin(
125+
&self,
126+
follow_subscription: String,
127+
hash_or_hashes: ListOrValue<Hash>,
128+
) -> RpcResult<()>;
122129

123130
/// Resumes a storage fetch started with `chainHead_storage` after it has generated an
124131
/// `operationWaitingForContinue` event.

substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use sc_client_api::{
4848
use sp_api::CallApiAt;
4949
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
5050
use sp_core::{traits::CallContext, Bytes};
51+
use sp_rpc::list::ListOrValue;
5152
use sp_runtime::traits::Block as BlockT;
5253
use std::{marker::PhantomData, sync::Arc, time::Duration};
5354

@@ -432,9 +433,16 @@ where
432433
fn chain_head_unstable_unpin(
433434
&self,
434435
follow_subscription: String,
435-
hash: Block::Hash,
436+
hash_or_hashes: ListOrValue<Block::Hash>,
436437
) -> RpcResult<()> {
437-
match self.subscriptions.unpin_block(&follow_subscription, hash) {
438+
let result = match hash_or_hashes {
439+
ListOrValue::Value(hash) =>
440+
self.subscriptions.unpin_blocks(&follow_subscription, [hash]),
441+
ListOrValue::List(hashes) =>
442+
self.subscriptions.unpin_blocks(&follow_subscription, hashes),
443+
};
444+
445+
match result {
438446
Ok(()) => Ok(()),
439447
Err(SubscriptionManagementError::SubscriptionAbsent) => {
440448
// Invalid invalid subscription ID.

substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -750,22 +750,36 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
750750
}
751751
}
752752

753-
pub fn unpin_block(
753+
pub fn unpin_blocks(
754754
&mut self,
755755
sub_id: &str,
756-
hash: Block::Hash,
756+
hashes: impl IntoIterator<Item = Block::Hash> + Clone,
757757
) -> Result<(), SubscriptionManagementError> {
758758
let Some(sub) = self.subs.get_mut(sub_id) else {
759759
return Err(SubscriptionManagementError::SubscriptionAbsent)
760760
};
761761

762-
// Check that unpin was not called before and the block was pinned
763-
// for this subscription.
764-
if !sub.unregister_block(hash) {
765-
return Err(SubscriptionManagementError::BlockHashAbsent)
762+
// Ensure that all blocks are part of the subscription before removing individual
763+
// blocks.
764+
for hash in hashes.clone() {
765+
if !sub.contains_block(hash) {
766+
return Err(SubscriptionManagementError::BlockHashAbsent);
767+
}
768+
}
769+
770+
// Note: this needs to be separate from the global mappings to avoid barrow checker
771+
// thinking we borrow `&mut self` twice: once from `self.subs.get_mut` and once from
772+
// `self.global_unregister_block`. Although the borrowing is correct, since different
773+
// fields of the structure are borrowed, one at a time.
774+
for hash in hashes.clone() {
775+
sub.unregister_block(hash);
776+
}
777+
778+
// Block have been removed from the subscription. Remove them from the global tracking.
779+
for hash in hashes {
780+
self.global_unregister_block(hash);
766781
}
767782

768-
self.global_unregister_block(hash);
769783
Ok(())
770784
}
771785

@@ -1029,11 +1043,11 @@ mod tests {
10291043
assert_eq!(block.has_runtime(), true);
10301044

10311045
let invalid_id = "abc-invalid".to_string();
1032-
let err = subs.unpin_block(&invalid_id, hash).unwrap_err();
1046+
let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err();
10331047
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
10341048

10351049
// Unpin the block.
1036-
subs.unpin_block(&id, hash).unwrap();
1050+
subs.unpin_blocks(&id, vec![hash]).unwrap();
10371051
let err = subs.lock_block(&id, hash, 1).unwrap_err();
10381052
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
10391053
}
@@ -1077,13 +1091,13 @@ mod tests {
10771091
// Ensure the block propagated to the subscription.
10781092
subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap();
10791093

1080-
subs.unpin_block(&id, hash).unwrap();
1094+
subs.unpin_blocks(&id, vec![hash]).unwrap();
10811095
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
10821096
// Cannot unpin a block twice for the same subscription.
1083-
let err = subs.unpin_block(&id, hash).unwrap_err();
1097+
let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err();
10841098
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
10851099

1086-
subs.unpin_block(&id_second, hash).unwrap();
1100+
subs.unpin_blocks(&id_second, vec![hash]).unwrap();
10871101
// Block unregistered from the memory.
10881102
assert!(subs.global_blocks.get(&hash).is_none());
10891103
}

substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,22 +94,23 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
9494
inner.pin_block(sub_id, hash)
9595
}
9696

97-
/// Unpin the block from the subscription.
97+
/// Unpin the blocks from the subscription.
9898
///
99-
/// The last subscription that unpins the block is also unpinning the block
100-
/// from the backend.
99+
/// Blocks are reference counted and when the last subscription unpins a given block, the block
100+
/// is also unpinned from the backend.
101101
///
102102
/// This method is called only once per subscription.
103103
///
104-
/// Returns an error if the block is not pinned for the subscription or
105-
/// the subscription ID is invalid.
106-
pub fn unpin_block(
104+
/// Returns an error if the subscription ID is invalid, or any of the blocks are not pinned
105+
/// for the subscriptions. When an error is returned, it is guaranteed that no blocks have
106+
/// been unpinned.
107+
pub fn unpin_blocks(
107108
&self,
108109
sub_id: &str,
109-
hash: Block::Hash,
110+
hashes: impl IntoIterator<Item = Block::Hash> + Clone,
110111
) -> Result<(), SubscriptionManagementError> {
111112
let mut inner = self.inner.write();
112-
inner.unpin_block(sub_id, hash)
113+
inner.unpin_blocks(sub_id, hashes)
113114
}
114115

115116
/// Ensure the block remains pinned until the return object is dropped.

substrate/client/rpc-spec-v2/src/chain_head/tests.rs

Lines changed: 168 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1591,22 +1591,28 @@ async fn follow_with_unpin() {
15911591
// Unpin an invalid subscription ID must return Ok(()).
15921592
let invalid_hash = hex_string(&INVALID_HASH);
15931593
let _res: () = api
1594-
.call("chainHead_unstable_unpin", ["invalid_sub_id", &invalid_hash])
1594+
.call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash])
15951595
.await
15961596
.unwrap();
15971597

15981598
// Valid subscription with invalid block hash.
15991599
let invalid_hash = hex_string(&INVALID_HASH);
16001600
let err = api
1601-
.call::<_, serde_json::Value>("chainHead_unstable_unpin", [&sub_id, &invalid_hash])
1601+
.call::<_, serde_json::Value>(
1602+
"chainHead_unstable_unpin",
1603+
rpc_params![&sub_id, &invalid_hash],
1604+
)
16021605
.await
16031606
.unwrap_err();
16041607
assert_matches!(err,
16051608
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
16061609
);
16071610

16081611
// To not exceed the number of pinned blocks, we need to unpin before the next import.
1609-
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap();
1612+
let _res: () = api
1613+
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash])
1614+
.await
1615+
.unwrap();
16101616

16111617
// Block tree:
16121618
// finalized_block -> block -> block2
@@ -1645,6 +1651,160 @@ async fn follow_with_unpin() {
16451651
assert!(sub.next::<FollowEvent<String>>().await.is_none());
16461652
}
16471653

1654+
#[tokio::test]
1655+
async fn follow_with_multiple_unpin_hashes() {
1656+
let builder = TestClientBuilder::new();
1657+
let backend = builder.backend();
1658+
let mut client = Arc::new(builder.build());
1659+
1660+
let api = ChainHead::new(
1661+
client.clone(),
1662+
backend,
1663+
Arc::new(TaskExecutor::default()),
1664+
CHAIN_GENESIS,
1665+
ChainHeadConfig {
1666+
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
1667+
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
1668+
subscription_max_ongoing_operations: MAX_OPERATIONS,
1669+
operation_max_storage_items: MAX_PAGINATION_LIMIT,
1670+
},
1671+
)
1672+
.into_rpc();
1673+
1674+
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
1675+
let sub_id = sub.subscription_id();
1676+
let sub_id = serde_json::to_string(&sub_id).unwrap();
1677+
1678+
// Import 3 blocks.
1679+
let block_1 = BlockBuilderBuilder::new(&*client)
1680+
.on_parent_block(client.chain_info().genesis_hash)
1681+
.with_parent_block_number(0)
1682+
.build()
1683+
.unwrap()
1684+
.build()
1685+
.unwrap()
1686+
.block;
1687+
let block_1_hash = block_1.header.hash();
1688+
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();
1689+
1690+
let block_2 = BlockBuilderBuilder::new(&*client)
1691+
.on_parent_block(block_1.hash())
1692+
.with_parent_block_number(1)
1693+
.build()
1694+
.unwrap()
1695+
.build()
1696+
.unwrap()
1697+
.block;
1698+
let block_2_hash = block_2.header.hash();
1699+
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();
1700+
1701+
let block_3 = BlockBuilderBuilder::new(&*client)
1702+
.on_parent_block(block_2.hash())
1703+
.with_parent_block_number(2)
1704+
.build()
1705+
.unwrap()
1706+
.build()
1707+
.unwrap()
1708+
.block;
1709+
let block_3_hash = block_3.header.hash();
1710+
client.import(BlockOrigin::Own, block_3.clone()).await.unwrap();
1711+
1712+
// Ensure the imported block is propagated and pinned for this subscription.
1713+
assert_matches!(
1714+
get_next_event::<FollowEvent<String>>(&mut sub).await,
1715+
FollowEvent::Initialized(_)
1716+
);
1717+
assert_matches!(
1718+
get_next_event::<FollowEvent<String>>(&mut sub).await,
1719+
FollowEvent::NewBlock(_)
1720+
);
1721+
assert_matches!(
1722+
get_next_event::<FollowEvent<String>>(&mut sub).await,
1723+
FollowEvent::BestBlockChanged(_)
1724+
);
1725+
assert_matches!(
1726+
get_next_event::<FollowEvent<String>>(&mut sub).await,
1727+
FollowEvent::NewBlock(_)
1728+
);
1729+
assert_matches!(
1730+
get_next_event::<FollowEvent<String>>(&mut sub).await,
1731+
FollowEvent::BestBlockChanged(_)
1732+
);
1733+
assert_matches!(
1734+
get_next_event::<FollowEvent<String>>(&mut sub).await,
1735+
FollowEvent::NewBlock(_)
1736+
);
1737+
assert_matches!(
1738+
get_next_event::<FollowEvent<String>>(&mut sub).await,
1739+
FollowEvent::BestBlockChanged(_)
1740+
);
1741+
1742+
// Unpin an invalid subscription ID must return Ok(()).
1743+
let invalid_hash = hex_string(&INVALID_HASH);
1744+
let _res: () = api
1745+
.call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash])
1746+
.await
1747+
.unwrap();
1748+
1749+
// Valid subscription with invalid block hash.
1750+
let err = api
1751+
.call::<_, serde_json::Value>(
1752+
"chainHead_unstable_unpin",
1753+
rpc_params![&sub_id, &invalid_hash],
1754+
)
1755+
.await
1756+
.unwrap_err();
1757+
assert_matches!(err,
1758+
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
1759+
);
1760+
1761+
let _res: () = api
1762+
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_1_hash])
1763+
.await
1764+
.unwrap();
1765+
1766+
// One block hash is invalid. Block 1 is already unpinned.
1767+
let err = api
1768+
.call::<_, serde_json::Value>(
1769+
"chainHead_unstable_unpin",
1770+
rpc_params![&sub_id, vec![&block_1_hash, &block_2_hash, &block_3_hash]],
1771+
)
1772+
.await
1773+
.unwrap_err();
1774+
assert_matches!(err,
1775+
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
1776+
);
1777+
1778+
// Unpin multiple blocks.
1779+
let _res: () = api
1780+
.call("chainHead_unstable_unpin", rpc_params![&sub_id, vec![&block_2_hash, &block_3_hash]])
1781+
.await
1782+
.unwrap();
1783+
1784+
// Check block 2 and 3 are unpinned.
1785+
let err = api
1786+
.call::<_, serde_json::Value>(
1787+
"chainHead_unstable_unpin",
1788+
rpc_params![&sub_id, &block_2_hash],
1789+
)
1790+
.await
1791+
.unwrap_err();
1792+
assert_matches!(err,
1793+
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
1794+
);
1795+
1796+
let err = api
1797+
.call::<_, serde_json::Value>(
1798+
"chainHead_unstable_unpin",
1799+
rpc_params![&sub_id, &block_3_hash],
1800+
)
1801+
.await
1802+
.unwrap_err();
1803+
assert_matches!(err,
1804+
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
1805+
);
1806+
}
1807+
16481808
#[tokio::test]
16491809
async fn follow_prune_best_block() {
16501810
let builder = TestClientBuilder::new();
@@ -1828,7 +1988,7 @@ async fn follow_prune_best_block() {
18281988
let sub_id = sub.subscription_id();
18291989
let sub_id = serde_json::to_string(&sub_id).unwrap();
18301990
let hash = format!("{:?}", block_2_hash);
1831-
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &hash]).await.unwrap();
1991+
let _res: () = api.call("chainHead_unstable_unpin", rpc_params![&sub_id, &hash]).await.unwrap();
18321992
}
18331993

18341994
#[tokio::test]
@@ -2305,7 +2465,10 @@ async fn pin_block_references() {
23052465
wait_pinned_references(&backend, &hash, 1).await;
23062466

23072467
// To not exceed the number of pinned blocks, we need to unpin before the next import.
2308-
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap();
2468+
let _res: () = api
2469+
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash])
2470+
.await
2471+
.unwrap();
23092472

23102473
// Make sure unpin clears out the reference.
23112474
let refs = backend.pin_refs(&hash).unwrap();

0 commit comments

Comments
 (0)