1- use std:: collections:: BTreeMap ;
2-
31use arrow:: {
42 array:: {
53 ArrayBuilder , BinaryBuilder , BooleanBuilder , FixedSizeListBuilder , Float32Builder ,
@@ -20,7 +18,7 @@ use crate::{Error, LayerIdentifier, MessageLayer};
2018
2119struct ProtobufMessageParser {
2220 message_descriptor : MessageDescriptor ,
23- fields : BTreeMap < String , FixedSizeListBuilder < Box < dyn ArrayBuilder > > > ,
21+ builder : FixedSizeListBuilder < StructBuilder > ,
2422}
2523
2624#[ derive( Debug , thiserror:: Error ) ]
@@ -47,9 +45,6 @@ enum ProtobufError {
4745 #[ error( "unknown enum number {0}" ) ]
4846 UnknownEnumNumber ( i32 ) ,
4947
50- #[ error( "unknown field name {0}" ) ]
51- UnknownFieldName ( String ) ,
52-
5348 #[ error( "type {0} is not supported yet" ) ]
5449 UnsupportedType ( & ' static str ) ,
5550
@@ -59,19 +54,6 @@ enum ProtobufError {
5954
6055impl ProtobufMessageParser {
6156 fn new ( num_rows : usize , message_descriptor : MessageDescriptor ) -> Self {
62- let mut fields = BTreeMap :: new ( ) ;
63-
64- // We recursively build up the Arrow builders for this particular message.
65- for field_descr in message_descriptor. fields ( ) {
66- let name = field_descr. name ( ) . to_owned ( ) ;
67- let builder = arrow_builder_from_field ( & field_descr) ;
68- fields. insert (
69- name,
70- FixedSizeListBuilder :: with_capacity ( builder, 1 , num_rows) ,
71- ) ;
72- re_log:: trace!( "Added Arrow builder for fields: {}" , field_descr. name( ) ) ;
73- }
74-
7557 if message_descriptor. oneofs ( ) . len ( ) > 0 {
7658 re_log:: warn_once!(
7759 "`oneof` in schema {} is not supported yet." ,
@@ -84,9 +66,12 @@ impl ProtobufMessageParser {
8466 ) ;
8567 }
8668
69+ let struct_builder = struct_builder_from_message ( & message_descriptor) ;
70+ let builder = FixedSizeListBuilder :: with_capacity ( struct_builder, 1 , num_rows) ;
71+
8772 Self {
8873 message_descriptor,
89- fields ,
74+ builder ,
9075 }
9176 }
9277}
@@ -103,22 +88,31 @@ impl MessageParser for ProtobufMessageParser {
10388 } ,
10489 ) ?;
10590
106- // We always need to make sure to iterate over all our builders, adding null values whenever
107- // a field is missing from the message that we received.
108- for ( field, builder) in & mut self . fields {
109- if let Some ( val) = dynamic_message. get_field_by_name ( field. as_str ( ) ) {
91+ let struct_builder = self . builder . values ( ) ;
92+
93+ for ( ith_arrow_field, field_builder) in
94+ struct_builder. field_builders_mut ( ) . iter_mut ( ) . enumerate ( )
95+ {
96+ // Protobuf fields are 1-indexed, so we need to map the i-th builder.
97+ let protobuf_number = ith_arrow_field as u32 + 1 ;
98+
99+ if let Some ( val) = dynamic_message. get_field_by_number ( protobuf_number) {
110100 let field = dynamic_message
111101 . descriptor ( )
112- . get_field_by_name ( field)
113- . ok_or_else ( || ProtobufError :: UnknownFieldName ( field. to_owned ( ) ) ) ?;
114- append_value ( builder. values ( ) , & field, val. as_ref ( ) ) ?;
115- builder. append ( true ) ;
102+ . get_field ( protobuf_number)
103+ . ok_or_else ( || ProtobufError :: MissingField {
104+ field : protobuf_number,
105+ } ) ?;
106+ append_value ( field_builder, & field, val. as_ref ( ) ) ?;
116107 re_log:: trace!( "Field {}: Finished writing to builders" , field. full_name( ) ) ;
117108 } else {
118- builder . append ( false ) ;
109+ append_null_to_builder ( field_builder ) ? ;
119110 }
120111 }
121112
113+ struct_builder. append ( true ) ;
114+ self . builder . append ( true ) ;
115+
122116 Ok ( ( ) )
123117 }
124118
@@ -129,23 +123,19 @@ impl MessageParser for ProtobufMessageParser {
129123
130124 let Self {
131125 message_descriptor,
132- fields ,
126+ mut builder ,
133127 } = * self ;
134128
135129 let message_chunk = Chunk :: from_auto_row_ids (
136130 ChunkId :: new ( ) ,
137131 entity_path,
138132 timelines,
139- fields
140- . into_iter ( )
141- . map ( |( field, mut builder) | {
142- (
143- ComponentDescriptor :: partial ( field)
144- . with_builtin_archetype ( message_descriptor. full_name ( ) ) ,
145- builder. finish ( ) . into ( ) ,
146- )
147- } )
148- . collect ( ) ,
133+ std:: iter:: once ( (
134+ ComponentDescriptor :: partial ( "message" )
135+ . with_builtin_archetype ( message_descriptor. full_name ( ) ) ,
136+ builder. finish ( ) . into ( ) ,
137+ ) )
138+ . collect ( ) ,
149139 )
150140 . map_err ( |err| Error :: Other ( anyhow:: anyhow!( err) ) ) ?;
151141
@@ -166,6 +156,41 @@ fn downcast_err<'a, T: std::any::Any>(
166156 } )
167157}
168158
159+ fn append_null_to_builder ( builder : & mut dyn ArrayBuilder ) -> Result < ( ) , ProtobufError > {
160+ // Try to append null by downcasting to known builder types
161+ if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < BooleanBuilder > ( ) {
162+ b. append_null ( ) ;
163+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < Int32Builder > ( ) {
164+ b. append_null ( ) ;
165+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < Int64Builder > ( ) {
166+ b. append_null ( ) ;
167+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < UInt32Builder > ( ) {
168+ b. append_null ( ) ;
169+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < UInt64Builder > ( ) {
170+ b. append_null ( ) ;
171+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < Float32Builder > ( ) {
172+ b. append_null ( ) ;
173+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < Float64Builder > ( ) {
174+ b. append_null ( ) ;
175+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < StringBuilder > ( ) {
176+ b. append_null ( ) ;
177+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < BinaryBuilder > ( ) {
178+ b. append_null ( ) ;
179+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < StructBuilder > ( ) {
180+ b. append_null ( ) ;
181+ } else if let Some ( b) = builder
182+ . as_any_mut ( )
183+ . downcast_mut :: < ListBuilder < Box < dyn ArrayBuilder > > > ( )
184+ {
185+ b. append_null ( ) ;
186+ } else {
187+ return Err ( ProtobufError :: UnsupportedType (
188+ "Unknown builder type for append_null" ,
189+ ) ) ;
190+ }
191+ Ok ( ( ) )
192+ }
193+
169194fn append_value (
170195 builder : & mut dyn ArrayBuilder ,
171196 field : & FieldDescriptor ,
0 commit comments