diff --git a/src/commands/multi_block/monitor.rs b/src/commands/multi_block/monitor.rs index 0480a5cd0..61912780d 100644 --- a/src/commands/multi_block/monitor.rs +++ b/src/commands/multi_block/monitor.rs @@ -79,8 +79,13 @@ where T::VoterSnapshotPerBlock: Send + Sync, T::MaxVotesPerVoter: Send + Sync, { - let (at, block_hash) = tokio::select! { - maybe_block = subscription.next() => { + let (at, block_hash) = match tokio::time::timeout( + std::time::Duration::from_secs(BLOCK_SUBSCRIPTION_TIMEOUT_SECS), + subscription.next(), + ) + .await + { + Ok(maybe_block) => { match maybe_block { Some(Ok(block)) => { *last_block_time = std::time::Instant::now(); @@ -99,10 +104,10 @@ where None => { log::error!(target: LOG_TARGET, "Subscription to finalized blocks terminated unexpectedly"); return Err(Error::Other("Subscription terminated unexpectedly".to_string())); - } + }, } - } - _ = tokio::time::sleep_until(tokio::time::Instant::from_std(*last_block_time + std::time::Duration::from_secs(BLOCK_SUBSCRIPTION_TIMEOUT_SECS))) => { + }, + Err(_) => { log::warn!(target: LOG_TARGET, "No blocks received for {} seconds - subscription may be stalled, recreating subscription...", BLOCK_SUBSCRIPTION_TIMEOUT_SECS); crate::prometheus::on_listener_subscription_stall(); // Recreate the subscription @@ -116,9 +121,9 @@ where Err(e) => { log::error!(target: LOG_TARGET, "Failed to recreate subscription: {:?}", e); return Err(e.into()); - } + }, } - } + }, }; let (_storage, phase, current_round) = get_block_state(client, block_hash).await?; diff --git a/src/main.rs b/src/main.rs index 9a5578f5d..7f617a673 100644 --- a/src/main.rs +++ b/src/main.rs @@ -202,7 +202,6 @@ async fn run_command( /// Runs until the RPC connection fails or updating the metadata failed. async fn runtime_upgrade_task(client: ChainClient, tx: oneshot::Sender) { let updater = client.updater(); - let mut last_update_time = std::time::Instant::now(); let mut update_stream = match updater.runtime_updates().await { Ok(u) => u, @@ -212,18 +211,33 @@ async fn runtime_upgrade_task(client: ChainClient, tx: oneshot::Sender) { }, }; + // Health check interval - check RPC connection every 1 hour + const HEALTH_CHECK_INTERVAL_SECS: u64 = 60 * 60; + let mut health_check_interval = + tokio::time::interval(std::time::Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS)); + health_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { - // Handle runtime upgrade subscription responses with 1-hour timeout: - // - Some(Ok(update)): process the update - // - Some(Err(e)): retry if recoverable, otherwise quit - // - None: stream ended (connection dead), quit immediately - // - Timeout: subscription may be stalled, recreate it - let update = tokio::select! { + // Use select to handle both runtime updates and periodic health checks + tokio::select! { maybe_update = update_stream.next() => { match maybe_update { Some(Ok(update)) => { - last_update_time = std::time::Instant::now(); - update + // Process the update + let version = update.runtime_version().spec_version; + match updater.apply_update(update) { + Ok(()) => { + if let Err(e) = dynamic::update_metadata_constants(&client) { + let _ = tx.send(e); + return; + } + prometheus::on_runtime_upgrade(); + log::info!(target: LOG_TARGET, "upgrade to version: {} successful", version); + }, + Err(e) => { + log::trace!(target: LOG_TARGET, "upgrade to version: {} failed: {:?}", version, e); + }, + } }, Some(Err(e)) => { if e.is_disconnected_will_reconnect() { @@ -241,39 +255,32 @@ async fn runtime_upgrade_task(client: ChainClient, tx: oneshot::Sender) { }, } }, - _ = tokio::time::sleep_until(tokio::time::Instant::from_std(last_update_time + std::time::Duration::from_secs(60 * 60))) => { - log::warn!(target: LOG_TARGET, "No runtime updates received for 1 hour - subscription may be stalled, recreating subscription..."); - crate::prometheus::on_updater_subscription_stall(); - - // Recreate the subscription - match updater.runtime_updates().await { - Ok(new_stream) => { - update_stream = new_stream; - last_update_time = std::time::Instant::now(); - log::info!(target: LOG_TARGET, "Successfully recreated runtime upgrade subscription"); - continue; + _ = health_check_interval.tick() => { + log::trace!(target: LOG_TARGET, "Runtime upgrade subscription: periodic RPC health check"); + + // Try to get the current block number as a health check + match client.blocks().at_latest().await { + Ok(_) => { + log::trace!(target: LOG_TARGET, "RPC health check OK"); }, Err(e) => { - log::error!(target: LOG_TARGET, "Failed to recreate runtime upgrade subscription: {:?}", e); - let _ = tx.send(e.into()); - return; - }, - } - }, - }; - - let version = update.runtime_version().spec_version; - match updater.apply_update(update) { - Ok(()) => { - if let Err(e) = dynamic::update_metadata_constants(&client) { - let _ = tx.send(e); - return; + log::warn!(target: LOG_TARGET, "RPC health check failed: {:?} - recreating runtime upgrade subscription", e); + crate::prometheus::on_updater_subscription_stall(); + + // Recreate the subscription + match updater.runtime_updates().await { + Ok(new_stream) => { + update_stream = new_stream; + log::info!(target: LOG_TARGET, "Successfully recreated runtime upgrade subscription after health check failure"); + }, + Err(e) => { + log::error!(target: LOG_TARGET, "Failed to recreate runtime upgrade subscription: {:?}", e); + let _ = tx.send(e.into()); + return; + }, + } + } } - prometheus::on_runtime_upgrade(); - log::info!(target: LOG_TARGET, "upgrade to version: {} successful", version); - }, - Err(e) => { - log::trace!(target: LOG_TARGET, "upgrade to version: {} failed: {:?}", version, e); }, } }