diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index eb3d09dcd..f355f81eb 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -547,6 +547,27 @@ pub(crate) trait ContractExecutor: Send + 'static { /// A WASM executor which will run any contracts, delegates, etc. registered. /// /// This executor will monitor the store directories and databases to detect state changes. +/// Represents an operation that's waiting for a contract to finish initializing +#[derive(Debug)] +struct QueuedOperation { + update: Either>, + related_contracts: RelatedContracts<'static>, + /// When this operation was queued + queued_at: std::time::Instant, +} + +/// Tracks the initialization state of a contract +#[derive(Debug)] +enum ContractInitState { + /// Contract is currently being initialized (validation in progress) + Initializing { + /// Operations waiting for initialization to complete + queued_ops: Vec, + /// When initialization started + started_at: std::time::Instant, + }, +} + /// Consumers of the executor are required to poll for new changes in order to be notified /// of changes or can alternatively use the notification channel. pub struct Executor { @@ -559,6 +580,8 @@ pub struct Executor { subscriber_summaries: HashMap>>>, /// Attested contract instances for a given delegate. delegate_attested_ids: HashMap>, + /// Tracks contracts that are being initialized and operations queued for them + contract_init_state: HashMap, event_loop_channel: Option>, } @@ -580,6 +603,7 @@ impl Executor { update_notifications: HashMap::default(), subscriber_summaries: HashMap::default(), delegate_attested_ids: HashMap::default(), + contract_init_state: HashMap::default(), event_loop_channel, }) } diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index 2a48ec925..dcb5e9927 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -1,7 +1,8 @@ use super::*; use super::{ - ContractExecutor, ContractRequest, ContractResponse, ExecutorError, ExecutorHalve, - ExecutorToEventLoopChannel, RequestError, Response, StateStoreError, + ContractExecutor, ContractInitState, ContractRequest, ContractResponse, ExecutorError, + ExecutorHalve, ExecutorToEventLoopChannel, QueuedOperation, RequestError, Response, + StateStoreError, }; impl ContractExecutor for Executor { @@ -39,6 +40,45 @@ impl ContractExecutor for Executor { related_contracts: RelatedContracts<'static>, code: Option, ) -> Result { + // Check if this contract is currently being initialized + if let Some(ContractInitState::Initializing { .. }) = self.contract_init_state.get(&key) { + // If we're trying to PUT while already initializing, that's an error + if code.is_some() { + return Err(ExecutorError::request(StdContractError::Put { + key, + cause: "contract is already being initialized".into(), + })); + } + + // This is an UPDATE arriving during initialization - queue it + tracing::info!( + contract = %key, + "Operation arrived during contract initialization - queueing for later processing" + ); + + if let Some(ContractInitState::Initializing { + ref mut queued_ops, .. + }) = self.contract_init_state.get_mut(&key) + { + queued_ops.push(QueuedOperation { + update: update.clone(), + related_contracts: related_contracts.clone(), + queued_at: std::time::Instant::now(), + }); + + // Return a placeholder response indicating the operation is queued + // The caller should retry later once initialization is complete + tracing::debug!( + contract = %key, + queued_count = queued_ops.len(), + "Operation queued, initialization still in progress" + ); + + // For now, return NoChange to indicate the operation didn't fail but also didn't complete + // In the future, we might want a specific QueuedResult variant + return Ok(UpsertResult::NoChange); + } + } if let Either::Left(ref state) = update { let hash = blake3::hash(state.as_ref()); tracing::debug!( @@ -51,11 +91,19 @@ impl ContractExecutor for Executor { let params = if let Some(code) = &code { code.params() } else { + // Contract not provided, need to get params from state_store self.state_store .get_params(&key) .await .map_err(ExecutorError::other)? .ok_or_else(|| { + // This error occurs when an UPDATE arrives for a contract whose + // parameters haven't been stored yet (race condition) + tracing::warn!( + contract = %key, + is_delta = matches!(update, Either::Right(_)), + "Contract parameters not found in state_store" + ); ExecutorError::request(StdContractError::Put { key, cause: "missing contract parameters".into(), @@ -63,43 +111,48 @@ impl ContractExecutor for Executor { })? }; - let remove_if_fail = if self + // Track if we stored a new contract + let (remove_if_fail, contract_was_provided) = if self .runtime .contract_store .fetch_contract(&key, ¶ms) .is_none() { - let code = code.ok_or_else(|| { - ExecutorError::request(StdContractError::MissingContract { key: key.into() }) - })?; - // DEBUG: Log before and after store_contract - tracing::debug!( - "DEBUG PUT: Before store_contract - key={}, key.code_hash={:?}", - key, - key.code_hash() - ); - - self.runtime - .contract_store - .store_contract(code.clone()) - .map_err(ExecutorError::other)?; - - // Immediately verify the contract was stored - let fetch_result = self.runtime.contract_store.fetch_contract(&key, ¶ms); - tracing::debug!( - "DEBUG PUT: After store_contract - key={}, stored successfully={}, immediate fetch result={}", - key, - true, - fetch_result.is_some() - ); + if let Some(ref contract_code) = code { + tracing::debug!("Storing new contract - key={}", key); - true + self.runtime + .contract_store + .store_contract(contract_code.clone()) + .map_err(ExecutorError::other)?; + (true, true) + } else { + return Err(ExecutorError::request(StdContractError::MissingContract { + key: key.into(), + })); + } } else { - false + (false, code.is_some()) }; let is_new_contract = self.state_store.get(&key).await.is_err(); + // If this is a new contract being stored, mark it as initializing + if remove_if_fail && is_new_contract && contract_was_provided { + tracing::debug!( + contract = %key, + "Starting contract initialization - queueing subsequent operations" + ); + + self.contract_init_state.insert( + key, + ContractInitState::Initializing { + queued_ops: Vec::new(), + started_at: std::time::Instant::now(), + }, + ); + } + let mut updates = match update { Either::Left(incoming_state) => { let result = self @@ -124,13 +177,67 @@ impl ContractExecutor for Executor { .await .map_err(ExecutorError::other)?; + // Contract initialization complete - mark as ready and get queued ops + let queued_ops = if let Some(ContractInitState::Initializing { + queued_ops, + started_at, + }) = self.contract_init_state.remove(&key) + { + let init_duration = started_at.elapsed(); + tracing::info!( + contract = %key, + queued_operations = queued_ops.len(), + init_duration_ms = init_duration.as_millis(), + "Contract initialization complete, will process queued operations" + ); + queued_ops + } else { + Vec::new() + }; + + // Log details about queued operations + if !queued_ops.is_empty() { + for op in &queued_ops { + let queue_time = op.queued_at.elapsed(); + tracing::info!( + contract = %key, + queue_time_ms = queue_time.as_millis(), + is_delta = matches!(op.update, Either::Right(_)), + has_related = op.related_contracts.states().next().is_some(), + "Queued operation ready for retry after initialization" + ); + } + // The queued operations will be handled when the sender retries them + // Now that initialization is complete, they will succeed + } + return Ok(UpsertResult::Updated(incoming_state)); } } ValidateResult::Invalid => { + // Validation failed - clear any queued operations + if let Some(ContractInitState::Initializing { queued_ops, .. }) = + self.contract_init_state.remove(&key) + { + tracing::warn!( + contract = %key, + dropped_operations = queued_ops.len(), + "Contract validation failed, dropping queued operations" + ); + } return Err(ExecutorError::request(StdContractError::invalid_put(key))); } ValidateResult::RequestRelated(mut related) => { + // Clear any queued operations since we're missing related contracts + if let Some(ContractInitState::Initializing { queued_ops, .. }) = + self.contract_init_state.remove(&key) + { + tracing::warn!( + contract = %key, + dropped_operations = queued_ops.len(), + "Missing related contracts, dropping queued operations" + ); + } if let Some(key) = related.pop() { return Err(ExecutorError::request(StdContractError::MissingRelated { key, @@ -1122,8 +1229,16 @@ impl Executor { .await .map_err(ExecutorError::other)? else { + // Parameters not in state_store yet + // This can happen when a contract was just stored but state hasn't been stored yet + // In this case, we can't fetch the contract because fetch_contract requires params + tracing::debug!( + contract = %key, + "Contract parameters not in state_store, cannot fetch contract" + ); return Ok(None); }; + let Some(contract) = self.runtime.contract_store.fetch_contract(key, ¶meters) else { return Ok(None); };