Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 137 additions & 27 deletions parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

//! Contains writer which writes arrow data into parquet data.

use std::collections::VecDeque;
use std::sync::Arc;

use arrow::array as arrow_array;
use arrow::array::ArrayRef;
use arrow::compute::concat;
use arrow::datatypes::{DataType as ArrowDataType, IntervalUnit, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
Expand All @@ -40,14 +43,23 @@ use crate::{

/// Arrow writer
///
/// Writes Arrow `RecordBatch`es to a Parquet writer
/// Writes Arrow `RecordBatch`es to a Parquet writer, buffering up `RecordBatch` in order
/// to produce row groups with `max_row_group_size` rows. Any remaining rows will be
/// flushed on close, leading the final row group in the output file to potentially
/// contain fewer than `max_row_group_size` rows
pub struct ArrowWriter<W: ParquetWriter> {
/// Underlying Parquet writer
writer: SerializedFileWriter<W>,

buffer: Vec<VecDeque<ArrayRef>>,

buffered_rows: usize,

/// A copy of the Arrow schema.
///
/// The schema is used to verify that each record batch written has the correct schema
arrow_schema: SchemaRef,

/// The length of arrays to write to each row group
max_row_group_size: usize,
}
Expand Down Expand Up @@ -75,54 +87,109 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {

Ok(Self {
writer: file_writer,
buffer: vec![Default::default(); arrow_schema.fields().len()],
buffered_rows: 0,
arrow_schema,
max_row_group_size,
})
}

/// Write a RecordBatch to writer
/// Enqueues the provided `RecordBatch` to be written
///
/// The writer will slice the `batch` into `max_row_group_size`,
/// but if a batch has left-over rows less than the row group size,
/// the last row group will have fewer records.
/// This is currently a limitation because we close the row group
/// instead of keeping it open for the next batch.
/// If following this there are more than `max_row_group_size` rows buffered,
/// this will flush out one or more row groups with `max_row_group_size` rows,
/// and drop any fully written `RecordBatch`
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
// validate batch schema against writer's supplied schema
if self.arrow_schema != batch.schema() {
return Err(ParquetError::ArrowError(
"Record batch schema does not match writer schema".to_string(),
));
}
// Track the number of rows being written in the batch.
// We currently do not have a way of slicing nested arrays, thus we
// track this manually.
let num_rows = batch.num_rows();
let batches = (num_rows + self.max_row_group_size - 1) / self.max_row_group_size;
let min_batch = num_rows.min(self.max_row_group_size);
for batch_index in 0..batches {
// Determine the offset and length of arrays
let offset = batch_index * min_batch;
let length = (num_rows - offset).min(self.max_row_group_size);

// Compute the definition and repetition levels of the batch
let batch_level = LevelInfo::new(offset, length);
let mut row_group_writer = self.writer.next_row_group()?;
for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
let mut levels = batch_level.calculate_array_levels(array, field);
// Reverse levels as we pop() them when writing arrays
levels.reverse();
write_leaves(&mut row_group_writer, array, &mut levels)?;

for (buffer, batch) in self.buffer.iter_mut().zip(batch.columns()) {
buffer.push_back(batch.clone())
}

self.buffered_rows += batch.num_rows();

self.flush_completed()?;

Ok(())
}

/// Flushes buffered data until there are less than `max_row_group_size` rows buffered
fn flush_completed(&mut self) -> Result<()> {
while self.buffered_rows >= self.max_row_group_size {
self.flush_row_group(self.max_row_group_size)?;
}
Ok(())
}

/// Flushes `num_rows` from the buffer into a new row group
fn flush_row_group(&mut self, num_rows: usize) -> Result<()> {
if num_rows == 0 {
return Ok(());
}

assert!(
num_rows <= self.buffered_rows,
"cannot flush {} rows only have {}",
num_rows,
self.buffered_rows
);

assert!(
num_rows <= self.max_row_group_size,
"cannot flush {} rows would exceed max row group size of {}",
num_rows,
self.max_row_group_size
);

let batch_level = LevelInfo::new(0, num_rows);
let mut row_group_writer = self.writer.next_row_group()?;

for (col_buffer, field) in self.buffer.iter_mut().zip(self.arrow_schema.fields())
{
// Collect the number of arrays to append
let mut remaining = num_rows;
let mut arrays = Vec::with_capacity(col_buffer.len());
while remaining != 0 {
match col_buffer.pop_front() {
Some(next) if next.len() > remaining => {
col_buffer
.push_front(next.slice(remaining, next.len() - remaining));
arrays.push(next.slice(0, remaining));
remaining = 0;
}
Some(next) => {
remaining -= next.len();
arrays.push(next);
}
_ => break,
}
}

self.writer.close_row_group(row_group_writer)?;
// Workaround write logic expecting a single array
let array_refs: Vec<_> = arrays.iter().map(|x| x.as_ref()).collect();
let array = concat(array_refs.as_slice())?;

let mut levels = batch_level.calculate_array_levels(&array, field);
// Reverse levels as we pop() them when writing arrays
levels.reverse();
write_leaves(&mut row_group_writer, &array, &mut levels)?;
}

self.writer.close_row_group(row_group_writer)?;
self.buffered_rows -= num_rows;

Ok(())
}

/// Close and finalize the underlying Parquet writer
pub fn close(&mut self) -> Result<parquet_format::FileMetaData> {
self.flush_completed()?;
self.flush_row_group(self.buffered_rows)?;
self.writer.close()
}
}
Expand Down Expand Up @@ -1723,4 +1790,47 @@ mod tests {

one_column_roundtrip(array, true, Some(10));
}

#[test]
fn test_aggregates_records() {
let arrays = [
Int32Array::from((0..100).collect::<Vec<_>>()),
Int32Array::from((0..50).collect::<Vec<_>>()),
Int32Array::from((200..500).collect::<Vec<_>>()),
];

let schema = Arc::new(Schema::new(vec![Field::new(
"int",
ArrowDataType::Int32,
false,
)]));

let file = tempfile::tempfile().unwrap();

let props = WriterProperties::builder()
.set_max_row_group_size(200)
.build();

let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props))
.unwrap();

for array in arrays {
let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
writer.write(&batch).unwrap();
}

writer.close().unwrap();

let reader = SerializedFileReader::new(file).unwrap();
let row_group_sizes: Vec<_> = reader
.metadata()
.row_groups()
.iter()
.map(|x| x.num_rows())
.collect();

assert_eq!(row_group_sizes, vec![200, 200, 50]);
}
}
9 changes: 4 additions & 5 deletions parquet/src/arrow/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
//!
//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)

use arrow::array::{make_array, ArrayRef, MapArray, StructArray};
use arrow::array::{make_array, Array, ArrayRef, MapArray, StructArray};
use arrow::datatypes::{DataType, Field};

/// Keeps track of the level information per array that is needed to write an Arrow array to Parquet.
Expand Down Expand Up @@ -711,12 +711,11 @@ impl LevelInfo {
((0..=(len as i64)).collect(), array_mask)
}
DataType::List(_) | DataType::Map(_, _) => {
let data = array.data();
let offsets = unsafe { data.buffers()[0].typed_data::<i32>() };
let offsets = unsafe { array.data().buffers()[0].typed_data::<i32>() };
let offsets = offsets
.to_vec()
.into_iter()
.skip(offset)
.skip(array.offset() + offset)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a pre-existing bug, that people would have run into if they wrote a sliced RecordBatch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #1226 to track (so that we can document this in the release notes)

.take(len + 1)
.map(|v| v as i64)
.collect::<Vec<i64>>();
Expand All @@ -729,7 +728,7 @@ impl LevelInfo {
DataType::LargeList(_) => {
let offsets = unsafe { array.data().buffers()[0].typed_data::<i64>() }
.iter()
.skip(offset)
.skip(array.offset() + offset)
.take(len + 1)
.copied()
.collect();
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const DEFAULT_DICTIONARY_ENABLED: bool = true;
const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE;
const DEFAULT_STATISTICS_ENABLED: bool = true;
const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 128 * 1024 * 1024;
const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY");

/// Parquet writer version.
Expand Down Expand Up @@ -129,7 +129,7 @@ impl WriterProperties {
self.write_batch_size
}

/// Returns max size for a row group.
/// Returns maximum number of rows in a row group.
pub fn max_row_group_size(&self) -> usize {
self.max_row_group_size
}
Expand Down Expand Up @@ -288,7 +288,7 @@ impl WriterPropertiesBuilder {
self
}

/// Sets max size for a row group.
/// Sets maximum number of rows in a row group.
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
assert!(value > 0, "Cannot have a 0 max row group size");
self.max_row_group_size = value;
Expand Down