diff --git a/Cargo.lock b/Cargo.lock index dffc8eff53a..f2318bc792d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -152,9 +152,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.97" +version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" +checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" dependencies = [ "backtrace", ] @@ -716,7 +716,7 @@ dependencies = [ "bitflags 2.9.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.12.1", "lazy_static", "lazycell", "log", @@ -739,7 +739,7 @@ dependencies = [ "bitflags 2.9.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -1615,7 +1615,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -5304,7 +5304,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -6062,6 +6062,7 @@ dependencies = [ "nexus-db-schema", "nexus-inventory", "nexus-reconfigurator-planning", + "nexus-saga-interface", "nexus-sled-agent-shared", "nexus-test-utils", "nexus-types", @@ -6483,6 +6484,25 @@ dependencies = [ "uuid", ] +[[package]] +name = "nexus-saga-interface" +version = "0.1.0" +dependencies = [ + "async-trait", + "nexus-auth", + "nexus-background-task-interface", + "nexus-db-lookup", + "omicron-common", + "omicron-workspace-hack", + "paste", + "serde", + "serde_json", + "slog", + "steno", + "thiserror 1.0.69", + "uuid", +] + [[package]] name = "nexus-saga-recovery" version = "0.1.0" @@ -6510,6 +6530,52 @@ dependencies = [ "uuid", ] +[[package]] +name = "nexus-saga-tests" +version = "0.1.0" +dependencies = [ + "async-bb8-diesel", + "diesel", + "dropshot 0.16.0", + "futures", + "nexus-db-lookup", + "nexus-db-model", + "nexus-db-queries", + "nexus-db-schema", + "nexus-saga-interface", + "nexus-sagas", + "nexus-test-utils", + "nexus-test-utils-macros", + "nexus-types", + "omicron-common", + "omicron-nexus", + "omicron-sled-agent", + "omicron-test-utils", + "omicron-uuid-kinds", + "omicron-workspace-hack", + "sled-agent-client", + "slog", + "steno", + "tokio", + "uuid", +] + +[[package]] +name = "nexus-sagas" +version = "0.1.0" +dependencies = [ + "nexus-auth", + "nexus-db-lookup", + "nexus-db-model", + "nexus-saga-interface", + "omicron-common", + "omicron-uuid-kinds", + "omicron-workspace-hack", + "serde", + "steno", + "uuid", +] + [[package]] name = "nexus-sled-agent-shared" version = "0.1.0" @@ -7322,7 +7388,9 @@ dependencies = [ "nexus-reconfigurator-planning", "nexus-reconfigurator-preparation", "nexus-reconfigurator-rendezvous", + "nexus-saga-interface", "nexus-saga-recovery", + "nexus-sagas", "nexus-sled-agent-shared", "nexus-test-interface", "nexus-test-utils", @@ -7880,7 +7948,7 @@ dependencies = [ "inout", "ipnetwork", "itertools 0.10.5", - "itertools 0.13.0", + "itertools 0.12.1", "lalrpop-util", "lazy_static", "libc", @@ -7901,7 +7969,6 @@ dependencies = [ "peg-runtime", "pem-rfc7468", "percent-encoding", - "petgraph 0.6.5", "phf", "phf_shared", "pkcs8", @@ -9033,8 +9100,6 @@ checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset 0.4.2", "indexmap 2.7.1", - "serde", - "serde_derive", ] [[package]] @@ -9047,6 +9112,19 @@ dependencies = [ "indexmap 2.7.1", ] +[[package]] +name = "petgraph" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a98c6720655620a521dcc722d0ad66cd8afd5d86e34a89ef691c50b7b24de06" +dependencies = [ + "fixedbitset 0.5.7", + "hashbrown 0.15.1", + "indexmap 2.7.1", + "serde", + "serde_derive", +] + [[package]] name = "petname" version = "2.0.2" @@ -9054,7 +9132,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cd31dcfdbbd7431a807ef4df6edd6473228e94d5c805e8cf671227a21bad068" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.14.0", "proc-macro2", "quote", "rand 0.8.5", @@ -11940,9 +12018,8 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "steno" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a77bea0b19d52c04783569b87bb3e8d47c85e285be239a9b0428241bdbb95460" +version = "0.4.2-dev" +source = "git+https://github.com/oxidecomputer/steno?branch=sunshowers%2Fspr%2Fdraft-add-actioncontextmap_user_data#bc9409ea1295b7a8c1ddef1f61ebdda411fe0022" dependencies = [ "anyhow", "async-trait", @@ -11950,12 +12027,12 @@ dependencies = [ "futures", "lazy_static", "newtype_derive", - "petgraph 0.6.5", + "petgraph 0.8.1", "schemars", "serde", "serde_json", "slog", - "thiserror 1.0.69", + "thiserror 2.0.12", "tokio", "uuid", ] @@ -14126,7 +14203,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index fb00326b9e4..1bfb526abfe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,7 +100,10 @@ members = [ "nexus/reconfigurator/preparation", "nexus/reconfigurator/rendezvous", "nexus/reconfigurator/simulation", + "nexus/saga-interface", "nexus/saga-recovery", + "nexus/saga-tests", + "nexus/sagas", "nexus/test-interface", "nexus/test-utils-macros", "nexus/test-utils", @@ -249,7 +252,10 @@ default-members = [ "nexus/reconfigurator/preparation", "nexus/reconfigurator/rendezvous", "nexus/reconfigurator/simulation", + "nexus/saga-interface", "nexus/saga-recovery", + "nexus/saga-tests", + "nexus/sagas", "nexus/test-interface", "nexus/test-utils-macros", "nexus/test-utils", @@ -519,7 +525,10 @@ nexus-reconfigurator-planning = { path = "nexus/reconfigurator/planning" } nexus-reconfigurator-preparation = { path = "nexus/reconfigurator/preparation" } nexus-reconfigurator-rendezvous = { path = "nexus/reconfigurator/rendezvous" } nexus-reconfigurator-simulation = { path = "nexus/reconfigurator/simulation" } +nexus-saga-interface = { path = "nexus/saga-interface" } nexus-saga-recovery = { path = "nexus/saga-recovery" } +nexus-saga-tests = { path = "nexus/saga-tests" } +nexus-sagas = { path = "nexus/sagas" } nexus-sled-agent-shared = { path = "nexus-sled-agent-shared" } nexus-test-interface = { path = "nexus/test-interface" } nexus-test-utils-macros = { path = "nexus/test-utils-macros" } @@ -677,7 +686,7 @@ static_assertions = "1.1.0" # Please do not change the Steno version to a Git dependency. It makes it # harder than expected to make breaking changes (even if you specify a specific # SHA). Cut a new Steno release instead. See omicron#2117. -steno = "0.4.1" +steno = { version = "0.4.2-dev" } strum = { version = "0.26", features = [ "derive" ] } subprocess = "0.2.9" supports-color = "3.0.2" @@ -896,13 +905,13 @@ opt-level = 3 # It's common during development to use a local copy of various complex # dependencies. If you want to use those, uncomment one of these blocks. # -# [patch.crates-io] +[patch.crates-io] # diesel = { path = "../../diesel/diesel" } # dropshot = { path = "../dropshot/dropshot" } # dropshot_endpoint = { path = "../dropshot/dropshot_endpoint" } # progenitor = { path = "../progenitor/progenitor" } # progenitor-client = { path = "../progenitor/progenitor-client" } -# steno = { path = "../steno" } +steno = { git = "https://github.com/oxidecomputer/steno", branch = "sunshowers/spr/draft-add-actioncontextmap_user_data" } # [patch."https://github.com/oxidecomputer/crucible"] # crucible-agent-client = { path = "../crucible/agent-client" } diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 0570aeb0690..19f882f9d9e 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -60,7 +60,9 @@ nexus-external-api.workspace = true nexus-internal-api.workspace = true nexus-mgs-updates.workspace = true nexus-networking.workspace = true +nexus-saga-interface.workspace = true nexus-saga-recovery.workspace = true +nexus-sagas.workspace = true nexus-test-interface.workspace = true num-integer.workspace = true openssl.workspace = true diff --git a/nexus/db-queries/Cargo.toml b/nexus/db-queries/Cargo.toml index 5b40b7b271b..0685b30f83e 100644 --- a/nexus/db-queries/Cargo.toml +++ b/nexus/db-queries/Cargo.toml @@ -62,6 +62,7 @@ nexus-db-fixed-data.workspace = true nexus-db-model.workspace = true nexus-db-lookup.workspace = true nexus-db-schema.workspace = true +nexus-saga-interface.workspace = true nexus-sled-agent-shared.workspace = true nexus-types.workspace = true omicron-common.workspace = true diff --git a/nexus/db-queries/src/db/mod.rs b/nexus/db-queries/src/db/mod.rs index cabb3cffefb..59b46a5e79a 100644 --- a/nexus/db-queries/src/db/mod.rs +++ b/nexus/db-queries/src/db/mod.rs @@ -25,6 +25,7 @@ mod pool_connection; // sagas. pub mod queries; mod raw_query_builder; +mod saga_interface; mod sec_store; pub(crate) mod true_or_cast_error; mod update_and_check; diff --git a/nexus/db-queries/src/db/saga_interface.rs b/nexus/db-queries/src/db/saga_interface.rs new file mode 100644 index 00000000000..690976ba025 --- /dev/null +++ b/nexus/db-queries/src/db/saga_interface.rs @@ -0,0 +1,47 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use nexus_auth::authz; +use nexus_auth::context::OpContext; +use nexus_saga_interface::DataStoreImpl; +use omicron_common::api::external::DeleteResult; +use omicron_common::api::external::Error; +use uuid::Uuid; + +use super::DataStore; + +#[async_trait::async_trait] +impl DataStoreImpl for DataStore { + async fn project_delete_instance( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + ) -> DeleteResult { + self.project_delete_instance(opctx, authz_instance).await + } + + async fn instance_delete_all_network_interfaces( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + ) -> DeleteResult { + self.instance_delete_all_network_interfaces(opctx, authz_instance).await + } + + async fn deallocate_external_ip_by_instance_id( + &self, + opctx: &OpContext, + instance_id: Uuid, + ) -> Result { + self.deallocate_external_ip_by_instance_id(opctx, instance_id).await + } + + async fn detach_floating_ips_by_instance_id( + &self, + opctx: &OpContext, + instance_id: Uuid, + ) -> Result { + self.detach_floating_ips_by_instance_id(opctx, instance_id).await + } +} diff --git a/nexus/saga-interface/Cargo.toml b/nexus/saga-interface/Cargo.toml new file mode 100644 index 00000000000..c40294033b4 --- /dev/null +++ b/nexus/saga-interface/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "nexus-saga-interface" +version = "0.1.0" +edition = "2021" +license = "MPL-2.0" + +[lints] +workspace = true + +[dependencies] +async-trait.workspace = true +nexus-auth.workspace = true +nexus-background-task-interface.workspace = true +# Note: this crate depends on nexus-db-lookup but not nexus-db-queries, for +# build speed reasons. +nexus-db-lookup.workspace = true +omicron-common.workspace = true +omicron-workspace-hack.workspace = true +paste.workspace = true +serde.workspace = true +serde_json.workspace = true +steno.workspace = true +slog.workspace = true +thiserror.workspace = true +uuid.workspace = true diff --git a/nexus/saga-interface/src/datastore_context.rs b/nexus/saga-interface/src/datastore_context.rs new file mode 100644 index 00000000000..74d0d5401d3 --- /dev/null +++ b/nexus/saga-interface/src/datastore_context.rs @@ -0,0 +1,116 @@ +use std::sync::Arc; + +use nexus_auth::{authz, context::OpContext, storage::Storage}; +use nexus_db_lookup::LookupDataStore; +use omicron_common::api::external::{DeleteResult, Error}; +use uuid::Uuid; + +pub struct DataStoreContext { + // These are references to the same datastore with different vtables. This + // allows returning `Arc` objects in functions that require them. + datastore: Arc, + storage: Arc, +} + +impl DataStoreContext { + pub fn new(datastore: Arc) -> Self { + let storage = datastore.clone(); + Self { datastore, storage } + } + + #[inline] + pub fn as_storage(&self) -> &Arc { + &self.storage + } + + /// Deletes the provided `authz_instance`, as long as it is eligible for + /// deletion (in either the [`InstanceState::NoVmm`] or + /// [`InstanceState::Failed`] state, or it has already started being + /// deleted successfully). + /// + /// This function is idempotent, but not atomic. + pub async fn project_delete_instance( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + ) -> DeleteResult { + self.datastore.project_delete_instance(opctx, authz_instance).await + } + + /// Delete all network interfaces attached to the given instance. + pub async fn instance_delete_all_network_interfaces( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + ) -> DeleteResult { + self.datastore + .instance_delete_all_network_interfaces(opctx, authz_instance) + .await + } + + /// Delete all non-floating IP addresses associated with the provided instance + /// ID. + /// + /// This method returns the number of records deleted, rather than the usual + /// `DeleteResult`. That's mostly useful for tests, but could be important + /// if callers have some invariants they'd like to check. + pub async fn deallocate_external_ip_by_instance_id( + &self, + opctx: &OpContext, + instance_id: Uuid, + ) -> Result { + self.datastore + .deallocate_external_ip_by_instance_id(opctx, instance_id) + .await + } + + /// Detach an individual Floating IP address from their parent instance. + /// + /// As in `deallocate_external_ip_by_instance_id`, this method returns the + /// number of records altered, rather than an `UpdateResult`. + /// + /// This method ignores ongoing state transitions, and is only safely + /// usable from within the instance_delete saga. + pub async fn detach_floating_ips_by_instance_id( + &self, + opctx: &OpContext, + instance_id: Uuid, + ) -> Result { + self.datastore + .detach_floating_ips_by_instance_id(opctx, instance_id) + .await + } +} + +#[async_trait::async_trait] +pub trait DataStoreImpl: Storage + LookupDataStore { + async fn project_delete_instance( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + ) -> DeleteResult; + + async fn instance_delete_all_network_interfaces( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + ) -> DeleteResult; + + async fn deallocate_external_ip_by_instance_id( + &self, + opctx: &OpContext, + instance_id: Uuid, + ) -> Result; + + async fn detach_floating_ips_by_instance_id( + &self, + opctx: &OpContext, + instance_id: Uuid, + ) -> Result; +} + +impl<'a> From<&'a DataStoreContext> for &'a dyn LookupDataStore { + fn from(datastore: &'a DataStoreContext) -> &'a dyn LookupDataStore { + &*datastore.datastore + } +} diff --git a/nexus/saga-interface/src/lib.rs b/nexus/saga-interface/src/lib.rs new file mode 100644 index 00000000000..8c401ecc6e6 --- /dev/null +++ b/nexus/saga-interface/src/lib.rs @@ -0,0 +1,14 @@ +mod datastore_context; +mod macros; +mod nexus_saga; +mod nexus_saga_2; +mod saga_context; + +pub use datastore_context::*; +pub use nexus_saga::*; +pub use saga_context::*; + +pub mod macro_support { + pub use paste; + pub use steno; +} diff --git a/nexus/saga-interface/src/macros.rs b/nexus/saga-interface/src/macros.rs new file mode 100644 index 00000000000..c07d1ed8fcb --- /dev/null +++ b/nexus/saga-interface/src/macros.rs @@ -0,0 +1,135 @@ +/// A macro intended to reduce boilerplate when writing saga actions. +/// +/// This macro aims to reduce this boilerplate, by requiring only the following: +/// - The name of the saga +/// - The name of each action +/// - The output of each action +/// - The "forward" action function +/// - (Optional) The "undo" action function +/// +/// For this input: +/// +/// ```ignore +/// declare_saga_actions! { +/// my_saga; +/// SAGA_NODE1 -> "output1" { +/// + do1 +/// - undo1 +/// } +/// SAGA_NODE2 -> "output2" { +/// + do2 +/// } +/// } +/// ``` +/// +/// We generate the following: +/// - For `SAGA_NODE1`: +/// - A `NexusAction` labeled "my_saga.saga_node1" (containing "do1" and "undo1"). +/// - `fn saga_node1_action() -> steno::Node` referencing this node, with an +/// output named "output1". +/// - For `SAGA_NODE2`: +/// - A `NexusAction` labeled "my_saga.saga_node2" (containing "do2"). +/// - `fn saga_node2_action() -> steno::Node` referencing this node, with an +/// output named "output2". +/// - For `my_saga`: +/// - `fn my_saga_register_actions(...)`, which can be called to implement +/// `NexusSaga::register_actions`. +#[macro_export] +macro_rules! declare_saga_actions { + // The entrypoint to the macro. + // We expect the input to be of the form: + // + // saga-name; + ($saga:ident; $($tail:tt)*) => { + declare_saga_actions!(S = $saga <> $($tail)*); + }; + // Subsequent lines of the saga action declaration. + // These take the form: + // + // ACTION_NAME -> "output" { + // + action + // - undo_action + // } + // + // However, we also want to propagate the Saga structure and collection of + // all node names, so this is *actually* parsed with a hidden prefix: + // + // S = SagaName <> ... + // + // Basically, everything to the left of "<>" is just us propagating state + // through the macro, and everything to the right of it is user input. + (S = $saga:ident $($nodes:ident),* <> $node:ident -> $out:literal { + $a:ident - $u:ident } $($tail:tt)*) => { + static $node: ::std::sync::LazyLock<$crate::NexusAction> = + ::std::sync::LazyLock::new(|| { + $crate::macro_support::steno::ActionFunc::new_action( + $crate::__action_name!($saga, $node), $a, $u, + ) + }); + $crate::__emit_action!($node, $out); + declare_saga_actions!(S = $saga $($nodes,)* $node <> $($tail)*); + }; + // Same as the prior match, but without the undo action. + (S = $saga:ident $($nodes:ident),* <> $node:ident -> $out:literal { + $a:ident } $($tail:tt)*) => { + static $node: ::std::sync::LazyLock<$crate::NexusAction> = + ::std::sync::LazyLock::new(|| { + ::steno::new_action_noop_undo( + $crate::__action_name!($saga, $node), $a, + ) + }); + $crate::__emit_action!($node, $out); + declare_saga_actions!(S = $saga $($nodes,)* $node <> $($tail)*); + }; + // The end of the macro, which registers all previous generated saga nodes. + // + // We generate a new function, rather than implementing + // "NexusSaga::register_actions", because traits cannot be partially + // implemented, and "make_saga_dag" is not being generated through this + // macro. + (S = $saga:ident $($nodes:ident),* <>) => { + $crate::macro_support::paste::paste! { + fn [<$saga _list_actions>]() -> Vec<$crate::NexusAction> { + vec![ + $( + ::std::sync::Arc::clone(&* $nodes ), + )* + ] + } + } + }; +} + +#[macro_export] +macro_rules! __emit_action { + ($node:ident, $output:literal) => { + $crate::macro_support::paste::paste! { + #[allow(dead_code)] + fn [<$node:lower _action>]() -> $crate::macro_support::steno::Node { + $crate::macro_support::steno::Node::action( + $output, + $crate::__stringify_ident!([<$node:camel>]), + $node.as_ref(), + ) + } + } + }; +} + +#[macro_export] +macro_rules! __action_name { + ($saga:ident, $node:ident) => { + $crate::macro_support::paste::paste! { + concat!( + stringify!($saga), + ".", + $crate::__stringify_ident!([<$node:lower>]), + ) + } + }; +} + +#[macro_export] +macro_rules! __stringify_ident { + ($i:ident) => { + stringify!($i) + }; +} diff --git a/nexus/saga-interface/src/nexus_saga.rs b/nexus/saga-interface/src/nexus_saga.rs new file mode 100644 index 00000000000..2d26ded5038 --- /dev/null +++ b/nexus/saga-interface/src/nexus_saga.rs @@ -0,0 +1,90 @@ +use std::sync::Arc; + +use omicron_common::api::external::Error; +use steno::{DagBuilder, SagaDag, SagaName}; +use thiserror::Error; + +use crate::saga_context::SagaContext; + +#[derive(Debug)] +pub struct NexusSagaType; + +impl steno::SagaType for NexusSagaType { + type ExecContextType = Arc; +} + +pub type ActionRegistry = steno::ActionRegistry; +pub type NexusAction = Arc>; +pub type NexusActionContext = steno::ActionContext; + +/// Given a particular kind of Nexus saga (the type parameter `N`) and +/// parameters for that saga, construct a [`SagaDag`] for it. +pub fn create_saga_dag( + params: N::Params, +) -> Result { + N::prepare(¶ms) +} + +pub trait NexusSaga2 { + const NAME: &'static str; + + type Params: serde::Serialize + + serde::de::DeserializeOwned + + std::fmt::Debug; + + fn actions() -> Vec; + + fn make_saga_dag( + params: &Self::Params, + builder: steno::DagBuilder, + ) -> Result; + + fn prepare( + params: &Self::Params, + ) -> Result { + let builder = DagBuilder::new(SagaName::new(Self::NAME)); + let dag = Self::make_saga_dag(¶ms, builder)?; + let params = serde_json::to_value(¶ms).map_err(|e| { + SagaInitError::SerializeError(format!("saga params: {params:?}"), e) + })?; + Ok(SagaDag::new(dag, params)) + } +} + +#[derive(Debug, Error)] +pub enum SagaInitError { + #[error("internal error building saga graph: {0:#}")] + DagBuildError(steno::DagBuilderError), + + #[error("failed to serialize {0:?}: {1:#}")] + SerializeError(String, serde_json::Error), + + #[error("invalid parameter: {0}")] + InvalidParameter(String), +} + +impl From for SagaInitError { + fn from(error: steno::DagBuilderError) -> Self { + SagaInitError::DagBuildError(error) + } +} + +impl From for omicron_common::api::external::Error { + fn from(error: SagaInitError) -> Self { + match error { + SagaInitError::DagBuildError(_) + | SagaInitError::SerializeError(_, _) => { + // All of these errors reflect things that shouldn't be possible. + // They're basically bugs. + omicron_common::api::external::Error::internal_error(&format!( + "creating saga: {:#}", + error + )) + } + + SagaInitError::InvalidParameter(s) => { + omicron_common::api::external::Error::invalid_request(&s) + } + } + } +} diff --git a/nexus/saga-interface/src/nexus_saga_2.rs b/nexus/saga-interface/src/nexus_saga_2.rs new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/nexus/saga-interface/src/nexus_saga_2.rs @@ -0,0 +1 @@ + diff --git a/nexus/saga-interface/src/saga_context.rs b/nexus/saga-interface/src/saga_context.rs new file mode 100644 index 00000000000..73129104257 --- /dev/null +++ b/nexus/saga-interface/src/saga_context.rs @@ -0,0 +1,134 @@ +use std::{fmt, sync::Arc}; + +use nexus_auth::{ + authn, + authz::{self, Authz}, + context::{OpContext, OpKind}, +}; +use nexus_background_task_interface::BackgroundTasks; +use omicron_common::api::external::Error; +use slog::{Logger, o}; + +use crate::DataStoreContext; + +pub struct SagaContext { + log: Logger, + nexus: NexusContext, +} + +impl fmt::Debug for SagaContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("SagaContext { (nexus) ... }") + } +} + +impl SagaContext { + pub fn new(log: Logger, nexus: NexusContext) -> Self { + Self { log, nexus } + } + + #[inline] + pub fn log(&self) -> &Logger { + &self.log + } + + #[inline] + pub fn authz(&self) -> &Arc { + self.nexus.authz() + } + + #[inline] + pub fn nexus(&self) -> &NexusContext { + &self.nexus + } + + #[inline] + pub fn datastore(&self) -> &DataStoreContext { + self.nexus.datastore() + } +} + +pub struct NexusContext { + nexus: Arc, +} + +impl NexusContext { + pub fn new(nexus: Arc) -> Self { + Self { nexus } + } + + #[inline] + pub fn authz(&self) -> &Arc { + self.nexus.authz() + } + + #[inline] + pub fn datastore(&self) -> &DataStoreContext { + self.nexus.datastore() + } + + #[inline] + pub fn background_tasks(&self) -> &BackgroundTasks { + self.nexus.background_tasks() + } + + /// Attempt to delete all of the Dendrite NAT configuration for the + /// instance identified by `authz_instance`. + pub async fn instance_delete_dpd_config( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + ) -> Result<(), Error> { + self.nexus.instance_delete_dpd_config(opctx, authz_instance).await + } +} + +#[async_trait::async_trait] +pub trait NexusInterface: Send + Sync + 'static { + fn authz(&self) -> &Arc; + fn datastore(&self) -> &DataStoreContext; + fn background_tasks(&self) -> &BackgroundTasks; + + async fn instance_delete_dpd_config( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + ) -> Result<(), Error>; +} + +pub fn op_context_for_saga_action( + sagactx: &steno::ActionContext, + serialized_authn: &authn::saga::Serialized, +) -> OpContext +where + T: steno::SagaType>, +{ + let osagactx = sagactx.user_data(); + let nexus = osagactx.nexus(); + let datastore = Arc::clone(nexus.datastore().as_storage()); + + // TODO-debugging This would be a good place to put the saga name, but + // we don't have it available here. This log maybe should come from + // steno, prepopulated with useful metadata similar to the way + let log = osagactx.log().new(o!( + "saga_node" => sagactx.node_label() + )); + + OpContext::new( + &log, + || { + let authn = Arc::new(serialized_authn.to_authn()); + let authz = authz::Context::new( + Arc::clone(&authn), + Arc::clone(&osagactx.authz()), + datastore, + ); + Ok::<_, std::convert::Infallible>((authn, authz)) + }, + |metadata| { + metadata.insert(String::from("saga_node"), sagactx.node_label()); + }, + OpKind::Saga, + ) + .expect("infallible") +} diff --git a/nexus/saga-tests/Cargo.toml b/nexus/saga-tests/Cargo.toml new file mode 100644 index 00000000000..88c63f93708 --- /dev/null +++ b/nexus/saga-tests/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "nexus-saga-tests" +version = "0.1.0" +edition = "2021" +license = "MPL-2.0" + +[lints] +workspace = true + +[dependencies] +async-bb8-diesel.workspace = true +diesel.workspace = true +futures.workspace = true +nexus-db-lookup.workspace = true +nexus-db-model.workspace = true +nexus-db-queries.workspace = true +nexus-db-schema.workspace = true +nexus-saga-interface.workspace = true +nexus-sagas.workspace = true +nexus-test-utils.workspace = true +nexus-types.workspace = true +omicron-common.workspace = true +omicron-nexus.workspace = true +omicron-test-utils.workspace = true +omicron-uuid-kinds.workspace = true +omicron-workspace-hack.workspace = true +sled-agent-client.workspace = true +slog.workspace = true +steno.workspace = true + +[dev-dependencies] +dropshot.workspace = true +omicron-sled-agent.workspace = true +nexus-test-utils-macros.workspace = true +tokio.workspace = true +uuid.workspace = true diff --git a/nexus/saga-tests/src/lib.rs b/nexus/saga-tests/src/lib.rs new file mode 100644 index 00000000000..18a4abcdc6b --- /dev/null +++ b/nexus/saga-tests/src/lib.rs @@ -0,0 +1,3 @@ +mod test_helpers; + +pub use test_helpers::*; diff --git a/nexus/saga-tests/src/test_helpers.rs b/nexus/saga-tests/src/test_helpers.rs new file mode 100644 index 00000000000..5c7f7f3aa63 --- /dev/null +++ b/nexus/saga-tests/src/test_helpers.rs @@ -0,0 +1,778 @@ +//! Helper functions for writing saga undo tests and working with instances in +//! saga tests. + +use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection}; +use diesel::{ + BoolExpressionMethods, ExpressionMethods, QueryDsl, SelectableHelper, +}; +use futures::future::BoxFuture; +use nexus_db_lookup::LookupPath; +use nexus_db_model::InstanceState; +use nexus_db_queries::{ + authz, + context::OpContext, + db::{ + DataStore, + datastore::{InstanceAndActiveVmm, InstanceGestalt}, + }, +}; +use nexus_saga_interface::{NexusSaga2, create_saga_dag}; +use nexus_sagas::sagas::{ + instance_common::VmmAndSledIds, instance_start::InstanceStartReason, +}; +use nexus_types::identity::Resource; +use omicron_common::api::external::Error; +use omicron_common::api::external::NameOrId; +use omicron_nexus::Nexus; +use omicron_test_utils::dev::poll; +use omicron_uuid_kinds::{GenericUuid, InstanceUuid, PropolisUuid, SledUuid}; +use sled_agent_client::TestInterfaces as _; +use slog::{Logger, error, info, o, warn}; +use std::{num::NonZeroU32, sync::Arc, time::Duration}; +use steno::SagaDag; + +type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + +pub fn test_opctx(cptestctx: &ControlPlaneTestContext) -> OpContext { + OpContext::for_tests( + cptestctx.logctx.log.new(o!()), + cptestctx.server.server_context().nexus.datastore().clone(), + ) +} + +pub async fn instance_start( + cptestctx: &ControlPlaneTestContext, + id: &InstanceUuid, +) { + let nexus = &cptestctx.server.server_context().nexus; + let opctx = test_opctx(&cptestctx); + let instance_selector = + nexus_types::external_api::params::InstanceSelector { + project: None, + instance: NameOrId::from(id.into_untyped_uuid()), + }; + + let instance_lookup = + nexus.instance_lookup(&opctx, instance_selector).unwrap(); + nexus + .instance_start(&opctx, &instance_lookup, InstanceStartReason::User) + .await + .expect("Failed to start instance"); +} + +pub async fn instance_stop( + cptestctx: &ControlPlaneTestContext, + id: &InstanceUuid, +) { + let nexus = &cptestctx.server.server_context().nexus; + let opctx = test_opctx(&cptestctx); + let instance_selector = + nexus_types::external_api::params::InstanceSelector { + project: None, + instance: NameOrId::from(id.into_untyped_uuid()), + }; + + let instance_lookup = + nexus.instance_lookup(&opctx, instance_selector).unwrap(); + nexus + .instance_stop(&opctx, &instance_lookup) + .await + .expect("Failed to stop instance"); +} + +pub async fn instance_stop_by_name( + cptestctx: &ControlPlaneTestContext, + name: &str, + project_name: &str, +) { + let nexus = &cptestctx.server.server_context().nexus; + let opctx = test_opctx(&cptestctx); + let instance_selector = + nexus_types::external_api::params::InstanceSelector { + project: Some(project_name.to_string().try_into().unwrap()), + instance: name.to_string().try_into().unwrap(), + }; + + let instance_lookup = + nexus.instance_lookup(&opctx, instance_selector).unwrap(); + nexus + .instance_stop(&opctx, &instance_lookup) + .await + .expect("Failed to stop instance"); +} + +pub async fn instance_delete_by_name( + cptestctx: &ControlPlaneTestContext, + name: &str, + project_name: &str, +) { + let nexus = &cptestctx.server.server_context().nexus; + let opctx = test_opctx(&cptestctx); + let instance_selector = + nexus_types::external_api::params::InstanceSelector { + project: Some(project_name.to_string().try_into().unwrap()), + instance: name.to_string().try_into().unwrap(), + }; + + let instance_lookup = + nexus.instance_lookup(&opctx, instance_selector).unwrap(); + nexus + .project_destroy_instance(&opctx, &instance_lookup) + .await + .expect("Failed to destroy instance"); +} + +pub async fn instance_simulate( + cptestctx: &ControlPlaneTestContext, + instance_id: &InstanceUuid, +) { + info!(&cptestctx.logctx.log, "Poking simulated instance"; + "instance_id" => %instance_id); + let nexus = &cptestctx.server.server_context().nexus; + let VmmAndSledIds { vmm_id, sled_id } = + instance_fetch_vmm_and_sled_ids(cptestctx, instance_id).await; + let sa = nexus + .sled_client(&sled_id) + .await + .expect("instance must be on a sled to simulate a state change"); + + sa.vmm_finish_transition(vmm_id).await; +} + +pub async fn instance_single_step_on_sled( + cptestctx: &ControlPlaneTestContext, + instance_id: &InstanceUuid, + sled_id: &SledUuid, +) { + info!( + &cptestctx.logctx.log, + "Single-stepping simulated instance on sled"; + "instance_id" => %instance_id, + "sled_id" => %sled_id, + ); + let nexus = &cptestctx.server.server_context().nexus; + let VmmAndSledIds { vmm_id, sled_id } = + instance_fetch_vmm_and_sled_ids(cptestctx, instance_id).await; + let sa = nexus + .sled_client(&sled_id) + .await + .expect("instance must be on a sled to simulate a state change"); + + sa.vmm_single_step(vmm_id).await; +} + +pub async fn instance_simulate_by_name( + cptestctx: &ControlPlaneTestContext, + name: &str, + project_name: &str, +) { + info!(&cptestctx.logctx.log, "Poking simulated instance"; + "instance_name" => %name, + "project_name" => %project_name); + + let nexus = &cptestctx.server.server_context().nexus; + let opctx = test_opctx(&cptestctx); + let instance_selector = + nexus_types::external_api::params::InstanceSelector { + project: Some(project_name.to_string().try_into().unwrap()), + instance: name.to_string().try_into().unwrap(), + }; + + let instance_lookup = + nexus.instance_lookup(&opctx, instance_selector).unwrap(); + let (.., instance) = instance_lookup.fetch().await.unwrap(); + let instance_id = InstanceUuid::from_untyped_uuid(instance.id()); + let VmmAndSledIds { vmm_id, sled_id } = + instance_fetch_vmm_and_sled_ids(cptestctx, &instance_id).await; + let sa = nexus + .sled_client(&sled_id) + .await + .expect("instance must be on a sled to simulate a state change"); + sa.vmm_finish_transition(vmm_id).await; +} + +pub async fn instance_fetch( + cptestctx: &ControlPlaneTestContext, + instance_id: InstanceUuid, +) -> InstanceAndActiveVmm { + let datastore = cptestctx.server.server_context().nexus.datastore().clone(); + let opctx = test_opctx(&cptestctx); + let (.., authz_instance) = LookupPath::new(&opctx, &datastore) + .instance_id(instance_id.into_untyped_uuid()) + .lookup_for(authz::Action::Read) + .await + .expect("test instance should be present in datastore"); + + let db_state = datastore + .instance_fetch_with_vmm(&opctx, &authz_instance) + .await + .expect("test instance's info should be fetchable"); + + info!(&cptestctx.logctx.log, "refetched instance info from db"; + "instance_id" => %instance_id, + "instance_and_vmm" => ?db_state); + + db_state +} + +pub(super) async fn instance_fetch_vmm_and_sled_ids( + cptestctx: &ControlPlaneTestContext, + instance_id: &InstanceUuid, +) -> VmmAndSledIds { + let instance_and_vmm = instance_fetch(cptestctx, *instance_id).await; + let vmm = instance_and_vmm + .vmm() + .as_ref() + .expect("can only fetch VMM and sled IDs for an active instance"); + + let vmm_id = PropolisUuid::from_untyped_uuid(vmm.id); + let sled_id = SledUuid::from_untyped_uuid(vmm.sled_id); + VmmAndSledIds { vmm_id, sled_id } +} + +pub async fn instance_fetch_all( + cptestctx: &ControlPlaneTestContext, + instance_id: InstanceUuid, +) -> InstanceGestalt { + let datastore = cptestctx.server.server_context().nexus.datastore().clone(); + let opctx = test_opctx(&cptestctx); + let (.., authz_instance) = LookupPath::new(&opctx, &datastore) + .instance_id(instance_id.into_untyped_uuid()) + .lookup_for(authz::Action::Read) + .await + .expect("test instance should be present in datastore"); + + let db_state = datastore + .instance_fetch_all(&opctx, &authz_instance) + .await + .expect("test instance's info should be fetchable"); + + info!(&cptestctx.logctx.log, "refetched all instance info from db"; + "instance_id" => %instance_id, + "instance" => ?db_state.instance, + "active_vmm" => ?db_state.active_vmm, + "target_vmm" => ?db_state.target_vmm, + "migration" => ?db_state.migration, + ); + + db_state +} +pub async fn instance_fetch_by_name( + cptestctx: &ControlPlaneTestContext, + name: &str, + project_name: &str, +) -> InstanceAndActiveVmm { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(&cptestctx); + let instance_selector = + nexus_types::external_api::params::InstanceSelector { + project: Some(project_name.to_string().try_into().unwrap()), + instance: name.to_string().try_into().unwrap(), + }; + + let instance_lookup = + nexus.instance_lookup(&opctx, instance_selector).unwrap(); + let (_, _, authz_instance, ..) = instance_lookup.fetch().await.unwrap(); + + let db_state = datastore + .instance_fetch_with_vmm(&opctx, &authz_instance) + .await + .expect("test instance's info should be fetchable"); + + info!(&cptestctx.logctx.log, "refetched instance info from db"; + "instance_name" => name, + "project_name" => project_name, + "instance_id" => %authz_instance.id(), + "instance_and_vmm" => ?db_state, + ); + + db_state +} + +pub async fn instance_wait_for_state_by_name( + cptestctx: &ControlPlaneTestContext, + name: &str, + project_name: &str, + desired_state: InstanceState, +) -> InstanceAndActiveVmm { + let nexus = &cptestctx.server.server_context().nexus; + let opctx = test_opctx(&cptestctx); + let instance_selector = + nexus_types::external_api::params::InstanceSelector { + project: Some(project_name.to_string().try_into().unwrap()), + instance: name.to_string().try_into().unwrap(), + }; + + let instance_lookup = + nexus.instance_lookup(&opctx, instance_selector).unwrap(); + let (_, _, authz_instance, ..) = instance_lookup.fetch().await.unwrap(); + + instance_poll_state(cptestctx, &opctx, authz_instance, desired_state).await +} + +async fn instance_poll_state( + cptestctx: &ControlPlaneTestContext, + opctx: &OpContext, + authz_instance: authz::Instance, + desired_state: InstanceState, +) -> InstanceAndActiveVmm { + const MAX_WAIT: Duration = Duration::from_secs(120); + + let datastore = cptestctx.server.server_context().nexus.datastore(); + let log = &cptestctx.logctx.log; + let instance_id = authz_instance.id(); + + info!( + log, + "waiting for instance {instance_id} to transition to {desired_state}..."; + "instance_id" => %instance_id, + ); + let result = poll::wait_for_condition( + || async { + let db_state = datastore + .instance_fetch_with_vmm(&opctx, &authz_instance) + .await + .map_err(poll::CondCheckError::::Failed)?; + + if db_state.instance.runtime().nexus_state == desired_state { + info!( + log, + "instance {instance_id} transitioned to {desired_state}"; + "instance_id" => %instance_id, + "instance" => ?db_state.instance(), + "active_vmm" => ?db_state.vmm(), + ); + Ok(db_state) + } else { + info!( + log, + "instance {instance_id} has not yet transitioned to {desired_state}"; + "instance_id" => %instance_id, + "instance" => ?db_state.instance(), + "active_vmm" => ?db_state.vmm(), + ); + Err(poll::CondCheckError::::NotYet) + } + }, + &Duration::from_secs(1), + &MAX_WAIT, + ) + .await; + + match result { + Ok(i) => i, + Err(e) => panic!( + "instance {instance_id} did not transition to {desired_state} \ + after {MAX_WAIT:?}: {e}" + ), + } +} + +pub async fn no_virtual_provisioning_resource_records_exist( + cptestctx: &ControlPlaneTestContext, +) -> bool { + count_virtual_provisioning_resource_records(cptestctx).await == 0 +} + +pub async fn count_virtual_provisioning_resource_records( + cptestctx: &ControlPlaneTestContext, +) -> usize { + use nexus_db_queries::db::model::VirtualProvisioningResource; + use nexus_db_schema::schema::virtual_provisioning_resource::dsl; + + let datastore = cptestctx.server.server_context().nexus.datastore().clone(); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + datastore + .transaction_retry_wrapper("count_virtual_provisioning_resource_records") + .transaction(&conn, |conn| async move { + conn + .batch_execute_async(nexus_test_utils::db::ALLOW_FULL_TABLE_SCAN_SQL) + .await + .unwrap(); + + Ok( + dsl::virtual_provisioning_resource + .filter(dsl::resource_type.eq(nexus_db_queries::db::model::ResourceTypeProvisioned::Instance.to_string())) + .select(VirtualProvisioningResource::as_select()) + .get_results_async::(&conn) + .await + .unwrap() + .len() + ) + }).await.unwrap() +} + +pub async fn no_virtual_provisioning_collection_records_using_instances( + cptestctx: &ControlPlaneTestContext, +) -> bool { + count_virtual_provisioning_collection_records_using_instances(cptestctx) + .await + == 0 +} + +pub async fn count_virtual_provisioning_collection_records_using_instances( + cptestctx: &ControlPlaneTestContext, +) -> usize { + use nexus_db_queries::db::model::VirtualProvisioningCollection; + use nexus_db_schema::schema::virtual_provisioning_collection::dsl; + + let datastore = cptestctx.server.server_context().nexus.datastore().clone(); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + datastore + .transaction_retry_wrapper( + "count_virtual_provisioning_collection_records_using_instances", + ) + .transaction(&conn, |conn| async move { + conn.batch_execute_async( + nexus_test_utils::db::ALLOW_FULL_TABLE_SCAN_SQL, + ) + .await + .unwrap(); + Ok(dsl::virtual_provisioning_collection + .filter( + dsl::cpus_provisioned.ne(0).or(dsl::ram_provisioned.ne(0)), + ) + .select(VirtualProvisioningCollection::as_select()) + .get_results_async::(&conn) + .await + .unwrap() + .len()) + }) + .await + .unwrap() +} + +pub async fn no_sled_resource_vmm_records_exist( + cptestctx: &ControlPlaneTestContext, +) -> bool { + use nexus_db_queries::db::model::SledResourceVmm; + use nexus_db_schema::schema::sled_resource_vmm::dsl; + + let datastore = cptestctx.server.server_context().nexus.datastore(); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + datastore + .transaction_retry_wrapper("no_sled_resource_vmm_records_exist") + .transaction(&conn, |conn| async move { + conn.batch_execute_async( + nexus_test_utils::db::ALLOW_FULL_TABLE_SCAN_SQL, + ) + .await + .unwrap(); + + Ok(dsl::sled_resource_vmm + .select(SledResourceVmm::as_select()) + .get_results_async::(&conn) + .await + .unwrap() + .is_empty()) + }) + .await + .unwrap() +} + +pub async fn sled_resource_vmms_exist_for_vmm( + cptestctx: &ControlPlaneTestContext, + vmm_id: PropolisUuid, +) -> bool { + use nexus_db_queries::db::model::SledResourceVmm; + use nexus_db_schema::schema::sled_resource_vmm::dsl; + + let datastore = cptestctx.server.server_context().nexus.datastore(); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + let results = dsl::sled_resource_vmm + .filter(dsl::id.eq(vmm_id.into_untyped_uuid())) + .select(SledResourceVmm::as_select()) + .load_async(&*conn) + .await + .unwrap(); + info!( + cptestctx.logctx.log, + "queried sled reservation records for VMM"; + "vmm_id" => %vmm_id, + "results" => ?results, + ); + !results.is_empty() +} + +/// Tests that the saga described by `dag` succeeds if each of its nodes is +/// repeated. +/// +/// # Panics +/// +/// Asserts that a saga can be created from the supplied DAG and that it +/// succeeds when it is executed. +pub async fn actions_succeed_idempotently(nexus: &Arc, dag: SagaDag) { + let runnable_saga = nexus.sagas().saga_prepare(dag.clone()).await.unwrap(); + for node in dag.get_nodes() { + nexus + .sec() + .saga_inject_repeat( + runnable_saga.id(), + node.index(), + steno::RepeatInjected { + action: NonZeroU32::new(2).unwrap(), + undo: NonZeroU32::new(1).unwrap(), + }, + ) + .await + .unwrap(); + } + + runnable_saga + .run_to_completion() + .await + .expect("Saga should have started") + .into_omicron_result() + .expect("Saga should have succeeded"); +} + +/// Tests that a saga `S` functions properly when any of its nodes fails and +/// causes the saga to unwind by iterating over all saga nodes, creating a new +/// saga DAG for each node, injecting an error at the chosen node, and verifying +/// both that the saga failed and that the node at which the failure was +/// injected was the one that actually caused the saga to fail. This last check +/// ensures that all possible unwindings are executed. +/// +/// # Arguments +/// +/// - `nexus`: A reference to the Nexus that should execute the saga. +/// - `before_saga`: A function that runs before each execution of the saga +/// under test. This function returns the set of parameters to use for the +/// next saga execution. It may also set up other aspects of the test +/// environment needed to test the target saga (e.g. creating a test +/// instance). +/// - `after_saga`: A function that runs after each execution of the saga under +/// test. This function checks any post-saga invariants and cleans up any +/// objects that should be destroyed before the next test iteration. +/// - `log`: A logger to which the scaffold should log messages. +/// +/// # Panics +/// +/// This function asserts that each saga it executes (a) starts successfully, +/// (b) fails, and (c) fails at the specific node at which the function injected +/// a failure. +pub async fn action_failure_can_unwind<'a, S, B, A>( + nexus: &Arc, + before_saga: B, + after_saga: A, + log: &Logger, +) where + S: NexusSaga2, + B: Fn() -> BoxFuture<'a, S::Params>, + A: Fn() -> BoxFuture<'a, ()>, +{ + // Construct the failure index by hand (instead of iterating over a range) + // to avoid having to pre-construct a DAG for a saga of type S, which + // requires a separate `S::Params`. (Obtaining parameters from `before_saga` + // for this purpose may not be correct because that function may have side + // effects.) + let mut failure_index = 0; + let mut previous_node_count = None; + loop { + let params = before_saga().await; + let dag = create_saga_dag::(params).unwrap(); + let node_count = dag.get_nodes().count(); + + // Verify that the DAG is not empty and that, if this is not the first + // iteration, the node count has not changed between iterations (it + // might be a function of the generated parameters). + assert_ne!(node_count, 0); + if let Some(prev_count) = previous_node_count { + assert_eq!(prev_count, node_count); + } else { + previous_node_count = Some(node_count); + } + + let node = dag.get_nodes().nth(failure_index).unwrap(); + info!( + log, + "Creating new saga that will fail at index {:?}", node.index(); + "node_name" => node.name().as_ref(), + "label" => node.label() + ); + + let runnable_saga = + nexus.sagas().saga_prepare(dag.clone()).await.unwrap(); + + nexus + .sec() + .saga_inject_error(runnable_saga.id(), node.index()) + .await + .unwrap(); + + let saga_result = runnable_saga + .run_to_completion() + .await + .expect("saga should have started successfully") + .into_raw_result(); + + let saga_error = + saga_result.kind.expect_err("saga execution should have failed"); + + assert_eq!(saga_error.error_node_name, *node.name()); + + after_saga().await; + + failure_index += 1; + if failure_index >= node_count { + break; + } + } + + assert_no_failed_undo_steps(log, nexus.datastore()).await; +} + +/// Tests that saga `S` functions properly when any of its nodes fails and the +/// prior node's undo step is repeated during unwind. Like +/// `action_failure_can_unwind`, this routine creates a new DAG with new +/// parameters for each node and verifies that the saga failed at the expected +/// point. +/// +/// # Arguments +/// +/// - `nexus`: A reference to the Nexus that should execute the saga. +/// - `before_saga`: A function that runs before each execution of the saga +/// under test. This function returns the set of parameters to use for the +/// next saga execution. It may also set up other aspects of the test +/// environment needed to test the target saga (e.g. creating a test +/// instance). +/// - `after_saga`: A function that runs after each execution of the saga under +/// test. This function checks any post-saga invariants and cleans up any +/// objects that should be destroyed before the next test iteration. +/// - `log`: A logger to which the scaffold should log messages. +/// +/// # Panics +/// +/// This function asserts that each saga it executes (a) starts successfully, +/// (b) fails, and (c) fails at the specific node at which the function injected +/// a failure. +pub async fn action_failure_can_unwind_idempotently<'a, S, B, A>( + nexus: &Arc, + before_saga: B, + after_saga: A, + log: &Logger, +) where + S: NexusSaga2, + B: Fn() -> BoxFuture<'a, S::Params>, + A: Fn() -> BoxFuture<'a, ()>, +{ + // Construct the error index by hand (instead of iterating over a range) to + // avoid having to pre-construct a DAG for a saga of type S, which requires + // a separate `S::Params`. (Obtaining parameters from `before_saga` for this + // purpose may not be correct because that function may have side effects.) + // + // To test the effects of repeating an undo node, start injecting failures + // at the second node in the DAG so that there's always at least one + // preceding node whose undo step will run. + let mut error_index = 1; + let mut previous_node_count = None; + loop { + let params = before_saga().await; + let dag = create_saga_dag::(params).unwrap(); + let node_count = dag.get_nodes().count(); + + // Verify that the DAG's node count doesn't change between iterations. + // The DAG must have at least two nodes so that there's always a step + // preceding the error step. + if let Some(prev_count) = previous_node_count { + assert_eq!(prev_count, node_count); + } else { + if node_count < 2 { + warn!(log, "Saga has fewer than 2 nodes; nothing to undo"); + return; + } + + previous_node_count = Some(node_count); + } + + let undo_node = dag.get_nodes().nth(error_index - 1).unwrap(); + let error_node = dag.get_nodes().nth(error_index).unwrap(); + info!( + log, + "Creating new saga that will fail at index {:?}", error_node.index(); + "node_name" => error_node.name().as_ref(), + "label" => error_node.label(), + ); + + let runnable_saga = + nexus.sagas().saga_prepare(dag.clone()).await.unwrap(); + + nexus + .sec() + .saga_inject_error(runnable_saga.id(), error_node.index()) + .await + .unwrap(); + + nexus + .sec() + .saga_inject_repeat( + runnable_saga.id(), + undo_node.index(), + steno::RepeatInjected { + action: NonZeroU32::new(1).unwrap(), + undo: NonZeroU32::new(2).unwrap(), + }, + ) + .await + .unwrap(); + + let saga_error = runnable_saga + .run_to_completion() + .await + .expect("saga should have started successfully") + .into_raw_result() + .kind + .expect_err("saga execution should have failed"); + + assert_eq!(saga_error.error_node_name, *error_node.name()); + + after_saga().await; + + error_index += 1; + if error_index >= node_count { + break; + } + } + + assert_no_failed_undo_steps(log, nexus.datastore()).await; +} + +/// Asserts that there are no sagas in the supplied `datastore` for which an +/// undo step failed. +pub async fn assert_no_failed_undo_steps(log: &Logger, datastore: &DataStore) { + use nexus_db_queries::db::model::saga_types::SagaNodeEvent; + + let conn = datastore.pool_connection_for_tests().await.unwrap(); + let saga_node_events: Vec = datastore + .transaction_retry_wrapper("assert_no_failed_undo_steps") + .transaction(&conn, |conn| async move { + use nexus_db_schema::schema::saga_node_event::dsl; + + conn.batch_execute_async( + nexus_test_utils::db::ALLOW_FULL_TABLE_SCAN_SQL, + ) + .await + .unwrap(); + + Ok(dsl::saga_node_event + .filter(dsl::event_type.eq(String::from("undo_failed"))) + .select(SagaNodeEvent::as_select()) + .load_async::(&conn) + .await + .unwrap()) + }) + .await + .unwrap(); + + for saga_node_event in &saga_node_events { + error!(log, "saga {:?} is stuck!", saga_node_event.saga_id); + } + + assert!(saga_node_events.is_empty()); +} diff --git a/nexus/saga-tests/tests/config.test.toml b/nexus/saga-tests/tests/config.test.toml new file mode 120000 index 00000000000..52f00171fdb --- /dev/null +++ b/nexus/saga-tests/tests/config.test.toml @@ -0,0 +1 @@ +../../tests/config.test.toml \ No newline at end of file diff --git a/nexus/saga-tests/tests/integration/instance_create.rs b/nexus/saga-tests/tests/integration/instance_create.rs new file mode 100644 index 00000000000..785522ab64c --- /dev/null +++ b/nexus/saga-tests/tests/integration/instance_create.rs @@ -0,0 +1,111 @@ +use async_bb8_diesel::AsyncRunQueryDsl; +use diesel::prelude::*; +use nexus_db_queries::db::DataStore; +use omicron_sled_agent::sim::SledAgent; + +type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; +const DISK_NAME: &str = "my-disk"; + +pub(crate) async fn verify_clean_slate(cptestctx: &ControlPlaneTestContext) { + let sled_agent = cptestctx.first_sled_agent(); + let datastore = cptestctx.server.server_context().nexus.datastore(); + + // Check that no partial artifacts of instance creation exist + assert!(no_instance_records_exist(datastore).await); + assert!(no_network_interface_records_exist(datastore).await); + assert!(no_external_ip_records_exist(datastore).await); + assert!( + nexus_saga_tests::no_sled_resource_vmm_records_exist(cptestctx).await + ); + assert!( + nexus_saga_tests::no_virtual_provisioning_resource_records_exist( + cptestctx + ) + .await + ); + assert!( + nexus_saga_tests::no_virtual_provisioning_collection_records_using_instances( + cptestctx + ) + .await + ); + assert!(disk_is_detached(datastore).await); + assert!(no_instances_or_disks_on_sled(&sled_agent).await); + + let v2p_mappings = &*sled_agent.v2p_mappings.lock().unwrap(); + assert!(v2p_mappings.is_empty()); +} + +async fn no_instance_records_exist(datastore: &DataStore) -> bool { + use nexus_db_queries::db::model::Instance; + use nexus_db_schema::schema::instance::dsl; + + dsl::instance + .filter(dsl::time_deleted.is_null()) + .select(Instance::as_select()) + .first_async::( + &*datastore.pool_connection_for_tests().await.unwrap(), + ) + .await + .optional() + .unwrap() + .is_none() +} + +async fn no_network_interface_records_exist(datastore: &DataStore) -> bool { + use nexus_db_queries::db::model::NetworkInterface; + use nexus_db_queries::db::model::NetworkInterfaceKind; + use nexus_db_schema::schema::network_interface::dsl; + + dsl::network_interface + .filter(dsl::time_deleted.is_null()) + .filter(dsl::kind.eq(NetworkInterfaceKind::Instance)) + .select(NetworkInterface::as_select()) + .first_async::( + &*datastore.pool_connection_for_tests().await.unwrap(), + ) + .await + .optional() + .unwrap() + .is_none() +} + +async fn no_external_ip_records_exist(datastore: &DataStore) -> bool { + use nexus_db_queries::db::model::ExternalIp; + use nexus_db_schema::schema::external_ip::dsl; + + dsl::external_ip + .filter(dsl::time_deleted.is_null()) + .filter(dsl::is_service.eq(false)) + .select(ExternalIp::as_select()) + .first_async::( + &*datastore.pool_connection_for_tests().await.unwrap(), + ) + .await + .optional() + .unwrap() + .is_none() +} + +async fn disk_is_detached(datastore: &DataStore) -> bool { + use nexus_db_queries::db::model::Disk; + use nexus_db_schema::schema::disk::dsl; + + dsl::disk + .filter(dsl::time_deleted.is_null()) + .filter(dsl::name.eq(DISK_NAME)) + .select(Disk::as_select()) + .first_async::( + &*datastore.pool_connection_for_tests().await.unwrap(), + ) + .await + .unwrap() + .runtime_state + .disk_state + == "detached" +} + +async fn no_instances_or_disks_on_sled(sled_agent: &SledAgent) -> bool { + sled_agent.vmm_count().await == 0 && sled_agent.disk_count().await == 0 +} diff --git a/nexus/saga-tests/tests/integration/instance_delete.rs b/nexus/saga-tests/tests/integration/instance_delete.rs new file mode 100644 index 00000000000..46fa3045da0 --- /dev/null +++ b/nexus/saga-tests/tests/integration/instance_delete.rs @@ -0,0 +1,161 @@ +use crate::instance_create::verify_clean_slate; +use dropshot::test_util::ClientTestContext; +use nexus_db_lookup::LookupPath; +use nexus_db_queries::{authn::saga::Serialized, context::OpContext, db}; +use nexus_saga_interface::create_saga_dag; +use nexus_sagas::{ + sagas::instance_delete::Params, sagas::instance_delete::SagaInstanceDelete, +}; +use nexus_test_utils::resource_helpers::DiskTest; +use nexus_test_utils::resource_helpers::create_default_ip_pool; +use nexus_test_utils::resource_helpers::create_disk; +use nexus_test_utils::resource_helpers::create_project; +use nexus_test_utils_macros::nexus_test; +use nexus_types::{external_api::params, identity::Resource}; +use omicron_common::api::external::{ + ByteCount, IdentityMetadataCreateParams, InstanceCpuCount, +}; +use omicron_common::api::internal::shared::SwitchLocation; +use slog::o; +use std::collections::HashSet; +use uuid::Uuid; + +type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + +const INSTANCE_NAME: &str = "my-instance"; +const PROJECT_NAME: &str = "springfield-squidport"; +const DISK_NAME: &str = "my-disk"; + +async fn create_org_project_and_disk(client: &ClientTestContext) -> Uuid { + create_default_ip_pool(&client).await; + let project = create_project(client, PROJECT_NAME).await; + create_disk(&client, PROJECT_NAME, DISK_NAME).await; + project.identity.id +} + +async fn new_test_params( + cptestctx: &ControlPlaneTestContext, + instance_id: Uuid, +) -> Params { + let opctx = test_opctx(&cptestctx); + let datastore = cptestctx.server.server_context().nexus.datastore(); + + let (.., authz_instance, instance) = LookupPath::new(&opctx, datastore) + .instance_id(instance_id) + .fetch() + .await + .expect("Failed to lookup instance"); + Params { + serialized_authn: Serialized::for_opctx(&opctx), + authz_instance, + instance, + boundary_switches: HashSet::from([SwitchLocation::Switch0]), + } +} + +// Helper for creating instance create parameters +fn new_instance_create_params() -> params::InstanceCreate { + params::InstanceCreate { + identity: IdentityMetadataCreateParams { + name: INSTANCE_NAME.parse().unwrap(), + description: "My instance".to_string(), + }, + ncpus: InstanceCpuCount::try_from(2).unwrap(), + memory: ByteCount::from_gibibytes_u32(4), + hostname: "inst".parse().unwrap(), + user_data: vec![], + ssh_public_keys: Some(Vec::new()), + network_interfaces: params::InstanceNetworkInterfaceAttachment::Default, + external_ips: vec![params::ExternalIpCreate::Ephemeral { pool: None }], + boot_disk: Some(params::InstanceDiskAttachment::Attach( + params::InstanceDiskAttach { name: DISK_NAME.parse().unwrap() }, + )), + disks: Vec::new(), + start: false, + auto_restart_policy: Default::default(), + anti_affinity_groups: None, + } +} + +pub fn test_opctx(cptestctx: &ControlPlaneTestContext) -> OpContext { + OpContext::for_tests( + cptestctx.logctx.log.new(o!()), + cptestctx.server.server_context().nexus.datastore().clone(), + ) +} + +#[nexus_test(server = omicron_nexus::Server)] +async fn test_saga_basic_usage_succeeds(cptestctx: &ControlPlaneTestContext) { + DiskTest::new(cptestctx).await; + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.server_context().nexus; + create_org_project_and_disk(&client).await; + + // Build the saga DAG with the provided test parameters and run it. + let params = new_test_params( + &cptestctx, + create_instance(&cptestctx, new_instance_create_params()).await.id(), + ) + .await; + nexus + .sagas() + .saga_execute2::(params) + .await + .expect("Saga should have succeeded"); +} + +async fn create_instance( + cptestctx: &ControlPlaneTestContext, + params: params::InstanceCreate, +) -> db::model::Instance { + let nexus = &cptestctx.server.server_context().nexus; + let opctx = test_opctx(&cptestctx); + + let project_selector = params::ProjectSelector { + project: PROJECT_NAME.to_string().try_into().unwrap(), + }; + let project_lookup = + nexus.project_lookup(&opctx, project_selector).unwrap(); + + let instance_state = nexus + .project_create_instance(&opctx, &project_lookup, ¶ms) + .await + .unwrap(); + + let datastore = cptestctx.server.server_context().nexus.datastore().clone(); + let (.., db_instance) = LookupPath::new(&opctx, &datastore) + .instance_id(instance_state.instance().id()) + .fetch() + .await + .expect("test instance should be present in datastore"); + + db_instance +} + +#[nexus_test(server = omicron_nexus::Server)] +async fn test_actions_succeed_idempotently( + cptestctx: &ControlPlaneTestContext, +) { + DiskTest::new(cptestctx).await; + + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.server_context().nexus; + create_org_project_and_disk(&client).await; + + // Build the saga DAG with the provided test parameters + let dag = create_saga_dag::( + new_test_params( + &cptestctx, + create_instance(&cptestctx, new_instance_create_params()) + .await + .id(), + ) + .await, + ) + .unwrap(); + + nexus_saga_tests::actions_succeed_idempotently(nexus, dag).await; + + verify_clean_slate(&cptestctx).await; +} diff --git a/nexus/saga-tests/tests/integration/main.rs b/nexus/saga-tests/tests/integration/main.rs new file mode 100644 index 00000000000..a5d70d5ea0f --- /dev/null +++ b/nexus/saga-tests/tests/integration/main.rs @@ -0,0 +1,2 @@ +mod instance_create; +mod instance_delete; diff --git a/nexus/sagas/Cargo.toml b/nexus/sagas/Cargo.toml new file mode 100644 index 00000000000..e45a6dd9e3b --- /dev/null +++ b/nexus/sagas/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "nexus-sagas" +version = "0.1.0" +edition = "2021" +license = "MPL-2.0" + +[lints] +workspace = true + +[dependencies] +omicron-common.workspace = true +omicron-uuid-kinds.workspace = true +omicron-workspace-hack.workspace = true +nexus-auth.workspace = true +nexus-db-lookup.workspace = true +nexus-db-model.workspace = true +# Note: this crate does not depend on nexus-db-queries so that the two crates +# can be built in parallel. +nexus-saga-interface.workspace = true +serde.workspace = true +steno.workspace = true +uuid.workspace = true + +# Note: integration tests for sagas live in nexus/saga-tests. This is so that +# smaller unit tests don't have to pay the cost of building all of Nexus. diff --git a/nexus/sagas/src/lib.rs b/nexus/sagas/src/lib.rs new file mode 100644 index 00000000000..c6dd99dae52 --- /dev/null +++ b/nexus/sagas/src/lib.rs @@ -0,0 +1 @@ +pub mod sagas; diff --git a/nexus/sagas/src/sagas/disk_delete.rs b/nexus/sagas/src/sagas/disk_delete.rs new file mode 100644 index 00000000000..e69de29bb2d diff --git a/nexus/sagas/src/sagas/instance_common.rs b/nexus/sagas/src/sagas/instance_common.rs new file mode 100644 index 00000000000..8855fe34b2c --- /dev/null +++ b/nexus/sagas/src/sagas/instance_common.rs @@ -0,0 +1,8 @@ +use omicron_uuid_kinds::{PropolisUuid, SledUuid}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct VmmAndSledIds { + pub vmm_id: PropolisUuid, + pub sled_id: SledUuid, +} diff --git a/nexus/sagas/src/sagas/instance_delete.rs b/nexus/sagas/src/sagas/instance_delete.rs new file mode 100644 index 00000000000..f750cc34eda --- /dev/null +++ b/nexus/sagas/src/sagas/instance_delete.rs @@ -0,0 +1,150 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use std::collections::HashSet; + +use nexus_auth::authn; +use nexus_auth::authz; +use nexus_db_lookup::LookupPath; +use nexus_saga_interface::NexusAction; +use nexus_saga_interface::NexusActionContext; +use nexus_saga_interface::NexusSaga2; +use nexus_saga_interface::SagaInitError; +use nexus_saga_interface::declare_saga_actions; +use nexus_saga_interface::op_context_for_saga_action; +use omicron_common::api::internal::shared::SwitchLocation; +use serde::Deserialize; +use serde::Serialize; +use steno::ActionError; + +// instance delete saga: input parameters + +#[derive(Debug, Deserialize, Serialize)] +pub struct Params { + pub serialized_authn: authn::saga::Serialized, + pub authz_instance: authz::Instance, + pub instance: nexus_db_model::Instance, + pub boundary_switches: HashSet, +} + +// instance delete saga: actions + +declare_saga_actions! { + instance_delete; + + INSTANCE_DELETE_RECORD -> "no_result1" { + + sid_delete_instance_record + } + DELETE_NETWORK_INTERFACES -> "no_result2" { + + sid_delete_network_interfaces + } + DEALLOCATE_EXTERNAL_IP -> "no_result3" { + + sid_deallocate_external_ip + } + INSTANCE_DELETE_NAT -> "no_result4" { + + sid_delete_nat + } +} + +// instance delete saga: definition + +#[derive(Debug)] +pub struct SagaInstanceDelete; +impl NexusSaga2 for SagaInstanceDelete { + const NAME: &'static str = "instance-delete"; + type Params = Params; + + fn actions() -> Vec { + instance_delete_list_actions() + } + + fn make_saga_dag( + _params: &Self::Params, + mut builder: steno::DagBuilder, + ) -> Result { + builder.append(instance_delete_nat_action()); + builder.append(instance_delete_record_action()); + builder.append(delete_network_interfaces_action()); + builder.append(deallocate_external_ip_action()); + Ok(builder.build()?) + } +} + +// instance delete saga: action implementations + +async fn sid_delete_instance_record( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = op_context_for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .project_delete_instance(&opctx, ¶ms.authz_instance) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + +async fn sid_delete_network_interfaces( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let nexus = osagactx.nexus(); + let params = sagactx.saga_params::()?; + let opctx = op_context_for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .instance_delete_all_network_interfaces(&opctx, ¶ms.authz_instance) + .await + .map_err(ActionError::action_failed)?; + let background_tasks = nexus.background_tasks(); + background_tasks.activate(&background_tasks.task_v2p_manager); + Ok(()) +} + +async fn sid_delete_nat( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let params = sagactx.saga_params::()?; + let instance_id = params.authz_instance.id(); + let osagactx = sagactx.user_data(); + let opctx = op_context_for_saga_action(&sagactx, ¶ms.serialized_authn); + + let (.., authz_instance) = LookupPath::new(&opctx, osagactx.datastore()) + .instance_id(instance_id) + .lookup_for(authz::Action::Modify) + .await + .map_err(ActionError::action_failed)?; + + osagactx + .nexus() + .instance_delete_dpd_config(&opctx, &authz_instance) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn sid_deallocate_external_ip( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = op_context_for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .deallocate_external_ip_by_instance_id( + &opctx, + params.authz_instance.id(), + ) + .await + .map_err(ActionError::action_failed)?; + osagactx + .datastore() + .detach_floating_ips_by_instance_id(&opctx, params.authz_instance.id()) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} diff --git a/nexus/sagas/src/sagas/instance_start.rs b/nexus/sagas/src/sagas/instance_start.rs new file mode 100644 index 00000000000..965000729c9 --- /dev/null +++ b/nexus/sagas/src/sagas/instance_start.rs @@ -0,0 +1,33 @@ +use nexus_auth::authn; +use serde::{Deserialize, Serialize}; + +/// Parameters to the instance start saga. +#[derive(Debug, Deserialize, Serialize)] +pub struct Params { + pub db_instance: nexus_db_model::Instance, + + /// Authentication context to use to fetch the instance's current state from + /// the database. + pub serialized_authn: authn::saga::Serialized, + + /// Why is this instance being started? + pub reason: InstanceStartReason, +} + +/// Reasons an instance may be started. +/// +/// Currently, this is primarily used to determine whether the instance's +/// auto-restart timestamp must be updated. It's also included in log messages +/// in the start saga. +#[derive(Copy, Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub enum InstanceStartReason { + /// The instance was automatically started upon being created. + AutoStart, + /// The instance was started by a user action. + User, + /// The instance has failed and is being automatically restarted by the + /// control plane. + AutoRestart, +} + +// TODO: move the actual saga over. diff --git a/nexus/sagas/src/sagas/mod.rs b/nexus/sagas/src/sagas/mod.rs new file mode 100644 index 00000000000..77d4c190637 --- /dev/null +++ b/nexus/sagas/src/sagas/mod.rs @@ -0,0 +1,3 @@ +pub mod instance_common; +pub mod instance_delete; +pub mod instance_start; diff --git a/nexus/src/app/background/tasks/instance_reincarnation.rs b/nexus/src/app/background/tasks/instance_reincarnation.rs index 68c411d6e74..6a9c67f316d 100644 --- a/nexus/src/app/background/tasks/instance_reincarnation.rs +++ b/nexus/src/app/background/tasks/instance_reincarnation.rs @@ -188,10 +188,10 @@ impl InstanceReincarnation { let running_saga = async { let dag = instance_start::SagaInstanceStart::prepare( - &instance_start::Params { + &nexus_sagas::sagas::instance_start::Params { db_instance, serialized_authn: serialized_authn.clone(), - reason: instance_start::Reason::AutoRestart, + reason: nexus_sagas::sagas::instance_start::InstanceStartReason::AutoRestart, }, )?; self.sagas.saga_run(dag).await diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index bcaa81d899e..1bafb247e84 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -35,6 +35,7 @@ use nexus_db_queries::db::DataStore; use nexus_db_queries::db::datastore::InstanceAndActiveVmm; use nexus_db_queries::db::datastore::InstanceStateComputer; use nexus_db_queries::db::identity::Resource; +use nexus_sagas::sagas::instance_start::InstanceStartReason; use nexus_types::external_api::views; use omicron_common::api::external::ByteCount; use omicron_common::api::external::CreateResult; @@ -62,7 +63,6 @@ use propolis_client::support::tungstenite::Message as WebSocketMessage; use propolis_client::support::tungstenite::protocol::CloseFrame; use propolis_client::support::tungstenite::protocol::frame::coding::CloseCode; use sagas::instance_common::ExternalIpAttach; -use sagas::instance_start; use sagas::instance_update; use sled_agent_client::types::InstanceBootSettings; use sled_agent_client::types::InstanceMigrationTargetParams; @@ -390,7 +390,7 @@ impl super::Nexus { .await } - pub(crate) async fn project_create_instance( + pub async fn project_create_instance( self: &Arc, opctx: &OpContext, project_lookup: &lookup::Project<'_>, @@ -528,11 +528,7 @@ impl super::Nexus { .instance_id(instance_id); let start_result = self - .instance_start( - opctx, - &lookup, - instance_start::Reason::AutoStart, - ) + .instance_start(opctx, &lookup, InstanceStartReason::AutoStart) .await; if let Err(e) = start_result { info!(self.log, "failed to start newly-created instance"; @@ -575,7 +571,7 @@ impl super::Nexus { // This operation may only occur on stopped instances, which implies that // the attached disks do not have any running "upstairs" process running // within the sled. - pub(crate) async fn project_destroy_instance( + pub async fn project_destroy_instance( self: &Arc, opctx: &OpContext, instance_lookup: &lookup::Instance<'_>, @@ -594,14 +590,14 @@ impl super::Nexus { let boundary_switches = self.boundary_switches(&self.opctx_alloc).await?; - let saga_params = sagas::instance_delete::Params { + let saga_params = nexus_sagas::sagas::instance_delete::Params { serialized_authn: authn::saga::Serialized::for_opctx(opctx), authz_instance, instance, boundary_switches, }; self.sagas - .saga_execute::( + .saga_execute::( saga_params, ) .await?; @@ -710,11 +706,11 @@ impl super::Nexus { } /// Attempts to start an instance if it is currently stopped. - pub(crate) async fn instance_start( + pub async fn instance_start( self: &Arc, opctx: &OpContext, instance_lookup: &lookup::Instance<'_>, - reason: instance_start::Reason, + reason: InstanceStartReason, ) -> Result { let (.., authz_instance) = instance_lookup.lookup_for(authz::Action::Modify).await?; @@ -727,7 +723,7 @@ impl super::Nexus { match instance_start_allowed(&self.log, &state, reason)? { InstanceStartDisposition::AlreadyStarted => Ok(state), InstanceStartDisposition::Start => { - let saga_params = sagas::instance_start::Params { + let saga_params = nexus_sagas::sagas::instance_start::Params { serialized_authn: authn::saga::Serialized::for_opctx(opctx), db_instance: state.instance().clone(), reason, @@ -748,7 +744,7 @@ impl super::Nexus { } /// Make sure the given Instance is stopped. - pub(crate) async fn instance_stop( + pub async fn instance_stop( &self, opctx: &OpContext, instance_lookup: &lookup::Instance<'_>, @@ -2274,7 +2270,7 @@ fn check_instance_cpu_memory_sizes( fn instance_start_allowed( log: &slog::Logger, state: &InstanceAndActiveVmm, - reason: instance_start::Reason, + reason: InstanceStartReason, ) -> Result { let (instance, vmm) = (state.instance(), state.vmm()); @@ -2521,7 +2517,7 @@ mod tests { instance_start_allowed( &logctx.log, &state, - instance_start::Reason::User + InstanceStartReason::User ) .is_ok() ); @@ -2542,7 +2538,7 @@ mod tests { instance_start_allowed( &logctx.log, &state, - instance_start::Reason::User + InstanceStartReason::User ) .is_ok() ); @@ -2560,7 +2556,7 @@ mod tests { instance_start_allowed( &logctx.log, &state, - instance_start::Reason::User + InstanceStartReason::User ) .is_err() ); @@ -2580,7 +2576,7 @@ mod tests { instance_start_allowed( &logctx.log, &state, - instance_start::Reason::User + InstanceStartReason::User ) .is_ok() ); @@ -2592,7 +2588,7 @@ mod tests { instance_start_allowed( &logctx.log, &state, - instance_start::Reason::User + InstanceStartReason::User ) .is_ok() ); @@ -2604,7 +2600,7 @@ mod tests { instance_start_allowed( &logctx.log, &state, - instance_start::Reason::User + InstanceStartReason::User ) .is_ok() ); @@ -2615,7 +2611,7 @@ mod tests { instance_start_allowed( &logctx.log, &state, - instance_start::Reason::User + InstanceStartReason::User ) .is_ok() ); diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index ad79b2b5dda..7bc3f001cea 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -544,6 +544,11 @@ impl Nexus { &self.authz } + /// For testing only: provides a reference to the saga executor. + pub fn sagas(&self) -> &Arc { + &self.sagas + } + pub(crate) async fn wait_for_populate(&self) -> Result<(), anyhow::Error> { let mut my_rx = self.populate_status.clone(); loop { @@ -584,6 +589,10 @@ impl Nexus { Some(rustls_cfg) } + pub(crate) fn background_tasks(&self) -> &BackgroundTasks { + &self.background_tasks + } + // Called to trigger inventory collection. pub(crate) fn activate_inventory_collection(&self) { self.background_tasks diff --git a/nexus/src/app/saga.rs b/nexus/src/app/saga.rs index 1d1b77b020e..9e8a9242285 100644 --- a/nexus/src/app/saga.rs +++ b/nexus/src/app/saga.rs @@ -58,6 +58,7 @@ use futures::StreamExt; use futures::future::BoxFuture; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; +use nexus_saga_interface::NexusSaga2; use nexus_types::internal_api::views::DemoSaga; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; @@ -153,7 +154,7 @@ impl StartSaga for SagaExecutor { /// Handle to a self-contained subsystem for kicking off sagas /// /// See the module-level documentation for details. -pub(crate) struct SagaExecutor { +pub struct SagaExecutor { sec_client: Arc, log: slog::Logger, nexus: OnceLock>, @@ -222,7 +223,7 @@ impl SagaExecutor { /// execute sagas either from API calls and background tasks, neither of /// which can be cancelled. **This function should not be used in a /// `tokio::select!` with a `timeout` or the like.** - pub(crate) async fn saga_prepare( + pub async fn saga_prepare( &self, dag: SagaDag, ) -> Result { @@ -326,6 +327,17 @@ impl SagaExecutor { let stopped_saga = running_saga.wait_until_stopped().await; stopped_saga.into_omicron_result() } + + pub async fn saga_execute2( + &self, + params: N::Params, + ) -> Result { + let dag = nexus_saga_interface::create_saga_dag::(params)?; + let runnable_saga = self.saga_prepare(dag).await?; + let running_saga = runnable_saga.start().await?; + let stopped_saga = running_saga.wait_until_stopped().await; + stopped_saga.into_omicron_result() + } } /// Encapsulates a saga to be run before we actually start running it @@ -333,7 +345,7 @@ impl SagaExecutor { /// At this point, we've built the DAG, loaded it into the SEC, etc. but haven't /// started it running. This is a useful point to inject errors, inspect the /// DAG, etc. -pub(crate) struct RunnableSaga { +pub struct RunnableSaga { id: SagaId, saga_completion_future: BoxFuture<'static, SagaResult>, log: slog::Logger, @@ -341,7 +353,7 @@ pub(crate) struct RunnableSaga { } impl RunnableSaga { - pub(crate) fn id(&self) -> SagaId { + pub fn id(&self) -> SagaId { self.id } @@ -367,10 +379,7 @@ impl RunnableSaga { /// Start the saga running and wait for it to complete. /// /// This is a shorthand for `start().await?.wait_until_stopped().await`. - // There is no reason this needs to be limited to tests, but it's only used - // by the tests today. - #[cfg(test)] - pub(crate) async fn run_to_completion(self) -> Result { + pub async fn run_to_completion(self) -> Result { Ok(self.start().await?.wait_until_stopped().await) } } @@ -407,7 +416,7 @@ impl RunningSaga { } /// Describes a saga that's finished -pub(crate) struct StoppedSaga { +pub struct StoppedSaga { id: SagaId, result: SagaResult, log: slog::Logger, @@ -415,13 +424,13 @@ pub(crate) struct StoppedSaga { impl StoppedSaga { /// Fetches the raw Steno result for the saga's execution - pub(crate) fn into_raw_result(self) -> SagaResult { + pub fn into_raw_result(self) -> SagaResult { self.result } /// Interprets the result of saga execution as a `Result` whose error type /// is `Error`. - pub(crate) fn into_omicron_result(self) -> Result { + pub fn into_omicron_result(self) -> Result { self.result.kind.map_err(|saga_error| { let mut error = saga_error .error_source @@ -497,8 +506,7 @@ impl super::Nexus { /// For testing only: provides direct access to the underlying SecClient so /// that tests can inject errors - #[cfg(test)] - pub(crate) fn sec(&self) -> &steno::SecClient { + pub fn sec(&self) -> &steno::SecClient { &self.sagas.sec_client } diff --git a/nexus/src/app/sagas/instance_delete.rs b/nexus/src/app/sagas/instance_delete.rs deleted file mode 100644 index 2056a14784c..00000000000 --- a/nexus/src/app/sagas/instance_delete.rs +++ /dev/null @@ -1,334 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. - -use std::collections::HashSet; - -use super::ActionRegistry; -use super::NexusActionContext; -use super::NexusSaga; -use crate::app::sagas::declare_saga_actions; -use nexus_db_lookup::LookupPath; -use nexus_db_queries::{authn, authz, db}; -use omicron_common::api::internal::shared::SwitchLocation; -use serde::Deserialize; -use serde::Serialize; -use steno::ActionError; - -// instance delete saga: input parameters - -#[derive(Debug, Deserialize, Serialize)] -pub struct Params { - pub serialized_authn: authn::saga::Serialized, - pub authz_instance: authz::Instance, - pub instance: db::model::Instance, - pub boundary_switches: HashSet, -} - -// instance delete saga: actions - -declare_saga_actions! { - instance_delete; - - INSTANCE_DELETE_RECORD -> "no_result1" { - + sid_delete_instance_record - } - DELETE_NETWORK_INTERFACES -> "no_result2" { - + sid_delete_network_interfaces - } - DEALLOCATE_EXTERNAL_IP -> "no_result3" { - + sid_deallocate_external_ip - } - INSTANCE_DELETE_NAT -> "no_result4" { - + sid_delete_nat - } -} - -// instance delete saga: definition - -#[derive(Debug)] -pub struct SagaInstanceDelete; -impl NexusSaga for SagaInstanceDelete { - const NAME: &'static str = "instance-delete"; - type Params = Params; - - fn register_actions(registry: &mut ActionRegistry) { - instance_delete_register_actions(registry); - } - - fn make_saga_dag( - _params: &Self::Params, - mut builder: steno::DagBuilder, - ) -> Result { - builder.append(instance_delete_nat_action()); - builder.append(instance_delete_record_action()); - builder.append(delete_network_interfaces_action()); - builder.append(deallocate_external_ip_action()); - Ok(builder.build()?) - } -} - -// instance delete saga: action implementations - -async fn sid_delete_instance_record( - sagactx: NexusActionContext, -) -> Result<(), ActionError> { - let osagactx = sagactx.user_data(); - let params = sagactx.saga_params::()?; - let opctx = crate::context::op_context_for_saga_action( - &sagactx, - ¶ms.serialized_authn, - ); - osagactx - .datastore() - .project_delete_instance(&opctx, ¶ms.authz_instance) - .await - .map_err(ActionError::action_failed)?; - Ok(()) -} - -async fn sid_delete_network_interfaces( - sagactx: NexusActionContext, -) -> Result<(), ActionError> { - let osagactx = sagactx.user_data(); - let nexus = osagactx.nexus(); - let params = sagactx.saga_params::()?; - let opctx = crate::context::op_context_for_saga_action( - &sagactx, - ¶ms.serialized_authn, - ); - osagactx - .datastore() - .instance_delete_all_network_interfaces(&opctx, ¶ms.authz_instance) - .await - .map_err(ActionError::action_failed)?; - nexus.background_tasks.activate(&nexus.background_tasks.task_v2p_manager); - Ok(()) -} - -async fn sid_delete_nat( - sagactx: NexusActionContext, -) -> Result<(), ActionError> { - let params = sagactx.saga_params::()?; - let instance_id = params.authz_instance.id(); - let osagactx = sagactx.user_data(); - let opctx = crate::context::op_context_for_saga_action( - &sagactx, - ¶ms.serialized_authn, - ); - - let (.., authz_instance) = LookupPath::new(&opctx, osagactx.datastore()) - .instance_id(instance_id) - .lookup_for(authz::Action::Modify) - .await - .map_err(ActionError::action_failed)?; - - osagactx - .nexus() - .instance_delete_dpd_config(&opctx, &authz_instance) - .await - .map_err(ActionError::action_failed)?; - - Ok(()) -} - -async fn sid_deallocate_external_ip( - sagactx: NexusActionContext, -) -> Result<(), ActionError> { - let osagactx = sagactx.user_data(); - let params = sagactx.saga_params::()?; - let opctx = crate::context::op_context_for_saga_action( - &sagactx, - ¶ms.serialized_authn, - ); - osagactx - .datastore() - .deallocate_external_ip_by_instance_id( - &opctx, - params.authz_instance.id(), - ) - .await - .map_err(ActionError::action_failed)?; - osagactx - .datastore() - .detach_floating_ips_by_instance_id(&opctx, params.authz_instance.id()) - .await - .map_err(ActionError::action_failed)?; - Ok(()) -} - -#[cfg(test)] -mod test { - use crate::{ - app::saga::create_saga_dag, - app::sagas::instance_create::test::verify_clean_slate, - app::sagas::instance_delete::Params, - app::sagas::instance_delete::SagaInstanceDelete, external_api::params, - }; - use dropshot::test_util::ClientTestContext; - use nexus_db_lookup::LookupPath; - use nexus_db_queries::{authn::saga::Serialized, context::OpContext, db}; - use nexus_test_utils::resource_helpers::DiskTest; - use nexus_test_utils::resource_helpers::create_default_ip_pool; - use nexus_test_utils::resource_helpers::create_disk; - use nexus_test_utils::resource_helpers::create_project; - use nexus_test_utils_macros::nexus_test; - use nexus_types::identity::Resource; - use omicron_common::api::external::{ - ByteCount, IdentityMetadataCreateParams, InstanceCpuCount, - }; - use omicron_common::api::internal::shared::SwitchLocation; - use std::collections::HashSet; - use uuid::Uuid; - - type ControlPlaneTestContext = - nexus_test_utils::ControlPlaneTestContext; - - const INSTANCE_NAME: &str = "my-instance"; - const PROJECT_NAME: &str = "springfield-squidport"; - const DISK_NAME: &str = "my-disk"; - - async fn create_org_project_and_disk(client: &ClientTestContext) -> Uuid { - create_default_ip_pool(&client).await; - let project = create_project(client, PROJECT_NAME).await; - create_disk(&client, PROJECT_NAME, DISK_NAME).await; - project.identity.id - } - - async fn new_test_params( - cptestctx: &ControlPlaneTestContext, - instance_id: Uuid, - ) -> Params { - let opctx = test_opctx(&cptestctx); - let datastore = cptestctx.server.server_context().nexus.datastore(); - - let (.., authz_instance, instance) = LookupPath::new(&opctx, datastore) - .instance_id(instance_id) - .fetch() - .await - .expect("Failed to lookup instance"); - Params { - serialized_authn: Serialized::for_opctx(&opctx), - authz_instance, - instance, - boundary_switches: HashSet::from([SwitchLocation::Switch0]), - } - } - - // Helper for creating instance create parameters - fn new_instance_create_params() -> params::InstanceCreate { - params::InstanceCreate { - identity: IdentityMetadataCreateParams { - name: INSTANCE_NAME.parse().unwrap(), - description: "My instance".to_string(), - }, - ncpus: InstanceCpuCount::try_from(2).unwrap(), - memory: ByteCount::from_gibibytes_u32(4), - hostname: "inst".parse().unwrap(), - user_data: vec![], - ssh_public_keys: Some(Vec::new()), - network_interfaces: - params::InstanceNetworkInterfaceAttachment::Default, - external_ips: vec![params::ExternalIpCreate::Ephemeral { - pool: None, - }], - boot_disk: Some(params::InstanceDiskAttachment::Attach( - params::InstanceDiskAttach { name: DISK_NAME.parse().unwrap() }, - )), - disks: Vec::new(), - start: false, - auto_restart_policy: Default::default(), - anti_affinity_groups: Vec::new(), - } - } - - pub fn test_opctx(cptestctx: &ControlPlaneTestContext) -> OpContext { - OpContext::for_tests( - cptestctx.logctx.log.new(o!()), - cptestctx.server.server_context().nexus.datastore().clone(), - ) - } - - #[nexus_test(server = crate::Server)] - async fn test_saga_basic_usage_succeeds( - cptestctx: &ControlPlaneTestContext, - ) { - DiskTest::new(cptestctx).await; - let client = &cptestctx.external_client; - let nexus = &cptestctx.server.server_context().nexus; - create_org_project_and_disk(&client).await; - - // Build the saga DAG with the provided test parameters and run it. - let params = new_test_params( - &cptestctx, - create_instance(&cptestctx, new_instance_create_params()) - .await - .id(), - ) - .await; - nexus - .sagas - .saga_execute::(params) - .await - .expect("Saga should have succeeded"); - } - - async fn create_instance( - cptestctx: &ControlPlaneTestContext, - params: params::InstanceCreate, - ) -> db::model::Instance { - let nexus = &cptestctx.server.server_context().nexus; - let opctx = test_opctx(&cptestctx); - - let project_selector = params::ProjectSelector { - project: PROJECT_NAME.to_string().try_into().unwrap(), - }; - let project_lookup = - nexus.project_lookup(&opctx, project_selector).unwrap(); - - let instance_state = nexus - .project_create_instance(&opctx, &project_lookup, ¶ms) - .await - .unwrap(); - - let datastore = - cptestctx.server.server_context().nexus.datastore().clone(); - let (.., db_instance) = LookupPath::new(&opctx, &datastore) - .instance_id(instance_state.instance().id()) - .fetch() - .await - .expect("test instance should be present in datastore"); - - db_instance - } - - #[nexus_test(server = crate::Server)] - async fn test_actions_succeed_idempotently( - cptestctx: &ControlPlaneTestContext, - ) { - DiskTest::new(cptestctx).await; - - let client = &cptestctx.external_client; - let nexus = &cptestctx.server.server_context().nexus; - create_org_project_and_disk(&client).await; - - // Build the saga DAG with the provided test parameters - let dag = create_saga_dag::( - new_test_params( - &cptestctx, - create_instance(&cptestctx, new_instance_create_params()) - .await - .id(), - ) - .await, - ) - .unwrap(); - - crate::app::sagas::test_helpers::actions_succeed_idempotently( - nexus, dag, - ) - .await; - - verify_clean_slate(&cptestctx).await; - } -} diff --git a/nexus/src/app/sagas/instance_start.rs b/nexus/src/app/sagas/instance_start.rs index 4045f55984a..a340fe16f80 100644 --- a/nexus/src/app/sagas/instance_start.rs +++ b/nexus/src/app/sagas/instance_start.rs @@ -18,42 +18,13 @@ use crate::app::sagas::declare_saga_actions; use chrono::Utc; use nexus_db_lookup::LookupPath; use nexus_db_queries::db::identity::Resource; -use nexus_db_queries::{authn, authz, db}; +use nexus_db_queries::{authz, db}; +use nexus_sagas::sagas::instance_start::{InstanceStartReason, Params}; use omicron_common::api::external::Error; use omicron_uuid_kinds::{GenericUuid, InstanceUuid, PropolisUuid, SledUuid}; -use serde::{Deserialize, Serialize}; use slog::info; use steno::ActionError; -/// Parameters to the instance start saga. -#[derive(Debug, Deserialize, Serialize)] -pub(crate) struct Params { - pub db_instance: db::model::Instance, - - /// Authentication context to use to fetch the instance's current state from - /// the database. - pub serialized_authn: authn::saga::Serialized, - - /// Why is this instance being started? - pub reason: Reason, -} - -/// Reasons an instance may be started. -/// -/// Currently, this is primarily used to determine whether the instance's -/// auto-restart timestamp must be updated. It's also included in log messages -/// in the start saga. -#[derive(Copy, Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] -pub(crate) enum Reason { - /// The instance was automatically started upon being created. - AutoStart, - /// The instance was started by a user action. - User, - /// The instance has failed and is being automatically restarted by the - /// control plane. - AutoRestart, -} - declare_saga_actions! { instance_start; @@ -351,11 +322,12 @@ async fn sis_move_to_starting( let new_runtime = { // If we are performing an automated restart of a Failed instance, // remember to update the timestamp. - let time_last_auto_restarted = if params.reason == Reason::AutoRestart { - Some(Utc::now()) - } else { - db_instance.runtime().time_last_auto_restarted - }; + let time_last_auto_restarted = + if params.reason == InstanceStartReason::AutoRestart { + Some(Utc::now()) + } else { + db_instance.runtime().time_last_auto_restarted + }; db::model::InstanceRuntimeState { nexus_state: db::model::InstanceState::Vmm, propolis_id: Some(propolis_id.into_untyped_uuid()), @@ -869,7 +841,7 @@ mod test { let params = Params { serialized_authn: authn::saga::Serialized::for_opctx(&opctx), db_instance, - reason: Reason::User, + reason: InstanceStartReason::User, }; nexus @@ -921,7 +893,7 @@ mod test { serialized_authn: authn::saga::Serialized::for_opctx(&opctx), db_instance, - reason: Reason::User, + reason: InstanceStartReason::User, } } }) @@ -968,7 +940,7 @@ mod test { let params = Params { serialized_authn: authn::saga::Serialized::for_opctx(&opctx), db_instance, - reason: Reason::User, + reason: InstanceStartReason::User, }; let dag = create_saga_dag::(params).unwrap(); @@ -1009,7 +981,7 @@ mod test { let params = Params { serialized_authn: authn::saga::Serialized::for_opctx(&opctx), db_instance, - reason: Reason::User, + reason: InstanceStartReason::User, }; let dag = create_saga_dag::(params).unwrap(); diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index c7d3298ccb5..f17b98fd917 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -10,6 +10,8 @@ // easier it will be to test, version, and update in deployed systems. use crate::saga_interface::SagaContext; +use nexus_saga_interface::NexusSaga2; +use nexus_saga_interface::SagaInitError; use std::sync::Arc; use std::sync::LazyLock; use steno::ActionContext; @@ -19,7 +21,6 @@ use steno::SagaDag; use steno::SagaName; use steno::SagaType; use steno::new_action_noop_undo; -use thiserror::Error; use uuid::Uuid; pub mod demo; @@ -30,7 +31,6 @@ pub mod image_create; pub mod image_delete; pub(crate) mod instance_common; pub mod instance_create; -pub mod instance_delete; pub mod instance_ip_attach; pub mod instance_ip_detach; pub mod instance_migrate; @@ -96,42 +96,59 @@ pub(crate) trait NexusSaga { } } -#[derive(Debug, Error)] -pub enum SagaInitError { - #[error("internal error building saga graph: {0:#}")] - DagBuildError(steno::DagBuilderError), +// Adapter for NexusSaga2 +impl NexusSaga for T { + const NAME: &'static str = T::NAME; - #[error("failed to serialize {0:?}: {1:#}")] - SerializeError(String, serde_json::Error), + type Params = T::Params; - #[error("invalid parameter: {0}")] - InvalidParameter(String), -} + fn register_actions(registry: &mut ActionRegistry) { + for action in T::actions() { + registry.register(map_action(action)); + } + } -impl From for SagaInitError { - fn from(error: steno::DagBuilderError) -> Self { - SagaInitError::DagBuildError(error) + fn make_saga_dag( + params: &Self::Params, + builder: steno::DagBuilder, + ) -> Result { + T::make_saga_dag(params, builder) } } -impl From for omicron_common::api::external::Error { - fn from(error: SagaInitError) -> Self { - match error { - SagaInitError::DagBuildError(_) - | SagaInitError::SerializeError(_, _) => { - // All of these errors reflect things that shouldn't be possible. - // They're basically bugs. - omicron_common::api::external::Error::internal_error(&format!( - "creating saga: {:#}", - error - )) - } +fn map_action(action: nexus_saga_interface::NexusAction) -> NexusAction { + Arc::new(ActionWrapper(action)) +} - SagaInitError::InvalidParameter(s) => { - omicron_common::api::external::Error::invalid_request(&s) - } - } +#[derive(Debug)] +struct ActionWrapper(nexus_saga_interface::NexusAction); + +impl steno::Action for ActionWrapper { + fn do_it( + &self, + sgctx: ActionContext, + ) -> futures::future::BoxFuture<'_, steno::ActionResult> { + self.0.do_it(map_action_context(sgctx)) + } + + fn undo_it( + &self, + sgctx: ActionContext, + ) -> futures::future::BoxFuture<'_, steno::UndoResult> { + self.0.undo_it(map_action_context(sgctx)) } + + fn name(&self) -> steno::ActionName { + self.0.name() + } +} + +fn map_action_context( + sgctx: ActionContext, +) -> ActionContext { + sgctx.map_user_data::<_, nexus_saga_interface::NexusSagaType>(|data| { + Arc::new(Arc::clone(data.context2())) + }) } pub(super) static ACTION_GENERATE_ID: LazyLock = @@ -160,7 +177,7 @@ fn make_action_registry() -> ActionRegistry { disk_delete::SagaDiskDelete, finalize_disk::SagaFinalizeDisk, instance_create::SagaInstanceCreate, - instance_delete::SagaInstanceDelete, + nexus_sagas::sagas::instance_delete::SagaInstanceDelete, instance_ip_attach::SagaInstanceIpAttach, instance_ip_detach::SagaInstanceIpDetach, instance_migrate::SagaInstanceMigrate, diff --git a/nexus/src/app/sagas/test_helpers.rs b/nexus/src/app/sagas/test_helpers.rs index 0e2f76d9ee5..d4005ef78fa 100644 --- a/nexus/src/app/sagas/test_helpers.rs +++ b/nexus/src/app/sagas/test_helpers.rs @@ -5,7 +5,7 @@ //! Helper functions for writing saga undo tests and working with instances in //! saga tests. -use super::{NexusSaga, instance_common::VmmAndSledIds, instance_start}; +use super::{NexusSaga, instance_common::VmmAndSledIds}; use crate::{Nexus, app::saga::create_saga_dag}; use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection}; use diesel::{ @@ -22,6 +22,7 @@ use nexus_db_queries::{ datastore::{InstanceAndActiveVmm, InstanceGestalt}, }, }; +use nexus_sagas::sagas::instance_start::InstanceStartReason; use nexus_types::identity::Resource; use omicron_common::api::external::Error; use omicron_common::api::external::NameOrId; @@ -57,7 +58,7 @@ pub(crate) async fn instance_start( let instance_lookup = nexus.instance_lookup(&opctx, instance_selector).unwrap(); nexus - .instance_start(&opctx, &instance_lookup, instance_start::Reason::User) + .instance_start(&opctx, &instance_lookup, InstanceStartReason::User) .await .expect("Failed to start instance"); } diff --git a/nexus/src/external_api/http_entrypoints.rs b/nexus/src/external_api/http_entrypoints.rs index 1e9b407aed1..3061bfe3edc 100644 --- a/nexus/src/external_api/http_entrypoints.rs +++ b/nexus/src/external_api/http_entrypoints.rs @@ -2242,7 +2242,7 @@ impl NexusExternalApi for NexusExternalApiImpl { .instance_start( &opctx, &instance_lookup, - crate::app::sagas::instance_start::Reason::User, + nexus_sagas::sagas::instance_start::InstanceStartReason::User, ) .await?; Ok(HttpResponseAccepted(instance.into())) diff --git a/nexus/src/saga_interface.rs b/nexus/src/saga_interface.rs index aef7044408f..2233fcbbb40 100644 --- a/nexus/src/saga_interface.rs +++ b/nexus/src/saga_interface.rs @@ -5,7 +5,11 @@ //! Interfaces available to saga actions and undo actions use crate::Nexus; -use nexus_db_queries::{authz, db}; +use crate::app::background::BackgroundTasks; +use nexus_auth::{authz, context::OpContext}; +use nexus_db_queries::db; +use nexus_saga_interface::{DataStoreContext, NexusInterface}; +use omicron_common::api::external::Error; use slog::Logger; use std::fmt; use std::sync::Arc; @@ -16,6 +20,7 @@ use std::sync::Arc; pub struct SagaContext { nexus: Arc, log: Logger, + context2: Arc, } impl fmt::Debug for SagaContext { @@ -26,7 +31,18 @@ impl fmt::Debug for SagaContext { impl SagaContext { pub(crate) fn new(nexus: Arc, log: Logger) -> SagaContext { - SagaContext { nexus, log } + let nexus_context = nexus_saga_interface::NexusContext::new(Arc::new( + NexusWrapper::new(nexus.clone()), + )); + let context2 = Arc::new(nexus_saga_interface::SagaContext::new( + log.clone(), + nexus_context, + )); + SagaContext { nexus, log, context2 } + } + + pub(crate) fn context2(&self) -> &Arc { + &self.context2 } pub(crate) fn log(&self) -> &Logger { @@ -45,3 +61,39 @@ impl SagaContext { self.nexus.datastore() } } + +struct NexusWrapper { + nexus: Arc, + datastore_context: DataStoreContext, +} + +impl NexusWrapper { + fn new(nexus: Arc) -> Self { + let datastore = nexus.datastore().clone(); + let datastore_context = DataStoreContext::new(datastore); + Self { nexus, datastore_context } + } +} + +#[async_trait::async_trait] +impl NexusInterface for NexusWrapper { + fn authz(&self) -> &Arc { + self.nexus.authz() + } + + fn datastore(&self) -> &DataStoreContext { + &self.datastore_context + } + + fn background_tasks(&self) -> &BackgroundTasks { + self.nexus.background_tasks() + } + + async fn instance_delete_dpd_config( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + ) -> Result<(), Error> { + self.nexus.instance_delete_dpd_config(opctx, authz_instance).await + } +} diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 7eff9c3a240..03037c40f09 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -20,7 +20,7 @@ workspace = true [dependencies] ahash = { version = "0.8.11" } aho-corasick = { version = "1.1.3" } -anyhow = { version = "1.0.97", features = ["backtrace"] } +anyhow = { version = "1.0.98", features = ["backtrace"] } aws-lc-rs = { version = "1.12.4", features = ["prebuilt-nasm"] } base16ct = { version = "0.2.0", default-features = false, features = ["alloc"] } base64 = { version = "0.22.1" } @@ -69,7 +69,6 @@ idna = { version = "1.0.3" } indexmap = { version = "2.7.1", features = ["serde"] } inout = { version = "0.1.3", default-features = false, features = ["std"] } ipnetwork = { version = "0.21.1", features = ["schemars", "serde"] } -itertools-594e8ee84c453af0 = { package = "itertools", version = "0.13.0" } itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10.5" } lalrpop-util = { version = "0.19.12" } lazy_static = { version = "1.5.0", default-features = false, features = ["spin_no_std"] } @@ -89,7 +88,6 @@ openapiv3 = { version = "2.0.0", default-features = false, features = ["skip_ser peg-runtime = { version = "0.8.5", default-features = false, features = ["std"] } pem-rfc7468 = { version = "0.7.0", default-features = false, features = ["std"] } percent-encoding = { version = "2.3.1" } -petgraph = { version = "0.6.5", features = ["serde-1"] } phf = { version = "0.11.2" } phf_shared = { version = "0.11.2" } pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption", "pem", "std"] } @@ -142,7 +140,7 @@ zip-f595c2ba2a3f28df = { package = "zip", version = "2.6.1", default-features = [build-dependencies] ahash = { version = "0.8.11" } aho-corasick = { version = "1.1.3" } -anyhow = { version = "1.0.97", features = ["backtrace"] } +anyhow = { version = "1.0.98", features = ["backtrace"] } aws-lc-rs = { version = "1.12.4", features = ["prebuilt-nasm"] } base16ct = { version = "0.2.0", default-features = false, features = ["alloc"] } base64 = { version = "0.22.1" } @@ -192,7 +190,6 @@ idna = { version = "1.0.3" } indexmap = { version = "2.7.1", features = ["serde"] } inout = { version = "0.1.3", default-features = false, features = ["std"] } ipnetwork = { version = "0.21.1", features = ["schemars", "serde"] } -itertools-594e8ee84c453af0 = { package = "itertools", version = "0.13.0" } itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10.5" } lalrpop-util = { version = "0.19.12" } lazy_static = { version = "1.5.0", default-features = false, features = ["spin_no_std"] } @@ -212,7 +209,6 @@ openapiv3 = { version = "2.0.0", default-features = false, features = ["skip_ser peg-runtime = { version = "0.8.5", default-features = false, features = ["std"] } pem-rfc7468 = { version = "0.7.0", default-features = false, features = ["std"] } percent-encoding = { version = "2.3.1" } -petgraph = { version = "0.6.5", features = ["serde-1"] } phf = { version = "0.11.2" } phf_shared = { version = "0.11.2" } pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption", "pem", "std"] } @@ -337,6 +333,7 @@ getrandom-468e82937335b1c9 = { package = "getrandom", version = "0.3.1", default hyper-rustls = { version = "0.27.3", default-features = false, features = ["http1", "http2", "ring", "tls12", "webpki-tokio"] } hyper-util = { version = "0.1.11", features = ["full"] } indicatif = { version = "0.17.11", features = ["rayon"] } +itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12.1" } mio = { version = "1.0.2", features = ["net", "os-ext"] } rustix = { version = "0.38.37", features = ["event", "fs", "net", "pipe", "process", "stdio", "system", "termios", "time"] } tokio-rustls = { version = "0.26.0", default-features = false, features = ["logging", "ring", "tls12"] } @@ -351,6 +348,7 @@ getrandom-468e82937335b1c9 = { package = "getrandom", version = "0.3.1", default hyper-rustls = { version = "0.27.3", default-features = false, features = ["http1", "http2", "ring", "tls12", "webpki-tokio"] } hyper-util = { version = "0.1.11", features = ["full"] } indicatif = { version = "0.17.11", features = ["rayon"] } +itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12.1" } mio = { version = "1.0.2", features = ["net", "os-ext"] } rustix = { version = "0.38.37", features = ["event", "fs", "net", "pipe", "process", "stdio", "system", "termios", "time"] } tokio-rustls = { version = "0.26.0", default-features = false, features = ["logging", "ring", "tls12"] }