Skip to content

Commit d24048d

Browse files
authored
Add support for blueprint dataset (#11758)
1 parent 0d56db3 commit d24048d

19 files changed

+680
-63
lines changed

crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -649,16 +649,38 @@ impl From<DatasetEntry> for crate::cloud::v1alpha1::DatasetEntry {
649649

650650
// --- CreateDatasetEntryRequest ---
651651

652-
impl TryFrom<crate::cloud::v1alpha1::CreateDatasetEntryRequest> for String {
652+
#[derive(Debug, Clone)]
653+
pub struct CreateDatasetEntryRequest {
654+
/// Entry name (must be unique in catalog).
655+
pub name: String,
656+
657+
/// Override, use at your own risk.
658+
pub id: Option<EntryId>,
659+
}
660+
661+
impl From<CreateDatasetEntryRequest> for crate::cloud::v1alpha1::CreateDatasetEntryRequest {
662+
fn from(value: CreateDatasetEntryRequest) -> Self {
663+
Self {
664+
name: Some(value.name),
665+
id: value.id.map(Into::into),
666+
}
667+
}
668+
}
669+
670+
impl TryFrom<crate::cloud::v1alpha1::CreateDatasetEntryRequest> for CreateDatasetEntryRequest {
653671
type Error = TypeConversionError;
654672

655673
fn try_from(
656674
value: crate::cloud::v1alpha1::CreateDatasetEntryRequest,
657675
) -> Result<Self, Self::Error> {
658-
Ok(value.name.ok_or(missing_field!(
659-
crate::cloud::v1alpha1::CreateDatasetEntryRequest,
660-
"name"
661-
))?)
676+
Ok(Self {
677+
name: value.name.ok_or(missing_field!(
678+
crate::cloud::v1alpha1::CreateDatasetEntryRequest,
679+
"name"
680+
))?,
681+
682+
id: value.id.map(TryInto::try_into).transpose()?,
683+
})
662684
}
663685
}
664686

crates/store/re_redap_tests/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ pub use self::utils::{
2626
rerun::{
2727
TuidPrefix, create_nasty_recording, create_recording_with_embeddings,
2828
create_recording_with_properties, create_recording_with_scalars,
29-
create_recording_with_text, create_simple_recording, create_simple_recording_in,
29+
create_recording_with_text, create_simple_blueprint, create_simple_recording,
30+
create_simple_recording_in,
3031
},
3132
};
3233

crates/store/re_redap_tests/src/tests/common.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
use std::collections::BTreeMap;
2+
13
use arrow::array::RecordBatch;
24
use futures::StreamExt as _;
35
use itertools::Itertools as _;
4-
use std::collections::BTreeMap;
56
use tonic::async_trait;
67
use url::Url;
78

@@ -182,6 +183,9 @@ pub enum LayerType {
182183
embeddings: u32,
183184
embeddings_per_row: u32,
184185
},
186+
187+
/// See [`crate::create_simple_blueprint`]
188+
SimpleBlueprint,
185189
}
186190

187191
impl LayerType {
@@ -216,6 +220,10 @@ impl LayerType {
216220
}
217221
}
218222

223+
pub fn simple_blueprint() -> Self {
224+
Self::SimpleBlueprint
225+
}
226+
219227
fn into_recording(
220228
self,
221229
tuid_prefix: TuidPrefix,
@@ -253,6 +261,8 @@ impl LayerType {
253261
embeddings,
254262
embeddings_per_row,
255263
),
264+
265+
Self::SimpleBlueprint => crate::create_simple_blueprint(tuid_prefix, partition_id),
256266
}
257267
}
258268
}
@@ -326,6 +336,14 @@ impl LayerDefinition {
326336
}
327337
}
328338

339+
pub fn simple_blueprint(partition_id: &'static str) -> Self {
340+
Self {
341+
partition_id,
342+
layer_name: None,
343+
layer_type: LayerType::simple_blueprint(),
344+
}
345+
}
346+
329347
pub fn layer_name(mut self, layer_name: &'static str) -> Self {
330348
self.layer_name = Some(layer_name);
331349
self
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
use arrow::datatypes::{DataType, Field, Schema};
2+
3+
use re_log_types::EntryId;
4+
use re_protos::{
5+
cloud::v1alpha1::{
6+
EntryFilter, EntryKind, FindEntriesRequest, ReadDatasetEntryRequest,
7+
ext::{
8+
CreateDatasetEntryRequest, CreateTableEntryRequest, DatasetDetails, DatasetEntry,
9+
EntryDetails, LanceTable, ProviderDetails as _,
10+
},
11+
rerun_cloud_service_server::RerunCloudService,
12+
},
13+
headers::RerunHeadersInjectorExt as _,
14+
};
15+
16+
pub async fn create_dataset_tests(service: impl RerunCloudService) {
17+
//
18+
// Create a dataset with just a name
19+
//
20+
21+
let dataset1_name = "dataset1";
22+
23+
create_dataset_entry(
24+
&service,
25+
CreateDatasetEntryRequest {
26+
name: dataset1_name.to_owned(),
27+
id: None,
28+
},
29+
)
30+
.await
31+
.unwrap();
32+
33+
let entry_details = entry_details_from_name(&service, dataset1_name, EntryKind::Dataset)
34+
.await
35+
.unwrap();
36+
37+
let dataset_details = dataset_details_from_id(&service, entry_details.id)
38+
.await
39+
.unwrap();
40+
41+
assert!(dataset_details.blueprint_dataset.is_some());
42+
assert!(dataset_details.default_blueprint.is_none());
43+
44+
//
45+
// Check the dataset got a matching blueprint dataset
46+
//
47+
48+
let bp_entry_id = dataset_details
49+
.blueprint_dataset
50+
.expect("there should be a blueprint dataset");
51+
52+
let _ = entry_details_from_id(&service, bp_entry_id, EntryKind::BlueprintDataset)
53+
.await
54+
.unwrap();
55+
56+
let bp_dataset_details = dataset_details_from_id(&service, bp_entry_id)
57+
.await
58+
.unwrap();
59+
60+
assert!(bp_dataset_details.blueprint_dataset.is_none());
61+
assert!(bp_dataset_details.default_blueprint.is_none());
62+
63+
//
64+
// Check a duplicate entry name is rejected.
65+
//
66+
67+
let status = create_dataset_entry(
68+
&service,
69+
CreateDatasetEntryRequest {
70+
name: dataset1_name.to_owned(),
71+
id: None,
72+
},
73+
)
74+
.await
75+
.unwrap_err();
76+
77+
assert_eq!(
78+
status.code(),
79+
tonic::Code::AlreadyExists,
80+
"unexpected status: {status:?}"
81+
);
82+
83+
//
84+
// Check a duplicate entry id is rejected.
85+
//
86+
87+
let status = create_dataset_entry(
88+
&service,
89+
CreateDatasetEntryRequest {
90+
name: "this name is for sure not used, but the id might".to_owned(),
91+
id: Some(entry_details.id),
92+
},
93+
)
94+
.await
95+
.unwrap_err();
96+
97+
assert_eq!(
98+
status.code(),
99+
tonic::Code::AlreadyExists,
100+
"unexpected status: {status:?}"
101+
);
102+
103+
//
104+
// Create another dataset with an enforced entry id
105+
//
106+
107+
let dataset2_name = "dataset2";
108+
let dataset2_id = EntryId::from(re_tuid::Tuid::from_u128(123));
109+
110+
create_dataset_entry(
111+
&service,
112+
CreateDatasetEntryRequest {
113+
name: dataset2_name.to_owned(),
114+
id: Some(dataset2_id),
115+
},
116+
)
117+
.await
118+
.unwrap();
119+
120+
let _ = entry_details_from_name(&service, dataset2_name, EntryKind::Dataset)
121+
.await
122+
.unwrap();
123+
124+
let _ = entry_details_from_id(&service, dataset2_id, EntryKind::Dataset)
125+
.await
126+
.unwrap();
127+
128+
let dataset_details = dataset_details_from_id(&service, dataset2_id)
129+
.await
130+
.unwrap();
131+
132+
assert!(dataset_details.blueprint_dataset.is_some());
133+
assert!(dataset_details.default_blueprint.is_none());
134+
135+
//
136+
// Create a table
137+
//
138+
139+
let tmp_dir = tempfile::tempdir().expect("create temp dir");
140+
let table_name = "created_table";
141+
let schema = Schema::new(vec![Field::new("column_a", DataType::Utf8, false)]);
142+
143+
let table_url =
144+
url::Url::from_directory_path(tmp_dir.path()).expect("create url from tmp directory");
145+
let provider_details = LanceTable { table_url }
146+
.try_as_any()
147+
.expect("convert provider details to any");
148+
149+
let create_table_request = CreateTableEntryRequest {
150+
name: table_name.to_owned(),
151+
schema: schema.clone(),
152+
provider_details,
153+
}
154+
.try_into()
155+
.expect("Unable to create table request");
156+
157+
let _ = service
158+
.create_table_entry(tonic::Request::new(create_table_request))
159+
.await
160+
.expect("create table entry");
161+
162+
//
163+
// Dataset with same name as table fails
164+
//
165+
166+
let status = create_dataset_entry(
167+
&service,
168+
CreateDatasetEntryRequest {
169+
name: table_name.to_owned(),
170+
id: None,
171+
},
172+
)
173+
.await
174+
.unwrap_err();
175+
176+
assert_eq!(
177+
status.code(),
178+
tonic::Code::AlreadyExists,
179+
"unexpected status: {status:?}"
180+
);
181+
}
182+
183+
// ---
184+
185+
async fn create_dataset_entry(
186+
service: &impl RerunCloudService,
187+
request: CreateDatasetEntryRequest,
188+
) -> Result<DatasetEntry, tonic::Status> {
189+
service
190+
.create_dataset_entry(tonic::Request::new(request.clone().into()))
191+
.await
192+
.map(|result| result.into_inner().dataset.unwrap().try_into().unwrap())
193+
}
194+
195+
/// Get the entry details or return the endpoint error (all other errors panic)
196+
async fn entry_details_from_name(
197+
service: &impl RerunCloudService,
198+
name: &str,
199+
entry_kind: EntryKind,
200+
) -> Result<EntryDetails, tonic::Status> {
201+
let mut result = service
202+
.find_entries(tonic::Request::new(FindEntriesRequest {
203+
filter: Some(EntryFilter {
204+
id: None,
205+
name: Some(name.to_owned()),
206+
entry_kind: Some(entry_kind as i32),
207+
}),
208+
}))
209+
.await?
210+
.into_inner()
211+
.entries;
212+
213+
assert_eq!(result.len(), 1);
214+
215+
let entry_details = result.pop().unwrap();
216+
assert_eq!(entry_details.name.as_deref(), Some(name));
217+
assert_eq!(entry_details.entry_kind, entry_kind as i32);
218+
219+
Ok(entry_details.try_into().unwrap())
220+
}
221+
222+
/// Get the entry details or return the endpoint error (all other errors panic)
223+
async fn entry_details_from_id(
224+
service: &impl RerunCloudService,
225+
entry_id: EntryId,
226+
entry_kind: EntryKind,
227+
) -> Result<EntryDetails, tonic::Status> {
228+
let mut result = service
229+
.find_entries(tonic::Request::new(FindEntriesRequest {
230+
filter: Some(EntryFilter {
231+
id: Some(entry_id.into()),
232+
name: None,
233+
entry_kind: Some(entry_kind as i32),
234+
}),
235+
}))
236+
.await?
237+
.into_inner()
238+
.entries;
239+
240+
assert_eq!(result.len(), 1);
241+
242+
let entry_details = result.pop().unwrap();
243+
assert_eq!(entry_details.id, Some(entry_id.into()));
244+
assert_eq!(entry_details.entry_kind, entry_kind as i32);
245+
246+
Ok(entry_details.try_into().unwrap())
247+
}
248+
249+
/// Get the dataset details or return the endpoint error (all other errors panic)
250+
async fn dataset_details_from_id(
251+
service: &impl RerunCloudService,
252+
entry_id: EntryId,
253+
) -> Result<DatasetDetails, tonic::Status> {
254+
service
255+
.read_dataset_entry(
256+
tonic::Request::new(ReadDatasetEntryRequest {})
257+
.with_entry_id(entry_id)
258+
.unwrap(),
259+
)
260+
.await
261+
.map(|resp| {
262+
resp.into_inner()
263+
.dataset
264+
.unwrap()
265+
.dataset_details
266+
.unwrap()
267+
.try_into()
268+
.unwrap()
269+
})
270+
}

0 commit comments

Comments
 (0)