Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions src/commands/multi_block/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,13 @@ where
T::VoterSnapshotPerBlock: Send + Sync,
T::MaxVotesPerVoter: Send + Sync,
{
let (at, block_hash) = tokio::select! {
maybe_block = subscription.next() => {
let (at, block_hash) = match tokio::time::timeout(
std::time::Duration::from_secs(BLOCK_SUBSCRIPTION_TIMEOUT_SECS),
subscription.next(),
)
.await
{
Ok(maybe_block) => {
match maybe_block {
Some(Ok(block)) => {
*last_block_time = std::time::Instant::now();
Expand All @@ -99,10 +104,10 @@ where
None => {
log::error!(target: LOG_TARGET, "Subscription to finalized blocks terminated unexpectedly");
return Err(Error::Other("Subscription terminated unexpectedly".to_string()));
}
},
}
}
_ = tokio::time::sleep_until(tokio::time::Instant::from_std(*last_block_time + std::time::Duration::from_secs(BLOCK_SUBSCRIPTION_TIMEOUT_SECS))) => {
},
Err(_) => {
log::warn!(target: LOG_TARGET, "No blocks received for {} seconds - subscription may be stalled, recreating subscription...", BLOCK_SUBSCRIPTION_TIMEOUT_SECS);
crate::prometheus::on_listener_subscription_stall();
// Recreate the subscription
Expand All @@ -116,9 +121,9 @@ where
Err(e) => {
log::error!(target: LOG_TARGET, "Failed to recreate subscription: {:?}", e);
return Err(e.into());
}
},
}
}
},
};

let (_storage, phase, current_round) = get_block_state(client, block_hash).await?;
Expand Down
85 changes: 46 additions & 39 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ async fn run_command(
/// Runs until the RPC connection fails or updating the metadata failed.
async fn runtime_upgrade_task(client: ChainClient, tx: oneshot::Sender<Error>) {
let updater = client.updater();
let mut last_update_time = std::time::Instant::now();

let mut update_stream = match updater.runtime_updates().await {
Ok(u) => u,
Expand All @@ -212,18 +211,33 @@ async fn runtime_upgrade_task(client: ChainClient, tx: oneshot::Sender<Error>) {
},
};

// Health check interval - check RPC connection every 1 hour
const HEALTH_CHECK_INTERVAL_SECS: u64 = 60 * 60;
let mut health_check_interval =
tokio::time::interval(std::time::Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS));
health_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
// Handle runtime upgrade subscription responses with 1-hour timeout:
// - Some(Ok(update)): process the update
// - Some(Err(e)): retry if recoverable, otherwise quit
// - None: stream ended (connection dead), quit immediately
// - Timeout: subscription may be stalled, recreate it
let update = tokio::select! {
// Use select to handle both runtime updates and periodic health checks
tokio::select! {
maybe_update = update_stream.next() => {
match maybe_update {
Some(Ok(update)) => {
last_update_time = std::time::Instant::now();
update
// Process the update
let version = update.runtime_version().spec_version;
match updater.apply_update(update) {
Ok(()) => {
if let Err(e) = dynamic::update_metadata_constants(&client) {
let _ = tx.send(e);
return;
}
prometheus::on_runtime_upgrade();
log::info!(target: LOG_TARGET, "upgrade to version: {} successful", version);
},
Err(e) => {
log::trace!(target: LOG_TARGET, "upgrade to version: {} failed: {:?}", version, e);
},
}
},
Some(Err(e)) => {
if e.is_disconnected_will_reconnect() {
Expand All @@ -241,39 +255,32 @@ async fn runtime_upgrade_task(client: ChainClient, tx: oneshot::Sender<Error>) {
},
}
},
_ = tokio::time::sleep_until(tokio::time::Instant::from_std(last_update_time + std::time::Duration::from_secs(60 * 60))) => {
log::warn!(target: LOG_TARGET, "No runtime updates received for 1 hour - subscription may be stalled, recreating subscription...");
crate::prometheus::on_updater_subscription_stall();

// Recreate the subscription
match updater.runtime_updates().await {
Ok(new_stream) => {
update_stream = new_stream;
last_update_time = std::time::Instant::now();
log::info!(target: LOG_TARGET, "Successfully recreated runtime upgrade subscription");
continue;
_ = health_check_interval.tick() => {
log::trace!(target: LOG_TARGET, "Runtime upgrade subscription: periodic RPC health check");

// Try to get the current block number as a health check
match client.blocks().at_latest().await {
Ok(_) => {
log::trace!(target: LOG_TARGET, "RPC health check OK");
},
Err(e) => {
log::error!(target: LOG_TARGET, "Failed to recreate runtime upgrade subscription: {:?}", e);
let _ = tx.send(e.into());
return;
},
}
},
};

let version = update.runtime_version().spec_version;
match updater.apply_update(update) {
Ok(()) => {
if let Err(e) = dynamic::update_metadata_constants(&client) {
let _ = tx.send(e);
return;
log::warn!(target: LOG_TARGET, "RPC health check failed: {:?} - recreating runtime upgrade subscription", e);
crate::prometheus::on_updater_subscription_stall();

// Recreate the subscription
match updater.runtime_updates().await {
Ok(new_stream) => {
update_stream = new_stream;
log::info!(target: LOG_TARGET, "Successfully recreated runtime upgrade subscription after health check failure");
},
Err(e) => {
log::error!(target: LOG_TARGET, "Failed to recreate runtime upgrade subscription: {:?}", e);
let _ = tx.send(e.into());
return;
},
}
}
}
prometheus::on_runtime_upgrade();
log::info!(target: LOG_TARGET, "upgrade to version: {} successful", version);
},
Err(e) => {
log::trace!(target: LOG_TARGET, "upgrade to version: {} failed: {:?}", version, e);
},
}
}
Expand Down
Loading