Skip to content

Commit ba1fe68

Browse files
authored
Fix stalling issue if subscription.next() is stuck (#1119)
* Fix stalling issue if subscription.next() is stuck 1. Replaced tokio::select! with tokio::time::timeout that directly wraps subscription.next() 2. Simplified the timeout logic - now it's a straightforward 60-second timeout on each subscription call 3. Fixed the stalling issue - the timeout will always fire after 60 seconds, even if subscription.next() is stuck The key improvement is that the timeout now wraps the potentially hanging operation directly, guaranteeing detection of a stalled subscription. This eliminates the future starvation issue that could prevent the original timeout from firing. * Fix potential stalling in the runtime_upgrade_task * Add periodic health checks to runtime upgrade subscription Added RPC health checks every hour to detect hung connections without the need to recreate the updater subscription every hour. * Fix fmt
1 parent f215897 commit ba1fe68

2 files changed

Lines changed: 58 additions & 46 deletions

File tree

src/commands/multi_block/monitor.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,13 @@ where
7979
T::VoterSnapshotPerBlock: Send + Sync,
8080
T::MaxVotesPerVoter: Send + Sync,
8181
{
82-
let (at, block_hash) = tokio::select! {
83-
maybe_block = subscription.next() => {
82+
let (at, block_hash) = match tokio::time::timeout(
83+
std::time::Duration::from_secs(BLOCK_SUBSCRIPTION_TIMEOUT_SECS),
84+
subscription.next(),
85+
)
86+
.await
87+
{
88+
Ok(maybe_block) => {
8489
match maybe_block {
8590
Some(Ok(block)) => {
8691
*last_block_time = std::time::Instant::now();
@@ -99,10 +104,10 @@ where
99104
None => {
100105
log::error!(target: LOG_TARGET, "Subscription to finalized blocks terminated unexpectedly");
101106
return Err(Error::Other("Subscription terminated unexpectedly".to_string()));
102-
}
107+
},
103108
}
104-
}
105-
_ = tokio::time::sleep_until(tokio::time::Instant::from_std(*last_block_time + std::time::Duration::from_secs(BLOCK_SUBSCRIPTION_TIMEOUT_SECS))) => {
109+
},
110+
Err(_) => {
106111
log::warn!(target: LOG_TARGET, "No blocks received for {} seconds - subscription may be stalled, recreating subscription...", BLOCK_SUBSCRIPTION_TIMEOUT_SECS);
107112
crate::prometheus::on_listener_subscription_stall();
108113
// Recreate the subscription
@@ -116,9 +121,9 @@ where
116121
Err(e) => {
117122
log::error!(target: LOG_TARGET, "Failed to recreate subscription: {:?}", e);
118123
return Err(e.into());
119-
}
124+
},
120125
}
121-
}
126+
},
122127
};
123128

124129
let (_storage, phase, current_round) = get_block_state(client, block_hash).await?;

src/main.rs

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ async fn run_command(
202202
/// Runs until the RPC connection fails or updating the metadata failed.
203203
async fn runtime_upgrade_task(client: ChainClient, tx: oneshot::Sender<Error>) {
204204
let updater = client.updater();
205-
let mut last_update_time = std::time::Instant::now();
206205

207206
let mut update_stream = match updater.runtime_updates().await {
208207
Ok(u) => u,
@@ -212,18 +211,33 @@ async fn runtime_upgrade_task(client: ChainClient, tx: oneshot::Sender<Error>) {
212211
},
213212
};
214213

214+
// Health check interval - check RPC connection every 1 hour
215+
const HEALTH_CHECK_INTERVAL_SECS: u64 = 60 * 60;
216+
let mut health_check_interval =
217+
tokio::time::interval(std::time::Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS));
218+
health_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
219+
215220
loop {
216-
// Handle runtime upgrade subscription responses with 1-hour timeout:
217-
// - Some(Ok(update)): process the update
218-
// - Some(Err(e)): retry if recoverable, otherwise quit
219-
// - None: stream ended (connection dead), quit immediately
220-
// - Timeout: subscription may be stalled, recreate it
221-
let update = tokio::select! {
221+
// Use select to handle both runtime updates and periodic health checks
222+
tokio::select! {
222223
maybe_update = update_stream.next() => {
223224
match maybe_update {
224225
Some(Ok(update)) => {
225-
last_update_time = std::time::Instant::now();
226-
update
226+
// Process the update
227+
let version = update.runtime_version().spec_version;
228+
match updater.apply_update(update) {
229+
Ok(()) => {
230+
if let Err(e) = dynamic::update_metadata_constants(&client) {
231+
let _ = tx.send(e);
232+
return;
233+
}
234+
prometheus::on_runtime_upgrade();
235+
log::info!(target: LOG_TARGET, "upgrade to version: {} successful", version);
236+
},
237+
Err(e) => {
238+
log::trace!(target: LOG_TARGET, "upgrade to version: {} failed: {:?}", version, e);
239+
},
240+
}
227241
},
228242
Some(Err(e)) => {
229243
if e.is_disconnected_will_reconnect() {
@@ -241,39 +255,32 @@ async fn runtime_upgrade_task(client: ChainClient, tx: oneshot::Sender<Error>) {
241255
},
242256
}
243257
},
244-
_ = tokio::time::sleep_until(tokio::time::Instant::from_std(last_update_time + std::time::Duration::from_secs(60 * 60))) => {
245-
log::warn!(target: LOG_TARGET, "No runtime updates received for 1 hour - subscription may be stalled, recreating subscription...");
246-
crate::prometheus::on_updater_subscription_stall();
247-
248-
// Recreate the subscription
249-
match updater.runtime_updates().await {
250-
Ok(new_stream) => {
251-
update_stream = new_stream;
252-
last_update_time = std::time::Instant::now();
253-
log::info!(target: LOG_TARGET, "Successfully recreated runtime upgrade subscription");
254-
continue;
258+
_ = health_check_interval.tick() => {
259+
log::trace!(target: LOG_TARGET, "Runtime upgrade subscription: periodic RPC health check");
260+
261+
// Try to get the current block number as a health check
262+
match client.blocks().at_latest().await {
263+
Ok(_) => {
264+
log::trace!(target: LOG_TARGET, "RPC health check OK");
255265
},
256266
Err(e) => {
257-
log::error!(target: LOG_TARGET, "Failed to recreate runtime upgrade subscription: {:?}", e);
258-
let _ = tx.send(e.into());
259-
return;
260-
},
261-
}
262-
},
263-
};
264-
265-
let version = update.runtime_version().spec_version;
266-
match updater.apply_update(update) {
267-
Ok(()) => {
268-
if let Err(e) = dynamic::update_metadata_constants(&client) {
269-
let _ = tx.send(e);
270-
return;
267+
log::warn!(target: LOG_TARGET, "RPC health check failed: {:?} - recreating runtime upgrade subscription", e);
268+
crate::prometheus::on_updater_subscription_stall();
269+
270+
// Recreate the subscription
271+
match updater.runtime_updates().await {
272+
Ok(new_stream) => {
273+
update_stream = new_stream;
274+
log::info!(target: LOG_TARGET, "Successfully recreated runtime upgrade subscription after health check failure");
275+
},
276+
Err(e) => {
277+
log::error!(target: LOG_TARGET, "Failed to recreate runtime upgrade subscription: {:?}", e);
278+
let _ = tx.send(e.into());
279+
return;
280+
},
281+
}
282+
}
271283
}
272-
prometheus::on_runtime_upgrade();
273-
log::info!(target: LOG_TARGET, "upgrade to version: {} successful", version);
274-
},
275-
Err(e) => {
276-
log::trace!(target: LOG_TARGET, "upgrade to version: {} failed: {:?}", version, e);
277284
},
278285
}
279286
}

0 commit comments

Comments
 (0)