Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pyo3 = { version = "0.23", features = [
pythonize = "0.23"
git2 = { version = "0.18.0", features = ["vendored-openssl"] }
serde_yaml = "0.9.33"
serde_json = "1.0.145"

[package]
name = "dora-examples"
Expand Down
2 changes: 1 addition & 1 deletion apis/rust/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ futures-concurrency = "7.3.0"
futures-timer = "3.0.2"
dora-arrow-convert = { workspace = true }
aligned-vec = "0.5.0"
serde_json = "1.0.86"
serde_json = { version = "1.0.86", features = ["preserve_order"] }
tokio = { version = "1.24.2", features = ["rt", "rt-multi-thread"] }
inquire = { version = "0.7.5", default-features = false, features = [
"console",
Expand Down
283 changes: 283 additions & 0 deletions apis/rust/node/src/daemon_connection/integration_testing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
use std::{
fs::File,
io::Write as _,
path::PathBuf,
time::{Duration, Instant},
};

use arrow::array::{Array, RecordBatch, StructArray};
use colored::Colorize;
use dora_core::{
metadata::ArrowTypeInfoExt,
uhlc::{self, HLC, NTP64, Timestamp},
};
use dora_message::{
common::{DataMessage, Timestamped},
daemon_to_node::{DaemonReply, NodeEvent},
integration_testing::{InputData, InputEvent, IntegrationTestInput, TimedInputEvent},
metadata::{ArrowTypeInfo, Metadata},
node_to_daemon::DaemonRequest,
};
use eyre::{Context, ContextCompat};

use crate::{
arrow_utils::{copy_array_into_sample, required_data_size},
daemon_connection::json_to_arrow::read_json_value_as_arrow,
event_stream::data_to_arrow_array,
};

pub struct IntegrationTestingEvents {
events: std::vec::IntoIter<TimedInputEvent>,
output_file: File,
start_timestamp: uhlc::Timestamp,
start_time: Instant,
skip_output_time_offsets: bool,
}

impl IntegrationTestingEvents {
pub fn new(
input_file_path: PathBuf,
output_file_path: PathBuf,
skip_output_time_offsets: bool,
) -> eyre::Result<Self> {
let mut node_info: IntegrationTestInput = serde_json::from_slice(
&std::fs::read(&input_file_path)
.with_context(|| format!("failed to open {}", input_file_path.display()))?,
)
.with_context(|| format!("failed to deserialize {}", input_file_path.display()))?;
let output_file = File::create(&output_file_path)
.with_context(|| format!("failed to create {}", output_file_path.display()))?;

node_info
.events
.as_mut_slice()
.sort_by(|a, b| a.time_offset_secs.total_cmp(&b.time_offset_secs));
let inputs = std::mem::take(&mut node_info.events).into_iter();

let clock = HLC::default();
let start_timestamp = clock.new_timestamp();
let start_time = Instant::now();
Ok(Self {
events: inputs,
output_file,
start_timestamp,
start_time,
skip_output_time_offsets,
})
}

pub fn request(&mut self, request: &Timestamped<DaemonRequest>) -> eyre::Result<DaemonReply> {
let reply = match &request.inner {
DaemonRequest::Register(_) => DaemonReply::Result(Ok(())),
DaemonRequest::Subscribe => DaemonReply::Result(Ok(())),
DaemonRequest::SubscribeDrop => DaemonReply::Result(Ok(())),
DaemonRequest::NextEvent { .. } => {
let events = if let Some(event) = self.next_event()? {
vec![event]
} else {
vec![]
};
DaemonReply::NextEvents(events)
}
DaemonRequest::SendMessage {
output_id,
metadata,
data,
} => {
let mut output = serde_json::Map::new();
output.insert("id".into(), output_id.to_string().into());

if !self.skip_output_time_offsets {
let time_offset = metadata
.timestamp()
.get_diff_duration(&self.start_timestamp);
output.insert("time_offset_secs".into(), time_offset.as_secs_f64().into());
}

if data.is_some() {
let (drop_tx, drop_rx) = flume::unbounded();
let data_array = data_to_arrow_array(data.clone(), metadata, drop_tx)
.context("failed to convert output to arrow array")?;
// integration testing doesn't use shared memory -> no drop tokens
let _ = drop_rx;

let data_type_json = serde_json::to_value(data_array.data_type())
.context("failed to serialize data type as JSON")?;

let batch = RecordBatch::try_from_iter([("inner", data_array)])
.context("failed to create RecordBatch")?;

let mut writer = arrow_json::ArrayWriter::new(Vec::new());
writer
.write(&batch)
.context("failed to encode data as JSON")?;
writer
.finish()
.context("failed to finish writing JSON data")?;
let json_data_encoded = writer.into_inner();

// Reparse the string using serde_json
let json_data: Vec<serde_json::Map<String, serde_json::Value>> =
serde_json::from_reader(json_data_encoded.as_slice())
.context("failed to parse JSON data again")?;
// remove `inner` field again
let json_data_flattened: Vec<_> = json_data
.into_iter()
.map(|mut m| m.remove("inner"))
.collect();
output.insert("data".into(), json_data_flattened.into());
output.insert("type".into(), data_type_json);
}

serde_json::to_writer(&mut self.output_file, &output)
.context("failed to write output as JSON")?;
writeln!(&mut self.output_file)
.context("failed to write newline to output file")?;

DaemonReply::Empty
}
DaemonRequest::CloseOutputs(data_ids) => {
println!("{} {data_ids:?}", "node reports closed outputs".blue());
DaemonReply::Result(Ok(()))
}
DaemonRequest::OutputsDone => {
println!("{}", "node reports OutputsDone".blue());
DaemonReply::Result(Ok(()))
}
DaemonRequest::ReportDropTokens { drop_tokens } => {
println!("{} {drop_tokens:?}", "node reports drop tokens".blue());
DaemonReply::Empty
}
DaemonRequest::NextFinishedDropTokens => {
// interactive nodes don't use shared memory -> no drop tokens
DaemonReply::NextDropEvents(vec![])
}
DaemonRequest::EventStreamDropped => {
println!("{}", "node reports EventStreamDropped".blue());
DaemonReply::Result(Ok(()))
}
DaemonRequest::NodeConfig { .. } => {
eyre::bail!("unexpected NodeConfig in interactive mode")
}
};
Ok(reply)
}

fn next_event(&mut self) -> eyre::Result<Option<Timestamped<NodeEvent>>> {
let Some(event) = self.events.next() else {
return Ok(None);
};

let TimedInputEvent {
time_offset_secs,
event,
} = event;
let time_offset = Duration::from_secs_f64(time_offset_secs);
let elapsed = self.start_time.elapsed();
if let Some(wait_time) = time_offset.checked_sub(elapsed) {
std::thread::sleep(wait_time);
}

let timestamp = Timestamp::new(
self.start_timestamp.get_time() + NTP64::from(time_offset),
*self.start_timestamp.get_id(),
);

let converted = match event {
InputEvent::Stop => NodeEvent::Stop,
InputEvent::Input { id, metadata, data } => {
let (data, type_info) = if let Some(data) = data {
let array = match data {
InputData::JsonObject { value, schema } => {
// input is JSON data
let array = json_value_to_list(value);
let schema = match schema {
Some(schema) => schema,
None => arrow_json::reader::infer_json_schema_from_iterator(
array.iter().map(Ok),
)?,
};
read_json_value_as_arrow(&array, schema)
.context("failed to read data")?
}
InputData::ArrowFile {
path,
batch_index,
column,
} => {
let file = std::fs::File::open(&path).with_context(|| {
format!("failed to open arrow file {}", path.display())
})?;
let mut reader = arrow::ipc::reader::FileReader::try_new(file, None)
.context("failed to create arrow file reader")?;
reader.set_index(batch_index).with_context(|| {
format!(
"failed to seek to batch index {} in arrow file {}",
batch_index,
path.display()
)
})?;
let batch = reader
.next()
.context("no batch at given index")?
.context("failed to read batch from arrow file")?;
match column {
Some(name) => batch.column_by_name(&name).with_context(|| {
format!(
"failed to find column '{}' in batch at index {} of arrow file {}",
name,
batch_index,
path.display()
)
})?.to_data(),
None => StructArray::from(batch).to_data()
}
}
};

let total_len = required_data_size(&array);
let mut buf = vec![0; total_len];
let type_info = copy_array_into_sample(buf.as_mut_slice(), &array);

(Some(buf), type_info)
} else {
(None, ArrowTypeInfo::empty())
};
let mut meta = Metadata::new(timestamp, type_info);
meta.parameters = metadata.unwrap_or_default();
NodeEvent::Input {
id,
metadata: meta,
data: data.map(|d| DataMessage::Vec(aligned_vec::AVec::from_slice(1, &d))),
}
}
InputEvent::InputClosed { id } => NodeEvent::InputClosed { id },
InputEvent::AllInputsClosed => NodeEvent::AllInputsClosed,
};
Ok(Some(Timestamped {
inner: converted,
timestamp,
}))
}
}

fn json_value_to_list(value: serde_json::Value) -> Vec<serde_json::Value> {
match value {
value @ serde_json::Value::Object(_) => {
vec![value]
}
serde_json::Value::Array(inner) => inner.into_iter().map(wrap_value_into_object).collect(),
_ => {
// wrap into object to allow bare values
let object = wrap_value_into_object(value);
vec![object]
}
}
}

fn wrap_value_into_object(value: serde_json::Value) -> serde_json::Value {
let mut map = serde_json::Map::new();
map.insert("inner".into(), value);

serde_json::Value::Object(map)
}
50 changes: 3 additions & 47 deletions apis/rust/node/src/daemon_connection/interactive.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
use std::{
io::{BufRead, Read, stdout},
sync::Arc,
time::Duration,
};
use std::{io::stdout, time::Duration};

use arrow::array::{Array, ArrayData};
use colored::Colorize;
use dora_core::{metadata::ArrowTypeInfoExt, uhlc::HLC};
use dora_message::{
Expand All @@ -13,10 +8,10 @@ use dora_message::{
metadata::{ArrowTypeInfo, Metadata},
node_to_daemon::DaemonRequest,
};
use eyre::{Context, ContextCompat};

use crate::{
arrow_utils::{copy_array_into_sample, required_data_size},
daemon_connection::json_to_arrow::read_json_bytes_as_arrow,
event_stream::data_to_arrow_array,
};

Expand Down Expand Up @@ -123,7 +118,7 @@ impl InteractiveEvents {
std::mem::drop(stdout_lock);
let typed_data = if let Some(data) = data {
// input is JSON data
let array = match read_json_as_arrow(data.as_bytes()) {
let array = match read_json_bytes_as_arrow(data.as_bytes()) {
Ok(array) => array,
Err(err) => {
eprintln!("{}", format!("{err}").red());
Expand Down Expand Up @@ -151,42 +146,3 @@ impl InteractiveEvents {
Ok(Some(event))
}
}

fn read_json_as_arrow(data: &[u8]) -> eyre::Result<ArrayData> {
match arrow_json::reader::infer_json_schema(wrapped(data), None) {
Ok((schema, _)) => read_from_json_with_schema(wrapped(data), schema),
Err(_) => {
// try again with quoting the input to treat it as a string
match arrow_json::reader::infer_json_schema(wrapped_quoted(data), None) {
Ok((schema, _)) => read_from_json_with_schema(wrapped_quoted(data), schema),
Err(err) => eyre::bail!("failed to infer JSON schema: {err}"),
}
}
}
}

fn read_from_json_with_schema(
data: impl BufRead,
schema: arrow_schema::Schema,
) -> eyre::Result<ArrayData> {
let mut reader = arrow_json::reader::ReaderBuilder::new(Arc::new(schema))
.build(data)
.context("failed to build JSON reader")?;
let batch = reader
.next()
.context("no record batch in JSON")?
.context("failed to read record batch")?;

Ok(batch.column(0).to_data())
}

// wrap data into JSON object to also allow bare JSON values
fn wrapped(data: impl BufRead) -> impl BufRead {
"{ \"inner\":".as_bytes().chain(data).chain("}".as_bytes())
}

// wrap data into JSON object to also allow bare JSON values
fn wrapped_quoted(data: impl BufRead) -> impl BufRead {
let quoted = [b'"'].chain(data).chain([b'"'].as_slice());
wrapped(quoted)
}
Loading
Loading