diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 5a5e7e141c84..f2e7840a74da 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -21,9 +21,7 @@ //! [`filter`]: crate::filter::filter //! [`take`]: crate::take::take use crate::concat::concat_batches; -use arrow_array::{ - builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, RecordBatchOptions, -}; +use arrow_array::{builder::StringViewBuilder, cast::AsArray, Array, RecordBatch}; use arrow_schema::{ArrowError, SchemaRef}; use std::collections::VecDeque; use std::sync::Arc; @@ -164,7 +162,7 @@ impl BatchCoalescer { return Ok(()); } - let mut batch = gc_string_view_batch(&batch); + let mut batch = gc_string_view_batch(batch); // If pushing this batch would exceed the target batch size, // finish the current batch and start a new one @@ -242,64 +240,65 @@ impl BatchCoalescer { /// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the /// `StringViewArray` may only refer to a small portion of the buffer, /// significantly increasing memory usage. -fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { - let new_columns: Vec = batch - .columns() - .iter() - .map(|c| { - // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. - let Some(s) = c.as_string_view_opt() else { - return Arc::clone(c); - }; - let ideal_buffer_size: usize = s - .views() - .iter() - .map(|v| { - let len = (*v as u32) as usize; - if len > 12 { - len - } else { - 0 - } - }) - .sum(); - let actual_buffer_size = s.get_buffer_memory_size(); - - // Re-creating the array copies data and can be time consuming. - // We only do it if the array is sparse - if actual_buffer_size > (ideal_buffer_size * 2) { - // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. - // See https://github.com/apache/arrow-rs/issues/6094 for more details. - let mut builder = StringViewBuilder::with_capacity(s.len()); - if ideal_buffer_size > 0 { - builder = builder.with_fixed_block_size(ideal_buffer_size as u32); +#[inline(never)] +fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch { + let (schema, mut columns, row_count) = batch.into_parts(); + + for c in columns.iter_mut() { + let Some(s) = c.as_string_view_opt() else { + continue; + }; + let ideal_buffer_size: usize = s + .views() + .iter() + .map(|v| { + let len = (*v as u32) as usize; + if len > 12 { + len + } else { + 0 } + }) + .sum(); + let actual_buffer_size = s.get_buffer_memory_size(); + + // Re-creating the array copies data and can be time consuming. + // We only do it if the array is sparse + if actual_buffer_size > (ideal_buffer_size * 2) { + // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. + // See https://github.com/apache/arrow-rs/issues/6094 for more details. + let mut builder = StringViewBuilder::with_capacity(s.len()); + if ideal_buffer_size > 0 { + builder = builder.with_fixed_block_size(ideal_buffer_size as u32); + } - for v in s.iter() { - builder.append_option(v); - } + for v in s.iter() { + builder.append_option(v); + } - let gc_string = builder.finish(); + let gc_string = builder.finish(); - debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0 + debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0 + // Ensure the new array has the same number of rows as the input + let new_row_count = gc_string.len(); + assert_eq!( + new_row_count, row_count, + "gc_string_view_batch: length mismatch got {new_row_count}, expected {row_count}" + ); + *c = Arc::new(gc_string) + } + } - Arc::new(gc_string) - } else { - Arc::clone(c) - } - }) - .collect(); - let mut options = RecordBatchOptions::new(); - options = options.with_row_count(Some(batch.num_rows())); - RecordBatch::try_new_with_options(batch.schema(), new_columns, &options) - .expect("Failed to re-create the gc'ed record batch") + // Safety: the input batch was valid, and we validated that the new output array + // had the same number of rows as the input. + unsafe { RecordBatch::new_unchecked(schema, columns, row_count) } } #[cfg(test)] mod tests { use super::*; use arrow_array::builder::ArrayBuilder; - use arrow_array::{StringViewArray, UInt32Array}; + use arrow_array::{ArrayRef, RecordBatchOptions, StringViewArray, UInt32Array}; use arrow_schema::{DataType, Field, Schema}; use std::ops::Range; @@ -518,7 +517,7 @@ mod tests { fn test_gc_string_view_test_batch_empty() { let schema = Schema::empty(); let batch = RecordBatch::new_empty(schema.into()); - let output_batch = gc_string_view_batch(&batch); + let output_batch = gc_string_view_batch(batch.clone()); assert_eq!(batch.num_columns(), output_batch.num_columns()); assert_eq!(batch.num_rows(), output_batch.num_rows()); } @@ -568,7 +567,7 @@ mod tests { /// and ensures the number of rows are the same fn do_gc(array: StringViewArray) -> StringViewArray { let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap(); - let gc_batch = gc_string_view_batch(&batch); + let gc_batch = gc_string_view_batch(batch.clone()); assert_eq!(batch.num_rows(), gc_batch.num_rows()); assert_eq!(batch.schema(), gc_batch.schema()); gc_batch