Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
469 changes: 434 additions & 35 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ members = [
"node-hub/dora-rerun",
"node-hub/terminal-print",
"node-hub/openai-proxy-server",
"node-hub/dora-openai-websocket",
"node-hub/dora-kit-car",
"node-hub/dora-object-to-pose",
"node-hub/dora-mistral-rs",
Expand Down
20 changes: 0 additions & 20 deletions apis/python/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use std::env::current_dir;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_download::download_file;
use dora_node_api::dora_core::config::NodeId;
use dora_node_api::dora_core::descriptor::source_is_url;
use dora_node_api::merged::{MergeExternalSend, MergedEvent};
use dora_node_api::{DataflowId, DoraNode, EventStream};
use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent};
Expand Down Expand Up @@ -360,22 +356,6 @@ pub fn start_runtime() -> eyre::Result<()> {
dora_runtime::main().wrap_err("Dora Runtime raised an error.")
}

pub fn resolve_dataflow(dataflow: String) -> eyre::Result<PathBuf> {
let dataflow = if source_is_url(&dataflow) {
// try to download the shared library
let target_path = current_dir().context("Could not access the current dir")?;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async { download_file(&dataflow, &target_path).await })
.wrap_err("failed to download dataflow yaml file")?
} else {
PathBuf::from(dataflow)
};
Ok(dataflow)
}

/// Run a Dataflow
///
/// :rtype: None
Expand Down
1 change: 1 addition & 0 deletions apis/rust/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,4 @@ pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD};
mod daemon_connection;
mod event_stream;
mod node;
pub mod requests;
42 changes: 42 additions & 0 deletions apis/rust/node/src/requests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::sync::Arc;

use dora_core::{
topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
uhlc,
};
use dora_message::{
common::Timestamped, daemon_to_node::DaemonReply, node_to_daemon::DaemonRequest, DataflowId,
};
use eyre::{bail, Context};

use crate::daemon_connection::DaemonChannel;

pub fn start_dataflow(
dataflow: String,
name: Option<String>,
uv: bool,
) -> eyre::Result<DataflowId> {
let mut channel = init_daemon_channel()?;
let clock = Arc::new(uhlc::HLC::default());

let request = DaemonRequest::StartDataflow { dataflow, name, uv };
let reply = channel
.request(&Timestamped {
inner: request,
timestamp: clock.new_timestamp(),
})
.wrap_err("failed to trigger dataflow start through daemon")?;
match reply {
DaemonReply::StartDataflowResult(Ok(dataflow_id)) => Ok(dataflow_id),
DaemonReply::StartDataflowResult(Err(err)) => bail!("failed to start dataflow: {err}"),
other => bail!("unexpected StartDataflow reply from daemon: {other:?}"),
}
}

fn init_daemon_channel() -> eyre::Result<DaemonChannel> {
let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();

let channel =
DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
Ok(channel)
}
1 change: 0 additions & 1 deletion binaries/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ dora-core = { workspace = true }
dora-message = { workspace = true }
dora-node-api-c = { workspace = true }
dora-operator-api-c = { workspace = true }
dora-download = { workspace = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = { workspace = true }
webbrowser = "0.8.3"
Expand Down
4 changes: 2 additions & 2 deletions binaries/cli/src/command/build/distributed.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::descriptor::Descriptor;
use dora_core::{descriptor::Descriptor, session::DataflowSession};
use dora_message::{
cli_to_coordinator::ControlRequest,
common::{GitSource, LogMessage},
Expand All @@ -13,7 +13,7 @@ use std::{
net::{SocketAddr, TcpStream},
};

use crate::{output::print_log_message, session::DataflowSession};
use crate::output::print_log_message;

pub fn build_distributed_dataflow(
session: &mut TcpRequestReplyConnection,
Expand Down
3 changes: 1 addition & 2 deletions binaries/cli/src/command/build/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use colored::Colorize;
use dora_core::{
build::{BuildInfo, BuildLogger, Builder, GitManager, LogLevelOrStdout, PrevGitSource},
descriptor::{Descriptor, DescriptorExt},
session::DataflowSession,
};
use dora_message::{common::GitSource, id::NodeId};
use eyre::Context;

use crate::session::DataflowSession;

pub fn build_dataflow_locally(
dataflow: Descriptor,
git_sources: &BTreeMap<NodeId, GitSource>,
Expand Down
7 changes: 3 additions & 4 deletions binaries/cli/src/command/build/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,16 @@
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::{
descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt},
resolve_dataflow,
session::DataflowSession,
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST},
};
use dora_message::{descriptor::NodeSource, BuildId};
use eyre::Context;
use std::{collections::BTreeMap, net::IpAddr};

use super::{default_tracing, Executable};
use crate::{
common::{connect_to_coordinator, local_working_dir, resolve_dataflow},
session::DataflowSession,
};
use crate::common::{connect_to_coordinator, local_working_dir};

use distributed::{build_distributed_dataflow, wait_until_dataflow_built};
use local::build_dataflow_locally;
Expand Down
1 change: 1 addition & 0 deletions binaries/cli/src/command/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use dora_tracing::TracingBuilder;
use eyre::Context;
use std::net::{IpAddr, SocketAddr};
use tokio::runtime::Builder;
#[cfg(feature = "tracing")]
use tracing::level_filters::LevelFilter;

#[derive(Debug, clap::Args)]
Expand Down
8 changes: 5 additions & 3 deletions binaries/cli/src/command/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::Executable;
use crate::{common::handle_dataflow_result, session::DataflowSession};
use dora_core::topics::{
DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST,
use crate::common::handle_dataflow_result;
use dora_core::{
session::DataflowSession,
topics::{DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
};

use dora_daemon::LogDestination;
Expand All @@ -14,6 +15,7 @@ use std::{
path::PathBuf,
};
use tokio::runtime::Builder;
#[cfg(feature = "tracing")]
use tracing::level_filters::LevelFilter;

#[derive(Debug, clap::Args)]
Expand Down
8 changes: 3 additions & 5 deletions binaries/cli/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
//! Use `dora build --local` or manual build commands to build your nodes.

use super::Executable;
use crate::{
common::{handle_dataflow_result, resolve_dataflow},
output::print_log_message,
session::DataflowSession,
};
use crate::{common::handle_dataflow_result, output::print_log_message};
use dora_core::{resolve_dataflow, session::DataflowSession};
use dora_daemon::{flume, Daemon, LogDestination};
#[cfg(feature = "tracing")]
use dora_tracing::TracingBuilder;
use eyre::Context;
use tokio::runtime::Builder;
Expand Down
29 changes: 16 additions & 13 deletions binaries/cli/src/command/start/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@
use super::{default_tracing, Executable};
use crate::{
command::start::attach::attach_dataflow,
common::{connect_to_coordinator, local_working_dir, resolve_dataflow},
common::{connect_to_coordinator, local_working_dir},
output::print_log_message,
session::DataflowSession,
};
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::{
descriptor::{Descriptor, DescriptorExt},
resolve_dataflow,
session::DataflowSession,
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST},
};
use dora_message::{
cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::ControlRequestReply,
cli_to_coordinator::{ControlRequest, StartRequest},
common::LogMessage,
coordinator_to_cli::ControlRequestReply,
};
use eyre::{bail, Context};
use std::{
Expand All @@ -31,28 +34,28 @@ mod attach;
pub struct Start {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH")]
dataflow: String,
pub dataflow: String,
/// Assign a name to the dataflow
#[clap(long)]
name: Option<String>,
pub name: Option<String>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
pub coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
pub coordinator_port: u16,
/// Attach to the dataflow and wait for its completion
#[clap(long, action)]
attach: bool,
pub attach: bool,
/// Run the dataflow in background
#[clap(long, action)]
detach: bool,
pub detach: bool,
/// Enable hot reloading (Python only)
#[clap(long, action)]
hot_reload: bool,
pub hot_reload: bool,
// Use UV to run nodes.
#[clap(long, action)]
uv: bool,
pub uv: bool,
}

impl Executable for Start {
Expand Down Expand Up @@ -125,14 +128,14 @@ fn start_dataflow(
let session: &mut TcpRequestReplyConnection = &mut *session;
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
&serde_json::to_vec(&ControlRequest::Start(StartRequest {
build_id: dataflow_session.build_id,
session_id: dataflow_session.session_id,
dataflow,
name,
local_working_dir,
uv,
})
}))
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;
Expand Down
21 changes: 1 addition & 20 deletions binaries/cli/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
use crate::formatting::FormatDataflowError;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_core::descriptor::{source_is_url, Descriptor};
use dora_download::download_file;
use dora_core::descriptor::Descriptor;
use dora_message::{
cli_to_coordinator::ControlRequest,
coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult},
};
use eyre::{bail, Context, ContextCompat};
use std::{
env::current_dir,
net::SocketAddr,
path::{Path, PathBuf},
};
use tokio::runtime::Builder;
use uuid::Uuid;

pub(crate) fn handle_dataflow_result(
Expand Down Expand Up @@ -56,22 +53,6 @@ pub(crate) fn connect_to_coordinator(
TcpLayer::new().connect(coordinator_addr)
}

pub(crate) fn resolve_dataflow(dataflow: String) -> eyre::Result<PathBuf> {
let dataflow = if source_is_url(&dataflow) {
// try to download the shared library
let target_path = current_dir().context("Could not access the current dir")?;
let rt = Builder::new_current_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async { download_file(&dataflow, &target_path).await })
.wrap_err("failed to download dataflow yaml file")?
} else {
PathBuf::from(dataflow)
};
Ok(dataflow)
}

pub(crate) fn local_working_dir(
dataflow_path: &Path,
dataflow_descriptor: &Descriptor,
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod command;
mod common;
mod formatting;
pub mod output;
pub mod session;

mod template;

pub use command::run_func;
Expand Down
Loading
Loading