Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,16 +649,38 @@ impl From<DatasetEntry> for crate::cloud::v1alpha1::DatasetEntry {

// --- CreateDatasetEntryRequest ---

impl TryFrom<crate::cloud::v1alpha1::CreateDatasetEntryRequest> for String {
#[derive(Debug, Clone)]
pub struct CreateDatasetEntryRequest {
/// Entry name (must be unique in catalog).
pub name: String,

/// Override, use at your own risk.
pub id: Option<EntryId>,
}

impl From<CreateDatasetEntryRequest> for crate::cloud::v1alpha1::CreateDatasetEntryRequest {
fn from(value: CreateDatasetEntryRequest) -> Self {
Self {
name: Some(value.name),
id: value.id.map(Into::into),
}
}
}

impl TryFrom<crate::cloud::v1alpha1::CreateDatasetEntryRequest> for CreateDatasetEntryRequest {
type Error = TypeConversionError;

fn try_from(
value: crate::cloud::v1alpha1::CreateDatasetEntryRequest,
) -> Result<Self, Self::Error> {
Ok(value.name.ok_or(missing_field!(
crate::cloud::v1alpha1::CreateDatasetEntryRequest,
"name"
))?)
Ok(Self {
name: value.name.ok_or(missing_field!(
crate::cloud::v1alpha1::CreateDatasetEntryRequest,
"name"
))?,

id: value.id.map(TryInto::try_into).transpose()?,
})
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_redap_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ pub use self::utils::{
rerun::{
TuidPrefix, create_nasty_recording, create_recording_with_embeddings,
create_recording_with_properties, create_recording_with_scalars,
create_recording_with_text, create_simple_recording, create_simple_recording_in,
create_recording_with_text, create_simple_blueprint, create_simple_recording,
create_simple_recording_in,
},
};

Expand Down
20 changes: 19 additions & 1 deletion crates/store/re_redap_tests/src/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::BTreeMap;

use arrow::array::RecordBatch;
use futures::StreamExt as _;
use itertools::Itertools as _;
use std::collections::BTreeMap;
use tonic::async_trait;
use url::Url;

Expand Down Expand Up @@ -169,6 +170,9 @@ pub enum LayerType {
Properties {
properties: BTreeMap<String, Vec<Box<dyn AsComponents>>>,
},

/// See [`crate::create_simple_blueprint`]
SimpleBlueprint,
}

impl LayerType {
Expand All @@ -188,6 +192,10 @@ impl LayerType {
}
}

pub fn simple_blueprint() -> Self {
Self::SimpleBlueprint
}

fn into_recording(
self,
tuid_prefix: TuidPrefix,
Expand All @@ -211,6 +219,8 @@ impl LayerType {
.map(|(k, v)| (k.clone(), v.iter().map(|v| v.as_ref()).collect()))
.collect(),
),

Self::SimpleBlueprint => crate::create_simple_blueprint(tuid_prefix, partition_id),
}
}
}
Expand Down Expand Up @@ -252,6 +262,14 @@ impl LayerDefinition {
}
}

pub fn simple_blueprint(partition_id: &'static str) -> Self {
Self {
partition_id,
layer_name: None,
layer_type: LayerType::simple_blueprint(),
}
}

pub fn layer_name(mut self, layer_name: &'static str) -> Self {
self.layer_name = Some(layer_name);
self
Expand Down
270 changes: 270 additions & 0 deletions crates/store/re_redap_tests/src/tests/create_dataset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
use arrow::datatypes::{DataType, Field, Schema};

use re_log_types::EntryId;
use re_protos::{
cloud::v1alpha1::{
EntryFilter, EntryKind, FindEntriesRequest, ReadDatasetEntryRequest,
ext::{
CreateDatasetEntryRequest, CreateTableEntryRequest, DatasetDetails, DatasetEntry,
EntryDetails, LanceTable, ProviderDetails as _,
},
rerun_cloud_service_server::RerunCloudService,
},
headers::RerunHeadersInjectorExt as _,
};

pub async fn create_dataset_tests(service: impl RerunCloudService) {
//
// Create a dataset with just a name
//

let dataset1_name = "dataset1";

create_dataset_entry(
&service,
CreateDatasetEntryRequest {
name: dataset1_name.to_owned(),
id: None,
},
)
.await
.unwrap();

let entry_details = entry_details_from_name(&service, dataset1_name, EntryKind::Dataset)
.await
.unwrap();

let dataset_details = dataset_details_from_id(&service, entry_details.id)
.await
.unwrap();

assert!(dataset_details.blueprint_dataset.is_some());
assert!(dataset_details.default_blueprint.is_none());

//
// Check the dataset got a matching blueprint dataset
//

let bp_entry_id = dataset_details
.blueprint_dataset
.expect("there should be a blueprint dataset");

let _ = entry_details_from_id(&service, bp_entry_id, EntryKind::BlueprintDataset)
.await
.unwrap();

let bp_dataset_details = dataset_details_from_id(&service, bp_entry_id)
.await
.unwrap();

assert!(bp_dataset_details.blueprint_dataset.is_none());
assert!(bp_dataset_details.default_blueprint.is_none());

//
// Check a duplicate entry name is rejected.
//

let status = create_dataset_entry(
&service,
CreateDatasetEntryRequest {
name: dataset1_name.to_owned(),
id: None,
},
)
.await
.unwrap_err();

assert_eq!(
status.code(),
tonic::Code::AlreadyExists,
"unexpected status: {status:?}"
);

//
// Check a duplicate entry id is rejected.
//

let status = create_dataset_entry(
&service,
CreateDatasetEntryRequest {
name: "this name is for sure not used, but the id might".to_owned(),
id: Some(entry_details.id),
},
)
.await
.unwrap_err();

assert_eq!(
status.code(),
tonic::Code::AlreadyExists,
"unexpected status: {status:?}"
);

//
// Create another dataset with an enforced entry id
//

let dataset2_name = "dataset2";
let dataset2_id = EntryId::from(re_tuid::Tuid::from_u128(123));

create_dataset_entry(
&service,
CreateDatasetEntryRequest {
name: dataset2_name.to_owned(),
id: Some(dataset2_id),
},
)
.await
.unwrap();

let _ = entry_details_from_name(&service, dataset2_name, EntryKind::Dataset)
.await
.unwrap();

let _ = entry_details_from_id(&service, dataset2_id, EntryKind::Dataset)
.await
.unwrap();

let dataset_details = dataset_details_from_id(&service, dataset2_id)
.await
.unwrap();

assert!(dataset_details.blueprint_dataset.is_some());
assert!(dataset_details.default_blueprint.is_none());

//
// Create a table
//

let tmp_dir = tempfile::tempdir().expect("create temp dir");
let table_name = "created_table";
let schema = Schema::new(vec![Field::new("column_a", DataType::Utf8, false)]);

let table_url =
url::Url::from_directory_path(tmp_dir.path()).expect("create url from tmp directory");
let provider_details = LanceTable { table_url }
.try_as_any()
.expect("convert provider details to any");

let create_table_request = CreateTableEntryRequest {
name: table_name.to_owned(),
schema: schema.clone(),
provider_details,
}
.try_into()
.expect("Unable to create table request");

let _ = service
.create_table_entry(tonic::Request::new(create_table_request))
.await
.expect("create table entry");

//
// Dataset with same name as table fails
//

let status = create_dataset_entry(
&service,
CreateDatasetEntryRequest {
name: table_name.to_owned(),
id: None,
},
)
.await
.unwrap_err();

assert_eq!(
status.code(),
tonic::Code::AlreadyExists,
"unexpected status: {status:?}"
);
}

// ---

async fn create_dataset_entry(
service: &impl RerunCloudService,
request: CreateDatasetEntryRequest,
) -> Result<DatasetEntry, tonic::Status> {
service
.create_dataset_entry(tonic::Request::new(request.clone().into()))
.await
.map(|result| result.into_inner().dataset.unwrap().try_into().unwrap())
}

/// Get the entry details or return the endpoint error (all other errors panic)
async fn entry_details_from_name(
service: &impl RerunCloudService,
name: &str,
entry_kind: EntryKind,
) -> Result<EntryDetails, tonic::Status> {
let mut result = service
.find_entries(tonic::Request::new(FindEntriesRequest {
filter: Some(EntryFilter {
id: None,
name: Some(name.to_owned()),
entry_kind: Some(entry_kind as i32),
}),
}))
.await?
.into_inner()
.entries;

assert_eq!(result.len(), 1);

let entry_details = result.pop().unwrap();
assert_eq!(entry_details.name.as_deref(), Some(name));
assert_eq!(entry_details.entry_kind, entry_kind as i32);

Ok(entry_details.try_into().unwrap())
}

/// Get the entry details or return the endpoint error (all other errors panic)
async fn entry_details_from_id(
service: &impl RerunCloudService,
entry_id: EntryId,
entry_kind: EntryKind,
) -> Result<EntryDetails, tonic::Status> {
let mut result = service
.find_entries(tonic::Request::new(FindEntriesRequest {
filter: Some(EntryFilter {
id: Some(entry_id.into()),
name: None,
entry_kind: Some(entry_kind as i32),
}),
}))
.await?
.into_inner()
.entries;

assert_eq!(result.len(), 1);

let entry_details = result.pop().unwrap();
assert_eq!(entry_details.id, Some(entry_id.into()));
assert_eq!(entry_details.entry_kind, entry_kind as i32);

Ok(entry_details.try_into().unwrap())
}

/// Get the dataset details or return the endpoint error (all other errors panic)
async fn dataset_details_from_id(
service: &impl RerunCloudService,
entry_id: EntryId,
) -> Result<DatasetDetails, tonic::Status> {
service
.read_dataset_entry(
tonic::Request::new(ReadDatasetEntryRequest {})
.with_entry_id(entry_id)
.unwrap(),
)
.await
.map(|resp| {
resp.into_inner()
.dataset
.unwrap()
.dataset_details
.unwrap()
.try_into()
.unwrap()
})
}
Loading