From 03cb4ca515c4c60cf68c5590ae81152bc7a05923 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 20 Jan 2022 18:35:27 +0000 Subject: [PATCH 1/8] Batch multiple records in ArrowWriter --- parquet/src/arrow/arrow_writer.rs | 164 +++++++++++++++++++++++++----- parquet/src/arrow/levels.rs | 9 +- 2 files changed, 141 insertions(+), 32 deletions(-) diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index fc3a567a57c2..98d88445f567 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -17,9 +17,12 @@ //! Contains writer which writes arrow data into parquet data. +use std::collections::VecDeque; use std::sync::Arc; use arrow::array as arrow_array; +use arrow::array::ArrayRef; +use arrow::compute::concat; use arrow::datatypes::{DataType as ArrowDataType, IntervalUnit, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::Array; @@ -40,14 +43,23 @@ use crate::{ /// Arrow writer /// -/// Writes Arrow `RecordBatch`es to a Parquet writer +/// Writes Arrow `RecordBatch`es to a Parquet writer, buffering up `RecordBatch` in order +/// to produce row groups with `max_row_group_size` rows. Any remaining rows will be +/// flushed on close, leading the final row group in the output file to potentially +/// contain fewer than `max_row_group_size` rows pub struct ArrowWriter { /// Underlying Parquet writer writer: SerializedFileWriter, + + buffer: Vec>, + + buffered_rows: usize, + /// A copy of the Arrow schema. /// /// The schema is used to verify that each record batch written has the correct schema arrow_schema: SchemaRef, + /// The length of arrays to write to each row group max_row_group_size: usize, } @@ -75,18 +87,18 @@ impl ArrowWriter { Ok(Self { writer: file_writer, + buffer: vec![Default::default(); arrow_schema.fields().len()], + buffered_rows: 0, arrow_schema, max_row_group_size, }) } - /// Write a RecordBatch to writer + /// Enqueues the provided `RecordBatch` to be written /// - /// The writer will slice the `batch` into `max_row_group_size`, - /// but if a batch has left-over rows less than the row group size, - /// the last row group will have fewer records. - /// This is currently a limitation because we close the row group - /// instead of keeping it open for the next batch. + /// If following this there are more than `max_row_group_size` rows buffered, + /// this will flush out one or more row groups with `max_row_group_size` rows, + /// and drop any fully written `RecordBatch` pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { // validate batch schema against writer's supplied schema if self.arrow_schema != batch.schema() { @@ -94,35 +106,90 @@ impl ArrowWriter { "Record batch schema does not match writer schema".to_string(), )); } - // Track the number of rows being written in the batch. - // We currently do not have a way of slicing nested arrays, thus we - // track this manually. - let num_rows = batch.num_rows(); - let batches = (num_rows + self.max_row_group_size - 1) / self.max_row_group_size; - let min_batch = num_rows.min(self.max_row_group_size); - for batch_index in 0..batches { - // Determine the offset and length of arrays - let offset = batch_index * min_batch; - let length = (num_rows - offset).min(self.max_row_group_size); - - // Compute the definition and repetition levels of the batch - let batch_level = LevelInfo::new(offset, length); - let mut row_group_writer = self.writer.next_row_group()?; - for (array, field) in batch.columns().iter().zip(batch.schema().fields()) { - let mut levels = batch_level.calculate_array_levels(array, field); - // Reverse levels as we pop() them when writing arrays - levels.reverse(); - write_leaves(&mut row_group_writer, array, &mut levels)?; + + for (buffer, batch) in self.buffer.iter_mut().zip(batch.columns()) { + buffer.push_back(batch.clone()) + } + + self.buffered_rows += batch.num_rows(); + + self.flush_completed()?; + + Ok(()) + } + + /// Flushes buffered data until there are less than `max_row_group_size` rows buffered + fn flush_completed(&mut self) -> Result<()> { + while self.buffered_rows >= self.max_row_group_size { + self.flush_row_group(self.max_row_group_size)?; + } + Ok(()) + } + + /// Flushes `num_rows` from the buffer into a new row group + fn flush_row_group(&mut self, num_rows: usize) -> Result<()> { + if num_rows == 0 { + return Ok(()); + } + + assert!( + num_rows <= self.buffered_rows, + "cannot flush {} rows only have {}", + num_rows, + self.buffered_rows + ); + + assert!( + num_rows <= self.max_row_group_size, + "cannot flush {} rows would exceed max row group size of {}", + num_rows, + self.max_row_group_size + ); + + let batch_level = LevelInfo::new(0, num_rows); + let mut row_group_writer = self.writer.next_row_group()?; + + for (col_buffer, field) in self.buffer.iter_mut().zip(self.arrow_schema.fields()) + { + // Collect the number of arrays to append + let mut remaining = num_rows; + let mut arrays = Vec::with_capacity(col_buffer.len()); + while remaining != 0 { + match col_buffer.pop_front() { + Some(next) if next.len() > remaining => { + col_buffer + .push_front(next.slice(remaining, next.len() - remaining)); + arrays.push(next.slice(0, remaining)); + remaining = 0; + } + Some(next) => { + remaining -= next.len(); + arrays.push(next); + } + _ => break, + } } - self.writer.close_row_group(row_group_writer)?; + // Workaround write logic expecting a single array + let array_refs: Vec<_> = arrays.iter().map(|x| x.as_ref()).collect(); + let array = concat(array_refs.as_slice())?; + + let mut levels = batch_level.calculate_array_levels(&array, field); + // Reverse levels as we pop() them when writing arrays + levels.reverse(); + write_leaves(&mut row_group_writer, &array, &mut levels)?; } + self.writer.close_row_group(row_group_writer)?; + self.buffered_rows -= num_rows; + Ok(()) } /// Close and finalize the underlying Parquet writer pub fn close(&mut self) -> Result { + self.flush_completed()?; + self.flush_row_group(self.buffered_rows)?; self.writer.close() } } @@ -1723,4 +1790,47 @@ mod tests { one_column_roundtrip(array, true, Some(10)); } + + #[test] + fn test_aggregates_records() { + let arrays = [ + Int32Array::from((0..100).collect::>()), + Int32Array::from((0..50).collect::>()), + Int32Array::from((200..500).collect::>()), + ]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "int", + ArrowDataType::Int32, + false, + )])); + + let file = tempfile::tempfile().unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_size(200) + .build(); + + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)) + .unwrap(); + + for array in arrays { + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); + writer.write(&batch).unwrap(); + } + + writer.close().unwrap(); + + let reader = SerializedFileReader::new(file).unwrap(); + let row_group_sizes: Vec<_> = reader + .metadata() + .row_groups() + .iter() + .map(|x| x.num_rows()) + .collect(); + + assert_eq!(row_group_sizes, vec![200, 200, 50]); + } } diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs index ea1f2125069e..20b0ff73b8d7 100644 --- a/parquet/src/arrow/levels.rs +++ b/parquet/src/arrow/levels.rs @@ -40,7 +40,7 @@ //! //! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding) -use arrow::array::{make_array, ArrayRef, MapArray, StructArray}; +use arrow::array::{make_array, Array, ArrayRef, MapArray, StructArray}; use arrow::datatypes::{DataType, Field}; /// Keeps track of the level information per array that is needed to write an Arrow array to Parquet. @@ -711,12 +711,11 @@ impl LevelInfo { ((0..=(len as i64)).collect(), array_mask) } DataType::List(_) | DataType::Map(_, _) => { - let data = array.data(); - let offsets = unsafe { data.buffers()[0].typed_data::() }; + let offsets = unsafe { array.data().buffers()[0].typed_data::() }; let offsets = offsets .to_vec() .into_iter() - .skip(offset) + .skip(array.offset() + offset) .take(len + 1) .map(|v| v as i64) .collect::>(); @@ -729,7 +728,7 @@ impl LevelInfo { DataType::LargeList(_) => { let offsets = unsafe { array.data().buffers()[0].typed_data::() } .iter() - .skip(offset) + .skip(array.offset() + offset) .take(len + 1) .copied() .collect(); From 9dbfdf20e1cc47dd705b3d0456cb938c994923a9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 21 Jan 2022 18:34:37 +0000 Subject: [PATCH 2/8] Document max_group_size and reduce default (#1213) --- parquet/src/file/properties.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index c48e4e7a07b0..b42516e971bb 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -62,7 +62,7 @@ const DEFAULT_DICTIONARY_ENABLED: bool = true; const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE; const DEFAULT_STATISTICS_ENABLED: bool = true; const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; -const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 128 * 1024 * 1024; +const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY"); /// Parquet writer version. @@ -129,7 +129,7 @@ impl WriterProperties { self.write_batch_size } - /// Returns max size for a row group. + /// Returns maximum number of rows in a row group. pub fn max_row_group_size(&self) -> usize { self.max_row_group_size } @@ -288,7 +288,7 @@ impl WriterPropertiesBuilder { self } - /// Sets max size for a row group. + /// Sets maximum number of rows in a row group. pub fn set_max_row_group_size(mut self, value: usize) -> Self { assert!(value > 0, "Cannot have a 0 max row group size"); self.max_row_group_size = value; From 65454bb45346530abb31f781bf4e0292bd6985e1 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 31 Jan 2022 21:36:14 +0000 Subject: [PATCH 3/8] Review feedback --- parquet/src/arrow/arrow_writer.rs | 40 ++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index 98d88445f567..4f9b6f14e5d9 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -51,8 +51,10 @@ pub struct ArrowWriter { /// Underlying Parquet writer writer: SerializedFileWriter, + /// For each column, maintain an ordered queue of arrays to write buffer: Vec>, + /// The total number of rows currently buffered buffered_rows: usize, /// A copy of the Arrow schema. @@ -107,8 +109,8 @@ impl ArrowWriter { )); } - for (buffer, batch) in self.buffer.iter_mut().zip(batch.columns()) { - buffer.push_back(batch.clone()) + for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) { + buffer.push_back(column.clone()) } self.buffered_rows += batch.num_rows(); @@ -660,6 +662,7 @@ mod tests { use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type}; + use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use arrow::{array::*, buffer::Buffer}; @@ -1831,6 +1834,37 @@ mod tests { .map(|x| x.num_rows()) .collect(); - assert_eq!(row_group_sizes, vec![200, 200, 50]); + assert_eq!(&row_group_sizes, &[200, 200, 50]); + + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); + let batches = arrow_reader + .get_record_reader(100) + .unwrap() + .collect::>>() + .unwrap(); + + assert_eq!(batches.len(), 5); + assert!(batches.iter().all(|x| x.num_columns() == 1)); + + let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect(); + + assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]); + + let values: Vec<_> = batches + .iter() + .flat_map(|x| { + x.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .cloned() + }) + .collect(); + + let expected_values: Vec<_> = + [0..100, 0..50, 200..500].into_iter().flatten().collect(); + assert_eq!(&values, &expected_values) } } From 4185fa781a3afc012b199a080fd667d05ef082ff Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 31 Jan 2022 23:24:16 +0000 Subject: [PATCH 4/8] Write multiple arrays without concat --- parquet/src/arrow/arrow_writer.rs | 128 ++++++++++++++++++------------ 1 file changed, 79 insertions(+), 49 deletions(-) diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index 4f9b6f14e5d9..e13e3b451b77 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use arrow::array as arrow_array; use arrow::array::ArrayRef; -use arrow::compute::concat; use arrow::datatypes::{DataType as ArrowDataType, IntervalUnit, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::Array; @@ -148,7 +147,6 @@ impl ArrowWriter { self.max_row_group_size ); - let batch_level = LevelInfo::new(0, num_rows); let mut row_group_writer = self.writer.next_row_group()?; for (col_buffer, field) in self.buffer.iter_mut().zip(self.arrow_schema.fields()) @@ -172,14 +170,18 @@ impl ArrowWriter { } } - // Workaround write logic expecting a single array - let array_refs: Vec<_> = arrays.iter().map(|x| x.as_ref()).collect(); - let array = concat(array_refs.as_slice())?; + let mut levels: Vec<_> = arrays + .iter() + .map(|array| { + let batch_level = LevelInfo::new(0, array.len()); + let mut levels = batch_level.calculate_array_levels(&array, field); + // Reverse levels as we pop() them when writing arrays + levels.reverse(); + levels + }) + .collect(); - let mut levels = batch_level.calculate_array_levels(&array, field); - // Reverse levels as we pop() them when writing arrays - levels.reverse(); - write_leaves(&mut row_group_writer, &array, &mut levels)?; + write_leaves(row_group_writer.as_mut(), &arrays, &mut levels)?; } self.writer.close_row_group(row_group_writer)?; @@ -198,23 +200,25 @@ impl ArrowWriter { /// Convenience method to get the next ColumnWriter from the RowGroupWriter #[inline] -#[allow(clippy::borrowed_box)] -fn get_col_writer( - row_group_writer: &mut Box, -) -> Result { +fn get_col_writer(row_group_writer: &mut dyn RowGroupWriter) -> Result { let col_writer = row_group_writer .next_column()? .expect("Unable to get column writer"); Ok(col_writer) } -#[allow(clippy::borrowed_box)] fn write_leaves( - row_group_writer: &mut Box, - array: &arrow_array::ArrayRef, - levels: &mut Vec, + row_group_writer: &mut dyn RowGroupWriter, + arrays: &[ArrayRef], + levels: &mut [Vec], ) -> Result<()> { - match array.data_type() { + assert_eq!(arrays.len(), levels.len()); + assert!(!arrays.is_empty()); + + let data_type = arrays.first().unwrap().data_type().clone(); + assert!(arrays.iter().all(|a| a.data_type() == &data_type)); + + match &data_type { ArrowDataType::Null | ArrowDataType::Boolean | ArrowDataType::Int8 @@ -241,50 +245,76 @@ fn write_leaves( | ArrowDataType::Decimal(_, _) | ArrowDataType::FixedSizeBinary(_) => { let mut col_writer = get_col_writer(row_group_writer)?; - write_leaf( - &mut col_writer, - array, - levels.pop().expect("Levels exhausted"), - )?; + for (array, levels) in arrays.iter().zip(levels.iter_mut()) { + write_leaf( + &mut col_writer, + array, + levels.pop().expect("Levels exhausted"), + )?; + } row_group_writer.close_column(col_writer)?; Ok(()) } ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { - // write the child list - let data = array.data(); - let child_array = arrow_array::make_array(data.child_data()[0].clone()); - write_leaves(row_group_writer, &child_array, levels)?; + let arrays: Vec<_> = arrays.iter().map(|array|{ + // write the child list + let data = array.data(); + arrow_array::make_array(data.child_data()[0].clone()) + }).collect(); + + write_leaves(row_group_writer, &arrays, levels)?; Ok(()) } - ArrowDataType::Struct(_) => { - let struct_array: &arrow_array::StructArray = array - .as_any() - .downcast_ref::() - .expect("Unable to get struct array"); - for field in struct_array.columns() { - write_leaves(row_group_writer, field, levels)?; + ArrowDataType::Struct(fields) => { + // Groups child arrays by field + let mut field_arrays = vec![Vec::with_capacity(arrays.len()); fields.len()]; + + for array in arrays { + let struct_array: &arrow_array::StructArray = array + .as_any() + .downcast_ref::() + .expect("Unable to get struct array"); + + assert_eq!(struct_array.columns().len(), fields.len()); + + for (child_array, field) in field_arrays.iter_mut().zip(struct_array.columns()) { + child_array.push(field.clone()) + } + } + + for field in field_arrays { + write_leaves(row_group_writer, &field, levels)?; } + Ok(()) } ArrowDataType::Map(_, _) => { - let map_array: &arrow_array::MapArray = array - .as_any() - .downcast_ref::() - .expect("Unable to get map array"); - write_leaves(row_group_writer, &map_array.keys(), levels)?; - write_leaves(row_group_writer, &map_array.values(), levels)?; + let mut keys = Vec::with_capacity(arrays.len()); + let mut values = Vec::with_capacity(arrays.len()); + for array in arrays { + let map_array: &arrow_array::MapArray = array + .as_any() + .downcast_ref::() + .expect("Unable to get map array"); + keys.push(map_array.keys()); + values.push(map_array.values()); + } + + write_leaves(row_group_writer, &keys, levels)?; + write_leaves(row_group_writer, &values, levels)?; Ok(()) } ArrowDataType::Dictionary(_, value_type) => { - // cast dictionary to a primitive - let array = arrow::compute::cast(array, value_type)?; - let mut col_writer = get_col_writer(row_group_writer)?; - write_leaf( - &mut col_writer, - &array, - levels.pop().expect("Levels exhausted"), - )?; + for (array, levels) in arrays.iter().zip(levels.iter_mut()) { + // cast dictionary to a primitive + let array = arrow::compute::cast(array, value_type)?; + write_leaf( + &mut col_writer, + &array, + levels.pop().expect("Levels exhausted"), + )?; + } row_group_writer.close_column(col_writer)?; Ok(()) } @@ -295,7 +325,7 @@ fn write_leaves( Err(ParquetError::NYI( format!( "Attempting to write an Arrow type {:?} to parquet that is not yet implemented", - array.data_type() + data_type ) )) } From e894fa897923773cfd89847fb0cdb4f48e560110 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 31 Jan 2022 23:24:53 +0000 Subject: [PATCH 5/8] Clippy --- parquet/src/arrow/arrow_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index e13e3b451b77..f4abf98ec4d8 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -174,7 +174,7 @@ impl ArrowWriter { .iter() .map(|array| { let batch_level = LevelInfo::new(0, array.len()); - let mut levels = batch_level.calculate_array_levels(&array, field); + let mut levels = batch_level.calculate_array_levels(array, field); // Reverse levels as we pop() them when writing arrays levels.reverse(); levels From e552b7af1f188291e6d7c6a702b0e1709a83624b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 1 Feb 2022 09:50:51 +0000 Subject: [PATCH 6/8] Test aggregating complex types --- arrow/src/util/pretty.rs | 10 +-- parquet/Cargo.toml | 2 +- parquet/src/arrow/arrow_writer.rs | 127 ++++++++++++++++++++++++++++++ 3 files changed, 133 insertions(+), 6 deletions(-) diff --git a/arrow/src/util/pretty.rs b/arrow/src/util/pretty.rs index 91343ec0f161..4b67f3d376ed 100644 --- a/arrow/src/util/pretty.rs +++ b/arrow/src/util/pretty.rs @@ -74,7 +74,7 @@ fn create_table(results: &[RecordBatch]) -> Result { let mut cells = Vec::new(); for col in 0..batch.num_columns() { let column = batch.column(col); - cells.push(Cell::new(&array_value_to_string(&column, row)?)); + cells.push(Cell::new(&array_value_to_string(column, row)?)); } table.add_row(cells); } @@ -96,7 +96,7 @@ fn create_column(field: &str, columns: &[ArrayRef]) -> Result
{ for col in columns { for row in 0..col.len() { - let cells = vec![Cell::new(&array_value_to_string(&col, row)?)]; + let cells = vec![Cell::new(&array_value_to_string(col, row)?)]; table.add_row(cells); } } @@ -320,7 +320,7 @@ mod tests { let mut builder = FixedSizeBinaryBuilder::new(3, 3); builder.append_value(&[1, 2, 3]).unwrap(); - builder.append_null(); + builder.append_null().unwrap(); builder.append_value(&[7, 8, 9]).unwrap(); let array = Arc::new(builder.finish()); @@ -677,7 +677,7 @@ mod tests { )?; let mut buf = String::new(); - write!(&mut buf, "{}", pretty_format_batches(&[batch])?.to_string()).unwrap(); + write!(&mut buf, "{}", pretty_format_batches(&[batch])?).unwrap(); let s = vec![ "+---+-----+", @@ -689,7 +689,7 @@ mod tests { "| d | 100 |", "+---+-----+", ]; - let expected = String::from(s.join("\n")); + let expected = s.join("\n"); assert_eq!(expected, buf); Ok(()) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 0765da13c586..3dd65f650cc7 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -55,7 +55,7 @@ brotli = "3.3" flate2 = "1.0" lz4 = "1.23" serde_json = { version = "1.0", features = ["preserve_order"] } -arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils"] } +arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils", "prettyprint"] } [features] default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index f4abf98ec4d8..fd4743b8c499 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -694,6 +694,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; + use arrow::util::pretty::pretty_format_batches; use arrow::{array::*, buffer::Buffer}; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; @@ -1897,4 +1898,130 @@ mod tests { [0..100, 0..50, 200..500].into_iter().flatten().collect(); assert_eq!(&values, &expected_values) } + + #[test] + fn complex_aggregate() { + // Tests aggregating nested data + let field_a = Field::new("leaf_a", DataType::Int32, false); + let field_b = Field::new("leaf_b", DataType::Int32, true); + let struct_a = Field::new( + "struct_a", + DataType::Struct(vec![field_a.clone(), field_b.clone()]), + true, + ); + + let list_a = Field::new("list", DataType::List(Box::new(struct_a)), true); + let struct_b = + Field::new("struct_b", DataType::Struct(vec![list_a.clone()]), false); + + let schema = Arc::new(Schema::new(vec![struct_b])); + + // create nested data + let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); + let field_b_array = + Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]); + + let struct_a_array = StructArray::from(vec![ + (field_a.clone(), Arc::new(field_a_array) as ArrayRef), + (field_b.clone(), Arc::new(field_b_array) as ArrayRef), + ]); + + let list_data = ArrayDataBuilder::new(list_a.data_type().clone()) + .len(5) + .add_buffer(Buffer::from_iter(vec![ + 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32, + ])) + .null_bit_buffer(Buffer::from_iter(vec![true, false, true, false, true])) + .child_data(vec![struct_a_array.data().clone()]) + .build() + .unwrap(); + + let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef; + let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]); + + let batch1 = RecordBatch::try_from_iter(vec![( + "struct_b", + Arc::new(struct_b_array) as ArrayRef, + )]) + .unwrap(); + + let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]); + let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]); + + let struct_a_array = StructArray::from(vec![ + (field_a, Arc::new(field_a_array) as ArrayRef), + (field_b, Arc::new(field_b_array) as ArrayRef), + ]); + + let list_data = ArrayDataBuilder::new(list_a.data_type().clone()) + .len(2) + .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32])) + .child_data(vec![struct_a_array.data().clone()]) + .build() + .unwrap(); + + let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef; + let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]); + + let batch2 = RecordBatch::try_from_iter(vec![( + "struct_b", + Arc::new(struct_b_array) as ArrayRef, + )]) + .unwrap(); + + let batches = &[batch1, batch2]; + + // Verify data is as expected + + let expected = r#" + +-------------------------------------------------------------------------------------------------------------------------------------+ + | struct_b | + +-------------------------------------------------------------------------------------------------------------------------------------+ + | {"list": [{"leaf_a": 1, "leaf_b": 1}]} | + | {"list": null} | + | {"list": [{"leaf_a": 2, "leaf_b": null}, {"leaf_a": 3, "leaf_b": 2}]} | + | {"list": null} | + | {"list": [{"leaf_a": 4, "leaf_b": null}, {"leaf_a": 5, "leaf_b": null}]} | + | {"list": [{"leaf_a": 6, "leaf_b": null}, {"leaf_a": 7, "leaf_b": null}, {"leaf_a": 8, "leaf_b": null}, {"leaf_a": 9, "leaf_b": 1}]} | + | {"list": [{"leaf_a": 10, "leaf_b": null}]} | + +-------------------------------------------------------------------------------------------------------------------------------------+ + "#.trim().split('\n').map(|x| x.trim()).collect::>().join("\n"); + + let actual = pretty_format_batches(batches).unwrap().to_string(); + assert_eq!(actual, expected); + + // Write data + let file = tempfile::tempfile().unwrap(); + let props = WriterProperties::builder() + .set_max_row_group_size(200) + .build(); + + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap(); + + for batch in batches { + writer.write(batch).unwrap(); + } + writer.close().unwrap(); + + // Read Data + let reader = SerializedFileReader::new(file).unwrap(); + + // Should have written a single row group + assert_eq!(reader.metadata().num_row_groups(), 1); + + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); + let batches = arrow_reader + .get_record_reader(2) + .unwrap() + .collect::>>() + .unwrap(); + + assert_eq!(batches.len(), 4); + let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect(); + assert_eq!(&batch_counts, &[2, 2, 2, 1]); + + let actual = pretty_format_batches(&batches).unwrap().to_string(); + assert_eq!(actual, expected); + } } From 9d10c2d43a54dfe3926cf044a6923add933ae696 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 1 Feb 2022 09:57:42 +0000 Subject: [PATCH 7/8] Test complex slice --- parquet/src/arrow/arrow_writer.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index fd4743b8c499..6f391b500a16 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -698,6 +698,7 @@ mod tests { use arrow::{array::*, buffer::Buffer}; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; + use crate::file::metadata::{FileMetaData, ParquetMetaData}; use crate::file::{ reader::{FileReader, SerializedFileReader}, statistics::Statistics, @@ -1825,6 +1826,10 @@ mod tests { one_column_roundtrip(array, true, Some(10)); } + fn row_group_sizes(metadata: &ParquetMetaData) -> Vec { + metadata.row_groups().iter().map(|x| x.num_rows()).collect() + } + #[test] fn test_aggregates_records() { let arrays = [ @@ -1858,14 +1863,7 @@ mod tests { writer.close().unwrap(); let reader = SerializedFileReader::new(file).unwrap(); - let row_group_sizes: Vec<_> = reader - .metadata() - .row_groups() - .iter() - .map(|x| x.num_rows()) - .collect(); - - assert_eq!(&row_group_sizes, &[200, 200, 50]); + assert_eq!(&row_group_sizes(reader.metadata()), &[200, 200, 50]); let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); let batches = arrow_reader @@ -1993,7 +1991,7 @@ mod tests { // Write data let file = tempfile::tempfile().unwrap(); let props = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_size(6) .build(); let mut writer = @@ -2007,8 +2005,9 @@ mod tests { // Read Data let reader = SerializedFileReader::new(file).unwrap(); - // Should have written a single row group - assert_eq!(reader.metadata().num_row_groups(), 1); + // Should have written entire first batch and first row of second to the first row group + // leaving a single row in the second row group + assert_eq!(&row_group_sizes(reader.metadata()), &[6, 1]); let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); let batches = arrow_reader From 326461b91b57a24ccb00ede2fc3931f09e2cf1b9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 1 Feb 2022 10:57:26 +0000 Subject: [PATCH 8/8] Clippy --- parquet/src/arrow/arrow_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index 6f391b500a16..c0205c422a2a 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -698,7 +698,7 @@ mod tests { use arrow::{array::*, buffer::Buffer}; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; - use crate::file::metadata::{FileMetaData, ParquetMetaData}; + use crate::file::metadata::ParquetMetaData; use crate::file::{ reader::{FileReader, SerializedFileReader}, statistics::Statistics,