Skip to content

Commit b634ac4

Browse files
committed
fix: vTrust convergence - remove remote fallback, preserve emission on WASM upload, deterministic dedup
- Remove remote weight fallback (chain.platform.network) that caused divergence - Preserve existing challenge config (emission_weight, mechanism_id) on WASM re-upload - Use BTreeMap instead of HashMap for deterministic mechanism dedup ordering - Add WASM dedup guards (DedupFlags) to prevent concurrent function execution - Increase RESPONSE_BUF_LARGE from 256KB to 4MB for large storage reads
1 parent aec5fe7 commit b634ac4

File tree

6 files changed

+226
-83
lines changed

6 files changed

+226
-83
lines changed

bins/validator-node/src/main.rs

Lines changed: 55 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,6 +1221,9 @@ async fn main() -> Result<()> {
12211221
chain_state.clone(),
12221222
"wasm_upload_local",
12231223
|cs| {
1224+
// Preserve existing config (emission_weight, mechanism_id)
1225+
let existing_config = cs.wasm_challenge_configs.get(&challenge_id)
1226+
.map(|c| c.config.clone());
12241227
let wasm_config = platform_core::WasmChallengeConfig {
12251228
challenge_id,
12261229
name: challenge_name,
@@ -1232,7 +1235,7 @@ async fn main() -> Result<()> {
12321235
version,
12331236
..Default::default()
12341237
},
1235-
config: platform_core::ChallengeConfig::default(),
1238+
config: existing_config.unwrap_or_default(),
12361239
is_active: true,
12371240
};
12381241
cs.register_wasm_challenge(wasm_config);
@@ -3226,6 +3229,10 @@ async fn handle_network_event(
32263229
chain_state.clone(),
32273230
"wasm_consensus",
32283231
|cs| {
3232+
// Preserve existing config (emission_weight, mechanism_id)
3233+
// if this challenge was already registered.
3234+
let existing_config = cs.wasm_challenge_configs.get(&cid)
3235+
.map(|c| c.config.clone());
32293236
let wasm_config = platform_core::WasmChallengeConfig {
32303237
challenge_id: cid,
32313238
name: cname,
@@ -3237,7 +3244,7 @@ async fn handle_network_event(
32373244
version,
32383245
..Default::default()
32393246
},
3240-
config: platform_core::ChallengeConfig::default(),
3247+
config: existing_config.unwrap_or_default(),
32413248
is_active: true,
32423249
};
32433250
cs.register_wasm_challenge(wasm_config);
@@ -3848,90 +3855,62 @@ async fn handle_block_event(
38483855
// Deduplicate by mechanism_id: pick the best (most UIDs) entry per
38493856
// mechanism to avoid nonce conflicts from multiple submissions.
38503857
// Burn-only entries (single UID 0) are used only if no real weights exist.
3851-
let weights_to_submit: Vec<(u8, Vec<u16>, Vec<u16>)> = if mechanism_weights
3852-
.is_empty()
3853-
{
3854-
let mechanism_id = {
3855-
let cs = chain_state.read();
3856-
cs.mechanism_configs.keys().next().copied().unwrap_or(0u8)
3857-
};
3858-
info!("No weights - submitting burn weights to UID 0");
3859-
vec![(mechanism_id, vec![0u16], vec![65535u16])]
3860-
} else {
3861-
use std::collections::HashMap;
3862-
let mut best_per_mechanism: HashMap<u8, (Vec<u16>, Vec<u16>)> = HashMap::new();
3863-
for (mid, uids, vals) in &mechanism_weights {
3864-
let is_burn_only = uids.len() == 1 && uids[0] == 0;
3865-
let existing = best_per_mechanism.get(mid);
3866-
let replace = match existing {
3867-
None => true,
3868-
Some((ex_uids, _)) => {
3869-
let ex_is_burn = ex_uids.len() == 1 && ex_uids[0] == 0;
3870-
// Prefer real weights over burn; among real, prefer more UIDs
3871-
if is_burn_only && !ex_is_burn {
3872-
false
3873-
} else if !is_burn_only && ex_is_burn {
3874-
true
3875-
} else {
3876-
uids.len() > ex_uids.len()
3877-
}
3878-
}
3879-
};
3880-
if replace {
3881-
best_per_mechanism.insert(*mid, (uids.clone(), vals.clone()));
3882-
}
3883-
}
3884-
let mut result = Vec::new();
3885-
for (mid, (uids, vals)) in best_per_mechanism {
3886-
info!(
3887-
"Selected weights for mechanism {}: {} UIDs",
3888-
mid,
3889-
uids.len()
3890-
);
3891-
debug!(" UIDs: {:?}, Weights: {:?}", uids, vals);
3892-
result.push((mid, uids, vals));
3893-
}
3894-
if result.is_empty() {
3858+
let weights_to_submit: Vec<(u8, Vec<u16>, Vec<u16>)> =
3859+
if mechanism_weights.is_empty() {
38953860
let mechanism_id = {
38963861
let cs = chain_state.read();
38973862
cs.mechanism_configs.keys().next().copied().unwrap_or(0u8)
38983863
};
3864+
info!("No weights - submitting burn weights to UID 0");
38993865
vec![(mechanism_id, vec![0u16], vec![65535u16])]
39003866
} else {
3901-
result
3902-
}
3903-
};
3904-
3905-
// Fallback: if all weights are burn-only, try fetching from
3906-
// the primary validator's RPC (chain.platform.network).
3907-
let all_burn_only = weights_to_submit
3908-
.iter()
3909-
.all(|(_, uids, _)| uids.len() == 1 && uids[0] == 0);
3910-
let weights_to_submit = if all_burn_only {
3911-
info!("All local weights are burn-only, attempting HTTP fallback from chain.platform.network");
3912-
match fetch_remote_weights().await {
3913-
Ok(remote) if !remote.is_empty() => {
3867+
use std::collections::BTreeMap;
3868+
let mut best_per_mechanism: BTreeMap<u8, (Vec<u16>, Vec<u16>)> =
3869+
BTreeMap::new();
3870+
for (mid, uids, vals) in &mechanism_weights {
3871+
let is_burn_only = uids.len() == 1 && uids[0] == 0;
3872+
let existing = best_per_mechanism.get(mid);
3873+
let replace = match existing {
3874+
None => true,
3875+
Some((ex_uids, _)) => {
3876+
let ex_is_burn = ex_uids.len() == 1 && ex_uids[0] == 0;
3877+
// Prefer real weights over burn; among real, prefer more UIDs
3878+
if is_burn_only && !ex_is_burn {
3879+
false
3880+
} else if !is_burn_only && ex_is_burn {
3881+
true
3882+
} else {
3883+
uids.len() > ex_uids.len()
3884+
}
3885+
}
3886+
};
3887+
if replace {
3888+
best_per_mechanism.insert(*mid, (uids.clone(), vals.clone()));
3889+
}
3890+
}
3891+
let mut result = Vec::new();
3892+
for (mid, (uids, vals)) in best_per_mechanism {
39143893
info!(
3915-
"Fetched {} mechanism weight entries from remote",
3916-
remote.len()
3894+
"Selected weights for mechanism {}: {} UIDs",
3895+
mid,
3896+
uids.len()
39173897
);
3918-
remote
3919-
}
3920-
Ok(_) => {
3921-
warn!("Remote returned empty weights, using local burn-only");
3922-
weights_to_submit
3898+
debug!(" UIDs: {:?}, Weights: {:?}", uids, vals);
3899+
result.push((mid, uids, vals));
39233900
}
3924-
Err(e) => {
3925-
warn!(
3926-
"Failed to fetch remote weights: {}, using local burn-only",
3927-
e
3928-
);
3929-
weights_to_submit
3901+
if result.is_empty() {
3902+
let mechanism_id = {
3903+
let cs = chain_state.read();
3904+
cs.mechanism_configs.keys().next().copied().unwrap_or(0u8)
3905+
};
3906+
vec![(mechanism_id, vec![0u16], vec![65535u16])]
3907+
} else {
3908+
result
39303909
}
3931-
}
3932-
} else {
3933-
weights_to_submit
3934-
};
3910+
};
3911+
3912+
// No remote fallback -- each validator must compute weights
3913+
// independently from consensus storage to ensure vTrust convergence.
39353914

39363915
// Store computed weights in chain state for the subnet_getWeights RPC
39373916
{

bins/validator-node/src/wasm_executor.rs

Lines changed: 139 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use anyhow::{Context, Result};
22
use parking_lot::RwLock;
3-
use platform_challenge_sdk_wasm::{EvaluationInput, EvaluationOutput, WeightEntry};
3+
use platform_challenge_sdk_wasm::{DedupFlags, EvaluationInput, EvaluationOutput, WeightEntry};
44
use std::collections::HashMap;
55
use std::path::PathBuf;
6+
use std::sync::atomic::{AtomicBool, Ordering};
67
use std::sync::Arc;
78
use std::time::Instant;
89
use tracing::{debug, info};
@@ -72,10 +73,63 @@ pub struct ExecutionMetrics {
7273
pub fuel_consumed: Option<u64>,
7374
}
7475

76+
/// Per-challenge deduplication state. Each function that the WASM module
77+
/// requested deduplication for gets an [`AtomicBool`] guard.
78+
struct DedupState {
79+
flags: i32,
80+
sync_running: AtomicBool,
81+
get_weights_running: AtomicBool,
82+
evaluate_running: AtomicBool,
83+
}
84+
85+
impl DedupState {
86+
fn new(flags: i32) -> Self {
87+
Self {
88+
flags,
89+
sync_running: AtomicBool::new(false),
90+
get_weights_running: AtomicBool::new(false),
91+
evaluate_running: AtomicBool::new(false),
92+
}
93+
}
94+
95+
fn try_acquire(&self, flag: i32) -> Option<DedupGuard<'_>> {
96+
if self.flags & flag == 0 {
97+
return Some(DedupGuard { atom: None });
98+
}
99+
let atom = match flag {
100+
DedupFlags::SYNC => &self.sync_running,
101+
DedupFlags::GET_WEIGHTS => &self.get_weights_running,
102+
DedupFlags::EVALUATE => &self.evaluate_running,
103+
_ => return Some(DedupGuard { atom: None }),
104+
};
105+
if atom
106+
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
107+
.is_ok()
108+
{
109+
Some(DedupGuard { atom: Some(atom) })
110+
} else {
111+
None
112+
}
113+
}
114+
}
115+
116+
struct DedupGuard<'a> {
117+
atom: Option<&'a AtomicBool>,
118+
}
119+
120+
impl Drop for DedupGuard<'_> {
121+
fn drop(&mut self) {
122+
if let Some(atom) = self.atom {
123+
atom.store(false, Ordering::Release);
124+
}
125+
}
126+
}
127+
75128
pub struct WasmChallengeExecutor {
76129
runtime: WasmRuntime,
77130
config: WasmExecutorConfig,
78131
module_cache: RwLock<HashMap<String, Arc<WasmModule>>>,
132+
dedup_state: RwLock<HashMap<String, Arc<DedupState>>>,
79133
}
80134

81135
impl WasmChallengeExecutor {
@@ -101,9 +155,49 @@ impl WasmChallengeExecutor {
101155
runtime,
102156
config,
103157
module_cache: RwLock::new(HashMap::new()),
158+
dedup_state: RwLock::new(HashMap::new()),
104159
})
105160
}
106161

162+
fn get_or_init_dedup(&self, challenge_id: &str, module: &WasmModule) -> Arc<DedupState> {
163+
{
164+
let cache = self.dedup_state.read();
165+
if let Some(state) = cache.get(challenge_id) {
166+
return Arc::clone(state);
167+
}
168+
}
169+
let flags = self.query_dedup_flags(module);
170+
let state = Arc::new(DedupState::new(flags));
171+
let mut cache = self.dedup_state.write();
172+
cache
173+
.entry(challenge_id.to_string())
174+
.or_insert_with(|| Arc::clone(&state));
175+
Arc::clone(cache.get(challenge_id).unwrap())
176+
}
177+
178+
fn query_dedup_flags(&self, module: &WasmModule) -> i32 {
179+
let instance_config = InstanceConfig {
180+
challenge_id: "dedup-probe".to_string(),
181+
validator_id: "validator".to_string(),
182+
storage_host_config: self.config.storage_host_config.clone(),
183+
storage_backend: Arc::clone(&self.config.storage_backend),
184+
..Default::default()
185+
};
186+
let mut instance = match self.runtime.instantiate(module, instance_config, None) {
187+
Ok(i) => i,
188+
Err(_) => return DedupFlags::NONE,
189+
};
190+
match instance.call_return_i32("get_dedup_flags") {
191+
Ok(flags) => {
192+
if flags != 0 {
193+
info!(flags, "WASM module declares dedup flags");
194+
}
195+
flags
196+
}
197+
Err(_) => DedupFlags::NONE,
198+
}
199+
}
200+
107201
pub fn execute_evaluation(
108202
&self,
109203
module_path: &str,
@@ -137,6 +231,24 @@ impl WasmChallengeExecutor {
137231
.load_module(module_path)
138232
.context("Failed to load WASM module")?;
139233

234+
let dedup = self.get_or_init_dedup(module_path, &module);
235+
let _guard = match dedup.try_acquire(DedupFlags::EVALUATE) {
236+
Some(g) => g,
237+
None => {
238+
debug!(module = module_path, "evaluate skipped: already running");
239+
let metrics = ExecutionMetrics {
240+
execution_time_ms: 0,
241+
memory_used_bytes: 0,
242+
network_requests_made: 0,
243+
fuel_consumed: None,
244+
};
245+
return Ok((
246+
EvaluationOutput::failure("skipped: already running"),
247+
metrics,
248+
));
249+
}
250+
};
251+
140252
let input = EvaluationInput {
141253
agent_data: agent_data.to_vec(),
142254
challenge_id: challenge_id.to_string(),
@@ -941,6 +1053,15 @@ impl WasmChallengeExecutor {
9411053
.load_module(module_path)
9421054
.context("Failed to load WASM module")?;
9431055

1056+
let dedup = self.get_or_init_dedup(module_path, &module);
1057+
let _guard = match dedup.try_acquire(DedupFlags::GET_WEIGHTS) {
1058+
Some(g) => g,
1059+
None => {
1060+
debug!(module = module_path, "get_weights skipped: already running");
1061+
return Ok(Vec::new());
1062+
}
1063+
};
1064+
9441065
let instance_config = InstanceConfig {
9451066
challenge_id: module_path.to_string(),
9461067
validator_id: "validator".to_string(),
@@ -1030,6 +1151,22 @@ impl WasmChallengeExecutor {
10301151
.load_module(module_path)
10311152
.context("Failed to load WASM module")?;
10321153

1154+
let dedup = self.get_or_init_dedup(module_path, &module);
1155+
let _guard = match dedup.try_acquire(DedupFlags::SYNC) {
1156+
Some(g) => g,
1157+
None => {
1158+
debug!(module = module_path, "sync skipped: already running");
1159+
return Ok(platform_challenge_sdk_wasm::WasmSyncResult {
1160+
leaderboard_hash: [0u8; 32],
1161+
total_users: 0,
1162+
total_valid_issues: 0,
1163+
total_invalid_issues: 0,
1164+
total_pending_issues: 0,
1165+
sync_timestamp: 0,
1166+
});
1167+
}
1168+
};
1169+
10331170
let instance_config = InstanceConfig {
10341171
challenge_id: module_path.to_string(),
10351172
validator_id: "validator".to_string(),
@@ -1187,6 +1324,7 @@ impl WasmChallengeExecutor {
11871324
if cache.remove(module_path).is_some() {
11881325
info!(module = module_path, "WASM module cache entry invalidated");
11891326
}
1327+
self.dedup_state.write().remove(module_path);
11901328
}
11911329

11921330
#[allow(dead_code)]

crates/challenge-sdk-wasm/src/host_functions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use alloc::vec::Vec;
33

44
const RESPONSE_BUF_SMALL: usize = 4096;
55
const RESPONSE_BUF_MEDIUM: usize = 64 * 1024;
6-
const RESPONSE_BUF_LARGE: usize = 256 * 1024;
6+
const RESPONSE_BUF_LARGE: usize = 4 * 1024 * 1024;
77

88
#[link(wasm_import_module = "platform_network")]
99
extern "C" {

0 commit comments

Comments
 (0)