-
Notifications
You must be signed in to change notification settings - Fork 672
Fix handling of components that only vary by descriptors #11593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 27 commits
759d2f0
71b3e6e
cfbcda9
fed3575
0145417
47d4eca
2be5b36
3c84184
f80b908
997a715
d934344
3552281
bf05e3b
35e40b2
f60e88f
71c78cd
bab425b
d578c66
a842522
63c7ad0
ccd327a
5f5b181
a2e4abf
06340fc
48af60b
3c7e251
d1fd07a
3b8c2e6
ff98994
fbeaf50
f43c889
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,7 +12,7 @@ use nohash_hasher::IntMap; | |
| use re_arrow_util::arrays_to_list_array_opt; | ||
| use re_byte_size::SizeBytes as _; | ||
| use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt, TimePoint, Timeline, TimelineName}; | ||
| use re_types_core::ComponentDescriptor; | ||
| use re_types_core::{ComponentDescriptor, ComponentIdentifier}; | ||
|
|
||
| use crate::{Chunk, ChunkId, ChunkResult, RowId, TimeColumn, chunk::ChunkComponents}; | ||
|
|
||
|
|
@@ -784,18 +784,35 @@ pub struct PendingRow { | |
| /// The component data. | ||
| /// | ||
| /// Each array is a single component, i.e. _not_ a list array. | ||
| pub components: IntMap<ComponentDescriptor, ArrayRef>, | ||
| pub components: IntMap<ComponentIdentifier, (ComponentDescriptor, ArrayRef)>, | ||
| } | ||
|
|
||
| impl PendingRow { | ||
| #[inline] | ||
| pub fn new(timepoint: TimePoint, components: IntMap<ComponentDescriptor, ArrayRef>) -> Self { | ||
| pub fn new( | ||
| timepoint: TimePoint, | ||
| components: IntMap<ComponentIdentifier, (ComponentDescriptor, ArrayRef)>, | ||
|
||
| ) -> Self { | ||
| Self { | ||
| row_id: RowId::new(), | ||
| timepoint, | ||
| components, | ||
| } | ||
| } | ||
|
|
||
| #[inline] | ||
| pub fn from_iter( | ||
| timepoint: TimePoint, | ||
| components: impl IntoIterator<Item = (ComponentDescriptor, ArrayRef)>, | ||
| ) -> Self { | ||
| Self::new( | ||
| timepoint, | ||
| components | ||
| .into_iter() | ||
| .map(|(desc, array)| (desc.component, (desc, array))) | ||
| .collect(), | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| impl re_byte_size::SizeBytes for PendingRow { | ||
|
|
@@ -836,10 +853,10 @@ impl PendingRow { | |
| .collect(); | ||
|
|
||
| let mut per_desc = ChunkComponents::default(); | ||
| for (component_desc, array) in components { | ||
| for (component, (desc, array)) in components { | ||
| let list_array = arrays_to_list_array_opt(&[Some(&*array as _)]); | ||
| if let Some(list_array) = list_array { | ||
| per_desc.insert(component_desc, list_array); | ||
| per_desc.insert(component, (desc, list_array)); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -925,7 +942,7 @@ impl PendingRow { | |
| let mut hasher = ahash::AHasher::default(); | ||
| row.components | ||
| .values() | ||
| .for_each(|array| array.data_type().hash(&mut hasher)); | ||
| .for_each(|(_, array)| array.data_type().hash(&mut hasher)); | ||
| per_datatype_set | ||
| .entry(hasher.finish()) | ||
| .or_default() | ||
|
|
@@ -943,11 +960,12 @@ impl PendingRow { | |
|
|
||
| // Create all the logical list arrays that we're going to need, accounting for the | ||
| // possibility of sparse components in the data. | ||
| let mut all_components: IntMap<ComponentDescriptor, Vec<Option<&dyn ArrowArray>>> = | ||
| IntMap::default(); | ||
| let mut all_components: IntMap<ComponentIdentifier, _> = IntMap::default(); | ||
| for row in &rows { | ||
| for component_desc in row.components.keys() { | ||
| all_components.entry(component_desc.clone()).or_default(); | ||
| for (component, (desc, _)) in &row.components { | ||
| all_components | ||
| .entry(*component) | ||
| .or_insert_with(|| (desc.clone(), Vec::new())); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -983,15 +1001,16 @@ impl PendingRow { | |
| .map(|(name, time_column)| (name, time_column.finish())) | ||
| .collect(), | ||
| { | ||
| let mut per_desc = ChunkComponents::default(); | ||
| for (component_desc, arrays) in std::mem::take(&mut components) | ||
| let mut per_component = ChunkComponents::default(); | ||
| for (component, (desc, arrays)) in | ||
| std::mem::take(&mut components) | ||
| { | ||
| let list_array = arrays_to_list_array_opt(&arrays); | ||
| if let Some(list_array) = list_array { | ||
| per_desc.insert(component_desc, list_array); | ||
| per_component.insert(component, (desc, list_array)); | ||
| } | ||
| } | ||
| per_desc | ||
| per_component | ||
| }, | ||
| )); | ||
|
|
||
|
|
@@ -1008,13 +1027,13 @@ impl PendingRow { | |
| time_column.push(cell.into()); | ||
| } | ||
|
|
||
| for (component_desc, arrays) in &mut components { | ||
| for (component, (_desc, arrays)) in &mut components { | ||
| // NOTE: This will push `None` if the row doesn't actually hold a value for this | ||
| // component -- these are sparse list arrays! | ||
| arrays.push( | ||
| row_components | ||
| .get(component_desc) | ||
| .map(|array| &**array as &dyn ArrowArray), | ||
| .get(component) | ||
| .map(|(_, array)| &**array as &dyn ArrowArray), | ||
| ); | ||
| } | ||
| } | ||
|
|
@@ -1030,10 +1049,10 @@ impl PendingRow { | |
| .collect(), | ||
| { | ||
| let mut per_desc = ChunkComponents::default(); | ||
| for (component_desc, arrays) in components { | ||
| for (component, (desc, arrays)) in components { | ||
| let list_array = arrays_to_list_array_opt(&arrays); | ||
| if let Some(list_array) = list_array { | ||
| per_desc.insert(component_desc, list_array); | ||
| per_desc.insert(component, (desc, list_array)); | ||
| } | ||
| } | ||
| per_desc | ||
|
|
@@ -1152,9 +1171,9 @@ mod tests { | |
| (MyIndex::partial_descriptor(), indices3.clone()), | ||
| ]; | ||
|
|
||
| let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect()); | ||
| let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect()); | ||
| let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect()); | ||
| let row1 = PendingRow::from_iter(timepoint1.clone(), components1); | ||
| let row2 = PendingRow::from_iter(timepoint2.clone(), components2); | ||
| let row3 = PendingRow::from_iter(timepoint3.clone(), components3); | ||
|
|
||
| let entity_path1: EntityPath = "a/b/c".into(); | ||
| batcher.push_row(entity_path1.clone(), row1.clone()); | ||
|
|
@@ -1261,9 +1280,9 @@ mod tests { | |
| (MyPoints::descriptor_points(), points3.clone()), | ||
| ]; | ||
|
|
||
| let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect()); | ||
| let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect()); | ||
| let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect()); | ||
| let row1 = PendingRow::from_iter(timepoint1.clone(), components1); | ||
| let row2 = PendingRow::from_iter(timepoint2.clone(), components2); | ||
| let row3 = PendingRow::from_iter(timepoint3.clone(), components3); | ||
|
|
||
| let entity_path1: EntityPath = "a/b/c".into(); | ||
| batcher.push_row(entity_path1.clone(), row1.clone()); | ||
|
|
@@ -1349,9 +1368,9 @@ mod tests { | |
| let components2 = [(MyPoints::descriptor_points(), points2.clone())]; | ||
| let components3 = [(MyPoints::descriptor_points(), points3.clone())]; | ||
|
|
||
| let row1 = PendingRow::new(static_.clone(), components1.into_iter().collect()); | ||
| let row2 = PendingRow::new(static_.clone(), components2.into_iter().collect()); | ||
| let row3 = PendingRow::new(static_.clone(), components3.into_iter().collect()); | ||
| let row1 = PendingRow::from_iter(static_.clone(), components1); | ||
| let row2 = PendingRow::from_iter(static_.clone(), components2); | ||
| let row3 = PendingRow::from_iter(static_.clone(), components3); | ||
|
|
||
| let entity_path1: EntityPath = "a/b/c".into(); | ||
| batcher.push_row(entity_path1.clone(), row1.clone()); | ||
|
|
@@ -1424,9 +1443,9 @@ mod tests { | |
| let components2 = [(MyPoints::descriptor_points(), points2.clone())]; | ||
| let components3 = [(MyPoints::descriptor_points(), points3.clone())]; | ||
|
|
||
| let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect()); | ||
| let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect()); | ||
| let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect()); | ||
| let row1 = PendingRow::from_iter(timepoint1.clone(), components1); | ||
| let row2 = PendingRow::from_iter(timepoint2.clone(), components2); | ||
| let row3 = PendingRow::from_iter(timepoint3.clone(), components3); | ||
|
|
||
| let entity_path1: EntityPath = "ent1".into(); | ||
| let entity_path2: EntityPath = "ent2".into(); | ||
|
|
@@ -1532,9 +1551,9 @@ mod tests { | |
| let components2 = [(MyPoints::descriptor_points(), points2.clone())]; | ||
| let components3 = [(MyPoints::descriptor_points(), points3.clone())]; | ||
|
|
||
| let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect()); | ||
| let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect()); | ||
| let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect()); | ||
| let row1 = PendingRow::from_iter(timepoint1.clone(), components1); | ||
| let row2 = PendingRow::from_iter(timepoint2.clone(), components2); | ||
| let row3 = PendingRow::from_iter(timepoint3.clone(), components3); | ||
|
|
||
| let entity_path1: EntityPath = "a/b/c".into(); | ||
| batcher.push_row(entity_path1.clone(), row1.clone()); | ||
|
|
@@ -1641,9 +1660,9 @@ mod tests { | |
| let components2 = [(MyPoints::descriptor_points(), points2.clone())]; // same name, different datatype | ||
| let components3 = [(MyPoints::descriptor_points(), points3.clone())]; | ||
|
|
||
| let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect()); | ||
| let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect()); | ||
| let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect()); | ||
| let row1 = PendingRow::from_iter(timepoint1.clone(), components1); | ||
| let row2 = PendingRow::from_iter(timepoint2.clone(), components2); | ||
| let row3 = PendingRow::from_iter(timepoint3.clone(), components3); | ||
|
|
||
| let entity_path1: EntityPath = "a/b/c".into(); | ||
| batcher.push_row(entity_path1.clone(), row1.clone()); | ||
|
|
@@ -1763,10 +1782,10 @@ mod tests { | |
| let components3 = [(MyPoints::descriptor_points(), points3.clone())]; | ||
| let components4 = [(MyPoints::descriptor_points(), points4.clone())]; | ||
|
|
||
| let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect()); | ||
| let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect()); | ||
| let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect()); | ||
| let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect()); | ||
| let row1 = PendingRow::from_iter(timepoint4.clone(), components1); | ||
| let row2 = PendingRow::from_iter(timepoint1.clone(), components2); | ||
| let row3 = PendingRow::from_iter(timepoint2.clone(), components3); | ||
| let row4 = PendingRow::from_iter(timepoint3.clone(), components4); | ||
|
|
||
| let entity_path1: EntityPath = "a/b/c".into(); | ||
| batcher.push_row(entity_path1.clone(), row1.clone()); | ||
|
|
@@ -1870,10 +1889,10 @@ mod tests { | |
| let components3 = [(MyPoints::descriptor_points(), points3.clone())]; | ||
| let components4 = [(MyPoints::descriptor_points(), points4.clone())]; | ||
|
|
||
| let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect()); | ||
| let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect()); | ||
| let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect()); | ||
| let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect()); | ||
| let row1 = PendingRow::from_iter(timepoint4.clone(), components1); | ||
| let row2 = PendingRow::from_iter(timepoint1.clone(), components2); | ||
| let row3 = PendingRow::from_iter(timepoint2.clone(), components3); | ||
| let row4 = PendingRow::from_iter(timepoint3.clone(), components4); | ||
|
|
||
| let entity_path1: EntityPath = "a/b/c".into(); | ||
| batcher.push_row(entity_path1.clone(), row1.clone()); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.