Skip to content
20 changes: 13 additions & 7 deletions apis/python/operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ pub fn metadata_to_pydict<'a>(

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{ptr::NonNull, sync::Arc};

use aligned_vec::{AVec, ConstAlign};
use arrow::{
Expand All @@ -301,9 +301,8 @@ mod tests {
};

use arrow_schema::{DataType, Field};
use dora_node_api::{
arrow_utils::{copy_array_into_sample, required_data_size},
RawData,
use dora_node_api::arrow_utils::{
buffer_into_arrow_array, copy_array_into_sample, required_data_size,
};
use eyre::{Context, Result};

Expand All @@ -313,9 +312,16 @@ mod tests {

let info = copy_array_into_sample(&mut sample, arrow_array);

let serialized_deserialized_arrow_array = RawData::Vec(sample)
.into_arrow_array(&info)
.context("Could not create arrow array")?;
let serialized_deserialized_arrow_array = {
let ptr = NonNull::new(sample.as_ptr() as *mut _).unwrap();
let len = sample.len();

let raw_buffer = unsafe {
arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(sample))
};
buffer_into_arrow_array(&raw_buffer, &info)?
};

assert_eq!(arrow_array, &serialized_deserialized_arrow_array);

Ok(())
Expand Down
80 changes: 80 additions & 0 deletions apis/rust/node/src/event_stream/data_conversion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::{ptr::NonNull, sync::Arc};

use aligned_vec::{AVec, ConstAlign};
use dora_arrow_convert::IntoArrow;
use dora_message::metadata::ArrowTypeInfo;
use eyre::Context;
use shared_memory_server::{Shmem, ShmemConf};

use crate::arrow_utils::buffer_into_arrow_array;

pub enum RawData {
Empty,
Vec(AVec<u8, ConstAlign<128>>),
SharedMemory(SharedMemoryData),
}

impl RawData {
pub fn into_arrow_array(
self,
type_info: &ArrowTypeInfo,
) -> eyre::Result<arrow::array::ArrayData> {
let raw_buffer = match self {
RawData::Empty => return Ok(().into_arrow().into()),
RawData::Vec(data) => {
let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap();
let len = data.len();

unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
}
RawData::SharedMemory(data) => {
let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap();
let len = data.data.len();

unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
}
};

buffer_into_arrow_array(&raw_buffer, type_info)
}
}

impl std::fmt::Debug for RawData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Data").finish_non_exhaustive()
}
}

pub struct SharedMemoryData {
pub data: MappedInputData,
pub _drop: flume::Sender<()>,
}

pub struct MappedInputData {
memory: Box<Shmem>,
len: usize,
}

impl MappedInputData {
pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result<Self> {
let memory = Box::new(
ShmemConf::new()
.os_id(shared_memory_id)
.writable(false)
.open()
.wrap_err("failed to map shared memory input")?,
);
Ok(MappedInputData { memory, len })
}
}

impl std::ops::Deref for MappedInputData {
type Target = [u8];

fn deref(&self) -> &Self::Target {
unsafe { &self.memory.as_slice()[..self.len] }
}
}

unsafe impl Send for MappedInputData {}
unsafe impl Sync for MappedInputData {}
181 changes: 69 additions & 112 deletions apis/rust/node/src/event_stream/event.rs
Original file line number Diff line number Diff line change
@@ -1,133 +1,90 @@
use std::{ptr::NonNull, sync::Arc};

use aligned_vec::{AVec, ConstAlign};
use dora_arrow_convert::{ArrowData, IntoArrow};
use dora_arrow_convert::ArrowData;
use dora_core::config::{DataId, OperatorId};
use dora_message::metadata::{ArrowTypeInfo, BufferOffset, Metadata};
use eyre::{Context, Result};
use shared_memory_extended::{Shmem, ShmemConf};

use dora_message::metadata::Metadata;

/// Represents an incoming Dora event.
///
/// Events might be triggered by other nodes, by Dora itself, or by some external user input.
///
/// It's safe to ignore event types that are not relevant to the node.
///
/// This enum is marked as `non_exhaustive` because we might add additional
/// variants in the future. Please ignore unknown event types instead of throwing an
/// error to avoid breakage when updating Dora.
#[derive(Debug)]
#[non_exhaustive]
pub enum Event {
Stop(StopCause),
Reload {
operator_id: Option<OperatorId>,
},
/// An input was received from another node.
///
/// This event corresponds to one of the `inputs` of the node as specified
/// in the dataflow YAML file.
Input {
/// The input ID, as specified in the YAML file.
///
/// Note that this is not the output ID of the sender, but the ID
/// assigned to the input in the YAML file.
id: DataId,
/// Meta information about this input, e.g. the timestamp.
metadata: Metadata,
/// The actual data in the Apache Arrow data format.
data: ArrowData,
},
/// An input was closed by the sender.
///
/// The sending node mapped to an input exited, so this input will receive
/// no more data.
InputClosed {
/// The ID of the input that was closed, as specified in the YAML file.
///
/// Note that this is not the output ID of the sender, but the ID
/// assigned to the input in the YAML file.
id: DataId,
},
/// Notification that the event stream is about to close.
///
/// The [`StopCause`] field contains the reason for the event stream closure.
///
/// Typically, nodes should exit once the event stream closes. One notable
/// exception are nodes with no inputs, which will receive aa
/// `Event::Stop(StopCause::AllInputsClosed)` right at startup. Source nodes
/// might want to keep producing outputs still. (There is currently an open
/// discussion of changing this behavior and not sending `AllInputsClosed`
/// to nodes without inputs.)
///
/// Note: Stop events with `StopCause::Manual` indicate a manual stop operation
/// issued through `dora stop` or a `ctrl-c`. Nodes **must exit** once receiving
/// such a stop event, otherwise they will be killed by Dora.
Stop(StopCause),
/// Instructs the node to reload itself or one of its operators.
///
/// This event is currently only used for reloading Python operators that are
/// started by a `dora runtime` process. So this event should not be sent to normal
/// nodes yet.
Reload {
/// The ID of the operator that should be reloaded.
///
/// There is currently no case where `operator_id` is `None`.
operator_id: Option<OperatorId>,
},
/// Notifies the node about an unexpected error that happened inside Dora.
///
/// It's a good idea to output or log this error for debugging.
Error(String),
}

/// The reason for closing the event stream.
///
/// This enum is marked as `non_exhaustive` because we might add additional
/// variants in the future.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum StopCause {
/// The dataflow is stopped early after a `dora stop` command (or on `ctrl-c`).
///
/// Nodes should exit as soon as possible if they receive a stop event of
/// this type. Dora will kill nodes that keep running for too long after
/// receiving such a stop event.
Manual,
/// The event stream is closed because all of the node's inputs were closed.
AllInputsClosed,
}

pub enum RawData {
Empty,
Vec(AVec<u8, ConstAlign<128>>),
SharedMemory(SharedMemoryData),
}

impl RawData {
pub fn into_arrow_array(self, type_info: &ArrowTypeInfo) -> Result<arrow::array::ArrayData> {
let raw_buffer = match self {
RawData::Empty => return Ok(().into_arrow().into()),
RawData::Vec(data) => {
let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap();
let len = data.len();

unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
}
RawData::SharedMemory(data) => {
let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap();
let len = data.data.len();

unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
}
};

buffer_into_arrow_array(&raw_buffer, type_info)
}
}

pub struct SharedMemoryData {
pub data: MappedInputData,
pub _drop: flume::Sender<()>,
}

fn buffer_into_arrow_array(
raw_buffer: &arrow::buffer::Buffer,
type_info: &ArrowTypeInfo,
) -> eyre::Result<arrow::array::ArrayData> {
if raw_buffer.is_empty() {
return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
}

let mut buffers = Vec::new();
for BufferOffset { offset, len } in &type_info.buffer_offsets {
buffers.push(raw_buffer.slice_with_length(*offset, *len));
}

let mut child_data = Vec::new();
for child_type_info in &type_info.child_data {
child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
}

arrow::array::ArrayData::try_new(
type_info.data_type.clone(),
type_info.len,
type_info
.validity
.clone()
.map(arrow::buffer::Buffer::from_vec),
type_info.offset,
buffers,
child_data,
)
.context("Error creating Arrow array")
}

impl std::fmt::Debug for RawData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Data").finish_non_exhaustive()
}
}

pub struct MappedInputData {
memory: Box<Shmem>,
len: usize,
}

impl MappedInputData {
pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result<Self> {
let memory = Box::new(
ShmemConf::new()
.os_id(shared_memory_id)
.writable(false)
.open()
.wrap_err("failed to map shared memory input")?,
);
Ok(MappedInputData { memory, len })
}
}

impl std::ops::Deref for MappedInputData {
type Target = [u8];

fn deref(&self) -> &Self::Target {
unsafe { &self.memory.as_slice()[..self.len] }
}
}

unsafe impl Send for MappedInputData {}
unsafe impl Sync for MappedInputData {}
28 changes: 28 additions & 0 deletions apis/rust/node/src/event_stream/merged.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
//! Merge external stream into an [`EventStream`][super::EventStream].
//!
//! Sometimes nodes need to listen to external events, in addition to Dora events.
//! This module provides support for that by providing the [`MergeExternal`] trait.

use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge;

/// A Dora event or an event from an external source.
#[derive(Debug)]
pub enum MergedEvent<E> {
/// A Dora event
Dora(super::Event),
/// An external event
///
/// Yielded by the stream that was merged into the Dora [`EventStream`][super::EventStream].
External(E),
}

/// A general enum to represent a value of two possible types.
pub enum Either<A, B> {
/// Value is of the first type, type `A`.
First(A),
/// Value is of the second type, type `B`.
Second(B),
}

impl<A> Either<A, A> {
/// Unwraps an `Either` instance where both types are identical.
pub fn flatten(self) -> A {
match self {
Either::First(a) => a,
Expand All @@ -21,19 +35,33 @@ impl<A> Either<A, A> {
}
}

/// Allows merging an external event stream into an existing event stream.
// TODO: use impl trait return type once stable
pub trait MergeExternal<'a, E> {
/// The item type yielded from the merged stream.
type Item;

/// Merge the given stream into an existing event stream.
///
/// Returns a new event stream that yields items from both streams.
/// The ordering between the two streams is not guaranteed.
fn merge_external(
self,
external_events: impl Stream<Item = E> + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>;
}

/// Allows merging a sendable external event stream into an existing (sendable) event stream.
///
/// By implementing [`Send`], the streams can be sent to different threads.
pub trait MergeExternalSend<'a, E> {
/// The item type yielded from the merged stream.
type Item;

/// Merge the given stream into an existing event stream.
///
/// Returns a new event stream that yields items from both streams.
/// The ordering between the two streams is not guaranteed.
fn merge_external_send(
self,
external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a,
Expand Down
Loading
Loading