Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 5 additions & 30 deletions crates/store/re_data_loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ pub struct DataLoaderSettings {
pub application_id: Option<re_log_types::ApplicationId>,

/// The [`re_log_types::ApplicationId`] that is currently opened in the viewer, if any.
//
// TODO(#5350): actually support this
pub opened_application_id: Option<re_log_types::ApplicationId>,

/// The recommended [`re_log_types::StoreId`] to log the data to, based on the surrounding context.
Expand All @@ -64,8 +62,6 @@ pub struct DataLoaderSettings {
pub store_id: re_log_types::StoreId,

/// The [`re_log_types::StoreId`] that is currently opened in the viewer, if any.
//
// TODO(#5350): actually support this
pub opened_store_id: Option<re_log_types::StoreId>,

/// What should the logged entity paths be prefixed with?
Expand Down Expand Up @@ -318,39 +314,18 @@ impl DataLoaderError {
/// most convenient for them, whether it is raw components, arrow chunks or even
/// full-on [`LogMsg`]s.
pub enum LoadedData {
Chunk(Chunk),
ArrowMsg(ArrowMsg),
Chunk(re_log_types::StoreId, Chunk),
ArrowMsg(re_log_types::StoreId, ArrowMsg),
LogMsg(LogMsg),
}

impl From<Chunk> for LoadedData {
#[inline]
fn from(value: Chunk) -> Self {
Self::Chunk(value)
}
}

impl From<ArrowMsg> for LoadedData {
#[inline]
fn from(value: ArrowMsg) -> Self {
Self::ArrowMsg(value)
}
}

impl From<LogMsg> for LoadedData {
#[inline]
fn from(value: LogMsg) -> Self {
Self::LogMsg(value)
}
}

impl LoadedData {
/// Pack the data into a [`LogMsg`].
pub fn into_log_msg(self, store_id: &re_log_types::StoreId) -> ChunkResult<LogMsg> {
pub fn into_log_msg(self) -> ChunkResult<LogMsg> {
match self {
Self::Chunk(chunk) => Ok(LogMsg::ArrowMsg(store_id.clone(), chunk.to_arrow_msg()?)),
Self::Chunk(store_id, chunk) => Ok(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?)),

Self::ArrowMsg(msg) => Ok(LogMsg::ArrowMsg(store_id.clone(), msg)),
Self::ArrowMsg(store_id, msg) => Ok(LogMsg::ArrowMsg(store_id, msg)),

Self::LogMsg(msg) => Ok(msg),
}
Expand Down
100 changes: 52 additions & 48 deletions crates/store/re_data_loader/src/load_file.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::borrow::Cow;

use ahash::{HashMap, HashMapExt};
use re_log_types::{FileSource, LogMsg};
use re_smart_channel::Sender;

use crate::{extension, DataLoaderError, LoadedData};
use crate::{DataLoaderError, LoadedData};

// ---

Expand Down Expand Up @@ -36,16 +37,7 @@ pub fn load_from_path(

let rx = load(settings, path, None)?;

// TODO(cmc): should we always unconditionally set store info though?
// If we reach this point, then at least one compatible `DataLoader` has been found.
let store_info = prepare_store_info(&settings.store_id, file_source, path);
if let Some(store_info) = store_info {
if tx.send(store_info).is_err() {
return Ok(()); // other end has hung up.
}
}

send(&settings.store_id, rx, tx);
send(settings.clone(), file_source, path.to_owned(), rx, tx);

Ok(())
}
Expand All @@ -72,16 +64,7 @@ pub fn load_from_file_contents(

let data = load(settings, filepath, Some(contents))?;

// TODO(cmc): should we always unconditionally set store info though?
// If we reach this point, then at least one compatible `DataLoader` has been found.
let store_info = prepare_store_info(&settings.store_id, file_source, filepath);
if let Some(store_info) = store_info {
if tx.send(store_info).is_err() {
return Ok(()); // other end has hung up.
}
}

send(&settings.store_id, data, tx);
send(settings.clone(), file_source, filepath.to_owned(), data, tx);

Ok(())
}
Expand All @@ -93,35 +76,25 @@ pub(crate) fn prepare_store_info(
store_id: &re_log_types::StoreId,
file_source: FileSource,
path: &std::path::Path,
) -> Option<LogMsg> {
) -> LogMsg {
re_tracing::profile_function!(path.display().to_string());

use re_log_types::SetStoreInfo;

let app_id = re_log_types::ApplicationId(path.display().to_string());
let store_source = re_log_types::StoreSource::File { file_source };

let ext = extension(path);
let is_rrd = crate::SUPPORTED_RERUN_EXTENSIONS.contains(&ext.as_str());

(!is_rrd).then(|| {
LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *re_chunk::RowId::new(),
info: re_log_types::StoreInfo {
application_id: app_id.clone(),
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
started: re_log_types::Time::now(),
store_source,
// NOTE: If this is a natively supported file, it will go through one of the
// builtin dataloaders, i.e. the local version.
// Otherwise, it will go through an arbitrary external loader, at which point we
// have no certainty what the version is.
store_version: crate::is_supported_file_extension(ext.as_str())
.then_some(re_build_info::CrateVersion::LOCAL),
},
})
LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *re_chunk::RowId::new(),
info: re_log_types::StoreInfo {
application_id: app_id.clone(),
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
started: re_log_types::Time::now(),
store_source,
store_version: Some(re_build_info::CrateVersion::LOCAL),
},
})
}

Expand Down Expand Up @@ -288,32 +261,63 @@ pub(crate) fn load(
///
/// Runs asynchronously from another thread on native, synchronously on wasm.
pub(crate) fn send(
store_id: &re_log_types::StoreId,
settings: crate::DataLoaderSettings,
file_source: FileSource,
path: std::path::PathBuf,
rx_loader: std::sync::mpsc::Receiver<LoadedData>,
tx: &Sender<LogMsg>,
) {
spawn({
re_tracing::profile_function!();

let mut store_info_tracker: HashMap<re_log_types::StoreId, bool> = HashMap::new();

let tx = tx.clone();
let store_id = store_id.clone();
move || {
// ## Ignoring channel errors
//
// Not our problem whether or not the other end has hung up, but we still want to
// poll the channel in any case so as to make sure that the data producer
// doesn't get stuck.
for data in rx_loader {
let msg = match data.into_log_msg(&store_id) {
Ok(msg) => msg,
let msg = match data.into_log_msg() {
Ok(msg) => {
let store_info = match &msg {
LogMsg::SetStoreInfo(set_store_info) => {
Some((set_store_info.info.store_id.clone(), true))
}
LogMsg::ArrowMsg(store_id, _arrow_msg) => {
Some((store_id.clone(), false))
}
LogMsg::BlueprintActivationCommand(_) => None,
};

if let Some((store_id, store_info_created)) = store_info {
*store_info_tracker.entry(store_id).or_default() |= store_info_created;
}

msg
}
Err(err) => {
re_log::error!(%err, %store_id, "Couldn't serialize component data");
re_log::error!(%err, "Couldn't serialize component data");
continue;
}
};
tx.send(msg).ok();
}

for (store_id, store_info_already_created) in store_info_tracker {
let is_a_preexisting_recording =
Some(&store_id) == settings.opened_store_id.as_ref();

if store_info_already_created || is_a_preexisting_recording {
continue;
}

let store_info = prepare_store_info(&store_id, file_source.clone(), &path);
tx.send(store_info).ok();
}

tx.quit(None).ok();
}
});
Expand Down
9 changes: 7 additions & 2 deletions crates/store/re_data_loader/src/loader_archetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl DataLoader for ArchetypeLoader {

fn load_from_file_contents(
&self,
_settings: &crate::DataLoaderSettings,
settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<LoadedData>,
Expand Down Expand Up @@ -133,8 +133,13 @@ impl DataLoader for ArchetypeLoader {
)?);
}

let store_id = settings
.opened_store_id
.clone()
.unwrap_or_else(|| settings.store_id.clone());
for row in rows {
if tx.send(row.into()).is_err() {
let data = LoadedData::Chunk(store_id.clone(), row);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/store/re_data_loader/src/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::{
use ahash::HashMap;
use once_cell::sync::Lazy;

use crate::LoadedData;

// ---

/// To register a new external data loader, simply add an executable in your $PATH whose name
Expand Down Expand Up @@ -318,7 +320,9 @@ fn decode_and_stream<R: std::io::Read>(
continue;
}
};
if tx.send(msg.into()).is_err() {

let data = LoadedData::LogMsg(msg);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use re_log_encoding::decoder::Decoder;
use crossbeam::channel::Receiver;
use re_log_types::{ApplicationId, StoreId};

use crate::LoadedData;

// ---

/// Loads data from any `rrd` file or in-memory contents.
Expand Down Expand Up @@ -193,7 +195,8 @@ fn decode_and_stream<R: std::io::Read>(
msg
};

if tx.send(msg.into()).is_err() {
let data = LoadedData::LogMsg(msg);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
}
Expand Down
23 changes: 17 additions & 6 deletions crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ impl DataSource {
// or not.
let shared_store_id =
re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
let settings = re_data_loader::DataLoaderSettings::recommended(shared_store_id);
let settings = re_data_loader::DataLoaderSettings {
opened_application_id: file_source.recommended_application_id().cloned(),
opened_store_id: file_source.recommended_recording_id().cloned(),
..re_data_loader::DataLoaderSettings::recommended(shared_store_id)
};
re_data_loader::load_from_path(&settings, file_source, &path, &tx)
.with_context(|| format!("{path:?}"))?;

Expand All @@ -188,7 +192,11 @@ impl DataSource {
// or not.
let shared_store_id =
re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
let settings = re_data_loader::DataLoaderSettings::recommended(shared_store_id);
let settings = re_data_loader::DataLoaderSettings {
opened_application_id: file_source.recommended_application_id().cloned(),
opened_store_id: file_source.recommended_recording_id().cloned(),
..re_data_loader::DataLoaderSettings::recommended(shared_store_id)
};
re_data_loader::load_from_file_contents(
&settings,
file_source,
Expand Down Expand Up @@ -248,12 +256,15 @@ fn test_data_source_from_uri() {
];
let ws = ["ws://foo.zip", "wss://foo.zip", "127.0.0.1"];

let file_source = FileSource::DragAndDrop;
let file_source = FileSource::DragAndDrop {
recommended_application_id: None,
recommended_recording_id: None,
};

for uri in file {
assert!(
matches!(
DataSource::from_uri(file_source, uri.to_owned()),
DataSource::from_uri(file_source.clone(), uri.to_owned()),
DataSource::FilePath { .. }
),
"Expected {uri:?} to be categorized as FilePath"
Expand All @@ -263,7 +274,7 @@ fn test_data_source_from_uri() {
for uri in http {
assert!(
matches!(
DataSource::from_uri(file_source, uri.to_owned()),
DataSource::from_uri(file_source.clone(), uri.to_owned()),
DataSource::RrdHttpUrl { .. }
),
"Expected {uri:?} to be categorized as RrdHttpUrl"
Expand All @@ -273,7 +284,7 @@ fn test_data_source_from_uri() {
for uri in ws {
assert!(
matches!(
DataSource::from_uri(file_source, uri.to_owned()),
DataSource::from_uri(file_source.clone(), uri.to_owned()),
DataSource::WebSocketAddr(_)
),
"Expected {uri:?} to be categorized as WebSocketAddr"
Expand Down
Loading