Skip to content

Commit 059d1e1

Browse files
svyatonikhitchhooker
authored andcommitted
Fixed RPC subscriptions leak when subscription stream is finished (paritytech#4533)
closes paritytech/parity-bridges-common#3000 Recently we've changed our bridge configuration for Rococo <> Westend and our new relayer has started to submit transactions every ~ `30` seconds. Eventually, it switches itself into limbo state, where it can't submit more transactions - all `author_submitAndWatchExtrinsic` calls are failing with the following error: `ERROR bridge Failed to send transaction to BridgeHubRococo node: Call(ErrorObject { code: ServerError(-32006), message: "Too many subscriptions on the connection", data: Some(RawValue("Exceeded max limit of 1024")) })`. Some links for those who want to explore: - server side (node) has a strict limit on a number of active subscriptions. It fails to open a new subscription if this limit is hit: https://github.com/paritytech/jsonrpsee/blob/a4533966b997e83632509ad97eea010fc7c3efc0/server/src/middleware/rpc/layer/rpc_service.rs#L122-L132. The limit is set to `1024` by default; - internally this limit is a semaphore with `limit` permits: https://github.com/paritytech/jsonrpsee/blob/a4533966b997e83632509ad97eea010fc7c3efc0/core/src/server/subscription.rs#L461-L485; - semaphore permit is acquired in the first link; - the permit is "returned" when the `SubscriptionSink` is dropped: https://github.com/paritytech/jsonrpsee/blob/a4533966b997e83632509ad97eea010fc7c3efc0/core/src/server/subscription.rs#L310-L325; - the `SubscriptionSink` is dropped when [this `polkadot-sdk` function](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L58-L94) returns. In other words - when the connection is closed, the stream is finished or internal subscription buffer limit is hit; - the subscription has the internal buffer, so sending an item contains of two steps: [reading an item from the underlying stream](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L125-L141) and [sending it over the connection](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L111-L116); - when the underlying stream is finished, the `inner_pipe_from_stream` wants to ensure that all items are sent to the subscriber. So it: [waits until the current send operation completes](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L146-L148) and then [send all remaining items from the internal buffer](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L150-L155). Once it is done, the function returns, the `SubscriptionSink` is dropped, semaphore permit is dropped and we are ready to accept new subscriptions; - unfortunately, the code just calls the `pending_fut.await.is_err()` to ensure that [the current send operation completes](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L146-L148). But if there are no current send operation (which is normal), then the `pending_fut` is set to terminated future and the `await` never completes. Hence, no return from the function, no drop of `SubscriptionSink`, no drop of semaphore permit, no new subscriptions allowed (once number of susbcriptions hits the limit. I've illustrated the issue with small test - you may ensure that if e.g. the stream is initially empty, the `subscription_is_dropped_when_stream_is_empty` will hang because `pipe_from_stream` never exits.
1 parent 3d24d5a commit 059d1e1

2 files changed

Lines changed: 35 additions & 1 deletion

File tree

prdoc/pr_4533.prdoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
title: "Fixed RPC subscriptions leak when subscription stream is finished"
2+
3+
doc:
4+
- audience: Node Operator
5+
description: |
6+
The node may leak RPC subscriptions in some cases, e.g. during
7+
`author_submitAndWatchExtrinsic` calls. This PR fixes the issue.
8+
9+
crates:
10+
- name: sc-rpc

substrate/client/rpc/src/utils.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ async fn inner_pipe_from_stream<S, T>(
143143
//
144144
// Process remaining items and terminate.
145145
Either::Right((Either::Right((None, pending_fut)), _)) => {
146-
if pending_fut.await.is_err() {
146+
if !pending_fut.is_terminated() && pending_fut.await.is_err() {
147147
return;
148148
}
149149

@@ -231,4 +231,28 @@ mod tests {
231231
_ = rx.next().await.unwrap();
232232
assert!(sub.next::<usize>().await.is_none());
233233
}
234+
235+
#[tokio::test]
236+
async fn subscription_is_dropped_when_stream_is_empty() {
237+
let notify_rx = std::sync::Arc::new(tokio::sync::Notify::new());
238+
let notify_tx = notify_rx.clone();
239+
240+
let mut module = RpcModule::new(notify_tx);
241+
module
242+
.register_subscription("sub", "my_sub", "unsub", |_, pending, notify_tx| async move {
243+
// emulate empty stream for simplicity: otherwise we need some mechanism
244+
// to sync buffer and channel send operations
245+
let stream = futures::stream::empty::<()>();
246+
// this should exit immediately
247+
pipe_from_stream(pending, stream).await;
248+
// notify that the `pipe_from_stream` has returned
249+
notify_tx.notify_one();
250+
Ok(())
251+
})
252+
.unwrap();
253+
module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
254+
255+
// it should fire once `pipe_from_stream` returns
256+
notify_rx.notified().await;
257+
}
234258
}

0 commit comments

Comments
 (0)