diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index ea34b3b47..59c9dbc15 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -289,7 +289,7 @@ pub fn metadata_to_pydict<'a>( #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{ptr::NonNull, sync::Arc}; use aligned_vec::{AVec, ConstAlign}; use arrow::{ @@ -301,9 +301,8 @@ mod tests { }; use arrow_schema::{DataType, Field}; - use dora_node_api::{ - arrow_utils::{copy_array_into_sample, required_data_size}, - RawData, + use dora_node_api::arrow_utils::{ + buffer_into_arrow_array, copy_array_into_sample, required_data_size, }; use eyre::{Context, Result}; @@ -313,9 +312,16 @@ mod tests { let info = copy_array_into_sample(&mut sample, arrow_array); - let serialized_deserialized_arrow_array = RawData::Vec(sample) - .into_arrow_array(&info) - .context("Could not create arrow array")?; + let serialized_deserialized_arrow_array = { + let ptr = NonNull::new(sample.as_ptr() as *mut _).unwrap(); + let len = sample.len(); + + let raw_buffer = unsafe { + arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(sample)) + }; + buffer_into_arrow_array(&raw_buffer, &info)? + }; + assert_eq!(arrow_array, &serialized_deserialized_arrow_array); Ok(()) diff --git a/apis/rust/node/src/event_stream/data_conversion.rs b/apis/rust/node/src/event_stream/data_conversion.rs new file mode 100644 index 000000000..acc29bb19 --- /dev/null +++ b/apis/rust/node/src/event_stream/data_conversion.rs @@ -0,0 +1,80 @@ +use std::{ptr::NonNull, sync::Arc}; + +use aligned_vec::{AVec, ConstAlign}; +use dora_arrow_convert::IntoArrow; +use dora_message::metadata::ArrowTypeInfo; +use eyre::Context; +use shared_memory_server::{Shmem, ShmemConf}; + +use crate::arrow_utils::buffer_into_arrow_array; + +pub enum RawData { + Empty, + Vec(AVec>), + SharedMemory(SharedMemoryData), +} + +impl RawData { + pub fn into_arrow_array( + self, + type_info: &ArrowTypeInfo, + ) -> eyre::Result { + let raw_buffer = match self { + RawData::Empty => return Ok(().into_arrow().into()), + RawData::Vec(data) => { + let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap(); + let len = data.len(); + + unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) } + } + RawData::SharedMemory(data) => { + let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap(); + let len = data.data.len(); + + unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) } + } + }; + + buffer_into_arrow_array(&raw_buffer, type_info) + } +} + +impl std::fmt::Debug for RawData { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Data").finish_non_exhaustive() + } +} + +pub struct SharedMemoryData { + pub data: MappedInputData, + pub _drop: flume::Sender<()>, +} + +pub struct MappedInputData { + memory: Box, + len: usize, +} + +impl MappedInputData { + pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result { + let memory = Box::new( + ShmemConf::new() + .os_id(shared_memory_id) + .writable(false) + .open() + .wrap_err("failed to map shared memory input")?, + ); + Ok(MappedInputData { memory, len }) + } +} + +impl std::ops::Deref for MappedInputData { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + unsafe { &self.memory.as_slice()[..self.len] } + } +} + +unsafe impl Send for MappedInputData {} +unsafe impl Sync for MappedInputData {} diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index 22997f4b8..72dfe1187 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -1,133 +1,90 @@ -use std::{ptr::NonNull, sync::Arc}; - -use aligned_vec::{AVec, ConstAlign}; -use dora_arrow_convert::{ArrowData, IntoArrow}; +use dora_arrow_convert::ArrowData; use dora_core::config::{DataId, OperatorId}; -use dora_message::metadata::{ArrowTypeInfo, BufferOffset, Metadata}; -use eyre::{Context, Result}; -use shared_memory_extended::{Shmem, ShmemConf}; - +use dora_message::metadata::Metadata; + +/// Represents an incoming Dora event. +/// +/// Events might be triggered by other nodes, by Dora itself, or by some external user input. +/// +/// It's safe to ignore event types that are not relevant to the node. +/// +/// This enum is marked as `non_exhaustive` because we might add additional +/// variants in the future. Please ignore unknown event types instead of throwing an +/// error to avoid breakage when updating Dora. #[derive(Debug)] #[non_exhaustive] pub enum Event { - Stop(StopCause), - Reload { - operator_id: Option, - }, + /// An input was received from another node. + /// + /// This event corresponds to one of the `inputs` of the node as specified + /// in the dataflow YAML file. Input { + /// The input ID, as specified in the YAML file. + /// + /// Note that this is not the output ID of the sender, but the ID + /// assigned to the input in the YAML file. id: DataId, + /// Meta information about this input, e.g. the timestamp. metadata: Metadata, + /// The actual data in the Apache Arrow data format. data: ArrowData, }, + /// An input was closed by the sender. + /// + /// The sending node mapped to an input exited, so this input will receive + /// no more data. InputClosed { + /// The ID of the input that was closed, as specified in the YAML file. + /// + /// Note that this is not the output ID of the sender, but the ID + /// assigned to the input in the YAML file. id: DataId, }, + /// Notification that the event stream is about to close. + /// + /// The [`StopCause`] field contains the reason for the event stream closure. + /// + /// Typically, nodes should exit once the event stream closes. One notable + /// exception are nodes with no inputs, which will receive aa + /// `Event::Stop(StopCause::AllInputsClosed)` right at startup. Source nodes + /// might want to keep producing outputs still. (There is currently an open + /// discussion of changing this behavior and not sending `AllInputsClosed` + /// to nodes without inputs.) + /// + /// Note: Stop events with `StopCause::Manual` indicate a manual stop operation + /// issued through `dora stop` or a `ctrl-c`. Nodes **must exit** once receiving + /// such a stop event, otherwise they will be killed by Dora. + Stop(StopCause), + /// Instructs the node to reload itself or one of its operators. + /// + /// This event is currently only used for reloading Python operators that are + /// started by a `dora runtime` process. So this event should not be sent to normal + /// nodes yet. + Reload { + /// The ID of the operator that should be reloaded. + /// + /// There is currently no case where `operator_id` is `None`. + operator_id: Option, + }, + /// Notifies the node about an unexpected error that happened inside Dora. + /// + /// It's a good idea to output or log this error for debugging. Error(String), } +/// The reason for closing the event stream. +/// +/// This enum is marked as `non_exhaustive` because we might add additional +/// variants in the future. #[derive(Debug, Clone)] #[non_exhaustive] pub enum StopCause { + /// The dataflow is stopped early after a `dora stop` command (or on `ctrl-c`). + /// + /// Nodes should exit as soon as possible if they receive a stop event of + /// this type. Dora will kill nodes that keep running for too long after + /// receiving such a stop event. Manual, + /// The event stream is closed because all of the node's inputs were closed. AllInputsClosed, } - -pub enum RawData { - Empty, - Vec(AVec>), - SharedMemory(SharedMemoryData), -} - -impl RawData { - pub fn into_arrow_array(self, type_info: &ArrowTypeInfo) -> Result { - let raw_buffer = match self { - RawData::Empty => return Ok(().into_arrow().into()), - RawData::Vec(data) => { - let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap(); - let len = data.len(); - - unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) } - } - RawData::SharedMemory(data) => { - let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap(); - let len = data.data.len(); - - unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) } - } - }; - - buffer_into_arrow_array(&raw_buffer, type_info) - } -} - -pub struct SharedMemoryData { - pub data: MappedInputData, - pub _drop: flume::Sender<()>, -} - -fn buffer_into_arrow_array( - raw_buffer: &arrow::buffer::Buffer, - type_info: &ArrowTypeInfo, -) -> eyre::Result { - if raw_buffer.is_empty() { - return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type)); - } - - let mut buffers = Vec::new(); - for BufferOffset { offset, len } in &type_info.buffer_offsets { - buffers.push(raw_buffer.slice_with_length(*offset, *len)); - } - - let mut child_data = Vec::new(); - for child_type_info in &type_info.child_data { - child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?) - } - - arrow::array::ArrayData::try_new( - type_info.data_type.clone(), - type_info.len, - type_info - .validity - .clone() - .map(arrow::buffer::Buffer::from_vec), - type_info.offset, - buffers, - child_data, - ) - .context("Error creating Arrow array") -} - -impl std::fmt::Debug for RawData { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Data").finish_non_exhaustive() - } -} - -pub struct MappedInputData { - memory: Box, - len: usize, -} - -impl MappedInputData { - pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result { - let memory = Box::new( - ShmemConf::new() - .os_id(shared_memory_id) - .writable(false) - .open() - .wrap_err("failed to map shared memory input")?, - ); - Ok(MappedInputData { memory, len }) - } -} - -impl std::ops::Deref for MappedInputData { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - unsafe { &self.memory.as_slice()[..self.len] } - } -} - -unsafe impl Send for MappedInputData {} -unsafe impl Sync for MappedInputData {} diff --git a/apis/rust/node/src/event_stream/merged.rs b/apis/rust/node/src/event_stream/merged.rs index 43c456bef..adeeafada 100644 --- a/apis/rust/node/src/event_stream/merged.rs +++ b/apis/rust/node/src/event_stream/merged.rs @@ -1,18 +1,32 @@ +//! Merge external stream into an [`EventStream`][super::EventStream]. +//! +//! Sometimes nodes need to listen to external events, in addition to Dora events. +//! This module provides support for that by providing the [`MergeExternal`] trait. + use futures::{Stream, StreamExt}; use futures_concurrency::stream::Merge; +/// A Dora event or an event from an external source. #[derive(Debug)] pub enum MergedEvent { + /// A Dora event Dora(super::Event), + /// An external event + /// + /// Yielded by the stream that was merged into the Dora [`EventStream`][super::EventStream]. External(E), } +/// A general enum to represent a value of two possible types. pub enum Either { + /// Value is of the first type, type `A`. First(A), + /// Value is of the second type, type `B`. Second(B), } impl Either { + /// Unwraps an `Either` instance where both types are identical. pub fn flatten(self) -> A { match self { Either::First(a) => a, @@ -21,19 +35,33 @@ impl Either { } } +/// Allows merging an external event stream into an existing event stream. // TODO: use impl trait return type once stable pub trait MergeExternal<'a, E> { + /// The item type yielded from the merged stream. type Item; + /// Merge the given stream into an existing event stream. + /// + /// Returns a new event stream that yields items from both streams. + /// The ordering between the two streams is not guaranteed. fn merge_external( self, external_events: impl Stream + Unpin + 'a, ) -> Box + Unpin + 'a>; } +/// Allows merging a sendable external event stream into an existing (sendable) event stream. +/// +/// By implementing [`Send`], the streams can be sent to different threads. pub trait MergeExternalSend<'a, E> { + /// The item type yielded from the merged stream. type Item; + /// Merge the given stream into an existing event stream. + /// + /// Returns a new event stream that yields items from both streams. + /// The ordering between the two streams is not guaranteed. fn merge_external_send( self, external_events: impl Stream + Unpin + Send + Sync + 'a, diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 565f87132..c7a3aefcf 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -11,7 +11,7 @@ use dora_message::{ node_to_daemon::{DaemonRequest, Timestamped}, DataflowId, }; -pub use event::{Event, MappedInputData, RawData, StopCause}; +pub use event::{Event, StopCause}; use futures::{ future::{select, Either}, Stream, StreamExt, @@ -19,22 +19,44 @@ use futures::{ use futures_timer::Delay; use scheduler::{Scheduler, NON_INPUT_EVENT}; -use self::{ - event::SharedMemoryData, - thread::{EventItem, EventStreamThreadHandle}, +use self::thread::{EventItem, EventStreamThreadHandle}; +use crate::{ + daemon_connection::DaemonChannel, + event_stream::data_conversion::{MappedInputData, RawData, SharedMemoryData}, }; -use crate::daemon_connection::DaemonChannel; use dora_core::{ config::{Input, NodeId}, uhlc, }; use eyre::{eyre, Context}; +pub use scheduler::Scheduler as EventScheduler; + +mod data_conversion; mod event; pub mod merged; mod scheduler; mod thread; +/// Asynchronous iterator over the incoming [`Event`]s destined for this node. +/// +/// This struct [implements](#impl-Stream-for-EventStream) the [`Stream`] trait, +/// so you can use methods of the [`StreamExt`] trait +/// on this struct. A common pattern is `while let Some(event) = event_stream.next().await`. +/// +/// Nodes should iterate over this event stream and react to events that they are interested in. +/// Typically, the most important event type is [`Event::Input`]. +/// You don't need to handle all events, it's fine to ignore events that are not relevant to your node. +/// +/// The event stream will close itself after a [`Event::Stop`] was received. +/// A manual `break` on [`Event::Stop`] is typically not needed. +/// _(You probably do need to use a manual `break` on stop events when using the +/// [`StreamExt::merge`][`futures_concurrency::stream::StreamExt::merge`] implementation on +/// [`EventStream`] to combine the stream with an external one.)_ +/// +/// Once the event stream finished, nodes should exit. +/// Note that Dora kills nodes that don't exit quickly after a [`Event::Stop`] of type +/// [`StopCause::Manual`] was received. pub struct EventStream { node_id: NodeId, receiver: flume::r#async::RecvStream<'static, EventItem>, @@ -157,16 +179,61 @@ impl EventStream { }) } - /// wait for the next event on the events stream. + /// Synchronously waits for the next event. + /// + /// Blocks the thread until the next event arrives. + /// Returns [`None`] once the event stream is closed. + /// + /// For an asynchronous variant of this method see [`recv_async`][Self::recv_async]. + /// + /// ## Event Reordering + /// + /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the + /// events might be returned in a different order than they occurred. For details, check the + /// documentation of the [`EventScheduler`] struct. + /// + /// If you want to receive the events in their original chronological order, use the + /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the + /// [`Stream`] trait). pub fn recv(&mut self) -> Option { futures::executor::block_on(self.recv_async()) } - /// wait for the next event on the events stream until timeout + /// Receives the next incoming [`Event`] synchronously with a timeout. + /// + /// Blocks the thread until the next event arrives or the timeout is reached. + /// Returns a [`Event::Error`] if no event was received within the given duration. + /// + /// Returns [`None`] once the event stream is closed. + /// + /// For an asynchronous variant of this method see [`recv_async_timeout`][Self::recv_async_timeout]. + /// + /// ## Event Reordering + /// + /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the + /// events might be returned in a different order than they occurred. For details, check the + /// documentation of the [`EventScheduler`] struct. + /// + /// If you want to receive the events in their original chronological order, use the + /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the + /// [`Stream`] trait). pub fn recv_timeout(&mut self, dur: Duration) -> Option { futures::executor::block_on(self.recv_async_timeout(dur)) } + /// Receives the next incoming [`Event`] asynchronously, using an [`EventScheduler`] for fairness. + /// + /// Returns [`None`] once the event stream is closed. + /// + /// ## Event Reordering + /// + /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the + /// events might be returned in a different order than they occurred. For details, check the + /// documentation of the [`EventScheduler`] struct. + /// + /// If you want to receive the events in their original chronological order, use the + /// [`StreamExt::next`] method with a custom timeout future instead + /// ([`EventStream`] implements the [`Stream`] trait). pub async fn recv_async(&mut self) -> Option { loop { if self.scheduler.is_empty() { @@ -187,6 +254,21 @@ impl EventStream { event.map(Self::convert_event_item) } + /// Receives the next incoming [`Event`] asynchronously with a timeout. + /// + /// Returns a [`Event::Error`] if no event was received within the given duration. + /// + /// Returns [`None`] once the event stream is closed. + /// + /// ## Event Reordering + /// + /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the + /// events might be returned in a different order than they occurred. For details, check the + /// documentation of the [`EventScheduler`] struct. + /// + /// If you want to receive the events in their original chronological order, use the + /// [`StreamExt::next`] method with a custom timeout future instead + /// ([`EventStream`] implements the [`Stream`] trait). pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option { match select(Delay::new(dur), pin!(self.recv_async())).await { Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError( diff --git a/apis/rust/node/src/event_stream/scheduler.rs b/apis/rust/node/src/event_stream/scheduler.rs index defdc2098..6ef8c14f0 100644 --- a/apis/rust/node/src/event_stream/scheduler.rs +++ b/apis/rust/node/src/event_stream/scheduler.rs @@ -3,27 +3,89 @@ use std::collections::{HashMap, VecDeque}; use dora_message::{daemon_to_node::NodeEvent, id::DataId}; use super::thread::EventItem; -pub const NON_INPUT_EVENT: &str = "dora/non_input_event"; +pub(crate) const NON_INPUT_EVENT: &str = "dora/non_input_event"; -// This scheduler will make sure that there is fairness between -// inputs. -// -// It is going to always make sure that the input that has not been used for the longest period is the first one to be used next. -// -// Ex: -// In the case that one input has a very high frequency and another one with a very slow frequency.\ -// -// The Node will always alternate between the two inputs when each input is available -// Avoiding one input to be overwhelmingly present. -// +/// This scheduler will make sure that there is fairness between inputs. +/// +/// The scheduler reorders events in the following way: +/// +/// - **Non-input events are prioritized** +/// +/// If the node received any events that are not input events, they are returned first. The +/// intention of this reordering is that the nodes can react quickly to dataflow-related events +/// even when their input queues are very full. +/// +/// This reordering has some side effects that might be unexpected: +/// - An [`InputClosed`][super::Event::InputClosed] event might be yielded before the last +/// input events of that ID. +/// +/// Usually, an `InputClosed` event indicates that there won't be any subsequent inputs +/// of a certain ID. This invariant does not hold anymore for a scheduled event stream. +/// - The [`Stop`][super::Event::Stop] event might not be the last event of the stream anymore. +/// +/// Usually, the `Stop` event is the last event that is sent to a node before the event stream +/// is closed. Because of the reordering, the stream might return more events after a `Stop` +/// event. +/// - **Input events are grouped by ID** and yielded in a **least-recently used order (by ID)**. +/// +/// The scheduler keeps a separate queue for each input ID, where the incoming input events are +/// placed in their chronological order. When yielding the next event, the scheduler iterates over +/// these queues in least-recently used order. This means that the queue corresponding to the +/// last yielded event will be checked last. The scheduler will return the oldest event from the +/// first non-empty queue. +/// +/// The side effect of this change is that inputs events of different IDs are no longer in their +/// chronological order. This might lead to unexpected results for input events that are caused by +/// each other. +/// +/// ## Example 1 +/// Consider the case that one input has a very high frequency and another one with a very slow +/// frequency. The event stream will always alternate between the two inputs when each input is +/// available. +/// Without the scheduling, the high-frequency input would be returned much more often. +/// +/// ## Example 2 +/// Again, let's consider the case that one input has a very high frequency and the other has a +/// very slow frequency. This time, we define a small maximum queue sizes for the low-frequency +/// input, but a large queue size for the high-frequency one. +/// Using the scheduler, the event stream will always alternate between high and low-frequency +/// inputs as long as inputs of both types are available. +/// +/// Without scheduling, the low-frequency input might never be yielded before +/// it's dropped because there is almost always an older high-frequency input available that is +/// yielded first. Once the low-frequency input would be the next one chronologically, it might +/// have been dropped already because the node received newer low-frequency inputs in the +/// meantime (the queue length is small). At this point, the next-oldest input is a high-frequency +/// input again. +/// +/// ## Example 3 +/// Consider a high-frequency camera input and a low-frequency bounding box input, which is based +/// on the latest camera image. The dataflow YAML file specifies a large queue size for the camera +/// input and a small queue size for the bounding box input. +/// +/// With scheduling, the number of +/// buffered camera inputs might grow over time. As a result the camera inputs yielded from the +/// stream (in oldest-first order) are not synchronized with the bounding box inputs anymore. So +/// the node receives an up-to-date bounding box, but a considerably outdated image. +/// +/// Without scheduling, the events are returned in chronological order. This time, the bounding +/// box might be slightly outdated if the camera sent new images before the bounding box was +/// ready. However, the time difference between the two input types is independent of the +/// queue size this time. +/// +/// (If a perfect matching bounding box is required, we recommend to forward the input image as +/// part of the bounding box output. This way, the receiving node only needs to subscribe to one +/// input so no mismatches can happen.) #[derive(Debug)] pub struct Scheduler { - last_used: VecDeque, // Tracks the last-used event ID - event_queues: HashMap)>, // Tracks events per ID + /// Tracks the last-used event ID + last_used: VecDeque, + /// Tracks events per ID + event_queues: HashMap)>, } impl Scheduler { - pub fn new(event_queues: HashMap)>) -> Self { + pub(crate) fn new(event_queues: HashMap)>) -> Self { let topic = VecDeque::from_iter( event_queues .keys() @@ -36,7 +98,7 @@ impl Scheduler { } } - pub fn add_event(&mut self, event: EventItem) { + pub(crate) fn add_event(&mut self, event: EventItem) { let event_id = match &event { EventItem::NodeEvent { event: @@ -63,7 +125,7 @@ impl Scheduler { } } - pub fn next(&mut self) -> Option { + pub(crate) fn next(&mut self) -> Option { // Retrieve message from the non input event first that have priority over input message. if let Some((_size, queue)) = self .event_queues @@ -89,7 +151,7 @@ impl Scheduler { None } - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { self.event_queues .iter() .all(|(_id, (_size, queue))| queue.is_empty()) diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index e1b17b6fd..90f266217 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,18 +1,85 @@ -//! The custom node API allow you to integrate `dora` into your application. -//! It allows you to retrieve input and send output in any fashion you want. +//! This crate enables you to create nodes for the [Dora] dataflow framework. //! -//! Try it out with: +//! [Dora]: https://dora-rs.ai/ //! -//! ```bash -//! dora new node --kind node -//! ``` +//! ## The Dora Framework +//! +//! Dora is a dataflow frame work that models applications as a directed graph, with nodes +//! representing operations and edges representing data transfer. +//! The layout of the dataflow graph is defined through a YAML file in Dora. +//! For details, see our [Dataflow Specification](https://dora-rs.ai/docs/api/dataflow-config/) +//! chapter. +//! +//! Dora nodes are typically spawned by the Dora framework, instead of spawning them manually. +//! If you want to spawn a node manually, define it as a [_dynamic_ node](#dynamic-nodes). //! -//! You can also generate a dora rust project with +//! ## Normal Usage //! -//! ```bash -//! dora new project_xyz --kind dataflow +//! In order to connect your executable to Dora, you need to initialize a [`DoraNode`]. +//! For standard nodes, the recommended initialization function is [`init_from_env`][`DoraNode::init_from_env`]. +//! This function will return two values, a [`DoraNode`] instance and an [`EventStream`]: +//! +//! ```no_run +//! use dora_node_api::DoraNode; +//! +//! let (mut node, mut events) = DoraNode::init_from_env()?; +//! # Ok::<(), eyre::Report>(()) //! ``` //! +//! You can use the `node` instance to send outputs and retrieve information about the node and +//! the dataflow. The `events` stream yields the inputs that the node defines in the dataflow +//! YAML file and other incoming events. +//! +//! ### Sending Outputs +//! +//! The [`DoraNode`] instance enables you to send outputs in different formats. +//! For best performance, use the [Arrow](https://arrow.apache.org/docs/index.html) data format +//! and one of the output functions that utilizes shared memory. +//! +//! ### Receiving Events +//! +//! The [`EventStream`] is an [`AsyncIterator`][std::async_iter::AsyncIterator] that yields the incoming [`Event`]s. +//! +//! Nodes should iterate over this event stream and react to events that they are interested in. +//! Typically, the most important event type is [`Event::Input`]. +//! You don't need to handle all events, it's fine to ignore events that are not relevant to your node. +//! +//! The event stream will close itself after a [`Event::Stop`] was received. +//! A manual `break` on [`Event::Stop`] is typically not needed. +//! _(You probably do need to use a manual `break` on stop events when using the +//! [`StreamExt::merge`][`futures_concurrency::stream::StreamExt::merge`] implementation on +//! [`EventStream`] to combine the stream with an external one.)_ +//! +//! Once the event stream finished, nodes should exit. +//! Note that Dora kills nodes that don't exit quickly after a [`Event::Stop`] of type +//! [`StopCause::Manual`] was received. +//! +//! +//! +//! ## Dynamic Nodes +//! +//!
+//! +//! Dynamic nodes have certain [limitations](#limitations). Use with care. +//! +//!
+//! +//! Nodes can be defined as `dynamic` by setting their `path` attribute to `dynamic` in the +//! dataflow YAML file. Dynamic nodes are not spawned by the Dora framework and need to be started +//! manually. +//! +//! Dynamic nodes cannot use the [`DoraNode::init_from_env`] function for initialization. +//! Instead, they can be initialized through the [`DoraNode::init_from_node_id`] function. +//! +//! ### Limitations +//! +//! - Dynamic nodes **don't work with `dora run`**. +//! - As dynamic nodes are identified by their node ID, this **ID must be unique** +//! across all running dataflows. +//! - For distributed dataflows, nodes need to be manually spawned on the correct machine. + +#![warn(missing_docs)] + pub use arrow; pub use dora_arrow_convert::*; pub use dora_core::{self, uhlc}; @@ -20,8 +87,9 @@ pub use dora_message::{ metadata::{Metadata, MetadataParameters, Parameter}, DataflowId, }; -pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData, StopCause}; +pub use event_stream::{merged, Event, EventScheduler, EventStream, StopCause}; pub use flume::Receiver; +pub use futures; pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; mod daemon_connection; diff --git a/apis/rust/node/src/node/arrow_utils.rs b/apis/rust/node/src/node/arrow_utils.rs index 5a21337a0..9a75e9f73 100644 --- a/apis/rust/node/src/node/arrow_utils.rs +++ b/apis/rust/node/src/node/arrow_utils.rs @@ -1,6 +1,11 @@ +//! Utility functions for converting Arrow arrays to/from raw data. +//! use arrow::array::{ArrayData, BufferSpec}; use dora_message::metadata::{ArrowTypeInfo, BufferOffset}; +use eyre::Context; +/// Calculates the data size in bytes required for storing a continuous copy of the given Arrow +/// array. pub fn required_data_size(array: &ArrayData) -> usize { let mut next_offset = 0; required_data_size_inner(array, &mut next_offset); @@ -10,8 +15,8 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) { let layout = arrow::array::layout(array.data_type()); for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) { // consider alignment padding - if let BufferSpec::FixedWidth { alignment, .. } = spec { - *next_offset = (*next_offset).div_ceil(*alignment) * alignment; + if let BufferSpec::FixedWidth { alignment, .. } = *spec { + *next_offset = (*next_offset).div_ceil(alignment) * alignment; } *next_offset += buffer.len(); } @@ -20,6 +25,12 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) { } } +/// Copy the given Arrow array into the provided buffer. +/// +/// If the Arrow array consists of multiple buffers, they are placed continuously in the target +/// buffer (there might be some padding for alignment) +/// +/// Panics if the buffer is not large enough. pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo { let mut next_offset = 0; copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) @@ -41,8 +52,8 @@ fn copy_array_into_sample_inner( *next_offset, ); // add alignment padding - if let BufferSpec::FixedWidth { alignment, .. } = spec { - *next_offset = (*next_offset).div_ceil(*alignment) * alignment; + if let BufferSpec::FixedWidth { alignment, .. } = *spec { + *next_offset = (*next_offset).div_ceil(alignment) * alignment; } target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice()); @@ -69,3 +80,38 @@ fn copy_array_into_sample_inner( child_data, } } + +/// Tries to convert the given raw Arrow buffer into an Arrow array. +/// +/// The `type_info` is required for decoding the `raw_buffer` correctly. +pub fn buffer_into_arrow_array( + raw_buffer: &arrow::buffer::Buffer, + type_info: &ArrowTypeInfo, +) -> eyre::Result { + if raw_buffer.is_empty() { + return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type)); + } + + let mut buffers = Vec::new(); + for BufferOffset { offset, len } in &type_info.buffer_offsets { + buffers.push(raw_buffer.slice_with_length(*offset, *len)); + } + + let mut child_data = Vec::new(); + for child_type_info in &type_info.child_data { + child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?) + } + + arrow::array::ArrayData::try_new( + type_info.data_type.clone(), + type_info.len, + type_info + .validity + .clone() + .map(arrow::buffer::Buffer::from_vec), + type_info.offset, + buffers, + child_data, + ) + .context("Error creating Arrow array") +} diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 0c986ad52..e6a9b4e96 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -42,6 +42,20 @@ pub mod arrow_utils; mod control_channel; mod drop_stream; +/// The data size threshold at which we start using shared memory. +/// +/// Shared memory works by sharing memory pages. This means that the smallest +/// memory region that can be shared is one memory page, which is typically +/// 4KiB. +/// +/// Using shared memory for messages smaller than the page size still requires +/// sharing a full page, so we have some memory overhead. We also have some +/// performance overhead because we need to issue multiple syscalls. For small +/// messages it is faster to send them over a traditional TCP stream (or similar). +/// +/// This hardcoded threshold value specifies which messages are sent through +/// shared memory. Messages that are smaller than this threshold are sent through +/// TCP. pub const ZERO_COPY_THRESHOLD: usize = 4096; #[allow(dead_code)] @@ -50,6 +64,10 @@ enum TokioRuntime { Handle(Handle), } +/// Allows sending outputs and retrieving node information. +/// +/// The main purpose of this struct is to send outputs via Dora. There are also functions available +/// for retrieving the node configuration. pub struct DoraNode { id: NodeId, dataflow_id: DataflowId, @@ -67,7 +85,11 @@ pub struct DoraNode { } impl DoraNode { - /// Initiate a node from environment variables set by `dora-coordinator` + /// Initiate a node from environment variables set by the Dora daemon. + /// + /// This is the recommended initialization function for Dora nodes, which are spawned by + /// Dora daemon instances. + /// /// /// ```no_run /// use dora_node_api::DoraNode; @@ -94,6 +116,8 @@ impl DoraNode { /// Initiate a node from a dataflow id and a node id. /// + /// This initialization function should be used for [_dynamic nodes_](index.html#dynamic-nodes). + /// /// ```no_run /// use dora_node_api::DoraNode; /// use dora_node_api::dora_core::config::NodeId; @@ -126,6 +150,11 @@ impl DoraNode { } } + /// Dynamic initialization function for nodes that are sometimes used as dynamic nodes. + /// + /// This function first tries initializing the traditional way through + /// [`init_from_env`][Self::init_from_env]. If this fails, it falls back to + /// [`init_from_node_id`][Self::init_from_node_id]. pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { if std::env::var("DORA_NODE_CONFIG").is_ok() { info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"); @@ -135,6 +164,8 @@ impl DoraNode { } } + /// Internal initialization routine that should not be used outside of Dora. + #[doc(hidden)] #[tracing::instrument] pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> { let NodeConfig { @@ -219,7 +250,8 @@ impl DoraNode { } } - /// Send data from the node to the other nodes. + /// Send raw data from the node to the other nodes. + /// /// We take a closure as an input to enable zero copy on send. /// /// ```no_run @@ -242,6 +274,8 @@ impl DoraNode { /// }).expect("Could not send output"); /// ``` /// + /// Ignores the output if the given `output_id` is not specified as node output in the dataflow + /// configuration file. pub fn send_output_raw( &mut self, output_id: DataId, @@ -263,6 +297,14 @@ impl DoraNode { self.send_output_sample(output_id, type_info, parameters, Some(sample)) } + /// Sends the give Arrow array as an output message. + /// + /// Uses shared memory for efficient data transfer if suitable. + /// + /// This method might copy the message once to move it to shared memory. + /// + /// Ignores the output if the given `output_id` is not specified as node output in the dataflow + /// configuration file. pub fn send_output( &mut self, output_id: DataId, @@ -286,6 +328,12 @@ impl DoraNode { Ok(()) } + /// Send the given raw byte data as output. + /// + /// Might copy the data once to move it into shared memory. + /// + /// Ignores the output if the given `output_id` is not specified as node output in the dataflow + /// configuration file. pub fn send_output_bytes( &mut self, output_id: DataId, @@ -301,6 +349,12 @@ impl DoraNode { }) } + /// Send the give raw byte data with the provided type information. + /// + /// It is recommended to use a function like [`send_output`][Self::send_output] instead. + /// + /// Ignores the output if the given `output_id` is not specified as node output in the dataflow + /// configuration file. pub fn send_typed_output( &mut self, output_id: DataId, @@ -322,6 +376,12 @@ impl DoraNode { self.send_output_sample(output_id, type_info, parameters, Some(sample)) } + /// Sends the given [`DataSample`] as output, combined with the given type information. + /// + /// It is recommended to use a function like [`send_output`][Self::send_output] instead. + /// + /// Ignores the output if the given `output_id` is not specified as node output in the dataflow + /// configuration file. pub fn send_output_sample( &mut self, output_id: DataId, @@ -350,32 +410,46 @@ impl DoraNode { Ok(()) } - pub fn close_outputs(&mut self, outputs: Vec) -> eyre::Result<()> { - for output_id in &outputs { + /// Report the given outputs IDs as closed. + /// + /// The node is not allowed to send more outputs with the closed IDs. + /// + /// Closing outputs early can be helpful to receivers. + pub fn close_outputs(&mut self, outputs_ids: Vec) -> eyre::Result<()> { + for output_id in &outputs_ids { if !self.node_config.outputs.remove(output_id) { eyre::bail!("unknown output {output_id}"); } } self.control_channel - .report_closed_outputs(outputs) + .report_closed_outputs(outputs_ids) .wrap_err("failed to report closed outputs to daemon")?; Ok(()) } + /// Returns the ID of the node as specified in the dataflow configuration file. pub fn id(&self) -> &NodeId { &self.id } + /// Returns the unique identifier for the running dataflow instance. + /// + /// Dora assigns each dataflow instance a random identifier when started. pub fn dataflow_id(&self) -> &DataflowId { &self.dataflow_id } + /// Returns the input and output configuration of this node. pub fn node_config(&self) -> &NodeRunConfig { &self.node_config } + /// Allocates a [`DataSample`] of the specified size. + /// + /// The data sample will use shared memory when suitable to enable efficient data transfer + /// when sending an output message. pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result { let data = if data_len >= ZERO_COPY_THRESHOLD { // create shared memory region @@ -514,6 +588,11 @@ impl Drop for DoraNode { } } +/// A data region suitable for sending as an output message. +/// +/// The region is stored in shared memory when suitable to enable efficient data transfer. +/// +/// `DataSample` implements the [`Deref`] and [`DerefMut`] traits to read and write the mapped data. pub struct DataSample { inner: DataSampleInner, len: usize, diff --git a/binaries/cli/src/command/build/mod.rs b/binaries/cli/src/command/build/mod.rs index 32eed2e08..04f16f55f 100644 --- a/binaries/cli/src/command/build/mod.rs +++ b/binaries/cli/src/command/build/mod.rs @@ -1,3 +1,52 @@ +//! Provides the `dora build` command. +//! +//! The `dora build` command works like this: +//! +//! - Dataflows can specify a `build` command for each node in their YAML definition +//! - Dora will run the `build` command when `dora build` is invoked +//! - If the dataflow is distributed across multiple machines, each `build` command will be run the target machine of the corresponding node. +//! - i.e. the machine specified under the `deploy` key +//! - this requires a connection to the dora coordinator, so you need to specify the coordinator IP/port for this +//! - to run the build commands of all nodes _locally_, you can use `dora build --local` +//! - If the build command does not specify any `deploy` keys, all build commands will be run locally (i.e. `dora build` behaves like `dora build --local`) +//! +//! #### Git Source +//! +//! - Nodes can have a git repository as source +//! - set the `git` config key to the URL of the repository +//! - by default, the default branch is used +//! - you can also specify a specific `branch` name +//! - alternatively, you can specify a `tag` name or a `rev` key with the commit hash +//! - you can only specify one of `branch`, `tag`, and `rev`, otherwise an error will occur +//! - Dora will automatically clone and checkout the requested branch/tag/commit on `dora build` +//! - the `build` command will be run after cloning +//! - for distributed dataflows, the clone/checkout will happen on the target machine +//! - subsequent `dora build` command will automatically fetch the latest changes for nodes +//! - not when using `tag` or `rev`, because these are not expected to change +//! - after fetching changes, the `build` command will be executed again +//! - _tip:_ use a build tool that supports incremental builds (e.g. `cargo`) to make this rebuild faster +//! +//! The **working directory** will be set to the git repository. +//! This means that both the `build` and `path` keys will be run from this folder. +//! This allows you to use relative paths. +//! +//! #### Example +//! +//! ```yml +//! nodes: +//! - id: rust-node +//! # URL of your repository +//! git: https://github.com/dora-rs/dora.git +//! # the build command that should be invoked after cloning +//! build: cargo build -p rust-dataflow-example-node +//! # path to the executable that should be run on start +//! path: target/debug/rust-dataflow-example-node +//! inputs: +//! tick: dora/timer/millis/10 +//! outputs: +//! - random +//! ``` + use communication_layer_request_reply::TcpRequestReplyConnection; use dora_core::{ descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt}, diff --git a/binaries/cli/src/command/run.rs b/binaries/cli/src/command/run.rs index 61b9f0db7..feb5947c0 100644 --- a/binaries/cli/src/command/run.rs +++ b/binaries/cli/src/command/run.rs @@ -1,3 +1,10 @@ +//! The `dora run` command is a quick and easy way to run a dataflow locally. +//! It does not support distributed dataflows and will throw an error if there are any `deploy` keys in the YAML file. +//! +//! The `dora run` command does not interact with any `dora coordinator` or `dora daemon` instances, or with any other parallel `dora run` commands. +//! +//! Use `dora build --local` or manual build commands to build your nodes. + use super::Executable; use crate::{ common::{handle_dataflow_result, resolve_dataflow}, diff --git a/binaries/cli/src/command/start/mod.rs b/binaries/cli/src/command/start/mod.rs index 455fb756a..077a67b4b 100644 --- a/binaries/cli/src/command/start/mod.rs +++ b/binaries/cli/src/command/start/mod.rs @@ -1,3 +1,7 @@ +//! The `dora start` command is used to spawn a dataflow in a pre-existing _dora network_. To create a dora network, spawn a `dora coordinator` and one or multiple `dora daemon` instances. +//! +//! The `dora start` command does not run any build commands, nor update git dependencies or similar. Use `dora build` for that. + use super::{default_tracing, Executable}; use crate::{ command::start::attach::attach_dataflow, diff --git a/libraries/arrow-convert/src/lib.rs b/libraries/arrow-convert/src/lib.rs index f9b8f21e6..32185a3fe 100644 --- a/libraries/arrow-convert/src/lib.rs +++ b/libraries/arrow-convert/src/lib.rs @@ -1,3 +1,7 @@ +//! Provides functions for converting between Apache Arrow arrays and Rust data types. + +#![warn(missing_docs)] + use arrow::array::{ Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, UInt32Array, UInt8Array, @@ -10,12 +14,16 @@ use std::ops::{Deref, DerefMut}; mod from_impls; mod into_impls; +/// Data that can be converted to an Arrow array. pub trait IntoArrow { + /// The Array type that the data can be converted to. type A: Array; + /// Convert the data into an Arrow array. fn into_arrow(self) -> Self::A; } +/// Wrapper type for an Arrow [`ArrayRef`](arrow::array::ArrayRef). #[derive(Debug)] pub struct ArrowData(pub arrow::array::ArrayRef); @@ -35,6 +43,7 @@ impl DerefMut for ArrowData { macro_rules! register_array_handlers { ($(($variant:path, $array_type:ty, $type_name:expr)),* $(,)?) => { + /// Tries to convert the given Arrow array into a `Vec` of integers or floats. pub fn into_vec(data: &ArrowData) -> Result> where T: Copy + NumCast + 'static, diff --git a/libraries/message/src/config.rs b/libraries/message/src/config.rs index 203a302f7..21d34a267 100644 --- a/libraries/message/src/config.rs +++ b/libraries/message/src/config.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; pub use crate::id::{DataId, NodeId, OperatorId}; +/// Contains the input and output configuration of the node. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] pub struct NodeRunConfig { /// Inputs for the nodes as a map from input ID to `node_id/output_id`. diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index e5e2e33fa..dfaba4b9d 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -26,6 +26,9 @@ pub use arrow_data; pub use arrow_schema; use uuid::{Timestamp, Uuid}; +/// Unique identifier for a dataflow instance. +/// +/// Dora assigns each dataflow instance a unique ID on start. pub type DataflowId = uuid::Uuid; #[derive( diff --git a/libraries/message/src/metadata.rs b/libraries/message/src/metadata.rs index 9b9fa2b25..46a937f5e 100644 --- a/libraries/message/src/metadata.rs +++ b/libraries/message/src/metadata.rs @@ -3,6 +3,9 @@ use std::collections::BTreeMap; use arrow_schema::DataType; use serde::{Deserialize, Serialize}; +/// Additional data that is sent as part of output messages. +/// +/// Includes a timestamp, type information, and additional user-provided parameters. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Metadata { metadata_version: u16, @@ -42,6 +45,7 @@ impl Metadata { } } +/// Additional metadata that can be sent as part of output messages. pub type MetadataParameters = BTreeMap; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -55,6 +59,7 @@ pub struct ArrowTypeInfo { pub child_data: Vec, } +/// A metadata parameter that can be sent as part of output messages. #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] pub enum Parameter { Bool(bool),