Skip to content

Commit 962d257

Browse files
committed
fix: critical weight submission bugs - per-mechanism rate limit, retry on Priority-too-low, Sentry logging
- Fix can_set_weights to check correct mechanism_id storage slot (was hardcoded to 0) - Move rate limit check inside per-mechanism loop so one blocked mechanism doesn't skip all - Do NOT mark epoch as submitted on Priority-too-low/1010 errors (allows retry next window) - Track per-mechanism success: only mark epoch done when ALL mechanisms succeed - Retry non-success responses (Ok with success=false) via reconnect path - Add structured warn/error logs for all weight submission outcomes (visible in Sentry) - Fix hardcoded tempo=360 in Path B: now reads real tempo from chain via Subtensor - Log reconnect failures as errors for Sentry visibility
1 parent 00b94b2 commit 962d257

File tree

2 files changed

+161
-70
lines changed

2 files changed

+161
-70
lines changed

bins/validator-node/src/main.rs

Lines changed: 148 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,7 +1346,11 @@ async fn main() -> Result<()> {
13461346
// All validators see the same block numbers so they all submit at the same time.
13471347
if is_weight_block {
13481348
let block_number = new_block_number.unwrap();
1349-
let tempo = 360u64;
1349+
let tempo = if let Some(st) = subtensor.as_ref() {
1350+
st.tempo(netuid).await.unwrap_or(360) as u64
1351+
} else {
1352+
360u64
1353+
};
13501354
let netuid_plus_one = (netuid as u64).saturating_add(1);
13511355
let epoch = block_number.saturating_add(netuid_plus_one) / (tempo + 1);
13521356
info!(
@@ -1928,7 +1932,11 @@ async fn main() -> Result<()> {
19281932
if challenges.is_empty() {
19291933
warn!("RPC pre-compute skipped: no active challenges loaded");
19301934
} else {
1931-
let tempo = 360u64;
1935+
let tempo = if let Some(st) = subtensor.as_ref() {
1936+
st.tempo(netuid).await.unwrap_or(360) as u64
1937+
} else {
1938+
360u64
1939+
};
19321940
let netuid_plus_one = (netuid as u64).saturating_add(1);
19331941
let epoch = current_block.saturating_add(netuid_plus_one) / (tempo + 1);
19341942
let mut precomputed: Vec<(u8, Vec<u16>, Vec<u16>)> = Vec::new();
@@ -4942,43 +4950,27 @@ async fn handle_block_event(
49424950
);
49434951

49444952
if epoch <= *last_weight_submission_epoch {
4945-
info!(
4953+
debug!(
49464954
"Skipping weight submission for epoch {} (already submitted in epoch {})",
49474955
epoch, *last_weight_submission_epoch
49484956
);
49494957
return;
49504958
}
49514959

4952-
// Single submission path: collect WASM weights, convert hotkey->UID,
4953-
// apply emission_weight, and submit directly via Subtensor.
49544960
if let (Some(st), Some(sig)) = (subtensor.as_ref(), signer.as_ref()) {
4955-
// Check rate limit before submitting to avoid CommittingWeightsTooFast
4956-
if let Some(sc) = client.as_ref() {
4957-
let our_ss58 = _keypair.ss58_address();
4958-
let our_uid: u16 = sc
4959-
.metagraph()
4960-
.as_ref()
4961-
.and_then(|mg| {
4961+
let our_uid: u16 = client
4962+
.as_ref()
4963+
.and_then(|sc| {
4964+
let our_ss58 = _keypair.ss58_address();
4965+
sc.metagraph().as_ref().and_then(|mg| {
49624966
mg.neurons
49634967
.values()
49644968
.find(|n| n.hotkey.to_string() == our_ss58)
49654969
.map(|n| n.uid as u16)
49664970
})
4967-
.unwrap_or(0);
4968-
match st.can_set_weights(netuid, our_uid).await {
4969-
Ok(false) => {
4970-
warn!(
4971-
"Rate limit: cannot set weights yet (uid={}), skipping epoch {}",
4972-
our_uid, epoch
4973-
);
4974-
return;
4975-
}
4976-
Err(e) => {
4977-
warn!("Failed to check rate limit: {}, proceeding anyway", e);
4978-
}
4979-
Ok(true) => {}
4980-
}
4981-
}
4971+
})
4972+
.unwrap_or(0);
4973+
49824974
let mut mechanism_weights: Vec<(u8, Vec<u16>, Vec<u16>)> = Vec::new();
49834975

49844976
if let Some(ref executor) = wasm_executor {
@@ -5010,7 +5002,6 @@ async fn handle_block_event(
50105002
Ok(assignments) if !assignments.is_empty() => {
50115003
let total_weight: f64 = assignments.iter().map(|a| a.weight).sum();
50125004

5013-
// Use a map to merge duplicate UIDs (same hotkey appearing multiple times)
50145005
let mut uid_weight_map: std::collections::BTreeMap<u16, f64> =
50155006
std::collections::BTreeMap::new();
50165007
let mut assigned_weight: f64 = 0.0;
@@ -5045,7 +5036,6 @@ async fn handle_block_event(
50455036
}
50465037
}
50475038

5048-
// Remaining weight goes to burn (UID 0)
50495039
let burn_weight = 1.0 - assigned_weight;
50505040
if burn_weight > 0.001 {
50515041
let burn_u16 = (burn_weight * 65535.0).round() as u16;
@@ -5059,7 +5049,6 @@ async fn handle_block_event(
50595049
}
50605050

50615051
if !uids.is_empty() {
5062-
// Max-upscale so largest = 65535 (matches convert_weights_and_uids_for_emit)
50635052
let max_val = *vals.iter().max().unwrap() as f64;
50645053
if max_val > 0.0 && max_val < 65535.0 {
50655054
vals = vals
@@ -5106,16 +5095,18 @@ async fn handle_block_event(
51065095
}
51075096
}
51085097

5109-
// Deduplicate by mechanism_id: pick the best (most UIDs) entry per
5110-
// mechanism to avoid nonce conflicts from multiple submissions.
5111-
// Burn-only entries (single UID 0) are used only if no real weights exist.
5098+
// Deduplicate by mechanism_id: pick the best (most UIDs) entry per mechanism.
51125099
let weights_to_submit: Vec<(u8, Vec<u16>, Vec<u16>)> =
51135100
if mechanism_weights.is_empty() {
51145101
let mechanism_id = {
51155102
let cs = chain_state.read();
51165103
cs.mechanism_configs.keys().next().copied().unwrap_or(0u8)
51175104
};
5118-
info!("No weights - submitting burn weights to UID 0");
5105+
warn!(
5106+
epoch = epoch,
5107+
block = block,
5108+
"No WASM weights available - submitting burn weights to UID 0"
5109+
);
51195110
vec![(mechanism_id, vec![0u16], vec![65535u16])]
51205111
} else {
51215112
use std::collections::BTreeMap;
@@ -5128,7 +5119,6 @@ async fn handle_block_event(
51285119
None => true,
51295120
Some((ex_uids, _)) => {
51305121
let ex_is_burn = ex_uids.len() == 1 && ex_uids[0] == 0;
5131-
// Prefer real weights over burn; among real, prefer more UIDs
51325122
if is_burn_only && !ex_is_burn {
51335123
false
51345124
} else if !is_burn_only && ex_is_burn {
@@ -5165,7 +5155,6 @@ async fn handle_block_event(
51655155

51665156
// ALL validators (including bootstrap) fetch weights from the
51675157
// bootstrap RPC to guarantee everyone submits identical weights.
5168-
// Local computation is used only as fallback if the RPC is unreachable.
51695158
let weights_to_submit = {
51705159
info!("Fetching weights from bootstrap validator RPC");
51715160
match fetch_remote_weights().await {
@@ -5177,13 +5166,19 @@ async fn handle_block_event(
51775166
remote
51785167
}
51795168
Ok(_) => {
5180-
warn!("Remote weights empty, falling back to locally computed");
5169+
warn!(
5170+
epoch = epoch,
5171+
block = block,
5172+
"Remote weights empty, falling back to locally computed"
5173+
);
51815174
weights_to_submit
51825175
}
51835176
Err(e) => {
51845177
warn!(
5185-
"Failed to fetch remote weights ({}), falling back to locally computed",
5186-
e
5178+
epoch = epoch,
5179+
block = block,
5180+
error = %e,
5181+
"Failed to fetch remote weights, falling back to locally computed"
51875182
);
51885183
weights_to_submit
51895184
}
@@ -5196,10 +5191,35 @@ async fn handle_block_event(
51965191
cs.last_computed_weights = weights_to_submit.clone();
51975192
}
51985193

5194+
let total_mechanisms = weights_to_submit.len();
5195+
let mut succeeded_mechanisms: Vec<u8> = Vec::new();
5196+
let mut failed_mechanisms: Vec<u8> = Vec::new();
51995197
let mut needs_reconnect = false;
52005198
let mut failed_submissions: Vec<(u8, Vec<u16>, Vec<u16>)> = Vec::new();
52015199

52025200
for (mechanism_id, uids, weights) in &weights_to_submit {
5201+
// Per-mechanism rate limit check
5202+
match st.can_set_weights(netuid, our_uid, *mechanism_id).await {
5203+
Ok(false) => {
5204+
warn!(
5205+
epoch = epoch,
5206+
mechanism_id = mechanism_id,
5207+
uid = our_uid,
5208+
"Rate limit: cannot set weights yet for mechanism, will retry next window"
5209+
);
5210+
failed_mechanisms.push(*mechanism_id);
5211+
continue;
5212+
}
5213+
Err(e) => {
5214+
warn!(
5215+
mechanism_id = mechanism_id,
5216+
error = %e,
5217+
"Failed to check rate limit, proceeding anyway"
5218+
);
5219+
}
5220+
Ok(true) => {}
5221+
}
5222+
52035223
info!(
52045224
"Submitting weights for mechanism {} ({} UIDs)",
52055225
mechanism_id,
@@ -5218,39 +5238,57 @@ async fn handle_block_event(
52185238
.await
52195239
{
52205240
Ok(resp) if resp.success => {
5221-
info!(
5222-
"Mechanism {} weights submitted: {:?}",
5223-
mechanism_id, resp.tx_hash
5241+
warn!(
5242+
epoch = epoch,
5243+
mechanism_id = mechanism_id,
5244+
tx_hash = ?resp.tx_hash,
5245+
uid_count = uids.len(),
5246+
"Weight submission SUCCESS"
52245247
);
5225-
*last_weight_submission_epoch = epoch;
5248+
succeeded_mechanisms.push(*mechanism_id);
52265249
}
52275250
Ok(resp) => {
5228-
warn!("Mechanism {} issue: {}", mechanism_id, resp.message);
5251+
error!(
5252+
epoch = epoch,
5253+
mechanism_id = mechanism_id,
5254+
message = %resp.message,
5255+
"Weight submission returned non-success response, will retry"
5256+
);
5257+
failed_mechanisms.push(*mechanism_id);
5258+
failed_submissions.push((*mechanism_id, uids.clone(), weights.clone()));
52295259
}
52305260
Err(e) => {
52315261
let err_str = format!("{}", e);
52325262
if err_str.contains("Priority is too low") || err_str.contains("1010") {
52335263
warn!(
5234-
"Mechanism {}: transaction rejected ({}) - marking epoch as submitted to avoid retry loop",
5235-
mechanism_id, err_str
5264+
epoch = epoch,
5265+
mechanism_id = mechanism_id,
5266+
error = %err_str,
5267+
"Transaction priority conflict, will retry next window (NOT marking epoch as done)"
52365268
);
5237-
*last_weight_submission_epoch = epoch;
5269+
failed_mechanisms.push(*mechanism_id);
52385270
} else if is_transport_error(&err_str) {
52395271
error!(
5240-
"Mechanism {} weight submission failed (transport error): {}",
5241-
mechanism_id, e
5272+
epoch = epoch,
5273+
mechanism_id = mechanism_id,
5274+
error = %e,
5275+
"Weight submission transport error, will reconnect and retry"
52425276
);
52435277
needs_reconnect = true;
5278+
failed_mechanisms.push(*mechanism_id);
52445279
failed_submissions.push((
52455280
*mechanism_id,
52465281
uids.clone(),
52475282
weights.clone(),
52485283
));
52495284
} else {
52505285
error!(
5251-
"Mechanism {} weight submission failed: {}",
5252-
mechanism_id, e
5286+
epoch = epoch,
5287+
mechanism_id = mechanism_id,
5288+
error = %e,
5289+
"Weight submission failed (unknown error), will retry next window"
52535290
);
5291+
failed_mechanisms.push(*mechanism_id);
52545292
}
52555293
}
52565294
}
@@ -5259,15 +5297,17 @@ async fn handle_block_event(
52595297
// Drop immutable borrows of subtensor (st, sig) before reconnecting
52605298
drop(weights_to_submit);
52615299

5300+
// Reconnect and retry transport failures
52625301
if needs_reconnect && !failed_submissions.is_empty() {
52635302
if try_reconnect_subtensor(subtensor, subtensor_endpoint, subtensor_state_path)
52645303
.await
52655304
{
52665305
if let (Some(new_st), Some(sig)) = (subtensor.as_ref(), signer.as_ref()) {
52675306
for (mechanism_id, uids, weights) in &failed_submissions {
5268-
info!(
5269-
"Retrying weight submission after reconnect for mechanism {}",
5270-
mechanism_id
5307+
warn!(
5308+
epoch = epoch,
5309+
mechanism_id = mechanism_id,
5310+
"Retrying weight submission after reconnect"
52715311
);
52725312
match new_st
52735313
.set_mechanism_weights(
@@ -5282,31 +5322,75 @@ async fn handle_block_event(
52825322
.await
52835323
{
52845324
Ok(resp) if resp.success => {
5285-
info!(
5286-
"Mechanism {} weights submitted after reconnect: {:?}",
5287-
mechanism_id, resp.tx_hash
5325+
warn!(
5326+
epoch = epoch,
5327+
mechanism_id = mechanism_id,
5328+
tx_hash = ?resp.tx_hash,
5329+
"Weight submission SUCCESS after reconnect"
52885330
);
5289-
*last_weight_submission_epoch = epoch;
5331+
succeeded_mechanisms.push(*mechanism_id);
5332+
failed_mechanisms.retain(|m| m != mechanism_id);
52905333
}
52915334
Ok(resp) => {
5292-
warn!(
5293-
"Mechanism {} issue after reconnect: {}",
5294-
mechanism_id, resp.message
5335+
error!(
5336+
epoch = epoch,
5337+
mechanism_id = mechanism_id,
5338+
message = %resp.message,
5339+
"Weight submission non-success after reconnect"
52955340
);
52965341
}
52975342
Err(e2) => {
52985343
error!(
5299-
"Mechanism {} weight submission failed even after reconnect: {}",
5300-
mechanism_id, e2
5344+
epoch = epoch,
5345+
mechanism_id = mechanism_id,
5346+
error = %e2,
5347+
"Weight submission failed even after reconnect"
53015348
);
53025349
}
53035350
}
53045351
}
53055352
}
5353+
} else {
5354+
error!(
5355+
epoch = epoch,
5356+
"Subtensor reconnect failed, weight submissions lost for this window"
5357+
);
53065358
}
53075359
}
5360+
5361+
// Only mark epoch as submitted if ALL mechanisms succeeded
5362+
if !succeeded_mechanisms.is_empty() && failed_mechanisms.is_empty() {
5363+
*last_weight_submission_epoch = epoch;
5364+
warn!(
5365+
epoch = epoch,
5366+
succeeded = succeeded_mechanisms.len(),
5367+
total = total_mechanisms,
5368+
"All weight submissions succeeded, epoch marked complete"
5369+
);
5370+
} else if !succeeded_mechanisms.is_empty() {
5371+
// Partial success: do NOT mark epoch as done so failed mechanisms can retry
5372+
warn!(
5373+
epoch = epoch,
5374+
succeeded = succeeded_mechanisms.len(),
5375+
failed = failed_mechanisms.len(),
5376+
total = total_mechanisms,
5377+
failed_ids = ?failed_mechanisms,
5378+
"Partial weight submission: NOT marking epoch as done, will retry failed mechanisms"
5379+
);
5380+
} else {
5381+
error!(
5382+
epoch = epoch,
5383+
total = total_mechanisms,
5384+
failed_ids = ?failed_mechanisms,
5385+
"All weight submissions failed for this epoch, will retry next window"
5386+
);
5387+
}
53085388
} else {
5309-
warn!("No Subtensor/signer - cannot submit weights");
5389+
error!(
5390+
epoch = epoch,
5391+
block = block,
5392+
"No Subtensor/signer available - cannot submit weights"
5393+
);
53105394
}
53115395
}
53125396
BlockSyncEvent::RevealWindowOpen { epoch, block } => {

0 commit comments

Comments
 (0)