diff --git a/Cargo.lock b/Cargo.lock index 9859c0d9e..a168c0c84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1525,6 +1525,7 @@ dependencies = [ "semver", "serde", "serde-with-expand-env", + "serde_json", "serde_yaml 0.9.34+deprecated", "tokio", "uhlc 0.5.2", @@ -5559,6 +5560,7 @@ version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ + "indexmap 2.12.0", "itoa", "memchr", "ryu", diff --git a/Cargo.toml b/Cargo.toml index 3d0711843..ced171602 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ pyo3 = { version = "0.23", features = [ pythonize = "0.23" git2 = { version = "0.18.0", features = ["vendored-openssl"] } serde_yaml = "0.9.33" +serde_json = "1.0.145" [package] name = "dora-examples" diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index d36610387..f47652059 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -32,7 +32,7 @@ futures-concurrency = "7.3.0" futures-timer = "3.0.2" dora-arrow-convert = { workspace = true } aligned-vec = "0.5.0" -serde_json = "1.0.86" +serde_json = { version = "1.0.86", features = ["preserve_order"] } tokio = { version = "1.24.2", features = ["rt", "rt-multi-thread"] } inquire = { version = "0.7.5", default-features = false, features = [ "console", diff --git a/apis/rust/node/src/daemon_connection/integration_testing.rs b/apis/rust/node/src/daemon_connection/integration_testing.rs new file mode 100644 index 000000000..9affd00a6 --- /dev/null +++ b/apis/rust/node/src/daemon_connection/integration_testing.rs @@ -0,0 +1,283 @@ +use std::{ + fs::File, + io::Write as _, + path::PathBuf, + time::{Duration, Instant}, +}; + +use arrow::array::{Array, RecordBatch, StructArray}; +use colored::Colorize; +use dora_core::{ + metadata::ArrowTypeInfoExt, + uhlc::{self, HLC, NTP64, Timestamp}, +}; +use dora_message::{ + common::{DataMessage, Timestamped}, + daemon_to_node::{DaemonReply, NodeEvent}, + integration_testing::{InputData, InputEvent, IntegrationTestInput, TimedInputEvent}, + metadata::{ArrowTypeInfo, Metadata}, + node_to_daemon::DaemonRequest, +}; +use eyre::{Context, ContextCompat}; + +use crate::{ + arrow_utils::{copy_array_into_sample, required_data_size}, + daemon_connection::json_to_arrow::read_json_value_as_arrow, + event_stream::data_to_arrow_array, +}; + +pub struct IntegrationTestingEvents { + events: std::vec::IntoIter, + output_file: File, + start_timestamp: uhlc::Timestamp, + start_time: Instant, + skip_output_time_offsets: bool, +} + +impl IntegrationTestingEvents { + pub fn new( + input_file_path: PathBuf, + output_file_path: PathBuf, + skip_output_time_offsets: bool, + ) -> eyre::Result { + let mut node_info: IntegrationTestInput = serde_json::from_slice( + &std::fs::read(&input_file_path) + .with_context(|| format!("failed to open {}", input_file_path.display()))?, + ) + .with_context(|| format!("failed to deserialize {}", input_file_path.display()))?; + let output_file = File::create(&output_file_path) + .with_context(|| format!("failed to create {}", output_file_path.display()))?; + + node_info + .events + .as_mut_slice() + .sort_by(|a, b| a.time_offset_secs.total_cmp(&b.time_offset_secs)); + let inputs = std::mem::take(&mut node_info.events).into_iter(); + + let clock = HLC::default(); + let start_timestamp = clock.new_timestamp(); + let start_time = Instant::now(); + Ok(Self { + events: inputs, + output_file, + start_timestamp, + start_time, + skip_output_time_offsets, + }) + } + + pub fn request(&mut self, request: &Timestamped) -> eyre::Result { + let reply = match &request.inner { + DaemonRequest::Register(_) => DaemonReply::Result(Ok(())), + DaemonRequest::Subscribe => DaemonReply::Result(Ok(())), + DaemonRequest::SubscribeDrop => DaemonReply::Result(Ok(())), + DaemonRequest::NextEvent { .. } => { + let events = if let Some(event) = self.next_event()? { + vec![event] + } else { + vec![] + }; + DaemonReply::NextEvents(events) + } + DaemonRequest::SendMessage { + output_id, + metadata, + data, + } => { + let mut output = serde_json::Map::new(); + output.insert("id".into(), output_id.to_string().into()); + + if !self.skip_output_time_offsets { + let time_offset = metadata + .timestamp() + .get_diff_duration(&self.start_timestamp); + output.insert("time_offset_secs".into(), time_offset.as_secs_f64().into()); + } + + if data.is_some() { + let (drop_tx, drop_rx) = flume::unbounded(); + let data_array = data_to_arrow_array(data.clone(), metadata, drop_tx) + .context("failed to convert output to arrow array")?; + // integration testing doesn't use shared memory -> no drop tokens + let _ = drop_rx; + + let data_type_json = serde_json::to_value(data_array.data_type()) + .context("failed to serialize data type as JSON")?; + + let batch = RecordBatch::try_from_iter([("inner", data_array)]) + .context("failed to create RecordBatch")?; + + let mut writer = arrow_json::ArrayWriter::new(Vec::new()); + writer + .write(&batch) + .context("failed to encode data as JSON")?; + writer + .finish() + .context("failed to finish writing JSON data")?; + let json_data_encoded = writer.into_inner(); + + // Reparse the string using serde_json + let json_data: Vec> = + serde_json::from_reader(json_data_encoded.as_slice()) + .context("failed to parse JSON data again")?; + // remove `inner` field again + let json_data_flattened: Vec<_> = json_data + .into_iter() + .map(|mut m| m.remove("inner")) + .collect(); + output.insert("data".into(), json_data_flattened.into()); + output.insert("type".into(), data_type_json); + } + + serde_json::to_writer(&mut self.output_file, &output) + .context("failed to write output as JSON")?; + writeln!(&mut self.output_file) + .context("failed to write newline to output file")?; + + DaemonReply::Empty + } + DaemonRequest::CloseOutputs(data_ids) => { + println!("{} {data_ids:?}", "node reports closed outputs".blue()); + DaemonReply::Result(Ok(())) + } + DaemonRequest::OutputsDone => { + println!("{}", "node reports OutputsDone".blue()); + DaemonReply::Result(Ok(())) + } + DaemonRequest::ReportDropTokens { drop_tokens } => { + println!("{} {drop_tokens:?}", "node reports drop tokens".blue()); + DaemonReply::Empty + } + DaemonRequest::NextFinishedDropTokens => { + // interactive nodes don't use shared memory -> no drop tokens + DaemonReply::NextDropEvents(vec![]) + } + DaemonRequest::EventStreamDropped => { + println!("{}", "node reports EventStreamDropped".blue()); + DaemonReply::Result(Ok(())) + } + DaemonRequest::NodeConfig { .. } => { + eyre::bail!("unexpected NodeConfig in interactive mode") + } + }; + Ok(reply) + } + + fn next_event(&mut self) -> eyre::Result>> { + let Some(event) = self.events.next() else { + return Ok(None); + }; + + let TimedInputEvent { + time_offset_secs, + event, + } = event; + let time_offset = Duration::from_secs_f64(time_offset_secs); + let elapsed = self.start_time.elapsed(); + if let Some(wait_time) = time_offset.checked_sub(elapsed) { + std::thread::sleep(wait_time); + } + + let timestamp = Timestamp::new( + self.start_timestamp.get_time() + NTP64::from(time_offset), + *self.start_timestamp.get_id(), + ); + + let converted = match event { + InputEvent::Stop => NodeEvent::Stop, + InputEvent::Input { id, metadata, data } => { + let (data, type_info) = if let Some(data) = data { + let array = match data { + InputData::JsonObject { value, schema } => { + // input is JSON data + let array = json_value_to_list(value); + let schema = match schema { + Some(schema) => schema, + None => arrow_json::reader::infer_json_schema_from_iterator( + array.iter().map(Ok), + )?, + }; + read_json_value_as_arrow(&array, schema) + .context("failed to read data")? + } + InputData::ArrowFile { + path, + batch_index, + column, + } => { + let file = std::fs::File::open(&path).with_context(|| { + format!("failed to open arrow file {}", path.display()) + })?; + let mut reader = arrow::ipc::reader::FileReader::try_new(file, None) + .context("failed to create arrow file reader")?; + reader.set_index(batch_index).with_context(|| { + format!( + "failed to seek to batch index {} in arrow file {}", + batch_index, + path.display() + ) + })?; + let batch = reader + .next() + .context("no batch at given index")? + .context("failed to read batch from arrow file")?; + match column { + Some(name) => batch.column_by_name(&name).with_context(|| { + format!( + "failed to find column '{}' in batch at index {} of arrow file {}", + name, + batch_index, + path.display() + ) + })?.to_data(), + None => StructArray::from(batch).to_data() + } + } + }; + + let total_len = required_data_size(&array); + let mut buf = vec![0; total_len]; + let type_info = copy_array_into_sample(buf.as_mut_slice(), &array); + + (Some(buf), type_info) + } else { + (None, ArrowTypeInfo::empty()) + }; + let mut meta = Metadata::new(timestamp, type_info); + meta.parameters = metadata.unwrap_or_default(); + NodeEvent::Input { + id, + metadata: meta, + data: data.map(|d| DataMessage::Vec(aligned_vec::AVec::from_slice(1, &d))), + } + } + InputEvent::InputClosed { id } => NodeEvent::InputClosed { id }, + InputEvent::AllInputsClosed => NodeEvent::AllInputsClosed, + }; + Ok(Some(Timestamped { + inner: converted, + timestamp, + })) + } +} + +fn json_value_to_list(value: serde_json::Value) -> Vec { + match value { + value @ serde_json::Value::Object(_) => { + vec![value] + } + serde_json::Value::Array(inner) => inner.into_iter().map(wrap_value_into_object).collect(), + _ => { + // wrap into object to allow bare values + let object = wrap_value_into_object(value); + vec![object] + } + } +} + +fn wrap_value_into_object(value: serde_json::Value) -> serde_json::Value { + let mut map = serde_json::Map::new(); + map.insert("inner".into(), value); + + serde_json::Value::Object(map) +} diff --git a/apis/rust/node/src/daemon_connection/interactive.rs b/apis/rust/node/src/daemon_connection/interactive.rs index b4168484f..55ace06ff 100644 --- a/apis/rust/node/src/daemon_connection/interactive.rs +++ b/apis/rust/node/src/daemon_connection/interactive.rs @@ -1,10 +1,5 @@ -use std::{ - io::{BufRead, Read, stdout}, - sync::Arc, - time::Duration, -}; +use std::{io::stdout, time::Duration}; -use arrow::array::{Array, ArrayData}; use colored::Colorize; use dora_core::{metadata::ArrowTypeInfoExt, uhlc::HLC}; use dora_message::{ @@ -13,10 +8,10 @@ use dora_message::{ metadata::{ArrowTypeInfo, Metadata}, node_to_daemon::DaemonRequest, }; -use eyre::{Context, ContextCompat}; use crate::{ arrow_utils::{copy_array_into_sample, required_data_size}, + daemon_connection::json_to_arrow::read_json_bytes_as_arrow, event_stream::data_to_arrow_array, }; @@ -123,7 +118,7 @@ impl InteractiveEvents { std::mem::drop(stdout_lock); let typed_data = if let Some(data) = data { // input is JSON data - let array = match read_json_as_arrow(data.as_bytes()) { + let array = match read_json_bytes_as_arrow(data.as_bytes()) { Ok(array) => array, Err(err) => { eprintln!("{}", format!("{err}").red()); @@ -151,42 +146,3 @@ impl InteractiveEvents { Ok(Some(event)) } } - -fn read_json_as_arrow(data: &[u8]) -> eyre::Result { - match arrow_json::reader::infer_json_schema(wrapped(data), None) { - Ok((schema, _)) => read_from_json_with_schema(wrapped(data), schema), - Err(_) => { - // try again with quoting the input to treat it as a string - match arrow_json::reader::infer_json_schema(wrapped_quoted(data), None) { - Ok((schema, _)) => read_from_json_with_schema(wrapped_quoted(data), schema), - Err(err) => eyre::bail!("failed to infer JSON schema: {err}"), - } - } - } -} - -fn read_from_json_with_schema( - data: impl BufRead, - schema: arrow_schema::Schema, -) -> eyre::Result { - let mut reader = arrow_json::reader::ReaderBuilder::new(Arc::new(schema)) - .build(data) - .context("failed to build JSON reader")?; - let batch = reader - .next() - .context("no record batch in JSON")? - .context("failed to read record batch")?; - - Ok(batch.column(0).to_data()) -} - -// wrap data into JSON object to also allow bare JSON values -fn wrapped(data: impl BufRead) -> impl BufRead { - "{ \"inner\":".as_bytes().chain(data).chain("}".as_bytes()) -} - -// wrap data into JSON object to also allow bare JSON values -fn wrapped_quoted(data: impl BufRead) -> impl BufRead { - let quoted = [b'"'].chain(data).chain([b'"'].as_slice()); - wrapped(quoted) -} diff --git a/apis/rust/node/src/daemon_connection/json_to_arrow.rs b/apis/rust/node/src/daemon_connection/json_to_arrow.rs new file mode 100644 index 000000000..2ad2b4c60 --- /dev/null +++ b/apis/rust/node/src/daemon_connection/json_to_arrow.rs @@ -0,0 +1,64 @@ +use std::{ + io::{BufRead, Read}, + sync::Arc, +}; + +use arrow::array::{Array, ArrayData}; +use eyre::{Context, ContextCompat}; + +pub fn read_json_bytes_as_arrow(data: &[u8]) -> eyre::Result { + match arrow_json::reader::infer_json_schema(wrapped(data), None) { + Ok((schema, _)) => read_from_json_with_schema(wrapped(data), schema), + Err(_) => { + // try again with quoting the input to treat it as a string + match arrow_json::reader::infer_json_schema(wrapped_quoted(data), None) { + Ok((schema, _)) => read_from_json_with_schema(wrapped_quoted(data), schema), + Err(err) => eyre::bail!("failed to infer JSON schema: {err}"), + } + } + } +} + +fn read_from_json_with_schema( + data: impl BufRead, + schema: arrow_schema::Schema, +) -> eyre::Result { + let mut reader = arrow_json::reader::ReaderBuilder::new(Arc::new(schema)) + .build(data) + .context("failed to build JSON reader")?; + let batch = reader + .next() + .context("no record batch in JSON")? + .context("failed to read record batch")?; + + Ok(batch.column(0).to_data()) +} + +// wrap data into JSON object to also allow bare JSON values +fn wrapped(data: impl BufRead) -> impl BufRead { + "{ \"inner\":".as_bytes().chain(data).chain("}".as_bytes()) +} + +// wrap data into JSON object to also allow bare JSON values +fn wrapped_quoted(data: impl BufRead) -> impl BufRead { + let quoted = [b'"'].chain(data).chain([b'"'].as_slice()); + wrapped(quoted) +} + +/// convert the given JSON object to the closed arrow representation +pub fn read_json_value_as_arrow( + data: &[serde_json::Value], + schema: arrow_schema::Schema, +) -> eyre::Result { + let mut decoder = arrow_json::reader::ReaderBuilder::new(Arc::new(schema)) + .build_decoder() + .context("failed to build JSON decoder")?; + decoder + .serialize(data) + .context("failed to decode JSON to arrow array")?; + let batch = decoder + .flush() + .context("failed to read record batch")? + .context("no record batch in JSON")?; + Ok(batch.column(0).to_data()) +} diff --git a/apis/rust/node/src/daemon_connection/mod.rs b/apis/rust/node/src/daemon_connection/mod.rs index a21a703e6..e77096945 100644 --- a/apis/rust/node/src/daemon_connection/mod.rs +++ b/apis/rust/node/src/daemon_connection/mod.rs @@ -1,3 +1,4 @@ +use crate::daemon_connection::interactive::InteractiveEvents; use dora_core::{config::NodeId, uhlc::Timestamp}; use dora_message::{ DataflowId, @@ -5,6 +6,7 @@ use dora_message::{ node_to_daemon::{DaemonRequest, NodeRegisterRequest, Timestamped}, }; use eyre::{Context, bail, eyre}; +pub use integration_testing::IntegrationTestingEvents; use shared_memory_server::{ShmemClient, ShmemConf}; #[cfg(unix)] use std::os::unix::net::UnixStream; @@ -12,20 +14,28 @@ use std::{ net::{SocketAddr, TcpStream}, time::Duration, }; +use tokio::sync::oneshot; -use crate::daemon_connection::interactive::InteractiveEvents; - +mod integration_testing; mod interactive; mod tcp; #[cfg(unix)] mod unix_domain; +mod json_to_arrow; + pub enum DaemonChannel { Shmem(ShmemClient, DaemonReply>), Tcp(TcpStream), #[cfg(unix)] UnixDomain(UnixStream), Interactive(InteractiveEvents), + IntegrationTestChannel( + tokio::sync::mpsc::Sender<( + Timestamped, + tokio::sync::oneshot::Sender, + )>, + ), } impl DaemonChannel { @@ -86,6 +96,15 @@ impl DaemonChannel { #[cfg(unix)] DaemonChannel::UnixDomain(stream) => unix_domain::request(stream, request), DaemonChannel::Interactive(events) => events.request(request), + DaemonChannel::IntegrationTestChannel(channel) => { + let (reply_tx, reply) = oneshot::channel(); + channel + .blocking_send((request.clone(), reply_tx)) + .expect("failed to send request to IntegrationTestChannel"); + reply + .blocking_recv() + .context("failed to receive oneshot reply") + } } } } diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 43a6d7859..0db5eb0f5 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -91,6 +91,12 @@ impl EventStream { })? } DaemonCommunication::Interactive => DaemonChannel::Interactive(Default::default()), + DaemonCommunication::IntegrationTest { .. } => { + unreachable!("integration test channel should be initialized at this point") + } + DaemonCommunication::IntegrationTestInitialized { channel } => { + DaemonChannel::IntegrationTestChannel(channel.clone().expect("channel is None")) + } }; let close_channel = match daemon_communication { @@ -111,6 +117,12 @@ impl EventStream { })? } DaemonCommunication::Interactive => DaemonChannel::Interactive(Default::default()), + DaemonCommunication::IntegrationTest { .. } => { + unreachable!("integration test channel should be initialized at this point") + } + DaemonCommunication::IntegrationTestInitialized { channel } => { + DaemonChannel::IntegrationTestChannel(channel.clone().expect("channel is None")) + } }; let mut queue_size_limit: HashMap)> = input_config diff --git a/apis/rust/node/src/event_stream/thread.rs b/apis/rust/node/src/event_stream/thread.rs index b360ad4c6..5d4593775 100644 --- a/apis/rust/node/src/event_stream/thread.rs +++ b/apis/rust/node/src/event_stream/thread.rs @@ -117,13 +117,19 @@ fn event_stream_loop( events } } + Ok(DaemonReply::Result(Err(err))) => { + let err = eyre!(err).wrap_err("error in incoming event"); + tracing::error!("{err:?}"); + continue; + } + Ok(other) => { let err = eyre!("unexpected control reply: {other:?}"); tracing::warn!("{err:?}"); continue; } Err(err) => { - let err = eyre!(err).wrap_err("failed to receive incoming event"); + let err = err.wrap_err("failed to receive incoming event"); tracing::warn!("{err:?}"); continue; } diff --git a/apis/rust/node/src/node/control_channel.rs b/apis/rust/node/src/node/control_channel.rs index eebf4911a..b80ffc3fa 100644 --- a/apis/rust/node/src/node/control_channel.rs +++ b/apis/rust/node/src/node/control_channel.rs @@ -40,6 +40,12 @@ impl ControlChannel { .wrap_err("failed to connect control channel")? } DaemonCommunication::Interactive => DaemonChannel::Interactive(Default::default()), + DaemonCommunication::IntegrationTest { .. } => { + unreachable!("integration test channel should be initialized at this point") + } + DaemonCommunication::IntegrationTestInitialized { channel } => { + DaemonChannel::IntegrationTestChannel(channel.clone().expect("channel is None")) + } }; Self::init_on_channel(dataflow_id, node_id, channel, clock) diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index c7da712cc..5a7ec3189 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -41,6 +41,12 @@ impl DropStream { })? } DaemonCommunication::Interactive => DaemonChannel::Interactive(Default::default()), + DaemonCommunication::IntegrationTest { .. } => { + unreachable!("integration test channel should be initialized at this point") + } + DaemonCommunication::IntegrationTestInitialized { channel } => { + DaemonChannel::IntegrationTestChannel(channel.clone().expect("channel is None")) + } }; Self::init_on_channel(dataflow_id, node_id, channel, hlc) diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index df06b0392..dec3f8094 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -1,4 +1,7 @@ -use crate::{EventStream, daemon_connection::DaemonChannel}; +use crate::{ + EventStream, + daemon_connection::{DaemonChannel, IntegrationTestingEvents}, +}; use self::{ arrow_utils::{copy_array_into_sample, required_data_size}, @@ -27,6 +30,7 @@ use shared_memory_extended::{Shmem, ShmemConf}; use std::{ collections::{BTreeSet, HashMap, VecDeque}, ops::{Deref, DerefMut}, + path::PathBuf, sync::Arc, time::Duration, }; @@ -118,32 +122,61 @@ impl DoraNode { } fn init_from_env_inner(fallback_to_interactive: bool) -> eyre::Result<(Self, EventStream)> { - let node_config: NodeConfig = match std::env::var("DORA_NODE_CONFIG") { - Ok(raw) => serde_yaml::from_str(&raw).context("failed to deserialize node config")?, + // normal execution (started by dora daemon) + match std::env::var("DORA_NODE_CONFIG") { + Ok(raw) => { + let node_config: NodeConfig = + serde_yaml::from_str(&raw).context("failed to deserialize node config")?; + #[cfg(feature = "tracing")] + { + TracingBuilder::new(node_config.node_id.as_ref()) + .build() + .wrap_err("failed to set up tracing subscriber")?; + } + + return Self::init(node_config); + } Err(std::env::VarError::NotUnicode(_)) => { bail!("DORA_NODE_CONFIG env variable is not valid unicode") } - Err(std::env::VarError::NotPresent) => { - if fallback_to_interactive && std::io::stdin().is_terminal() { - println!( - "{}", - "Starting node in interactive mode as DORA_NODE_CONFIG env variable is not set" - .green() - ); - return Self::init_interactive(); - } else { - bail!("DORA_NODE_CONFIG env variable is not set") - } + Err(std::env::VarError::NotPresent) => {} // continue trying other init methods + } + + // node integration test mode + match std::env::var("DORA_TEST_WITH_INPUTS") { + Ok(raw) => { + let input_file = PathBuf::from(raw); + let output_file = match std::env::var("DORA_TEST_WRITE_OUTPUTS_TO") { + Ok(raw) => PathBuf::from(raw), + Err(std::env::VarError::NotUnicode(_)) => { + bail!("DORA_TEST_WRITE_OUTPUTS_TO env variable is not valid unicode") + } + Err(std::env::VarError::NotPresent) => { + input_file.with_file_name("outputs.jsonl") + } + }; + let skip_output_time_offsets = + std::env::var_os("DORA_TEST_NO_OUTPUT_TIME_OFFSET").is_some(); + return Self::init_testing(input_file, output_file, skip_output_time_offsets); } - }; - #[cfg(feature = "tracing")] - { - TracingBuilder::new(node_config.node_id.as_ref()) - .build() - .wrap_err("failed to set up tracing subscriber")?; + Err(std::env::VarError::NotUnicode(_)) => { + bail!("DORA_TEST_WITH_INPUTS env variable is not valid unicode") + } + Err(std::env::VarError::NotPresent) => {} // continue trying other init methods } - Self::init(node_config) + // interactive mode + if fallback_to_interactive && std::io::stdin().is_terminal() { + println!( + "{}", + "Starting node in interactive mode as DORA_NODE_CONFIG env variable is not set" + .green() + ); + return Self::init_interactive(); + } + + // no run mode applicable + bail!("DORA_NODE_CONFIG env variable is not set") } /// Initiate a node from a dataflow id and a node id. @@ -323,6 +356,39 @@ impl DoraNode { Ok((node, events)) } + fn init_testing( + input_file: PathBuf, + output_file: PathBuf, + skip_output_time_offsets: bool, + ) -> eyre::Result<(Self, EventStream)> { + #[cfg(feature = "tracing")] + { + TracingBuilder::new("node") + .with_stdout("debug") + .build() + .wrap_err("failed to set up tracing subscriber")?; + } + + let node_config = NodeConfig { + dataflow_id: DataflowId::new_v4(), + node_id: "".parse()?, + run_config: NodeRunConfig { + inputs: Default::default(), + outputs: Default::default(), + }, + daemon_communication: DaemonCommunication::IntegrationTest { + input_file, + output_file, + skip_output_time_offsets, + }, + dataflow_descriptor: serde_yaml::Value::Null, + dynamic: false, + }; + let (mut node, events) = Self::init(node_config)?; + node.interactive = true; + Ok((node, events)) + } + /// Internal initialization routine that should not be used outside of Dora. #[doc(hidden)] #[tracing::instrument] @@ -366,6 +432,40 @@ impl DoraNode { }; } + let daemon_communication = + match daemon_communication { + DaemonCommunication::IntegrationTest { + input_file, + output_file, + skip_output_time_offsets, + } => { + let (sender, mut receiver) = tokio::sync::mpsc::channel(5); + let new_communication = DaemonCommunication::IntegrationTestInitialized { + channel: Some(sender), + }; + let mut events = IntegrationTestingEvents::new( + input_file, + output_file, + skip_output_time_offsets, + )?; + std::thread::spawn(move || { + while let Some((request, reply_sender)) = receiver.blocking_recv() { + let reply = events.request(&request); + if reply_sender + .send(reply.unwrap_or_else(|err| { + DaemonReply::Result(Err(format!("{err:?}"))) + })) + .is_err() + { + eprintln!("failed to send reply"); + } + } + }); + new_communication + } + other => other, + }; + let event_stream = EventStream::init( dataflow_id, &node_id, diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index 4a4cc3fda..f5b1fc72a 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -14,14 +14,10 @@ fn main() -> eyre::Result<()> { }; match event { - Event::Input { - id, - metadata, - data: _, - } => match id.as_str() { + Event::Input { id, metadata, data } => match id.as_str() { "tick" => { let random: u64 = rand::random(); - println!("tick {i}, sending {random:#x}"); + println!("tick {i} with data {data:?}, sending {random:#x}"); node.send_output(output.clone(), metadata.parameters, random.into_arrow())?; } other => eprintln!("Ignoring unexpected input `{other}`"), diff --git a/libraries/message/Cargo.toml b/libraries/message/Cargo.toml index 2bd3a73c7..a4b6db5db 100644 --- a/libraries/message/Cargo.toml +++ b/libraries/message/Cargo.toml @@ -28,3 +28,4 @@ serde_yaml = { workspace = true } once_cell = "1.13.0" serde-with-expand-env = "1.1.0" bincode = "1.3.3" +serde_json = { workspace = true } diff --git a/libraries/message/src/daemon_to_node.rs b/libraries/message/src/daemon_to_node.rs index f80673003..63fbd76a0 100644 --- a/libraries/message/src/daemon_to_node.rs +++ b/libraries/message/src/daemon_to_node.rs @@ -1,11 +1,14 @@ use std::{net::SocketAddr, path::PathBuf}; +use tokio::sync::oneshot; + use crate::{ DataflowId, config::NodeRunConfig, descriptor::OperatorDefinition, id::{DataId, NodeId, OperatorId}, metadata::Metadata, + node_to_daemon::DaemonRequest, }; pub use crate::common::{DataMessage, DropToken, SharedMemoryId, Timestamped}; @@ -43,6 +46,17 @@ pub enum DaemonCommunication { socket_file: PathBuf, }, Interactive, + IntegrationTest { + input_file: PathBuf, + output_file: PathBuf, + skip_output_time_offsets: bool, + }, + IntegrationTestInitialized { + #[serde(skip)] + channel: Option< + tokio::sync::mpsc::Sender<(Timestamped, oneshot::Sender)>, + >, + }, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/libraries/message/src/descriptor.rs b/libraries/message/src/descriptor.rs index e7f4811c0..c079b5dd1 100644 --- a/libraries/message/src/descriptor.rs +++ b/libraries/message/src/descriptor.rs @@ -677,7 +677,7 @@ pub enum GitRepoRev { Rev(String), } -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(untagged)] pub enum EnvValue { #[serde(deserialize_with = "with_expand_envs")] diff --git a/libraries/message/src/integration_testing.rs b/libraries/message/src/integration_testing.rs new file mode 100644 index 000000000..ca3acf18d --- /dev/null +++ b/libraries/message/src/integration_testing.rs @@ -0,0 +1,211 @@ +//! Use these types for integration testing nodes. + +use std::{ + collections::{BTreeMap, BTreeSet}, + path::PathBuf, +}; + +use crate::{ + config::Input, + descriptor::EnvValue, + id::{DataId, NodeId}, + metadata::MetadataParameters, +}; + +#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct IntegrationTestInput { + /// Unique node identifier. Must not contain `/` characters. + /// + /// Node IDs can be arbitrary strings with the following limitations: + /// + /// - They must not contain any `/` characters (slashes). + /// - We do not recommend using whitespace characters (e.g. spaces) in IDs + /// + /// Each node must have an ID field. + /// + /// ## Example + /// + /// ```yaml + /// nodes: + /// - id: camera_node + /// - id: some_other_node + /// ``` + pub id: NodeId, + + /// Human-readable node name for documentation. + /// + /// This optional field can be used to define a more descriptive name in addition to a short + /// [`id`](Self::id). + /// + /// ## Example + /// + /// ```yaml + /// nodes: + /// - id: camera_node + /// name: "Camera Input Handler" + pub name: Option, + + /// Detailed description of the node's functionality. + /// + /// ## Example + /// + /// ```yaml + /// nodes: + /// - id: camera_node + /// description: "Captures video frames from webcam" + /// ``` + pub description: Option, + + /// Command-line arguments passed to the executable. + /// + /// The command-line arguments that should be passed to the executable/script specified in `path`. + /// The arguments should be separated by space. + /// This field is optional and defaults to an empty argument list. + /// + /// ## Example + /// ```yaml + /// nodes: + /// - id: example + /// path: example-node + /// args: -v --some-flag foo + /// ``` + #[serde(default, skip_serializing_if = "Option::is_none")] + pub args: Option, + + /// Environment variables for node builds and execution. + /// + /// Key-value map of environment variables that should be set for both the + /// [`build`](Self::build) operation and the node execution (i.e. when the node is spawned + /// through [`path`](Self::path)). + /// + /// Supports strings, numbers, and booleans. + /// + /// ## Example + /// + /// ```yaml + /// nodes: + /// - id: example-node + /// path: path/to/node + /// env: + /// DEBUG: true + /// PORT: 8080 + /// API_KEY: "secret-key" + /// ``` + pub env: Option>, + + /// Output data identifiers produced by this node. + /// + /// List of output identifiers that the node sends. + /// Must contain all `output_id` values that the node uses when sending output, e.g. through the + /// [`send_output`](https://docs.rs/dora-node-api/latest/dora_node_api/struct.DoraNode.html#method.send_output) + /// function. + /// + /// ## Example + /// + /// ```yaml + /// nodes: + /// - id: example-node + /// outputs: + /// - processed_image + /// - metadata + /// ``` + #[serde(default)] + pub outputs: BTreeSet, + + /// Input data connections from other nodes. + /// + /// Defines the inputs that this node is subscribing to. + /// + /// The `inputs` field should be a key-value map of the following format: + /// + /// `input_id: source_node_id/source_node_output_id` + /// + /// The components are defined as follows: + /// + /// - `input_id` is the local identifier that should be used for this input. + /// + /// This will map to the `id` field of + /// [`Event::Input`](https://docs.rs/dora-node-api/latest/dora_node_api/enum.Event.html#variant.Input) + /// events sent to the node event loop. + /// - `source_node_id` should be the `id` field of the node that sends the output that we want + /// to subscribe to + /// - `source_node_output_id` should be the identifier of the output that that we want + /// to subscribe to + /// + /// ## Example + /// + /// ```yaml + /// nodes: + /// - id: example-node + /// outputs: + /// - one + /// - two + /// - id: receiver + /// inputs: + /// my_input: example-node/two + /// ``` + #[serde(default)] + pub inputs: BTreeMap, + + /// Redirect stdout/stderr to a data output. + /// + /// This field can be used to send all stdout and stderr output of the node as a Dora output. + /// Each output line is sent as a separate message. + /// + /// + /// ## Example + /// + /// ```yaml + /// nodes: + /// - id: example + /// send_stdout_as: stdout_output + /// - id: logger + /// inputs: + /// example_output: example/stdout_output + /// ``` + #[serde(skip_serializing_if = "Option::is_none")] + pub send_stdout_as: Option, + + pub events: Vec, +} + +#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct TimedInputEvent { + pub time_offset_secs: f64, + #[serde(flatten)] + pub event: InputEvent, +} + +#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] +#[serde(tag = "type")] +pub enum InputEvent { + Stop, + Input { + id: DataId, + metadata: Option, + data: Option, + }, + InputClosed { + id: DataId, + }, + AllInputsClosed, +} + +#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] +#[serde(untagged)] +pub enum InputData { + /// Converts the given JSON object to the closest Arrow representation. + /// + /// No schema is required, but not all arrow types are representable. + JsonObject { + value: serde_json::Value, + schema: Option, + }, + /// Use [Arrow file format](https://arrow.apache.org/docs/python/ipc.html#writing-and-reading-random-access-files) + ArrowFile { + path: PathBuf, + #[serde(default)] + batch_index: usize, + column: Option, + }, +} diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index dfaba4b9d..fa602adf2 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -22,6 +22,8 @@ pub mod node_to_daemon; pub mod cli_to_coordinator; pub mod coordinator_to_cli; +pub mod integration_testing; + pub use arrow_data; pub use arrow_schema; use uuid::{Timestamp, Uuid}; diff --git a/libraries/message/src/node_to_daemon.rs b/libraries/message/src/node_to_daemon.rs index d8d22e7fb..603634a3e 100644 --- a/libraries/message/src/node_to_daemon.rs +++ b/libraries/message/src/node_to_daemon.rs @@ -8,7 +8,7 @@ use crate::{ versions_compatible, }; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum DaemonRequest { Register(NodeRegisterRequest), Subscribe, @@ -71,7 +71,7 @@ impl DaemonRequest { } } -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct NodeRegisterRequest { pub dataflow_id: DataflowId, pub node_id: NodeId,