Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 52 additions & 53 deletions arrow-select/src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
use crate::concat::concat_batches;
use arrow_array::{
builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, RecordBatchOptions,
};
use arrow_array::{builder::StringViewBuilder, cast::AsArray, Array, RecordBatch};
use arrow_schema::{ArrowError, SchemaRef};
use std::collections::VecDeque;
use std::sync::Arc;
Expand Down Expand Up @@ -164,7 +162,7 @@ impl BatchCoalescer {
return Ok(());
}

let mut batch = gc_string_view_batch(&batch);
let mut batch = gc_string_view_batch(batch);

// If pushing this batch would exceed the target batch size,
// finish the current batch and start a new one
Expand Down Expand Up @@ -242,64 +240,65 @@ impl BatchCoalescer {
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
/// `StringViewArray` may only refer to a small portion of the buffer,
/// significantly increasing memory usage.
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
let new_columns: Vec<ArrayRef> = batch
.columns()
.iter()
.map(|c| {
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
let Some(s) = c.as_string_view_opt() else {
return Arc::clone(c);
};
let ideal_buffer_size: usize = s
.views()
.iter()
.map(|v| {
let len = (*v as u32) as usize;
if len > 12 {
len
} else {
0
}
})
.sum();
let actual_buffer_size = s.get_buffer_memory_size();

// Re-creating the array copies data and can be time consuming.
// We only do it if the array is sparse
if actual_buffer_size > (ideal_buffer_size * 2) {
// We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
let mut builder = StringViewBuilder::with_capacity(s.len());
if ideal_buffer_size > 0 {
builder = builder.with_fixed_block_size(ideal_buffer_size as u32);
#[inline(never)]
fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch {
let (schema, mut columns, row_count) = batch.into_parts();

for c in columns.iter_mut() {
let Some(s) = c.as_string_view_opt() else {
continue;
};
let ideal_buffer_size: usize = s
.views()
.iter()
.map(|v| {
let len = (*v as u32) as usize;
if len > 12 {
len
} else {
0
}
})
.sum();
let actual_buffer_size = s.get_buffer_memory_size();

// Re-creating the array copies data and can be time consuming.
// We only do it if the array is sparse
if actual_buffer_size > (ideal_buffer_size * 2) {
// We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
let mut builder = StringViewBuilder::with_capacity(s.len());
if ideal_buffer_size > 0 {
builder = builder.with_fixed_block_size(ideal_buffer_size as u32);
}

for v in s.iter() {
builder.append_option(v);
}
for v in s.iter() {
builder.append_option(v);
}

let gc_string = builder.finish();
let gc_string = builder.finish();

debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0
debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0
// Ensure the new array has the same number of rows as the input
let new_row_count = gc_string.len();
assert_eq!(
new_row_count, row_count,
"gc_string_view_batch: length mismatch got {new_row_count}, expected {row_count}"
);
*c = Arc::new(gc_string)
}
}

Arc::new(gc_string)
} else {
Arc::clone(c)
}
})
.collect();
let mut options = RecordBatchOptions::new();
options = options.with_row_count(Some(batch.num_rows()));
RecordBatch::try_new_with_options(batch.schema(), new_columns, &options)
.expect("Failed to re-create the gc'ed record batch")
// Safety: the input batch was valid, and we validated that the new output array
// had the same number of rows as the input.
unsafe { RecordBatch::new_unchecked(schema, columns, row_count) }
}

#[cfg(test)]
mod tests {
use super::*;
use arrow_array::builder::ArrayBuilder;
use arrow_array::{StringViewArray, UInt32Array};
use arrow_array::{ArrayRef, RecordBatchOptions, StringViewArray, UInt32Array};
use arrow_schema::{DataType, Field, Schema};
use std::ops::Range;

Expand Down Expand Up @@ -518,7 +517,7 @@ mod tests {
fn test_gc_string_view_test_batch_empty() {
let schema = Schema::empty();
let batch = RecordBatch::new_empty(schema.into());
let output_batch = gc_string_view_batch(&batch);
let output_batch = gc_string_view_batch(batch.clone());
assert_eq!(batch.num_columns(), output_batch.num_columns());
assert_eq!(batch.num_rows(), output_batch.num_rows());
}
Expand Down Expand Up @@ -568,7 +567,7 @@ mod tests {
/// and ensures the number of rows are the same
fn do_gc(array: StringViewArray) -> StringViewArray {
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
let gc_batch = gc_string_view_batch(&batch);
let gc_batch = gc_string_view_batch(batch.clone());
assert_eq!(batch.num_rows(), gc_batch.num_rows());
assert_eq!(batch.schema(), gc_batch.schema());
gc_batch
Expand Down
Loading