Skip to content
Merged
24 changes: 24 additions & 0 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,27 @@ pub(crate) trait ContractExecutor: Send + 'static {
/// A WASM executor which will run any contracts, delegates, etc. registered.
///
/// This executor will monitor the store directories and databases to detect state changes.
/// Represents an operation that's waiting for a contract to finish initializing
#[derive(Debug)]
struct QueuedOperation {
update: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
/// When this operation was queued
queued_at: std::time::Instant,
}

/// Tracks the initialization state of a contract
#[derive(Debug)]
enum ContractInitState {
/// Contract is currently being initialized (validation in progress)
Initializing {
/// Operations waiting for initialization to complete
queued_ops: Vec<QueuedOperation>,
/// When initialization started
started_at: std::time::Instant,
},
}

/// Consumers of the executor are required to poll for new changes in order to be notified
/// of changes or can alternatively use the notification channel.
pub struct Executor<R = Runtime> {
Expand All @@ -559,6 +580,8 @@ pub struct Executor<R = Runtime> {
subscriber_summaries: HashMap<ContractKey, HashMap<ClientId, Option<StateSummary<'static>>>>,
/// Attested contract instances for a given delegate.
delegate_attested_ids: HashMap<DelegateKey, Vec<ContractInstanceId>>,
/// Tracks contracts that are being initialized and operations queued for them
contract_init_state: HashMap<ContractKey, ContractInitState>,

event_loop_channel: Option<ExecutorToEventLoopChannel<ExecutorHalve>>,
}
Expand All @@ -580,6 +603,7 @@ impl<R> Executor<R> {
update_notifications: HashMap::default(),
subscriber_summaries: HashMap::default(),
delegate_attested_ids: HashMap::default(),
contract_init_state: HashMap::default(),
event_loop_channel,
})
}
Expand Down
171 changes: 143 additions & 28 deletions crates/core/src/contract/executor/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::*;
use super::{
ContractExecutor, ContractRequest, ContractResponse, ExecutorError, ExecutorHalve,
ExecutorToEventLoopChannel, RequestError, Response, StateStoreError,
ContractExecutor, ContractInitState, ContractRequest, ContractResponse, ExecutorError,
ExecutorHalve, ExecutorToEventLoopChannel, QueuedOperation, RequestError, Response,
StateStoreError,
};

impl ContractExecutor for Executor<Runtime> {
Expand Down Expand Up @@ -39,6 +40,45 @@ impl ContractExecutor for Executor<Runtime> {
related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
) -> Result<UpsertResult, ExecutorError> {
// Check if this contract is currently being initialized
if let Some(ContractInitState::Initializing { .. }) = self.contract_init_state.get(&key) {
// If we're trying to PUT while already initializing, that's an error
if code.is_some() {
return Err(ExecutorError::request(StdContractError::Put {
key,
cause: "contract is already being initialized".into(),
}));
}

// This is an UPDATE arriving during initialization - queue it
tracing::info!(
contract = %key,
"Operation arrived during contract initialization - queueing for later processing"
);

if let Some(ContractInitState::Initializing {
ref mut queued_ops, ..
}) = self.contract_init_state.get_mut(&key)
{
queued_ops.push(QueuedOperation {
update: update.clone(),
related_contracts: related_contracts.clone(),
queued_at: std::time::Instant::now(),
});

// Return a placeholder response indicating the operation is queued
// The caller should retry later once initialization is complete
tracing::debug!(
contract = %key,
queued_count = queued_ops.len(),
"Operation queued, initialization still in progress"
);

// For now, return NoChange to indicate the operation didn't fail but also didn't complete
// In the future, we might want a specific QueuedResult variant
return Ok(UpsertResult::NoChange);
}
}
if let Either::Left(ref state) = update {
let hash = blake3::hash(state.as_ref());
tracing::debug!(
Expand All @@ -51,55 +91,68 @@ impl ContractExecutor for Executor<Runtime> {
let params = if let Some(code) = &code {
code.params()
} else {
// Contract not provided, need to get params from state_store
self.state_store
.get_params(&key)
.await
.map_err(ExecutorError::other)?
.ok_or_else(|| {
// This error occurs when an UPDATE arrives for a contract whose
// parameters haven't been stored yet (race condition)
tracing::warn!(
contract = %key,
is_delta = matches!(update, Either::Right(_)),
"Contract parameters not found in state_store"
);
ExecutorError::request(StdContractError::Put {
key,
cause: "missing contract parameters".into(),
})
})?
};

let remove_if_fail = if self
// Track if we stored a new contract
let (remove_if_fail, contract_was_provided) = if self
.runtime
.contract_store
.fetch_contract(&key, &params)
.is_none()
{
let code = code.ok_or_else(|| {
ExecutorError::request(StdContractError::MissingContract { key: key.into() })
})?;
// DEBUG: Log before and after store_contract
tracing::debug!(
"DEBUG PUT: Before store_contract - key={}, key.code_hash={:?}",
key,
key.code_hash()
);

self.runtime
.contract_store
.store_contract(code.clone())
.map_err(ExecutorError::other)?;

// Immediately verify the contract was stored
let fetch_result = self.runtime.contract_store.fetch_contract(&key, &params);
tracing::debug!(
"DEBUG PUT: After store_contract - key={}, stored successfully={}, immediate fetch result={}",
key,
true,
fetch_result.is_some()
);
if let Some(ref contract_code) = code {
tracing::debug!("Storing new contract - key={}", key);

true
self.runtime
.contract_store
.store_contract(contract_code.clone())
.map_err(ExecutorError::other)?;
(true, true)
} else {
return Err(ExecutorError::request(StdContractError::MissingContract {
key: key.into(),
}));
}
} else {
false
(false, code.is_some())
};

let is_new_contract = self.state_store.get(&key).await.is_err();

// If this is a new contract being stored, mark it as initializing
if remove_if_fail && is_new_contract && contract_was_provided {
tracing::debug!(
contract = %key,
"Starting contract initialization - queueing subsequent operations"
);

self.contract_init_state.insert(
key,
ContractInitState::Initializing {
queued_ops: Vec::new(),
started_at: std::time::Instant::now(),
},
);
}

let mut updates = match update {
Either::Left(incoming_state) => {
let result = self
Expand All @@ -124,13 +177,67 @@ impl ContractExecutor for Executor<Runtime> {
.await
.map_err(ExecutorError::other)?;

// Contract initialization complete - mark as ready and get queued ops
let queued_ops = if let Some(ContractInitState::Initializing {
queued_ops,
started_at,
}) = self.contract_init_state.remove(&key)
{
let init_duration = started_at.elapsed();
tracing::info!(
contract = %key,
queued_operations = queued_ops.len(),
init_duration_ms = init_duration.as_millis(),
"Contract initialization complete, will process queued operations"
);
queued_ops
} else {
Vec::new()
};

// Log details about queued operations
if !queued_ops.is_empty() {
for op in &queued_ops {
let queue_time = op.queued_at.elapsed();
tracing::info!(
contract = %key,
queue_time_ms = queue_time.as_millis(),
is_delta = matches!(op.update, Either::Right(_)),
has_related = op.related_contracts.states().next().is_some(),
"Queued operation ready for retry after initialization"
);
}
// The queued operations will be handled when the sender retries them
// Now that initialization is complete, they will succeed
}

return Ok(UpsertResult::Updated(incoming_state));
}
}
ValidateResult::Invalid => {
// Validation failed - clear any queued operations
if let Some(ContractInitState::Initializing { queued_ops, .. }) =
self.contract_init_state.remove(&key)
{
tracing::warn!(
contract = %key,
dropped_operations = queued_ops.len(),
"Contract validation failed, dropping queued operations"
);
}
return Err(ExecutorError::request(StdContractError::invalid_put(key)));
}
ValidateResult::RequestRelated(mut related) => {
// Clear any queued operations since we're missing related contracts
if let Some(ContractInitState::Initializing { queued_ops, .. }) =
self.contract_init_state.remove(&key)
{
tracing::warn!(
contract = %key,
dropped_operations = queued_ops.len(),
"Missing related contracts, dropping queued operations"
);
}
if let Some(key) = related.pop() {
return Err(ExecutorError::request(StdContractError::MissingRelated {
key,
Expand Down Expand Up @@ -1122,8 +1229,16 @@ impl Executor<Runtime> {
.await
.map_err(ExecutorError::other)?
else {
// Parameters not in state_store yet
// This can happen when a contract was just stored but state hasn't been stored yet
// In this case, we can't fetch the contract because fetch_contract requires params
tracing::debug!(
contract = %key,
"Contract parameters not in state_store, cannot fetch contract"
);
return Ok(None);
};

let Some(contract) = self.runtime.contract_store.fetch_contract(key, &parameters) else {
return Ok(None);
};
Expand Down
Loading